Asynchronous methods, C# iterators, and Tasks

Asynchronous methods, C# iterators, and Tasks

Rate This
  • Comments 23

More and more, developers are realizing the significant scalability advantages that asynchronous programming can provide, especially as it relates to I/O.

Consider an application that needs to copy data from one stream to another stream, such as is being done in the following synchronous implementation:

static void CopyStreamToStream(Stream input, Stream output)
{
    // Buffer space for the data to be read and written
    byte [] buffer = new byte[0x2000];

    // While there’s data to be read and written
    while(true)
    {
        // Read data. If we weren’t able to read any, bail.
        // Otherwise, write it out and start over again.
        int numRead = input.Read(buffer, 0, buffer.Length);
        if (numRead == 0) break;
        output.Write(buffer, 0, numRead);
    }
}

In many cases like this, such as if the Streams are FileStream instances representing files on disk, or NetworkStream instances representing remote data, the operations being performed require little-to-no computational power, as the majority of the time is spent waiting on the I/O subsystems and devices. If one such operation is being performed at a time, that waiting isn’t such a big deal. But in a synchronous implementation like the one above, that waiting ends up blocking a thread, rendering the thread useless to do anything else while waiting. Threads by default take up a sizeable chunk of memory as well as kernel resources. Thus, if multiple concurrent calls to CopyStreamToStream are executed, multiple threads may be wasted. Since threads consume a non-negligable amount of resources, we try to limit the number of threads in an application at any one time, such as by using a thread pool, and this synchronous style of programming I/O can lead to scalability bottlenecks, especially in server components where we desire to process as many user requests concurrently as the machine’s resources will possibly allow.

One solution to this problem is through compiler support. A compiler could recognize this synchronous pattern and translate it into an asynchronous one. Such a transformation of that same snippet might look like the following (note that this is hand-generated and is not the actual output of any particular compiler, and rather is my attempt to write this out concisely while still human-understandable):

static void CopyStreamToStreamAsync(
    Stream input, Stream output, Action<Exception> completed)
{
    // Buffer space for the data to be read and written
    byte[] buffer = new byte[0x2000];

    // The read/write loop. The parameter is the IAsyncResult of the
    // last read operation that still need to be completed with a call
    // to EndRead. If the parameter is null, that means a new read needs
    // to be started.
    Action<IAsyncResult> readWriteLoop = null;
    readWriteLoop = iar =>
    {
        try
        {
            // Determine whether to start with a BeginRead or 
            // an EndRead/BeginWrite, based on whether iar is null.
            // Then, as long as the loop continues, alternate between
            // reading and writing.
            for (bool isRead = iar == null; ; isRead = !isRead)
            {
                switch (isRead)
                {
                    // Do BeginRead(...)
                    case true:
                        // Start the asynchronous read
                        iar = input.BeginRead(buffer, 0, buffer.Length, readResult =>
                        {
                            // If the read completed synchronously, immediately
                            // return from the callback, as the processing of the
                            // read will be handled synchronously
                            // from the same thread that called BeginRead
                            if (readResult.CompletedSynchronously) return;

                            // The read completed asynchronously, so we need 
                            // to run the processing loop,
                            // starting with EndRead/BeginWrite.
                            readWriteLoop(readResult);
                        }, null);

                        // If the read is completing asynchronously, bail, as
                        // there's nothing more to do.
                        // If it completed synchronously, loop around to do
                        // the EndRead/BeginWrite synchronously.
                        if (!iar.CompletedSynchronously) return;
                        break;

                   // Do BeginWrite(...)
                   case false:
                       // Complete the previous read. If there's no more data
                       // to be read/written, bail.
                       int numRead = input.EndRead(iar);
                       if (numRead == 0)
                       {
                           completed(null);
                           return;
                       }

                       // Now that we know how much data was read, write it
                       // out asynchronously
                       iar = output.BeginWrite(buffer, 0, numRead, writeResult =>
                       {
                           // If the write completed synchronously, allow
                           // the thread that called BeginWrite
                           // to handle it.
                           if (writeResult.CompletedSynchronously) return;

                           // Otherwise, complete the asynchronous write
                           // and launch the read/write loop
                           // to continue all over again.
                           output.EndWrite(writeResult);
                           readWriteLoop(null);
                       }, null);

                       // If the write is completing asynchronously, bail,
                       // as there's nothing more to do.
                       // Otherwise, complete the write synchronously and
                       // loop around.
                       if (!iar.CompletedSynchronously) return;
                       output.EndWrite(iar);
                       break;
                   }
               }
           } catch(Exception e) { completed(e); }
    };

    // Start the whole process off with a read.
    readWriteLoop(null);
}

The Axum compiler, available on DevLabs, is actually capable of these kinds of transformations for asynchronous programming, and you could imagine such functionality being baked into a mainstream language like C#. Here’s how the code could be written asynchronously with Axum:

static asynchronous void CopyStreamToStream(Stream input, Stream output)
{
    byte [] buffer = new byte[0x2000];
    while(true)
    {
        int numRead = input.Read(buffer, 0, buffer.Length);
        if (numRead == 0) break;
        output.Write(buffer, 0, numRead);
    }
}

That’s quite lovely. Comparatively, the hand-written code is mind-numbing, and it’s not something you want to have to write each and every time you need to perform some kind of repeating operation like this asynchronously.

As such, some developers have started to take advantage of C# iterators for writing asynchronous code. While not originally designed for this purpose, the compiler transformations employed for C# iterators are similar to what’s necessary for writing asynchronous code, and thus with a bit of library-based support, it’s possible to write an iterator that looks sequential but that takes advantage of asynchrony. Several libraries have been based on this approach, including the Concurrency & Coordination Runtime (CCR) from Microsoft Robotics, Jeffrey Richter’s AsyncEnumerator, and others.

The key to taking advantage of this pattern is yielding something from an iterator that can invoke a callback when an operation completes. The pattern then becomes:

IEnumerable<ThingThatHasCallbackWhenCompletes> AsyncMethod()
{
    ...
    yield return SomethingThatReturnsThingThatHasCallbackWhenCompletes();
    ... // code here executes when the 
        // yielded ThingThatHasCallbackWhenCompletes completes
}

The idea is that the iterator method returns an IEnumerable of instances that represent a piece of an asynchronous operation. A utility function is used to invoke the iterator method, and iterates over the resulting enumerator. Each time the utility function gets the next instance from the enumerator, it registers some code to monitor the operation for completion, and when the operation completes, it moves next on the enumerator. Moving next on the enumerator results in re-entering the iterator method at the code location after the last yield point, thus allowing another asynchronous operation to be yielded. In this fashion, the iterator method can in effect yield asynchronous operations, and by using “yield return” to instrument the code with those async points, you as the developer can write an asynchronous method in a manner that looks largely sequential.

Now, think about the description above for the kind of object that needs to be yielded: “something that can invoke a callback when an operation completes.” Sound familiar? The System.Threading.Tasks.Task class in .NET 4 provides this exact functionality. A Task represents an asynchronous operation, and it has a ContinueWith method that enables a callback to be invoked when that asynchronous operation completes. Thus, we should be able to yield Task instances from an iterator in order to write an asynchronous method. Here’s the same CopyStreamToStream example implemented asynchronously in this fashion:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    // Buffer space for the data to be read and written
    byte [] buffer = new byte[0x2000];

    // While there’s data to be read and written
    while(true)
    {
        // Read data asynchronously. When the operation completes,
        // if no data could be read, we’re done.
        var read = Task<int>.Factory.FromAsync(
            input.BeginRead, input.EndRead, buffer, 0, buffer.Length, null,
            TaskCreationOptions.DetachedFromParent);
        yield return read;
        if (read.Result == 0) break;

        // Write the data asynchronously
        yield return Task.Factory.FromAsync(
            output.BeginWrite, output.EndWrite, buffer, 0, read.Result, null,
            TaskCreationOptions.DetachedFromParent);
    }
}

Much simpler. Where we were previously doing synchronous reads and writes, now we’re yielding the result of calling the built-in Task.Factory.FromAsync method, which creates Tasks that represent asynchronous reads and writes following the APM pattern. We could of course simplify this code further by using a few helper extension methods to hide some of the asynchronous details (these helpers are part of the Beta 1 samples at http://code.msdn.microsoft.com/ParExtSamples):

public static Task<int> ReadTask(this Stream stream,
    byte [] buffer, int offset, int count)
{
    return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead,
        buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}

public static Task WriteTask(this Stream stream,
    byte [] buffer, int offset, int count)
{
    return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite,
        buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}

This then enables the previous code to be simplified to:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    byte [] buffer = new byte[0x2000];
    while(true)
    {
        var read = input.ReadTask(buffer, 0, buffer.Length);
        yield return read;
        if (read.Result == 0) break;
        yield return output.WriteTask(buffer, 0, read.Result);
    }
}

That looks a lot like the synchronous version, and is *much* easier and less error-prone to write than the manual version shown earlier.

Of course, now we need a mechanism for iterating over the asynchronous iterator. As mentioned, we can take advantage of ContinueWith for the main body of the operation (as with the earlier helpers, the following method and several variants of it are available in the Beta 1 samples):

public static Task Iterate(this TaskFactory factory,
    IEnumerable<Task> asyncIterator)
{
    // Validate parameters
    if (factory == null) throw new ArgumentNullException("factory");
    if (asyncIterator == null)
        throw new ArgumentNullException("asyncIterator");

    // Get the scheduler to use, either the one provided by the factory
    // or the current one if the factory didn’t have one specified.
    var scheduler = factory.TaskScheduler ?? TaskScheduler.Current;

    // Get an enumerator from the enumerable
    var enumerator = asyncIterator.GetEnumerator();
    if (enumerator == null) throw new InvalidOperationException();

    // Create the task to be returned to the caller. And ensure
    // that when everything is done, the enumerator is cleaned up.
    var trs = new TaskCompletionSource<object>(factory.CreationOptions);
    trs.Task.ContinueWith(_ => enumerator.Dispose(),
        TaskContinuationOptions.DetachedFromParent, scheduler);

    // This will be called every time more work can be done.
    Action<Task> recursiveBody = null;
    recursiveBody = antecedent =>
    {
        try
        {
            // If the previous task completed with any exceptions, bail
            if (antecedent != null && antecedent.IsFaulted)
                trs.TrySetException(antecedent.Exception);

            // If the user requested cancellation, bail.
            else if (trs.Task.IsCancellationRequested) trs.TrySetCanceled();

            // If we should continue iterating and there's more to iterate
            // over, create a continuation to continue processing. We only
            // want to continue processing once the current Task (as yielded
            // from the enumerator) is complete.
            else if (enumerator.MoveNext())
                enumerator.Current.ContinueWith(recursiveBody,
                    TaskContinuationOptions.DetachedFromParent, scheduler).
                        IgnoreExceptions();

            // Otherwise, we're done!
            else trs.TrySetResult(null);
        }
        // If MoveNext throws an exception, propagate that to the user
        catch (Exception exc) { trs.TrySetException(exc); }
    };

    // Get things started by launching the first task
    factory.StartNew(() => recursiveBody(null),
        TaskCreationOptions.DetachedFromParent, scheduler).
            IgnoreExceptions();

    // Return the representative task to the user
    return trs.Task;
}

Using this implementation, we can now run “asynchronous methods” that return IEnumerable<Task>, as did our CopyStreamToStreamAsync method:

var asyncOperation = Task.Factory.Iterate(
    CopyStreamToStreamAsync(input, output));

Note that the Iterate implementation shown includes a few handy additions on top of the previous hand-coded solution. First, the Iterate method returns a Task, which can be used to track the entire asynchronous operation. Second, it supports cancellation, meaning a caller can request that the asynchronous iteration to shutdown early, even if it hasn’t completed yet. Third, we’ve now separated out the run logic into a separate method, which means that we no longer need all of that goop in the actual target asynchronous method.

On top of all that, this implementation is now based on TaskFactory, which means we can do things like ensure that the code runs on a certain scheduler, such as a scheduler that targets the UI. As an example of where that is handy, consider a method that asynchronously reads from a long, remote stream and stores the resulting data into a TextBox as it’s available:

static IEnumerable<Task> ReadStreamIntoTextBox(Stream stream)
{
    byte [] buffer = new byte[0x2000];
    Encoding enc = new UTF8Encoder();
    while(true)
    {
        var read = stream.ReadTask(buffer, 0, buffer.Length);
        yield return read;
        if (read.Result == 0) break;
        myTextBox.Text += enc.GetString(buffer, 0, read.Result)
    }
}

As previously shown, I could invoke this method as follows:

public void button1_Click(object sender, EventArgs e)
{
    Task.Factory.Iterate(ReadStreamIntoTextBox(inputStream)); // buggy
}

However, accessing myTextBox.Text from a thread other than the thread that created myTextBox is a no-no, and yet that’s potentially what will happen in the above. To address that, I want to ensure that the actual code from the iterator is executed on the UI thread (but I still don’t want the asynchronous operations to block the UI thread). To accomplish that, I can create a TaskFactory that will run tasks on the UI thread, as I do in the following code:

public void button1_Click(object sender, EventArgs e)
{
    var uiFactory = new TaskFactory(
        TaskScheduler.FromCurrentSynchronizationContext());
    uiFactory.Iterate(ReadStreamIntoTextBox(inputStream));
}

Of course, while just being able to yield individual operations is useful, things get more interesting when you start considering multi-task continuations, as exposed through ContinueWhenAny and ContinueWhenAll. For example, in our previous copy stream example, we’re reading, then writing, then reading, then writing, and so forth. But we should be able to write the previously read bits while reading the next chunk, thereby achieving better speeds by overlapping latencies. Writing the code to do that using manual asynchrony would be a nightmare… with iterators and tasks, it’s manageable, almost fun:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    byte[][] buffers = new byte[2][] { 
        new byte[BUFFER_SIZE], new byte[BUFFER_SIZE] };
    int filledBufferNum = 0;
    Task writeTask = null;

    while (true)
    {
        var readTask = input.ReadTask(
            buffers[filledBufferNum], 0, buffers[filledBufferNum].Length);
        yield return writeTask == null ?
            readTask :
            Task.Factory.ContinueWhenAll(new[] { readTask, writeTask },
                tasks => Task.WaitAll(tasks), // to propagate exceptions
                TaskContinuationOptions.DetachedFromParent);
        if (readTask.Result == 0) break;

        writeTask = output.WriteTask(
            buffers[filledBufferNum], 0, readTask.Result);
        filledBufferNum = filledBufferNum == 0 ? 1 : 0;
    }
}

Here, ContinueWhenAll is used to ensure that the iterator isn’t re-entered until both any pending writes and reads have completed.

There are of course many variations to this code that you could implement. At the end of the day, I find it quite interesting to see how the Task primitive can be used to enable such scenarios.

Leave a Comment
  • Please add 3 and 8 and type the answer here:
  • Post
  • Any iterator based scheduling is easier to use than closure continuations, but you can't put yield statement in a try-catch block. It makes error handling difficult.

    That's why good support of asynchrony/continuation at a language/runtime level is so important.

  • Omari, you'll hear no argument from me; language-based support is definitely welcome.  Have you looked at Axum and how it handles asynchronous methods?  Thoughts?  In the meantime, I think it's nice to see how Tasks can be used with an iterator-based scheme to support asynchronous programming.

  • Hi there,

    What's wrong with using monads/"workflows" like F# does? F#'s async expressions add very little overhead to the code, but they're defined completely in normal user code -- the only thing hard-baked is the desugaring code.

  • The support F# provides is cool, and I'd personally welcome similar support in languages like C# and Visual Basic. (As an example, Luke Hoban has a nice snippet in his blog post at http://blogs.msdn.com/lukeh/archive/2009/06/26/icfp-programming-contest-2009.aspx.)  I'll also note that F# provides conversions between Async<'T> and Task<T>, so that given a Task<T>, you can use .AwaitValue to get an Async<'T>, and given an Async<'T>, you can use .StartAsTask to get a Task<T>.

  • Great stuff! It's good that the TPL has enough expressive power to handle this.

  • awsome stuff :)

    regarding the yield return and exceptions, isnt that sort of handled in the iterating code? there you wrap the moveNext in a try catch and in the case of errors -that- task aggregates the error right?

    i guess it makes life for the implementer more difficult but does it really matter from the consumer code prospective?

    im not agaisnt language integration, but concurrecy is so hard and locking a perticular approach to the language seems risky :) i think i saw an interview with anders where he gave this as a reason for not including something like that in c# 3.0

    but in the future, who knows :) it would certinly be useful :)

  • Don't know about C# . But asynchronous methods - yes - spot on - much simpler and safer .

  • aL, re: "regarding the yield return and exceptions, isnt that sort of handled in the iterating code?"... yes, it is.  Exceptions that occur in the iterator body as well as exceptions that occur in yielded tasks are captured and transfered to the task returned from Task.Factory.Iterate.

    Joe, Kim, aL, thanks for the thoughts.

  • Hey, i just wrote an asynchronous foreach extension. I am clearly not as knowledgeable as you are on this topic. Would love for you to check it out.

    http://thelifepattern.blogspot.com/2009/07/threaded-foreach-extension-for.html

  • toub said: "Have you looked at Axum and how it handles asynchronous methods?"

    I even made a small suggestion for asynchronous methods on the forum :)

  • Stan R., thanks for the link to your article.  What is the goal of your example?  The way it's written, you're queueing up multiple items, but you end up serializing all of them due to the lock around the action (and in the processing blocking threads in the pool waiting on that lock).  If you're goal is just to offload the work from the current thread, you could simply create a single work item to run the original foreach loop.  If you want to actually execute the actions in parallel, you can use Parallel.ForEach, and if you want to offload that ForEach to a different thread, you can wrap the call to ForEach in a Task.

    omario, thanks.

  • Great stuff!

    @Omari Omarov - it's true you can't put a try/catch around a yield. But you should be able to! If that were implemented, would other specific language extensions for asynchronous calls be quite so necessary?

    The likely reason why you can't is simply that it was already complicated enough to make it hard for the C# team to implement it.

    See my SO question: http://stackoverflow.com/questions/346365/why-cant-yield-return-appear-inside-a-try-block-with-a-catch

    Perhaps it would be possible to stuff an exception into an IEnumerable object, via some new optional interface (IEnumerableException?), thus causing an exception to be apparently thrown by yield-return. Then the iterator method's own code could handle exceptions that occurred in other contexts while execution was paused. The for-each construct could participate in this, allowing an iterator to handle an exception thrown in the body of the loop.

  • Daniel Earwicker said:  

    "@Omari Omarov - it's true you can't put a try/catch around a yield. But you should be able to! If that were implemented, would other specific language extensions for asynchronous calls be quite so necessary?

    The likely reason why you can't is simply that it was already complicated enough to make it hard for the C# team to implement it."

    That's why I think it'd be better to leave iterators to generate sequences  

    and use Axum (when it or it's successor comes) for efficient continuations.

  • Stephen,

    Excellent content!!!

    Are there any scenarios that would warrant using AsyncEnumerator over mechanisms such as these that would be available via TPL in .NET 4

  • Thanks, Abhijeet.  Jeffrey's AsyncEnumerator provides some really useful features already built and available.  If they meet your needs, great!  The purpose of this post was meant to show how TPL provides core primitives that enable building such higher level constructs, so if you want something custom tuned, or if you want some functionality not in AsyncEnumerator, etc., you could use TPL to do it.

Page 1 of 2 (23 items) 12