Implementing a simple ForEachAsync, part 2

Implementing a simple ForEachAsync, part 2

Rate This
  • Comments 13

After my previous post, I received several emails and comments from folks asking why I chose to implement ForEachAsync the way I did.  My goal with that post wasn’t to prescribe a particular approach to iteration, but rather to answer a question I’d received… obviously, however, I didn’t provide enough background. Let me take a step back then so as to put the post in context.

Iteration is a common development task, and there are many different variations on how iteration might be implemented.  For example, a basic synchronous ForEach might be implemented as follows:

public static void ForEach<T>(this IEnumerable<T> source, Action<T> body)
{
    foreach(var item in source) body(item);
}

That, however, encapsulates just one particular semantic, that of looping through the source, executing the action one element at a time, and stopping if an exception is thrown.  Here’s another implementation, this time continuing the processing even if an exception is thrown, propagating any exceptions only once we’re done with the whole loop:

public static void ForEach<T>(this IEnumerable<T> source, Action<T> body)
{
    List<Exception> exceptions = null;
    foreach(var item in source)
    {
        try {
body(item); }
        catch(Exception exc)
        {
            if (exceptions == null) exceptions = new List<Exception>();
            exceptions.Add(exc);
        }

    }
    if (exceptions != null)
        throw new AggregateException(exceptions);
}

These are both synchronous examples.  Once asynchrony is introduced, additional variations are possible.  We can of course create asynchronous versions that match the two examples just shown, e.g.

public static async Task ForEachAsync<T>(this IEnumerable<T> source, Func<T,Task> body)
{
    foreach(var item in source) await body(item);
}

and:

public static async Task ForEachAsync<T>(this IEnumerable<T> source, Func<T,Task> body)
{
   
List<Exception> exceptions = null;
    foreach(var item in source)
    { 
        try { await
body(item); }
        catch(Exception exc)
        {
            if (exceptions == null) exceptions = new List<Exception>();
            exceptions.Add(exc);
        }

    }
    if (exceptions != null)  
        throw new AggregateException(exceptions);
}

respectively. But we can also go beyond this.  Once we’re able to launch work asynchronously, we can achieve concurrency and parallelism, invoking the body for each element and waiting on them all at the end, rather than waiting for each in turn, e.g.

public static Task ForEachAsync<T>(this IEnumerable<T> source, Func<T,Task> body)
{
    return Task.WhenAll(
        from item in source
        select body(item));
}

This serially invokes all of the body delegates, but it allows any continuations used in the bodies to run concurrently (depending on whether we’re in a serializing SynchronizationContextand whether the code in the body delegate is forcing continuations back to that context).  We could force more parallelism by wrapping each body invocation in a Task:

public static Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> body)
{
    return Task.WhenAll(
        from item in source
        select Task.Run(() => body(item)));
}

This will schedule a Task to invoke the body for each item and will then asynchronously wait for all of the async invocations to complete.  Note that this also means that the code run by the body delegate won’t be forced back to the current SynchronizationContext, even if there is one, since the async invocations are occurring on ThreadPool threads where there is no SynchronizationContext set.

We could further expand on this if we wanted to limit the number of operations that are able to run in parallel.  One way to achieve that is to partition the input data set into N partitions, where N is the desired maximum degree of parallelism, and schedule a separate task to begin the execution for each partition (this uses the Partitioner class from the System.Collections.Concurrent namespace):

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}

This last example is similar in nature to Parallel.ForEach, with the primary difference being that Parallel.ForEach is a synchronous method and uses synchronous delegates.

The point is that there are many different semantics possible for iteration, and each will result in different design choices and implementations.  The ForEachAsync example from my previous post was just one more such variation, accounting for the behavior that I’d been asked about.  As should now hopefully be obvious from this post, it is in no way the only way to iterate asynchronously.

Thanks for all the interest.

Leave a Comment
  • Please add 1 and 6 and type the answer here:
  • Post
  • Perhaps some folks sensed that something was missing from, and incompatible with, the original solution -- namely, the ability to break from the loop. This new kind of loop seems like it should go by a different name, given the substantially different semantics. Might I suggest "SomewhatParallelForEach"?

  • This new await and async keywords really wreck my brain. At one point I think I understand them and at another I realize that I don't.

    Do you know any good resources to learn those keywords and how to use them?

  • Dalibor, try http://msdn.com/async. There are a lot of resources there.  Good luck.

  • I run method as async using BeginInvoke and EndInvoke like this:

    void Main()

    {

    Random rnd = new Random();

    Func<int> work = ()=> {

    var delay = rnd.Next(2000);

    delay = 5000;

    Thread.Sleep(delay);

    return delay;

    };

    for (int i = 0; i < 100; i++)

    {

    work.DoAsync(j=>Console.WriteLine("hello:{0}",j));

    }

    Console.WriteLine ("non-block");

    Console.Read();

    }

    public static class Extensions

    {

    public static void DoAsync<TResult>(this Func<TResult> f, Action<TResult> callback)

    {

    f.BeginInvoke(x => callback(f.EndInvoke(x)), null);

    }

    public static void DoAsync<TInput, TResult>(this Func<TInput, TResult> f, TInput arg, Action<TResult> callback)

    {

    f.BeginInvoke(arg, x => callback(f.EndInvoke(x)), null);

    }

    }

    What is different async ctp?

  • "What is different?": Several things, but most importantly, your solution is callback-based. You have to turn your control flow inside out by providing a delegate that specifies what to do when the first operation completes (note that you're also not properly handling exceptions... if the async operation fails, that exception will propagate out of the EndInvoke call and likely crash the process).  In contrast, the new async support in C#/VB allows you to write your normal control flow, without using callbacks.  For more info, see http://msdn.com/async.

  • async/await is really good, but I feel like it is missing the final piece: an "async foreach" construct, maybe based on IObservable.

    Can you write a post about how that can be simply simulated?

  • Hi Flavien-

    A synchronous foreach is basically syntactic sugar for (ignored disposability for the purposes of this discussion):

    while(e.MoveNext()) Body(e.Current);

    So, if you had an API which exposed a "Task<bool> MoveNextAsync()" and a "T Current;", you could do the same basic thing with await:

    while(await e.MoveNext()) Body(e.Current);

    If you were using a BufferBlock<T> from System.Threading.Tasks.Dataflow.dll, for example, you could do that with:

    while(await buffer.OutputAvailableAsync()) Body(buffer.Receive());

    or a bit more efficiently as:

    while(await buffer.OutputAvailableAsync())

    {

       T current;

       while(buffer.TryReceive(out current)) Body(current);

    }

  • I implemented the last ForEachAsync example in the above that uses the partitioner and I call on the ForEachAsync prefixed with an await, but the execution in the calling method continues while the Task are still being processed instead of pausing until all the tasks are complete (WhenAll). Am I doing something wrong? Below is a sample of my code...

    await new DocumentChunkHelper(document).GetChunkCollection().ForEachAsync<DocumentChunk>(

                       DegreeOfParallelism.Low,

                       chunk =>

                       {

                           return this._service.UpdateServiceTemplateWriteUp(

                               new DocumentBuffer()

                               {

                                   Id = this.ServiceTemplate.ServiceTemplateId,

                                   Buffer = chunk.Buffer,

                                   BufferIndex = chunk.BufferIndex,

                                   MaxBufferSize = chunk.MaxBufferSize,

                                   TotalLength = chunk.TotalLength

                               }

                           );

                       }

                   );

    This UpdateServiceTemplateWriteUp method's return type is Task<DocumentChunk>

  • What does your ForEachAsync code look like... is it 100% exactly what's written above?  Or did you maybe replace the Task.Run with Task.Factory.StartNew or something like that?  It could also be that the task you're returning from UpdateServiceTemplateWriteUp is completing before the represented work is actually done.

  • Stephen,

    Can you explain why nothing is happening when I use the ForEachAsync:

           public static void TestAsyncP()

           {

               Enumerable.Range(0, 100).ToList()

                   .ForEachAsync<int>(

                   10,

                   task =>

                   {

                       return new Task(async () => {

                           await ia(task);

                           //Console.WriteLine(ii);

                       });

                       //return task;

                   });

               Console.WriteLine("ForEachAsync");

           }

           public static async Task<long> ia(int x)

           {

               long ii = 1;

               Parallel.ForEach(Partitioner.Create(1, x), range =>

               {

                   for (var ij = range.Item1; ij <= range.Item2; ij++)

                   {

                       Console.WriteLine("{0} - {1} - {2} - {3}", x, ij, range.Item1, range.Item2);

                   }

               });

               await Task.WhenAll(

                   Enumerable.Range(1, x).ToList()

                   .ConvertAll(y =>

                       Task.Run(() =>

                       {

                           System.Threading.Thread.Sleep(x);

                           ii *= y;

                       })).ToArray());

               Console.WriteLine("ia: {0} - {1}", x, ii);

               return ii;

           }

  • @Lamb:  I've not run your code, but a few things that jump out at me:

    1. Task's ctor creates an unstarted Task.  It will never be executed (and thus never complete) unless you call its Start method, which your code does not do.

    2. Task's ctor does not work as you're expecting with async lambdas.  See the post at blogs.msdn.com/.../10229468.aspx for related information; it's talking about Task.Factory.StartNew, but similar issues apply to the ctor.

  • If I try to start the task though I get an exception. Do you have any working examples of using the ForEachAsync?

               Enumerable.Range(0, 100).ToList()

                   .ForEachAsync<int>(

                   10,

                   task =>

                   {

                       return new Task(async () =>

                       {

                           await ia(task);

                           //Console.WriteLine(ii);

                       });

                       //return task;

                   }).Start();

    Returns: Start may not be called on a promise-style task.

  • @BermudaLamb: The Task you're trying to start in your example is the one being returned from ForEachAsync, not the one that you're constructing.  You could instead put the .Start on the Task that you're constructing, but they'll you'll still have the other issue I mentioned.  Just get rid of the .Start and replace "new Task" with "Task.Run".

Page 1 of 1 (13 items)