Processing tasks as they complete

Processing tasks as they complete

Rate This
  • Comments 12

Recently I’ve had several folks ask me about how to process the results of tasks as those tasks complete.

A developer will have multiple tasks representing asynchronous operations they’ve initiated, and they want to process the results of these tasks, e.g.

List<Task<T>> tasks = …;
foreach(var t in tasks) {
    try { Process(await t); }
    catch(OperationCanceledException) {}
    catch(Exception exc) { Handle(exc); }
}

This approach is fine for many situations.  However, it enforces an additional constraint on the processing that wasn’t actually implied by the initial problem statement: this code ends up processing the tasks in the order they were supplied rather than in the order they complete, which means some tasks that have already completed might not be made available for processing, because an earlier task in the order may not have completed yet.

There are many ways to implement a solution like this.  One approach involves simply using Task’s ContinueWith method, e.g.

List<Task<T>> tasks = …;
foreach(var t in tasks)
    t.ContinueWith(completed => {
        switch(completed.Status) {
            case TaskStatus.RanToCompletion: Process(completed.Result); break;
            case TaskStatus.Faulted: Handle(completed.Exception.InnerException); break;
        }
    }, TaskScheduler.Default);

This solution actually alleviates an additional constraint, which is that the original solution forced the processing of all of the continuations to run serially (and on the original SynchronizationContext if there was one), whereas this allows the processing to run in parallel and on the ThreadPool.  If you wanted to go with this approach but also wanted to force the tasks to run serially, you could do so by supplying a serializing scheduler to ContinueWith (i.e. a scheduler that forced the continuations to run exclusively with regards to each other).  For example, you could use a ConcurrentExclusiveSchedulerPair, e.g.

List<Task<T>> tasks = …;
var scheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;
foreach(var t in tasks)
    t.ContinueWith(completed => {
        switch(completed.Status) {
            case TaskStatus.RanToCompletion: Process(completed.Result); break;
            case TaskStatus.Faulted: Handle(completed.Exception.InnerException); break;
        }
    }, scheduler);

or if, for example, you were on the UI thread of your application, you could supply a scheduler that represents that UI thread, e.g.

var scheduler = TaskScheduler.FromCurrentSynchronizationContext();

Even so, this ContinueWith-based approach does force you into a callback-based model to handle your processing.  If you wanted the process-serially-as-the-tasks-complete behavior, but with using async/await rather than using ContinueWith, that’s also possible.

There are a few ways to achieve this.  One relatively simple way is by using Task.WhenAny.  WhenAny accepts a set of tasks and will asynchronously provide the first one that completes.  As such, you can repeatedly invoke WhenAny, each time removing the previously completed task such that you’re asynchronously waiting for the next to complete, e.g.

List<Task<T>> tasks = …;
while(tasks.Count > 0) {
    var t = await Task.WhenAny(tasks);
    tasks.Remove(t);

    try { Process(await t); }
    catch(OperationCanceledException) {}
    catch(Exception exc) { Handle(exc); }
}

Functionally, this is fine, and as long as the number of tasks is small, the performance of this should be fine, too.  However, if the number of tasks is large here, this could result in non-negligible performance overheads.  What we’ve effectively created here is an O(N2) algorithm: for each task, we search the list for the task to remove it, which is an O(N) operation, and we register a continuation with each task, which is also an O(N) operation.  For example, if we had 10,000 tasks, over the span of this whole operation we’d end up registering and unregistering upwards of 50 million continuations as part of the WhenAny calls.  Now, that’s not quite as bad as it sounds, as WhenAny is smart about how it manages its resources, for example not registering continuations with tasks that are already completed, stopping as soon as it finds a completed task, reusing the same continuation object across all of the tasks, etc.  Still, there’s work here we can avoid if profiling deems it to be problematic.

An alternate approach is to create a new “combinator” method specifically geared towards this purpose.  When working with a collection of Task<T> instances, WhenAny returns a Task<Task<T>>; this is a task that will complete when the first supplied task has completed, and that Task<Task<T>>’s Result will be that first completed task.  In our case, we don’t just want the first task, but rather we want all of the tasks, ordered in how they’ll complete.  We can represent this with a Task<Task<T>>[].  Think of this as an array of buckets where we’ll place the input tasks as they complete, one task per bucket.  So, if you want to get the first task to complete, you can await the first bucket of this array, and if you want to get the sixth task to complete, you can await the sixth bucket of this array.

public static Task<Task<T>> [] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
    var inputTasks = tasks.ToList();

    var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
    var results = new Task<Task<T>>[buckets.Length];
    for (int i = 0; i < buckets.Length; i++) 
    {
        buckets[i] = new TaskCompletionSource<Task<T>>();
        results[i] = buckets[i].Task;
    }

    int nextTaskIndex = -1;
    Action<Task<T>> continuation = completed =>
    {
        var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
        bucket.TrySetResult(completed);
    };

    foreach (var inputTask in inputTasks)
        inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

    return results;
}

So what’s happening here?  First we force our enumerable of tasks into a List<Task<T>>; this is to ensure that any tasks that might be produced lazily by enumerating the enumerable are reified once.  Next, we create TaskCompletionSource<Task<T>> instances to represent the buckets, one bucket per each of the tasks that will eventually complete.  Then we hook up a continuation to each input task: this continuation will get the next available bucket and store the newly completed task into it.  With this combinator, we can now rewrite our original code as follows:

List<Task<T>> tasks = …;
foreach(var bucket in Interleaved(tasks)) {
    var t = await bucket;
    try { Process(await t); }
    catch(OperationCanceledException) {}
    catch(Exception exc) { Handle(exc); }
}

To close out this discussion, let’s look at an example of what this actually does in practice.  Consider the following:

var tasks = new[] { 
    Task.Delay(3000).ContinueWith(_ => 3),
    Task.Delay(1000).ContinueWith(_ => 1), 
    Task.Delay(2000).ContinueWith(_ => 2),
    Task.Delay(5000).ContinueWith(_ => 5),
    Task.Delay(4000).ContinueWith(_ => 4),
};
foreach (var bucket in Interleaved(tasks)) {
    var t = await bucket;
    int result = await t;

    Console.WriteLine("{0}: {1}", DateTime.Now, result);
}

Here we have an array of Task<int>, each of will complete after N seconds and return the integer N (e.g. the first task in the array will complete after 3 seconds and return the number 3).  We then loop through these tasks using our handy Interleaved method, printing out results as we get them.  When I run this code, I see the following output:

8/2/2012 7:37:48 AM: 1
8/2/2012 7:37:49 AM: 2
8/2/2012 7:37:50 AM: 3
8/2/2012 7:37:51 AM: 4
8/2/2012 7:37:52 AM: 5

and this is exactly the behavior we wanted.  Note the times that each element was output. All of the tasks were started at the same time, so their timers are all running concurrently.  As each task completes, our loop is able to process it, and as a result we get one line of output each second. 

In contrast, consider the same example but not using Interleaved:

var tasks = new[] { 
    Task.Delay(3000).ContinueWith(_ => 3),
    Task.Delay(1000).ContinueWith(_ => 1), 
    Task.Delay(2000).ContinueWith(_ => 2),
    Task.Delay(5000).ContinueWith(_ => 5),
    Task.Delay(4000).ContinueWith(_ => 4),
};
foreach (var t in tasks) {
    int result = await t;
    Console.WriteLine("{0}: {1}", DateTime.Now, result);
}

When I run this variation, I see:

8/2/2012 7:42:08 AM: 3
8/2/2012 7:42:08 AM: 1
8/2/2012 7:42:08 AM: 2
8/2/2012 7:42:10 AM: 5
8/2/2012 7:42:10 AM: 4

Now look at the times.  Because we’re now processing the tasks in order, we couldn’t print out the results for the 1s task or the 2s task until the 3s task had completed (since it was before them in the array).  Similarly, we couldn’t print out the result for the 4s task until the 5s task completed.

Leave a Comment
  • Please add 3 and 1 and type the answer here:
  • Post
  • Could TPL DataFlow make this task easier?

  • Hi Mike-

    You could certainly implement the same pattern using constructs from the dataflow library.  For example, as the Tasks complete, they could store themselves into a BufferBlock<Task<T>> which the loop would then ReceiveAsync from to get the next completed task.

    You could also of course reframe the whole problem, using the dataflow library from the beginning to handle the asynchronous processing, rather than using tasks to represent each operation.  I was assuming for the purpose of this post, though, that you do have a task for each operation.

  • Interesting reading.

    Is it somehow possible to implement Interleave so that it returns IEnumerable<Task<T>> returning tasks in the order of completion?

  • OmariO: It's possible, though the tasks that get returned wouldn't be the original tasks supplied but rather proxies to the results of those tasks.  For example, instead of the implementation showed, I could have written something like this:

    public static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)

    {

       var inputTasks = tasks.ToList();

       var buckets = new TaskCompletionSource<T>[inputTasks.Count];

       var results = new Task<T>[buckets.Length];

       for (int i = 0; i < buckets.Length; i++)

       {

           buckets[i] = new TaskCompletionSource<T>();

           results[i] = buckets[i].Task;

       }

       int nextTaskIndex = -1;

       Action<Task<T>> continuation = completed =>

       {

           var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];

           if (completed.IsFaulted) bucket.TrySetException(completed.Exception.InnerExceptions);

           else if (completed.IsCanceled) bucket.TrySetCanceled();

           else bucket.TrySetResult(completed.Result);

       };

       foreach (var inputTask in inputTasks)

           inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

       return results;

    }

    Note that in the continuation delegate, I'm not storing the task itself into the bucket, but rather I'm transferring the exception, cancellation, or result from the completed task to the one that was provided to the caller.  Then as a consumer, I could do:

    foreach(var t in Interleaved(tasks)) { int result = await t; ...; }

    Unless you care about task reference equality, this would be basically what I think you're asking for.

    The reason you can't just return the original tasks is because IEnumerator.MoveNext & Current are synchronous members.  If you invoke MoveNext and the next task hasn't yet completed, in order to return the right task object from Current, you'd need to block in MoveNext or Current until the next task had completed so that you knew which object to return.

  • That would be a great use case for Reactive Extensions (Rx). It's best at handling streams of multiple asynchronous values, where the TPL is best at handling single asynchronous values.

    It should also be easy to convert the array of tasks into an observable steam.

  • Omer:

    Sure.  If you wanted to use async/await to write the processing code with the same kind of control flow you get with this example Interleaved method, I don't know that Rx would yield much improvement.  But certainly if you wanted to write this kind of logic in a callback style and composed into a larger solution, it would be helpful, e.g.

    List<Task<T>> tasks = ...;

    Observable

       .Merge(

           from t in tasks

           select t.ToObservable()

                      .Materialize())

       .Subscribe(n =>

       {

           switch (n.Kind)

           {

               case NotificationKind.OnNext: Process(n.Value); break;

               case NotificationKind.OnError: Handle(n.Exception); break;

           }

       });

    I say "composed into a larger solution", because just with this it's not much different than the ContinueWith-based solutions shown.  The real benefit to Rx here would be when you want to start chaining on more processing... then it becomes quite powerful.  It's very useful that Rx provides the ToObservable, ToTask, FirstAsync, LastAsync, etc. conversions that allow you to go back and forth between tasks and observables.

  • You're right Stephen in your observation (pun intended).

    However, I wouldn't normally deal with the observable source this way, i.e handle each element's error individually. If the errors are a faulty exceptional case, then probably it should end the stream in accordance with the Rx grammar and the fail-fast philosophy.

    This will also avoid the need to materialize the stream, and will result in a more compact and simple code.

    If the errors are not exceptional and should be expected, I'd open a 2nd Rx stream for them to propagate through, rather than the tuple approach (for either-value-or-error elements) that is achieved with the materialize operator. But that's of course debatable.

  • Thanks, Omer.  Understood, and I was writing it this way to remain consistent with the semantics of the other examples (if you were ok stopping processing as soon as the first exception occurred, then all of my previous examples would also be simplified, e.g. no try/catch or checking of the Task's status).  I think in these cases one also needs to be very thoughtful about ignoring exceptions; you've launched work that may or may not fail, so if an error does occur and you stop paying attention to those subsequent operations as a result, as you say you do need to strongly consider if and how you should make forward progress in your app.

  • Again - you're right of course.

    If an error does occur, I normally fail the app as fast as I can, so subsequent errors are not really a concern anymore.

    Unless of course the error is to be expected and then I would either handle it inside the task or within another channel like a 2nd Rx stream.

  • Thanks, Stephen. That is exactly what I meant.

  • Stephen, you have quickly become my new favourite blogger. This new TPL stuff can be quite deep and I've pretty much learned most of what I know through your articles.

    Now, if only I could get the hang of this Rx stuff which looks equally interesting.

  • Thanks, BlackLight.  I'm very glad you've found the posts helpful.

Page 1 of 1 (12 items)