Implementing a simple ForEachAsync

Implementing a simple ForEachAsync

Rate This
  • Comments 15

Jon Skeet recently asked me how I might go about implementing the following “asynchronous ForEach” behavior:

  • For each element in an enumerable, run a function that returns a Task<TResult> to represent the completion of processing that element. All of these functions may run asynchronously concurrently.
  • As each task completes, run a second processing action over the results.  All of these actions must be run sequentially, but order doesn’t matter.

Given what we now know about SemaphoreSlim from my previous post, here’s one way to achieve this:

public static Task ForEachAsync<TSource, TResult>(
    this IEnumerable<TSource> source,
    Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor)
{
    var oneAtATime = new SemaphoreSlim(initialCount:1, maxCount:1);
    return Task.WhenAll(
        from item in source
        select ProcessAsync(item, taskSelector, resultProcessor, oneAtATime));
}

private static async Task ProcessAsync<TSource, TResult>(
    TSource item,
    Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor,
    SemaphoreSlim oneAtATime)
{
    TResult result = await taskSelector(item);
    await oneAtATime.WaitAsync();
    try { resultProcessor(item, result); }
    finally { oneAtATime.Release(); }
}

We instantiate a semaphore initialized with a count of 1, and we use this to throttle the follow-up actions to ensure that only one at a time runs.  We then call the ProcessAsync method for each element, passing in the element, the semaphore, and the delegates to invoke.  Inside ProcessAsync, we first run the function and await its completion.  Once it’s complete, we acquire the semaphore to ensure that we run the result processing function sequentially with regards to all the other processing function.

Thanks for the question/suggestion, Jon!

Leave a Comment
  • Please add 8 and 1 and type the answer here:
  • Post
  • An example about how to use it? thanks!

  • Also, you might want to investigate using a Partitioner. I implemented an async ForEach myself, and using one improved performance greatly for most workloads.

  • This seems useful as a simple projection from the elements of a hot sequence into concurrent Tasks, but it's not composable; e.g., via LINQ.

    Jon's requirements actually sound like a job for IObservable<T> to me.  But I wonder if what Jon was really interested in are cold sequences; i.e., he wants to model computations between yields of an iterator block using Task.  Although perhaps not, because in that case the Tasks cannot be executed concurrently, which is why IObservable<T> is perhaps more appropriate.

    Regardless, as I'm sure you're aware, Ix Experimental (from the Rx team) provides a ToAsyncEnumerable extension method that converts IEnumerable<T> into IAsyncEnumerable<T>, which seems to basically do what your extension is doing.  I've recently blogged about "Async Iterators" (davesexton.com/.../async-iterators.aspx) and I think my conclusion to use IObservable<T> may be more appropriate (again, assuming that my interpretation of Jon's requirements is correct).

    Do you at least agree that IAsyncEnumerable<T> in conjunction with Task.WhenAll is a reasonable solution to Jon's query for hot sequences?

  • My last question was wrong.  I don't think IAsyncEnumerable<T> is a reasonable solution for a hot sequence.  I meant for a cold sequence, where it's the computation between values that must be asynchronous (assuming there's no need to make the iterator block async as well, as shown in my blog post.)

  • Steph, here's a simple example:

       var client = new HttpClient();

       var results = new Dictionary<string,string>();

       await ForEachAsync(urls, url => client.GetStringAsync(url), (url,contents) => results.Add(url, contents));

    Cory, are you talking about implementing a serialized ForEachAsync like I've done here, or a ParallelForEachAsync?  I can definitely see using a Partitioner for the latter (and I'll add that to the list of subsequent blog posts to write :), but with the former there's only one thread enumerating the IEnumerable<T>, so I'm not sure how a Partitioner would be helpful here.

    Dave, thanks.  I wasn't trying to make a comparison between async/await and Rx, but rather show how these kinds of simple processing functions can be built up.  Yes, if you want to use LINQ-based processing, Rx can be a very useful addition.

  • @Stephen Woops, yes, you're right. I wrote a ParallelAsync.ForEach. It's unfortunate that Parallel and Task yet can't integrate (especially when Parallel uses Task underneath the hood!) -- I hope this functionality is looked at for future revisions.

  • which situation we use Parallel.ForEach? which situation we use AsyncForEach? I can't understand.

  • Ozy, this was just an example where I was asked how to implement something to meet a particular desired behavior, and I showed how it could be done using .NET 4.5 Beta.  This was not meant to be contrasted with Parallel.ForEach.  If the described semantics are what you need, then you could use the sample code I provided.  In contrast, Parallel.ForEach is about taking an IEnumerable<T> with data, and using multiple threads to process all of those elements in parallel... it's not about asynchrony, it's about using multiple cores to get work done faster.  Parallel.ForEach is actually a synchronous call.  I hope that helps.

  • How would one add a SemaphoreSlim.Dispose call to that?  I can think of two ways but am not sure that they would work.

    (a) After Task.WhenAll, call Task.ContinueWith(Action<Task, Object>, Object) to add an action that calls Dispose on the Object.

    (b) Make ForEachAsync an async function too.  Await the result of Task.WhenAll and wrap a using-statement around that.

  • Kalle, both of your solutions would work and are fine.  

    For (a), for example you could change:

       return Task.WhenAll(...);

    to instead be:

       var t = Task.WhenAll(...);

       t.ContinueWith(delegate { oneAtATime.Dispose(); }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

       return t;

    For (b), you could mark the method as async and change:

       var oneAtATime = new SemaphoreSlim(...);

       return Task.WhenAll(...);

    to instead be:

       using(var oneAtATime = new SemaphoreSlim(...))

           await Task.WhenAll(...);

    That said, you're probably better off just not disposing of it; I purposely didn't Dispose of it in my example.

  • Kalle, both of your solutions would work and are fine.  

    For (a), for example you could change:

       return Task.WhenAll(...);

    to instead be:

       var t = Task.WhenAll(...);

       t.ContinueWith(delegate { oneAtATime.Dispose(); }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

       return t;

    For (b), you could mark the method as async and change:

       var oneAtATime = new SemaphoreSlim(...);

       return Task.WhenAll(...);

    to instead be:

       using(var oneAtATime = new SemaphoreSlim(...))

           await Task.WhenAll(...);

    That said, you're probably better off just not disposing of it; I purposely didn't Dispose of it in my example.

  • Wouldn't this be a good fit for TPL DataFlow? Something like:

    public static async Task ForEachAsync<TSource, TResult>(

       this IEnumerable<TSource> source,

       Func<TSource, Task<TResult>> taskSelector, Action<TResult> resultProcessor)

    {

       var taskSelectorBlock = new TransformBlock<TSource, TResult>(

           taskSelector,

           new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

       var resultProcessorBlock = new ActionBlock<TResult>(resultProcessor);

       taskSelectorBlock.LinkTo(resultProcessorBlock, new DataflowLinkOptions { PropagateCompletion = true });

       foreach (var item in source)

           await taskSelectorBlock.SendAsync(item);

       taskSelectorBlock.Complete();

       await resultProcessorBlock.Completion;

    }

    It's about the same amount of code and I think it would be easier to extend or modify (e.g. if you wanted to limit the parallelism of the first step).

  • svick, definitely, thanks for pointing that out!

  • svick, definitely, thanks for pointing that out!

  • Hi all,

    Just wanted to say thanks for this post and I'm sharing with you some of the result using these snippets:

    A small helper I needed for quickly downloading images in parallel and asynchronously :

    stackoverflow.com/.../asynchronously-and-parallelly-downloading-files

    Thanks !

Page 1 of 1 (15 items)