Waiting for Tasks

Waiting for Tasks

  • Comments 14

Parallel Extensions makes it easy to wait for Tasks to complete.  Task exposes a Wait method, which can be used trivially:

Task t = Task.Create(...);
...
t.Wait();

Task also exposes several static methods for waiting on an array of tasks, either for all of them to complete or for any of them to complete:

Task t1 = Task.Create(...);
Task t2 = Task.Create(...);
Task t3 = Task.Create(...);
...
Task.WaitAll(t1, t2, t3); // or Task.WaitAny(t1, t2, t3)

Moreover, Tasks have implicit parent/child relationships, and a parent Task will wait for any of its child tasks to complete before it completes:

Task p = Task.Create(delegate
{
    Task c1 = Task.Create(...);
    Task c2 = Task.Create(...);
    Task c3 = Task.Create(...);
});
...
p.Wait(); // will not wake up until c1, c2, and c3 have finished

Sometimes, however, you want to wait for a list of tasks to complete, but you don't necessarily want to have to hold onto references to all of the relevant Task objets.  Imagine a scenario where I'm creating tasks for each of an unknown number of data elements to be processed:

IEnumerable<Data> data = ...;
List<Task> tasks = new List<Task>();
foreach(var item in data) tasks.Add(Task.Create(delegate { Process(item); });
Task.WaitAll(tasks.ToArray());

The code is straightforward, but it also requires that I hold onto references to all of the Tasks I've created so that I can wait for all of them.  It's interesting to note, however, that a task that has already completed doesn't really need to be waited on.  As an example, if I create 10 tasks and 9 of them complete, waiting on the remaining 1 task is sufficient to know that all 10 completed.  This means that I only really need to wait on a count of tasks, where each new task increases the count, and where each task completing decreases the count: when the count reaches 0, all tasks have completed. 

Sound familiar? This is the domain of a CountdownEvent, which we can use to codify this approach:

IEnumerable<Data> data = ...;
using(CountdownEvent tasksRemaining = new CountdownEvent(1))
{
    foreach(var item in data)
    {
        tasksRemaining.Increment();
        Task.Create(delegate
        {
            Process(item);
            taskRemaining.Decrement();
        });
    }
    tasksRemaining.Decrement(); // to counteract the initial 1
    tasksRemaining.Wait();
}

The CountdownEvent is initialized to a count of 1 to ensure that the event doesn't become signaled before I'm done creating all of the tasks; I remove that 1 count when all tasks have been created.  Before creating each Task, I up the count by 1, and when each task finishes, I decrease the count by 1.  Then I just wait on the CountdownEvent, which will be signaled when all of the tasks have completed.

Of course, this requires that I incorporate such logic directly into my code, but there's really no need for that.  I can extract out this logic into a separate class:

public class TaskWaiter
{
    public CountdownEvent _ce = new CountdownEvent(1);
    private bool _doneAdding = false;

    public void Add(Task t)
    {
        if (t == null) throw new ArgumentNullException("t"); 
        _ce.Increment();
        t.ContinueWith(ct => _ce.Decrement());
    }

    public void Wait()
    {
        if (!_doneAdding) { _doneAdding = true; _ce.Decrement(); } 
        _ce.Wait();
    }
}

Rewriting the previous example is now almost identical, just substituting TaskWaiter for the List<Task>:

IEnumerable<Data> data = ...;
TaskWaiter tasks = new TaskWaiter();
foreach(var item in data) tasks.Add(Task.Create(delegate { Process(item); });
tasks.Wait();

Unlike the previous implementation that uses a List<Task>, this implementation now doesn't need to store references to any of the tasks.  For just a few tasks, it's probably not warranted.  But if lots of tasks are being create and/or any of the tasks will be long-running such that holding onto tasks could cause them to be promoted to higher GC generations, this could have a noticeable impact on the application's performance.

One neat thing to notice about the implementation of TaskWaiter: it uses ContinueWith.  I previously wrote about useful abstractions enabled with ContinueWith, and this is another such abstraction.  ContinueWith is used in order to get a bit of extra code to execute when a Task complete, such that the CountdownEvent is properly decremented.  This allows me to move that Decrement call out of the Task's body itself (as I had with the original CountdownEvent-based implementation shown earlier), which means this technique can be used with any arbitrary Task you're handed, rather than one you have to create yourself.

There are some downsides to the TaskWaiter approach, however.  The biggest that jumps to mind with the current implementation is that we lose the ability to propagate any exceptions that may have occurred from tasks tracked by TaskWaiter.  We can fix that by storing references to tasks that complete with unhandled exceptions, and then do a true wait only on those tasks:

public class TaskWaiter
{
    public CountdownEvent _ce = new CountdownEvent(1);
    private bool _doneAdding = false;
    private ConcurrentQueue<Task> _faulted =
        new ConcurrentQueue<Task>();

    public void Add(Task t)
    {
        if (t == null) throw new ArgumentNullException("t"); 
        _ce.Increment();
        t.ContinueWith(ct =>
        { 
            if (ct.Exception != null) _faulted.Enqueue(ct);
            _ce.Decrement();
        });
    }

    public void Wait()
    {
        if (!_doneAdding) { _doneAdding = true; _ce.Decrement(); } 
        _ce.Wait(); 
        if (!_faulted.IsEmpty) Task.WaitAll(_faulted.ToArray());
    }
}

Notice the few changes.  A ConcurrentQueue<Task> was added to track any tasks that completed due to exception; this collection needs to be thread-safe, as multiple tasks could fail on different threads and attempt to access the queue from different threads, hence the ConcurrentQueue<Task>.  The delegate passed to ContinueWith has been augmented to check to see if the completing Task completed due to an exception, and if it did, the Task is added to the queue.  Then in Wait, after waiting on the CountdownEvent, I use Task.WaitAll to wait on all of the tasks that completed due to exceptions.  By this point, they've of course already completed, so I'm using Task.WaitAll simply to aggregate and throw all of the exceptions.

There's a subtlety about how I made these changes that's important to keep in mind.  Specifically, it's crucial that the faulting task be added to the ConcurrentQueue<T> before the CountdownEvent is decremented; if those lines were reversed in the ContinueWith delegate, the CountdownEvent could become set before the Task was tracked in the queue, in which case the exceptions from that Task may be missed in the call to TaskWaiter.Wait, since the Task may not yet have made it into the queue by the time Task.WaitAll is called. 

There are, of course, other optimizations that could be made to an implementation like this one, but we'll save those for another day.

Leave a Comment
  • Please add 5 and 8 and type the answer here:
  • Post
  • PingBack from http://wordnew.acne-reveiw.info/?p=16361

  • Hi,

    You could also extend the code to follow through the waitable time period of the CountdownEvent

           public bool Wait()

           {

               return Wait(TimeSpan.MinValue);

           }

           public bool Wait(TimeSpan wait)

           {

               bool ret = false;

               if (!_doneAdding)

               {

                   _doneAdding = true;

                   _ce.Decrement();

               }

               ret = _ce.Wait(wait);

               if (!ret)

                   return ret;

               if (!_faulted.IsEmpty)

                   Task.WaitAll(_faulted.ToArray());

               return true;

           }

  • Thanks, Marshall; certainly something you could add. Although you wouldn't want to use TimeSpan.MinValue like this: -1 is actually the sentinel value for an infinite timeout.  You probably want to make the overload "public bool Wait(int millisecondsTimeout)" and then pass in Timeout.Infinite or -1 as the timeout value used by the parameterless Wait overload.

  • I assume what Marshall wanted is to wait for the minimum amount of time, which would be this:

       return Wait(TimeSpan.Zero);

    Using TimeSpan.MinValue in that way will actually throw an ArgumentOutOfRangeException.

    You could wait infinitely like this:

    task.Wait(new TimeSpan(0,0,0,0,-1));

    But, I think that's unintuitive and contradictory.  I think it would be much clearer if Task.Wait(TimeSpan) threw and ArgumentOutOfRangeException on any "negative" TimeSpan (i.e. where ticks is less than 0, not -1).  The sentinel value for "infinite", with Wait(TimeSpan) should then be TimeSpan.MaxValue.

    The fact that the underlying framework uses -1 for infinity is a leaky abstraction with Wait(TimeSpan).

    I would suggest Task.Wait(TimeSpan timeout) be written more like:

    long milliseconds = -1;

    if (timeout != TimeSpan.MaxValue)

    {

    if (timeout.Ticks < 0 || timeout.TotalMilliseconds > Int32.MaxValue)

    {

    throw new ArgumentOutOfRangeException("timeout");

    }

    milliseconds = (long)timeout.TotalMilliseconds;

    }

    return this.Wait((int)milliseconds);

  • Yes, I caught that issue with Timespan, seems weird to me that Minvalue is not actually the minimum value, but actually means something else.  Timespan.Infinite would be more logical, but hey ho.  

    The reason I needed to add the Timeout was that you can never guarantee that a task will actually ever return (infinite loop etc) and this would break long running apps.  Perhaps the Task.Create could include a Timeout overload that is a reasonable period of time for the task to complete before firing an exception?

    By the way guys, I was using the CCR before and took a lot of persuading to move over to TPL as I really liked the Port methodology.  CCR however is licensed at $399 and I'm really liking the TPL API at the moment.  One issue I had with the CCR was the Policy Constraints you could place on a port for scheduling which were giving me some problems where the Pool was hung in cases, but this was resolved with the TPL by creating different TaskManagers with different number of threads - really nice :)

    I am now fully converted to TPL, but with the abstract nature of the library, the posts in this blog are essential to understaning how to use it.

    Thanks

    Marshall

  • Parallel Extensions includes the System.Threading.Parallel class, which provides several high-level loop

  • @Marshal.  TimeSpan.MinValue is the minium value; but as with signed types it's a negative value.

    In many of the contexts that TimeSpan is used, a negative span of time is meaningless, unfortunately.

  • PingBack from http://besteyecreamsite.info/story.php?id=642

  • are there similar examples for the new .net 4.0? some of the core methods have been changed, also make sure you include visual basic 10 examples, not everyone can translate what you have from C# to vb.net.

    -thanks.

  • Hi Tamer-

    This functionality and more is certainly possible with .NET 4, albeit with slightly changed syntax.  We'll definitely follow-up with such examples, hopefully in the near future.

    Thanks for the interest!  Requests like this help us to prioritize.

  • Hi Stephen,

    Could you comment and confirm Jeffrey Richter explanation on wait, I feel uncomfortable that wait can in certain cases may not wait...

    "When  a thread calls the Wait  method, the system checks if the Task  that the thread is waiting for has started executing. If it has, then the thread calling Wait  will block until the Task  has completed running. But if the Task  has not started executing yet, then the system may  (depending on the TaskScheduler ) execute the Task  by using the thread that called Wait . If this happens, then the thread calling Wait  does not block; it executes the Task  and returns immediately."

  • @Rohit:  Wait will not return until the Task has completed.  The distinction Jeffrey is making is what Wait does while it's waiting for the Task to complete... does it block waiting for another thread to execute the task, or does it execute it itself?  In both cases, the Task will be completed by the time Wait returns, it's just a question of which thread is actually processing the task in the meantime.

  • I didn't find TaskWaiter in Parallel Extra. Is there any updated version available or can I use the same.

    as you mentioned in the post, we hold the reference of Task in List and once threshold reached we are using WaitAll but later we found that it was not performing well. I used TaskWaiter and like the approach. It's cleaner version then Task Batch concept. I like to use in my project and want to know any updated version available.

  • @Brijen: The code on this post is a bit stale, as it was written based on pre-.NET 4 CTPs of the Task Parallel Library.  However, the TaskWaiter code above should be fairly close to working; from a quick glance, I think if you just change Increment to AddCount and Decrement to Signal, it should compile and work.

Page 1 of 1 (14 items)