Asynchronous methods, C# iterators, and Tasks

Asynchronous methods, C# iterators, and Tasks

Rate This
  • Comments 23

More and more, developers are realizing the significant scalability advantages that asynchronous programming can provide, especially as it relates to I/O.

Consider an application that needs to copy data from one stream to another stream, such as is being done in the following synchronous implementation:

static void CopyStreamToStream(Stream input, Stream output)
{
    // Buffer space for the data to be read and written
    byte [] buffer = new byte[0x2000];

    // While there’s data to be read and written
    while(true)
    {
        // Read data. If we weren’t able to read any, bail.
        // Otherwise, write it out and start over again.
        int numRead = input.Read(buffer, 0, buffer.Length);
        if (numRead == 0) break;
        output.Write(buffer, 0, numRead);
    }
}

In many cases like this, such as if the Streams are FileStream instances representing files on disk, or NetworkStream instances representing remote data, the operations being performed require little-to-no computational power, as the majority of the time is spent waiting on the I/O subsystems and devices. If one such operation is being performed at a time, that waiting isn’t such a big deal. But in a synchronous implementation like the one above, that waiting ends up blocking a thread, rendering the thread useless to do anything else while waiting. Threads by default take up a sizeable chunk of memory as well as kernel resources. Thus, if multiple concurrent calls to CopyStreamToStream are executed, multiple threads may be wasted. Since threads consume a non-negligable amount of resources, we try to limit the number of threads in an application at any one time, such as by using a thread pool, and this synchronous style of programming I/O can lead to scalability bottlenecks, especially in server components where we desire to process as many user requests concurrently as the machine’s resources will possibly allow.

One solution to this problem is through compiler support. A compiler could recognize this synchronous pattern and translate it into an asynchronous one. Such a transformation of that same snippet might look like the following (note that this is hand-generated and is not the actual output of any particular compiler, and rather is my attempt to write this out concisely while still human-understandable):

static void CopyStreamToStreamAsync(
    Stream input, Stream output, Action<Exception> completed)
{
    // Buffer space for the data to be read and written
    byte[] buffer = new byte[0x2000];

    // The read/write loop. The parameter is the IAsyncResult of the
    // last read operation that still need to be completed with a call
    // to EndRead. If the parameter is null, that means a new read needs
    // to be started.
    Action<IAsyncResult> readWriteLoop = null;
    readWriteLoop = iar =>
    {
        try
        {
            // Determine whether to start with a BeginRead or 
            // an EndRead/BeginWrite, based on whether iar is null.
            // Then, as long as the loop continues, alternate between
            // reading and writing.
            for (bool isRead = iar == null; ; isRead = !isRead)
            {
                switch (isRead)
                {
                    // Do BeginRead(...)
                    case true:
                        // Start the asynchronous read
                        iar = input.BeginRead(buffer, 0, buffer.Length, readResult =>
                        {
                            // If the read completed synchronously, immediately
                            // return from the callback, as the processing of the
                            // read will be handled synchronously
                            // from the same thread that called BeginRead
                            if (readResult.CompletedSynchronously) return;

                            // The read completed asynchronously, so we need 
                            // to run the processing loop,
                            // starting with EndRead/BeginWrite.
                            readWriteLoop(readResult);
                        }, null);

                        // If the read is completing asynchronously, bail, as
                        // there's nothing more to do.
                        // If it completed synchronously, loop around to do
                        // the EndRead/BeginWrite synchronously.
                        if (!iar.CompletedSynchronously) return;
                        break;

                   // Do BeginWrite(...)
                   case false:
                       // Complete the previous read. If there's no more data
                       // to be read/written, bail.
                       int numRead = input.EndRead(iar);
                       if (numRead == 0)
                       {
                           completed(null);
                           return;
                       }

                       // Now that we know how much data was read, write it
                       // out asynchronously
                       iar = output.BeginWrite(buffer, 0, numRead, writeResult =>
                       {
                           // If the write completed synchronously, allow
                           // the thread that called BeginWrite
                           // to handle it.
                           if (writeResult.CompletedSynchronously) return;

                           // Otherwise, complete the asynchronous write
                           // and launch the read/write loop
                           // to continue all over again.
                           output.EndWrite(writeResult);
                           readWriteLoop(null);
                       }, null);

                       // If the write is completing asynchronously, bail,
                       // as there's nothing more to do.
                       // Otherwise, complete the write synchronously and
                       // loop around.
                       if (!iar.CompletedSynchronously) return;
                       output.EndWrite(iar);
                       break;
                   }
               }
           } catch(Exception e) { completed(e); }
    };

    // Start the whole process off with a read.
    readWriteLoop(null);
}

The Axum compiler, available on DevLabs, is actually capable of these kinds of transformations for asynchronous programming, and you could imagine such functionality being baked into a mainstream language like C#. Here’s how the code could be written asynchronously with Axum:

static asynchronous void CopyStreamToStream(Stream input, Stream output)
{
    byte [] buffer = new byte[0x2000];
    while(true)
    {
        int numRead = input.Read(buffer, 0, buffer.Length);
        if (numRead == 0) break;
        output.Write(buffer, 0, numRead);
    }
}

That’s quite lovely. Comparatively, the hand-written code is mind-numbing, and it’s not something you want to have to write each and every time you need to perform some kind of repeating operation like this asynchronously.

As such, some developers have started to take advantage of C# iterators for writing asynchronous code. While not originally designed for this purpose, the compiler transformations employed for C# iterators are similar to what’s necessary for writing asynchronous code, and thus with a bit of library-based support, it’s possible to write an iterator that looks sequential but that takes advantage of asynchrony. Several libraries have been based on this approach, including the Concurrency & Coordination Runtime (CCR) from Microsoft Robotics, Jeffrey Richter’s AsyncEnumerator, and others.

The key to taking advantage of this pattern is yielding something from an iterator that can invoke a callback when an operation completes. The pattern then becomes:

IEnumerable<ThingThatHasCallbackWhenCompletes> AsyncMethod()
{
    ...
    yield return SomethingThatReturnsThingThatHasCallbackWhenCompletes();
    ... // code here executes when the 
        // yielded ThingThatHasCallbackWhenCompletes completes
}

The idea is that the iterator method returns an IEnumerable of instances that represent a piece of an asynchronous operation. A utility function is used to invoke the iterator method, and iterates over the resulting enumerator. Each time the utility function gets the next instance from the enumerator, it registers some code to monitor the operation for completion, and when the operation completes, it moves next on the enumerator. Moving next on the enumerator results in re-entering the iterator method at the code location after the last yield point, thus allowing another asynchronous operation to be yielded. In this fashion, the iterator method can in effect yield asynchronous operations, and by using “yield return” to instrument the code with those async points, you as the developer can write an asynchronous method in a manner that looks largely sequential.

Now, think about the description above for the kind of object that needs to be yielded: “something that can invoke a callback when an operation completes.” Sound familiar? The System.Threading.Tasks.Task class in .NET 4 provides this exact functionality. A Task represents an asynchronous operation, and it has a ContinueWith method that enables a callback to be invoked when that asynchronous operation completes. Thus, we should be able to yield Task instances from an iterator in order to write an asynchronous method. Here’s the same CopyStreamToStream example implemented asynchronously in this fashion:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    // Buffer space for the data to be read and written
    byte [] buffer = new byte[0x2000];

    // While there’s data to be read and written
    while(true)
    {
        // Read data asynchronously. When the operation completes,
        // if no data could be read, we’re done.
        var read = Task<int>.Factory.FromAsync(
            input.BeginRead, input.EndRead, buffer, 0, buffer.Length, null,
            TaskCreationOptions.DetachedFromParent);
        yield return read;
        if (read.Result == 0) break;

        // Write the data asynchronously
        yield return Task.Factory.FromAsync(
            output.BeginWrite, output.EndWrite, buffer, 0, read.Result, null,
            TaskCreationOptions.DetachedFromParent);
    }
}

Much simpler. Where we were previously doing synchronous reads and writes, now we’re yielding the result of calling the built-in Task.Factory.FromAsync method, which creates Tasks that represent asynchronous reads and writes following the APM pattern. We could of course simplify this code further by using a few helper extension methods to hide some of the asynchronous details (these helpers are part of the Beta 1 samples at http://code.msdn.microsoft.com/ParExtSamples):

public static Task<int> ReadTask(this Stream stream,
    byte [] buffer, int offset, int count)
{
    return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead,
        buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}

public static Task WriteTask(this Stream stream,
    byte [] buffer, int offset, int count)
{
    return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite,
        buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}

This then enables the previous code to be simplified to:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    byte [] buffer = new byte[0x2000];
    while(true)
    {
        var read = input.ReadTask(buffer, 0, buffer.Length);
        yield return read;
        if (read.Result == 0) break;
        yield return output.WriteTask(buffer, 0, read.Result);
    }
}

That looks a lot like the synchronous version, and is *much* easier and less error-prone to write than the manual version shown earlier.

Of course, now we need a mechanism for iterating over the asynchronous iterator. As mentioned, we can take advantage of ContinueWith for the main body of the operation (as with the earlier helpers, the following method and several variants of it are available in the Beta 1 samples):

public static Task Iterate(this TaskFactory factory,
    IEnumerable<Task> asyncIterator)
{
    // Validate parameters
    if (factory == null) throw new ArgumentNullException("factory");
    if (asyncIterator == null)
        throw new ArgumentNullException("asyncIterator");

    // Get the scheduler to use, either the one provided by the factory
    // or the current one if the factory didn’t have one specified.
    var scheduler = factory.TaskScheduler ?? TaskScheduler.Current;

    // Get an enumerator from the enumerable
    var enumerator = asyncIterator.GetEnumerator();
    if (enumerator == null) throw new InvalidOperationException();

    // Create the task to be returned to the caller. And ensure
    // that when everything is done, the enumerator is cleaned up.
    var trs = new TaskCompletionSource<object>(factory.CreationOptions);
    trs.Task.ContinueWith(_ => enumerator.Dispose(),
        TaskContinuationOptions.DetachedFromParent, scheduler);

    // This will be called every time more work can be done.
    Action<Task> recursiveBody = null;
    recursiveBody = antecedent =>
    {
        try
        {
            // If the previous task completed with any exceptions, bail
            if (antecedent != null && antecedent.IsFaulted)
                trs.TrySetException(antecedent.Exception);

            // If the user requested cancellation, bail.
            else if (trs.Task.IsCancellationRequested) trs.TrySetCanceled();

            // If we should continue iterating and there's more to iterate
            // over, create a continuation to continue processing. We only
            // want to continue processing once the current Task (as yielded
            // from the enumerator) is complete.
            else if (enumerator.MoveNext())
                enumerator.Current.ContinueWith(recursiveBody,
                    TaskContinuationOptions.DetachedFromParent, scheduler).
                        IgnoreExceptions();

            // Otherwise, we're done!
            else trs.TrySetResult(null);
        }
        // If MoveNext throws an exception, propagate that to the user
        catch (Exception exc) { trs.TrySetException(exc); }
    };

    // Get things started by launching the first task
    factory.StartNew(() => recursiveBody(null),
        TaskCreationOptions.DetachedFromParent, scheduler).
            IgnoreExceptions();

    // Return the representative task to the user
    return trs.Task;
}

Using this implementation, we can now run “asynchronous methods” that return IEnumerable<Task>, as did our CopyStreamToStreamAsync method:

var asyncOperation = Task.Factory.Iterate(
    CopyStreamToStreamAsync(input, output));

Note that the Iterate implementation shown includes a few handy additions on top of the previous hand-coded solution. First, the Iterate method returns a Task, which can be used to track the entire asynchronous operation. Second, it supports cancellation, meaning a caller can request that the asynchronous iteration to shutdown early, even if it hasn’t completed yet. Third, we’ve now separated out the run logic into a separate method, which means that we no longer need all of that goop in the actual target asynchronous method.

On top of all that, this implementation is now based on TaskFactory, which means we can do things like ensure that the code runs on a certain scheduler, such as a scheduler that targets the UI. As an example of where that is handy, consider a method that asynchronously reads from a long, remote stream and stores the resulting data into a TextBox as it’s available:

static IEnumerable<Task> ReadStreamIntoTextBox(Stream stream)
{
    byte [] buffer = new byte[0x2000];
    Encoding enc = new UTF8Encoder();
    while(true)
    {
        var read = stream.ReadTask(buffer, 0, buffer.Length);
        yield return read;
        if (read.Result == 0) break;
        myTextBox.Text += enc.GetString(buffer, 0, read.Result)
    }
}

As previously shown, I could invoke this method as follows:

public void button1_Click(object sender, EventArgs e)
{
    Task.Factory.Iterate(ReadStreamIntoTextBox(inputStream)); // buggy
}

However, accessing myTextBox.Text from a thread other than the thread that created myTextBox is a no-no, and yet that’s potentially what will happen in the above. To address that, I want to ensure that the actual code from the iterator is executed on the UI thread (but I still don’t want the asynchronous operations to block the UI thread). To accomplish that, I can create a TaskFactory that will run tasks on the UI thread, as I do in the following code:

public void button1_Click(object sender, EventArgs e)
{
    var uiFactory = new TaskFactory(
        TaskScheduler.FromCurrentSynchronizationContext());
    uiFactory.Iterate(ReadStreamIntoTextBox(inputStream));
}

Of course, while just being able to yield individual operations is useful, things get more interesting when you start considering multi-task continuations, as exposed through ContinueWhenAny and ContinueWhenAll. For example, in our previous copy stream example, we’re reading, then writing, then reading, then writing, and so forth. But we should be able to write the previously read bits while reading the next chunk, thereby achieving better speeds by overlapping latencies. Writing the code to do that using manual asynchrony would be a nightmare… with iterators and tasks, it’s manageable, almost fun:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    byte[][] buffers = new byte[2][] { 
        new byte[BUFFER_SIZE], new byte[BUFFER_SIZE] };
    int filledBufferNum = 0;
    Task writeTask = null;

    while (true)
    {
        var readTask = input.ReadTask(
            buffers[filledBufferNum], 0, buffers[filledBufferNum].Length);
        yield return writeTask == null ?
            readTask :
            Task.Factory.ContinueWhenAll(new[] { readTask, writeTask },
                tasks => Task.WaitAll(tasks), // to propagate exceptions
                TaskContinuationOptions.DetachedFromParent);
        if (readTask.Result == 0) break;

        writeTask = output.WriteTask(
            buffers[filledBufferNum], 0, readTask.Result);
        filledBufferNum = filledBufferNum == 0 ? 1 : 0;
    }
}

Here, ContinueWhenAll is used to ensure that the iterator isn’t re-entered until both any pending writes and reads have completed.

There are of course many variations to this code that you could implement. At the end of the day, I find it quite interesting to see how the Task primitive can be used to enable such scenarios.

Leave a Comment
  • Please add 4 and 2 and type the answer here:
  • Post
  • Two more points:

    First, the yield feature in C# (I think) and Python (I'm sure) is limited to one function. You can't call another function out of your generator function and yield from both within the same conceptual generator. This is because of how it is implemented: The generating context doesn't have its own stack. In my experience, this renders the whole yield feature useless (in the two times when I ever wanted to use it in real life, I couldn't express the whole generation in just one function; suppose you want to generate SAX events in an XML parser - one function?).

    Second, there is another alternative (in C/C++) called contexts (posix) or fibers (windows). These are "cooperative threads". You use them just like threads, but schedule them manually (via a switch-context call). Since these are entirely user-space, they have virtually no overhead (apart from the allocated stacks, but you define their sizes manually too).

    I recently learned about their existence and implemented a "yield" feature with it in C++; the notion is that you derive from a class and yield by calling a protected yield function in the base. The cool thing is that then you can indeed have your generator span multiple functions, since you actually have multiple stacks.

    contexts/fibers are not available on any virtual machine of my knowledge though, so C#/Java/Python/whatever can't use them.

  • Hi Jens-

    Thanks for the comments.

    Regarding yield in C#, you're correct, though it is possible to yield the results of other calls, e.g.

    public static IEnumerable<T> EnumerateTree<T>(Node<T> root)

    {

       if (root != null)

       {

           yield return root.Data;

           foreach(var child in root.Children)

           {

               foreach(var data in EnumerateTree(child)) yield return item;

           }

       }

    }

    Regarding fibers, while fast, they're typically extremely difficult to use correctly, especially when you consider libraries (ones you've written or otherwise) that may use thread-local storage in some way or another.

  • One important issue I notice about IEnumerable<Task> and Iterate() from above:

    else if (enumerator.MoveNext())

                   enumerator.Current.ContinueWith( ...

    this code assumes that task returned by enumerator.Current is always ALREADY STARTED(in "MoveNext" block which ends by yield return).

    It is true in case of Task<T>.Factory.FromAsync() (at least now) but not necessarily in general.

  • Hi Holatom-

    Why do you say it requires that the Task be started, and in what situation do you foresee in which it wouldn't be?  The point of the operation is that the iterator is yielding a Task that represents an asynchronous continuation of the operation in progress.

  • Hi

    I was simply pointing to the fact that Iterate() is only setting the continuation not actually "running" the tasks. Sure it's fine with asynchronous operations and scenarios targeted with Iterate() where setting continuation is the only thing that must be done but:

    Task (as a type) can also represents some chunk of work that will be started later (using task's ctor). So it is theoretically still possible to create and yield such task and end up with (probably) unexpected termination.

  • Hi Holatom-

    That's true, though as you point out, that's not the intended use of this Iterate method.  Moreover, if you did return an unstarted Task, presumably you'd be doing so with the plan of starting it at some future point; in that case, it's fine to yield such a task from an iterator, as the continuations from the iteration will continue on their way once the yielded Task is started and completed.

    Thanks.

  • What about timeouts?  With most asynchronous operations, the built in timeout of the synchronous method is lost.  For example, lets say we need to make an Http web request.  Using the synchronous methods, there is a default timeout value of like 30 seconds.  However, if you use the asynchronous method, there is no timeout.  You have to handle it manually and make a call to abort the request and also be sure to dispose of your resources.  I'd be curious to see you put together a complete example that makes an asynchronous web requests, handles possible exceptions (you can get a WebException and still have data in the response stream), support a timeout value, and dispose of all the resources after it's done.

  • Hi Kevin-

    The await support in the languages doesn't impact the underlying API's capabilities, e.g. if HttpWebRequest doesn't support timeouts for asynchronous operations, the language support can't magically make such things work.  What it can do, however, is make it easier to implement your own timeout/abort on top of HttpWebRequest, e.g.

    HttpWebRequest req = ...;

    var t = req.GetResponseAsync();

    if (t == await Task.WhenAny(t, Task.Delay(timeout)))

    {

       using(var response = await t)

       {

           ... // process response

       }

    }

    else

    {

       ... // abort the request

    }

Page 2 of 2 (23 items) 12