Welcome to MSDN Blogs Sign in | Join | Help
Waiting for Tasks

Parallel Extensions makes it easy to wait for Tasks to complete.  Task exposes a Wait method, which can be used trivially:

Task t = Task.Create(...);
...
t.Wait();

Task also exposes several static methods for waiting on an array of tasks, either for all of them to complete or for any of them to complete:

Task t1 = Task.Create(...);
Task t2 = Task.Create(...);
Task t3 = Task.Create(...);
...
Task.WaitAll(t1, t2, t3); // or Task.WaitAny(t1, t2, t3)

Moreover, Tasks have implicit parent/child relationships, and a parent Task will wait for any of its child tasks to complete before it completes:

Task p = Task.Create(delegate
{
    Task c1 = Task.Create(...);
    Task c2 = Task.Create(...);
    Task c3 = Task.Create(...);
});
...
p.Wait(); // will not wake up until c1, c2, and c3 have finished

Sometimes, however, you want to wait for a list of tasks to complete, but you don't necessarily want to have to hold onto references to all of the relevant Task objets.  Imagine a scenario where I'm creating tasks for each of an unknown number of data elements to be processed:

IEnumerable<Data> data = ...;
List<Task> tasks = new List<Task>();
foreach(var item in data) tasks.Add(Task.Create(delegate { Process(item); });
Task.WaitAll(tasks.ToArray());

The code is straightforward, but it also requires that I hold onto references to all of the Tasks I've created so that I can wait for all of them.  It's interesting to note, however, that a task that has already completed doesn't really need to be waited on.  As an example, if I create 10 tasks and 9 of them complete, waiting on the remaining 1 task is sufficient to know that all 10 completed.  This means that I only really need to wait on a count of tasks, where each new task increases the count, and where each task completing decreases the count: when the count reaches 0, all tasks have completed. 

Sound familiar? This is the domain of a CountdownEvent, which we can use to codify this approach:

IEnumerable<Data> data = ...;
using(CountdownEvent tasksRemaining = new CountdownEvent(1))
{
    foreach(var item in data)
    {
        tasksRemaining.Increment();
        Task.Create(delegate
        {
            Process(item);
            taskRemaining.Decrement();
        });
    }
    tasksRemaining.Decrement(); // to counteract the initial 1
    tasksRemaining.Wait();
}

The CountdownEvent is initialized to a count of 1 to ensure that the event doesn't become signaled before I'm done creating all of the tasks; I remove that 1 count when all tasks have been created.  Before creating each Task, I up the count by 1, and when each task finishes, I decrease the count by 1.  Then I just wait on the CountdownEvent, which will be signaled when all of the tasks have completed.

Of course, this requires that I incorporate such logic directly into my code, but there's really no need for that.  I can extract out this logic into a separate class:

public class TaskWaiter
{
    public CountdownEvent _ce = new CountdownEvent(1);
    private bool _doneAdding = false;

    public void Add(Task t)
    {
        if (t == null) throw new ArgumentNullException("t"); 
        _ce.Increment();
        t.ContinueWith(ct => _ce.Decrement());
    }

    public void Wait()
    {
        if (!_doneAdding) { _doneAdding = true; _ce.Decrement(); } 
        _ce.Wait();
    }
}

Rewriting the previous example is now almost identical, just substituting TaskWaiter for the List<Task>:

IEnumerable<Data> data = ...;
TaskWaiter tasks = new TaskWaiter();
foreach(var item in data) tasks.Add(Task.Create(delegate { Process(item); });
tasks.Wait();

Unlike the previous implementation that uses a List<Task>, this implementation now doesn't need to store references to any of the tasks.  For just a few tasks, it's probably not warranted.  But if lots of tasks are being create and/or any of the tasks will be long-running such that holding onto tasks could cause them to be promoted to higher GC generations, this could have a noticeable impact on the application's performance.

One neat thing to notice about the implementation of TaskWaiter: it uses ContinueWith.  I previously wrote about useful abstractions enabled with ContinueWith, and this is another such abstraction.  ContinueWith is used in order to get a bit of extra code to execute when a Task complete, such that the CountdownEvent is properly decremented.  This allows me to move that Decrement call out of the Task's body itself (as I had with the original CountdownEvent-based implementation shown earlier), which means this technique can be used with any arbitrary Task you're handed, rather than one you have to create yourself.

There are some downsides to the TaskWaiter approach, however.  The biggest that jumps to mind with the current implementation is that we lose the ability to propagate any exceptions that may have occurred from tasks tracked by TaskWaiter.  We can fix that by storing references to tasks that complete with unhandled exceptions, and then do a true wait only on those tasks:

public class TaskWaiter
{
    public CountdownEvent _ce = new CountdownEvent(1);
    private bool _doneAdding = false;
    private ConcurrentQueue<Task> _faulted =
        new ConcurrentQueue<Task>();

    public void Add(Task t)
    {
        if (t == null) throw new ArgumentNullException("t"); 
        _ce.Increment();
        t.ContinueWith(ct =>
        { 
            if (ct.Exception != null) _faulted.Enqueue(ct);
            _ce.Decrement();
        });
    }

    public void Wait()
    {
        if (!_doneAdding) { _doneAdding = true; _ce.Decrement(); } 
        _ce.Wait(); 
        if (!_faulted.IsEmpty) Task.WaitAll(_faulted.ToArray());
    }
}

Notice the few changes.  A ConcurrentQueue<Task> was added to track any tasks that completed due to exception; this collection needs to be thread-safe, as multiple tasks could fail on different threads and attempt to access the queue from different threads, hence the ConcurrentQueue<Task>.  The delegate passed to ContinueWith has been augmented to check to see if the completing Task completed due to an exception, and if it did, the Task is added to the queue.  Then in Wait, after waiting on the CountdownEvent, I use Task.WaitAll to wait on all of the tasks that completed due to exceptions.  By this point, they've of course already completed, so I'm using Task.WaitAll simply to aggregate and throw all of the exceptions.

There's a subtlety about how I made these changes that's important to keep in mind.  Specifically, it's crucial that the faulting task be added to the ConcurrentQueue<T> before the CountdownEvent is decremented; if those lines were reversed in the ContinueWith delegate, the CountdownEvent could become set before the Task was tracked in the queue, in which case the exceptions from that Task may be missed in the call to TaskWaiter.Wait, since the Task may not yet have made it into the queue by the time Task.WaitAll is called. 

There are, of course, other optimizations that could be made to an implementation like this one, but we'll save those for another day.

Posted: Tuesday, August 05, 2008 6:52 PM by toub

Comments

Marshall Brooke said:

Hi,

You could also extend the code to follow through the waitable time period of the CountdownEvent

       public bool Wait()

       {

           return Wait(TimeSpan.MinValue);

       }

       public bool Wait(TimeSpan wait)

       {

           bool ret = false;

           if (!_doneAdding)

           {

               _doneAdding = true;

               _ce.Decrement();

           }

           ret = _ce.Wait(wait);

           if (!ret)

               return ret;

           if (!_faulted.IsEmpty)

               Task.WaitAll(_faulted.ToArray());

           return true;

       }

# August 6, 2008 4:51 AM

toub said:

Thanks, Marshall; certainly something you could add. Although you wouldn't want to use TimeSpan.MinValue like this: -1 is actually the sentinel value for an infinite timeout.  You probably want to make the overload "public bool Wait(int millisecondsTimeout)" and then pass in Timeout.Infinite or -1 as the timeout value used by the parameterless Wait overload.

# August 6, 2008 11:21 AM

Peter Ritchie said:

I assume what Marshall wanted is to wait for the minimum amount of time, which would be this:

   return Wait(TimeSpan.Zero);

Using TimeSpan.MinValue in that way will actually throw an ArgumentOutOfRangeException.

You could wait infinitely like this:

task.Wait(new TimeSpan(0,0,0,0,-1));

But, I think that's unintuitive and contradictory.  I think it would be much clearer if Task.Wait(TimeSpan) threw and ArgumentOutOfRangeException on any "negative" TimeSpan (i.e. where ticks is less than 0, not -1).  The sentinel value for "infinite", with Wait(TimeSpan) should then be TimeSpan.MaxValue.

The fact that the underlying framework uses -1 for infinity is a leaky abstraction with Wait(TimeSpan).

I would suggest Task.Wait(TimeSpan timeout) be written more like:

long milliseconds = -1;

if (timeout != TimeSpan.MaxValue)

{

if (timeout.Ticks < 0 || timeout.TotalMilliseconds > Int32.MaxValue)

{

throw new ArgumentOutOfRangeException("timeout");

}

milliseconds = (long)timeout.TotalMilliseconds;

}

return this.Wait((int)milliseconds);

# August 6, 2008 12:40 PM

Marshall Brooke said:

Yes, I caught that issue with Timespan, seems weird to me that Minvalue is not actually the minimum value, but actually means something else.  Timespan.Infinite would be more logical, but hey ho.  

The reason I needed to add the Timeout was that you can never guarantee that a task will actually ever return (infinite loop etc) and this would break long running apps.  Perhaps the Task.Create could include a Timeout overload that is a reasonable period of time for the task to complete before firing an exception?

By the way guys, I was using the CCR before and took a lot of persuading to move over to TPL as I really liked the Port methodology.  CCR however is licensed at $399 and I'm really liking the TPL API at the moment.  One issue I had with the CCR was the Policy Constraints you could place on a port for scheduling which were giving me some problems where the Pool was hung in cases, but this was resolved with the TPL by creating different TaskManagers with different number of threads - really nice :)

I am now fully converted to TPL, but with the abstract nature of the library, the posts in this blog are essential to understaning how to use it.

Thanks

Marshall

# August 7, 2008 7:19 AM

Parallel Programming with .NET said:

Parallel Extensions includes the System.Threading.Parallel class, which provides several high-level loop

# August 7, 2008 7:41 PM

Peter Ritchie said:

@Marshal.  TimeSpan.MinValue is the minium value; but as with signed types it's a negative value.

In many of the contexts that TimeSpan is used, a negative span of time is meaningless, unfortunately.

# August 8, 2008 11:55 AM
Leave a Comment

(required) 

(required) 

(optional)

(required) 

Comment Notification

If you would like to receive an email when updates are made to this post, please register here

Subscribe to this post's comments using RSS

Page view tracker