Useful Abstractions Enabled with ContinueWith

Useful Abstractions Enabled with ContinueWith

  • Comments 6

In the June 2008 CTP of Parallel Extensions to the .NET Framework, we introduced the ContinueWith method on both Task and Future<T>.  ContinueWith is, in effect, a callback, very much like events in .NET.  With events, a causal action results in the event being raised, which by default triggers all of the delegates registered with the event to be invoked.  ContinueWith supports this, but rather than just registering the callback, it also provides back a Task or Future<T> that represents the callback; that returned Task or Future<T>'s lifecycle is tied specifically to the callback and won't be marked as IsCompleted until that callback completes.  While at its core this could be considered a relatively simple idea and is useful for general dataflow in applications, it also enables some important patterns and can serve as the building block for a whole host of larger abstractions.

For example, the June 2008 CTP doesn't provide support for mulit-item continuations (is this something you think we should provide in a future release?): you can create a continuation Task or Future<T> for when one Task or Future<T> finishes, but out-of-the-box you can't create one for when all of multiple Task or Future<T>'s complete.  Nevertheless, you can use the basic ContinueWith support to implement this additional support.  Here's an example of a ContinueWith method you could implement, where you provide it with multiple Task instances, and the resulting Task will only be scheduled and executed when all of the provided tasks complete (I've ommitted parameter validation and the like to simplify the code):

static Task ContinueWhenAll(
    Action<Task[]> continuation, params Task[] tasks)
{
    var starter = Future<bool>.Create();
    var task = starter.ContinueWith(o => continuation(tasks));

    CountdownEvent ce = new CountdownEvent(tasks.Length);
    Action<Task> whenComplete = delegate {
        if (ce.Decrement()) starter.Value = true;
    };
    foreach (var t in tasks)
        t.ContinueWith(whenComplete, TaskContinuationKind.OnAny,
            TaskCreationOptions.None, true);

    return task;
}

This ContinueWhenAll method tasks two parameters: an Action<Task[]> to execute when all of the provided tasks have completed (supplied with those tasks), and the params array of the Tasks to monitor for completion.  It then uses a workaround to create a Task that can be started at an arbitrary time in the future (in the June 2008 CTP, we don't directly support this capability, hence the workaround, but we're planning to in a future release as that capability is useful for a bunch of scenarios, including this one).  A delegate is created that will count down from the number of provided tasks every time it's invoked, and when the count reaches 0, it will start the task.  ContinueWith is then used to register that whenComplete action as a continuation for all of the parameter tasks.  And voila, we now have a ContinueWhenAll method, which can be used like:

Task t1 = ..., t2 = ...;
Task t3 = ContinueWhenAll(
    delegate { Console.WriteLine("t1 and t2 finished"); }, t1, t2);

Similarly, a method could be implemented for creating a task continuation for when any of a set of tasks completes (rather than when they all complete):

static Task ContinueWhenAny(
    Action<Task> continuation, params Task[] tasks)
{
    WriteOnce<Task> theCompletedTask = new WriteOnce<Task>();
    var starter = Future<bool>.Create();
    var task = starter.ContinueWith(o =>
        continuation(theCompletedTask.Value));

    Action<Task> whenComplete = t => {
        if (theCompletedTask.TrySetValue(t)) starter.Value = true;
    };
    foreach (var t in tasks)
         t.ContinueWith(whenComplete, TaskContinuationKind.OnAny,
            TaskCreationOptions.None, true);

    return task;
}

This implementation is very similar to ContinueWhenAll.  However, the action that's registered as the continuation for each task uses a WriteOnce<T> to store the first Task to complete and to schedule the continuation task when it does.  ContinueWhenAny can be used like:

Task t1 = ..., t2 = ...;
Task t3 = ContinueWhenAny(
    delegate { Console.WriteLine("t1 or t2 finished"); }, t1, t2);

There are other neat patterns enabled with ContinueWith.  One pattern that's starting to become popular with frameworks like the CCR or AsyncEnumerator is to take advantage of C#'s iterator support to make writing code that uses asynchronous operations a bit more like sequential code.  While not the primary focus of the Task Parallel Library, ContinueWith can be used to implement such patterns in at least a limited capacity.  Consider the following method:

static void RunAsync(IEnumerable<Task> iterator)
{
    var enumerator = iterator.GetEnumerator();
    Action a = null;
    a = delegate
    {
        if (enumerator.MoveNext())
        { 
            enumerator.Current.ContinueWith(delegate { a(); });
        }
        else enumerator.Dispose();
    };
    a();
}

This method accepts an IEnumerable<Task>.  It retrieves an enumerator from the enumerable and uses that enumerator in a delegate.  The delegate moves the enumerator to its next element, and if there is a next element, retrieves it and uses ContinueWith to schedule the delegate (recursively, in a sense) for execution when that current Task completes.  When the enumerator reaches the end, it's disposed.  With that delegate created, RunAsync simply executes the delegate to get the execution started.

Now imagine I wanted a method that would asynchronously read a file, processing it as it's read in.  I can implement such a method as follows:

static IEnumerable<Task> ReadFile(string path)
{
    using (FileStream fs = new FileStream(
         path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
    {
        var enc = new UTF8Encoding();
        byte[] data = new byte[0x1000];
        while (true)
        {
            var pendingRead = CreateFutureFromApm<int>(ac =>
                fs.BeginRead(data, 0, data.Length, ac, null), fs.EndRead);
            yield return pendingRead;
            int bytesRead = pendingRead.Value;
            if (bytesRead <= 0) break;

            Console.WriteLine(enc.GetString(data, 0, bytesRead));
        }
    }
}

ReadFile opens a file stream and continually reads from it asynchronously.  The code wraps usage of the FileStream's BeginRead/EndRead methods with a Future<int> (using the example method from http://blogs.msdn.com/8272833.aspx); that Future<int> is yielded from the ReadFile method (which returns an enumerator).  We'll execute ReadFile by passing it to RunAsync.  When the Future<int> completes, its continuation will be executed, which will cause the RunAsync iterator to MoveNext, thus ending up back in the ReadFile method just after the yield location.  The ContinueWhenAll method can be used to allow more complicated asynchronous methods to be written, by wrapping multiple asynchronous invocations and yielding a continuation for when they all complete.

There are a myriad of such interesting patterns that ContinueWith enables.  We'd love to hear about any useful ones you come up with.

Leave a Comment
  • Please add 4 and 7 and type the answer here:
  • Post
  • Hi toub.

    When I get a 42 core multi-core system...

    I'd like to watch 42 HD-DVD's in 42 windows at the same time...

    Is ContinueWith a useful abstraction for that scenario? (Maybe I need 1024 cores?)

    Thanks,

    art

  • Hi toub!

    Currently I’m writing network communication application which relies on .net APM (IAsyncResult + Begin... End… methods). I use this pattern because I want to minimize thread blocking. Threads are quite expensive, so I can’t allow thread just wait for response from remote system wasting system resources. The problem is that my code becomes too complicated quickly, especially when I introduce task cancelation, fault handling and time-outs. Currently I’m looking for simplified pattern to replace .net APM and, probably, standard library, written by someone else (ok, MS will suite me well :). Parallel Extensions to the .NET Framework seems to be good choice but I’m steel missing some functionality.

    The first is the Task in non-blocking scenarios. In “Wrapping an APM implementation with Future<T>” you wrote how to use Future to implement non-blocking  APM pattern wrapping, but lot of methods does not produce result, so using Task for wrapping would be preferred.

    Second is Future chaining. Consider I have some service, which should process request sequentially in some order (FIFO in most cases, but more complex scenarios also possible). I need to implement non-blocking post method, which will schedule request for execution. Also, due to limited numbers of threads, I can’t allocate separate thread for each service, thread should be allocated only when it needed and released when work is done.  Currently I’m using something like this:

    public Future<Result> Process()

    {

    var newFuture = Future<Result>.Create();

    var prevFuture = Interlocked.Exchange(ref PrevFuture, newFuture);

    if (prevFuture != null)

    prevFuture.ContinueWith(delegate

    {

    // here “if (!newFuture.IsCanceled)” may be placed

    OnProcess(newFuture);

    });

    else

    OnProcess(newFuture);

    return newFuture;

    }

    where OnProcess() starts non-blocking APM method and subscribes to result.  As you can see in this approach nearly unnecessary short running task created, the only work of which is to start non-blocking asynchronous operation. It would be nice to have some standard method to chain APM method wrappers.

  • Parallel Extensions makes it easy to wait for Tasks to complete.&#160; Task exposes a Wait method, which

  • Hi toub,

    regarding your ContinueWhenAll example.

    Why don't you just do this?:

    Task.WaitAll(t1, t2);

    Task.Create(delegate { Console.WriteLine("t1 and t2 finished"); });

    What is the difference to your ContinueWhenAll?

    Cheers, Urs

  • Urs,

    WaitAll blocks, while ContinueWhenAll does not.  ContinueWhenAll just schedules a delegate to run when all tasks are complete, and then continues on, while WaitAll blocks the current thread until all the tasks are complete.  This is an important distinction.

  • .Net 4.0 is going to be very interesting to exploit multicore processors. However, do you know a due date?

    In the meantime, I began working with multicore programming using current C# 2008, as I need more performance in some batch processes.

    I bought the book for beginners by Packt Publishing: "C# 2008 and 2005 Threaded Programming: Beginner’s Guide", by Gaston C. Hillar - http://www.packtpub.com/beginners-guide-for-C-sharp-2008-and-2005-threaded-programming/book

    The book covers a few chapters about Parallel Extensions. However, it also teaches to use the current threading model. I am achieving amazing results with my dual core CPU.

    Recommended for those who are trying to program for multicore CPUs.

    I can't wait to see Visual Studio 2010 in the market.

    But, please, provide a date!!!

Page 1 of 1 (6 items)