Using Tasks to implement the APM Pattern

Using Tasks to implement the APM Pattern

Rate This
  • Comments 18

Several times recently, folks have asked how to use tasks to implement the APM pattern, otherwise known as the Asynchronous Programming Model pattern, or the IAsyncResult pattern, or the Begin/End pattern.  While moving forward we encourage folks to use a Task-based pattern for exposing asynchronous operation, the APM pattern has been the prevalent pattern for asynchrony since the .NET Framework’s initial release, and thus there is a fair amount of infrastructure today built up around consuming implementations of the APM pattern.  Given how common Task-based APIs will be moving forward, it makes sense then that developers want to implement the APM pattern with Tasks  In this post we examine how to do so in .NET 4.

Consider that we have a method which returns a Task<int>:

static Task<int> FooAsync();

and we want to wrap it in an APM implementation with the signatures:

static IAsyncResult BeginFoo(AsyncCallback callback, object state);
static int EndFoo(IAsyncResult asyncResult);

Implementing BeginFoo is the most interesting piece, so we’ll start there. The hardest part of implementing the begin method is implementing the IAsyncResult interface.  Luckily, Task already does this, so the hardest part is out of the way.  The basic additional semantics we need to encode are:

  1. When the task completes, invoke the provided AsyncCallback.
  2. Pass to the invoked AsyncCallback an IAsyncResult, the same IAsyncResult that’s returned from BeginFoo.
  3. Ensure that the IAsyncResult’s AsyncState property returns the supplied state object.

Notice that none of these three requirements is specific to BeginFoo, but instead are general enough that they can be applied to any Task<TResult> for any BeginXx method, so we’ll build this logic into a reusable ToApm extension method:

static Task<TResult> ToApm<TResult>(this Task<TResult> task, AsyncCallback callback, object state);

Let’s start with #3 above.  We need a Task<TResult> object that has the supplied state as the task’s IAsyncResult’s implementation’s AsyncState.  Task enables you to configure this value when the task is initially constructed, but after that time the task is meant to be observationally pure and thus the IAsyncResult.AsyncState can’t be changed.  As such, we’ll create a new task to represent the operation, using the TaskCompletionSource<TResult> type from the System.Threading.Tasks namespace. TaskCompletionSource serves two purposes: to create the task instance, and to provide the methods necessary to manually complete that task instance in one of the three final states.

static Task<TResult> ToApm<TResult>(this Task<TResult> task, AsyncCallback callback, object state)
{
    var tcs = new TaskCompletionSource<TResult>(state);
 
   …
    return tcs.Task;
}

We need this newly created task to mirror the results of the supplied task, which means that when the supplied task completes, we need to transfer its completion state over to this newly created task:

static Task<TResult> ToApm<TResult>(this Task<TResult> task, AsyncCallback callback, object state)
{
    var tcs = new TaskCompletionSource<TResult>(state); 

    task.ContinueWith(delegate
    {
        if (task.IsFaulted) tcs.TrySetException(task.Exception.InnerExceptions);
        else if (task.IsCanceled) tcs.TrySetCanceled();
        else tcs.TrySetResult(task.Result);
    }, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default); 

    return tcs.Task;
}

Now, for #1 and #2 above, when the task completes, we also need to invoke the callback, passing in the IAsyncResult.  We already have a continuation in place, so this part is easy:

static Task<TResult> ToApm<TResult>(this Task<TResult> task, AsyncCallback callback, object state)
{
    var tcs = new TaskCompletionSource<TResult>(state);

    task.ContinueWith(delegate
    {
        if (task.IsFaulted) tcs.TrySetException(task.Exception.InnerExceptions);
        else if (task.IsCanceled) tcs.TrySetCanceled();
        else tcs.TrySetResult(task.Result);

        if (callback != null) callback(tcs.Task);

    }, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default);

    return tcs.Task;
}

And that’s it!  We can now easily implement our BeginFoo method:

static IAsyncResult BeginFoo(AsyncCallback callback, object state)
{
    return FooAsync().ToApm(callback, state);
}

For our EndFoo method, we simply need to cast the supplied IAsyncResult back into a Task<TResult> and then wait for it to complete and return its result when it’s completed.  Task<TResult>’s Result property handles all of that, as well as throwing an exception if the task completes due to a failure or due to cancellation, so we can write the EndFoo method simply as:

static int EndFoo(IAsyncResult asyncResult)
{
    return ((Task<int>)asyncResult).Result;
}

Of course there’s more we could do here in terms of error conditions (e.g. throwing a better exception than an InvalidCastException if the supplied asyncResult wasn’t a Task<int>), but our basic implementation is complete.

There are variations on these that we could employ as well.  For example, the EndFoo method will end up propagating an AggregateException that wraps the one or more exceptions provided by the Task<int> if the task completed due to faulting or cancellation.  Propagating an AggregateException serves two purposes: first and foremost, it avoids overwriting the stack trace and Watson bucket information by throwing the original exception, and second it enables more than one exception from the task to be propagated.  However, if you choose to propagate the original exception, that can certainly still be done, e.g.

static int EndFoo(IAsyncResult asyncResult)
{
    try
    {

        return ((Task<int>)asyncResult).Result;
    }
    catch(AggregateException ae) { throw ae.InnerException; }
}

There are also further optimizations we could employ for certain use cases.  For example, if the object state provided by the caller is the same state that stored in the supplied task (e.g. if the task was created with an overload of Task.Factory.StartNew that accepts object state), then there’s no reason to create a new task.  And in that case, if there is no callback provided, there’s no reason to have a continuation at all:

static Task<TResult> ToApm<TResult>(this Task<TResult> task, AsyncCallback callback, object state)
{
    if (task.AsyncState == state)
    {
        if (callback != null)
        {
            task.ContinueWith(delegate { callback(task); },
                CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default);
        }
        return task;
    }


    var tcs = new TaskCompletionSource<TResult>(state);
    task.ContinueWith(delegate
    {
        if (task.IsFaulted) tcs.TrySetException(task.Exception.InnerExceptions);
        else if (task.IsCanceled) tcs.TrySetCanceled();
        else tcs.TrySetResult(task.Result);

        if (callback != null) callback(tcs.Task);

    }, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default); 
    return tcs.Task;
}

With this ToApm function in hand, you can now easily convert your Task-based APIs into APM APIs in order to support integration with consumers of the old pattern.

Leave a Comment
  • Please add 4 and 5 and type the answer here:
  • Post
  • We've been using similar code to implement async WCF calls server-side. If we had to implement them with Begin/End calls all the way through, we'd just stick with blocking synchronous calls (yuck!). Can't wait for vNext with native support for Task based async WCF interfaces out of the box.

    We don't currently do the AsyncState or null callback optimizations (because we've always passed in state and had a callback). It's nice to have an "official" reference to update our library with.

    Thanks!

  • What if the callback throws an Exception ?

    You must observe it, else your process will shutdown!

  • Hi Maxim-

    That's a valid and even expected result here.  If you allow exceptions to propagate out of many APM implementations' callbacks, the process will crash due to the default .NET exception escalation policy, e.g.

    using System;

    using System.IO;

    class Program

    {

       static void Main(string[] args)

       {

           var ms = new MemoryStream();

           var buffer = new byte[0x1000];

           ms.BeginWrite(buffer, 0, buffer.Length, iar =>

           {

               ms.EndWrite(iar);

               throw new Exception("uh oh");

           }, null);

           Console.ReadLine();

       }

    }

  • This is a fantastic little extension! I am using it in an async (service side) WCF service.

    One question, which is a safer way to invoke?

    // this one

    return Task.Factory.StartNew(someLongRunningAction).ToApm(callback, state);

    // OR this one

    var t = new Task(someLongRunningAction).ToApm(callback, state);

    t.Start();

    return t;

    Thanks, I appreciate your feedback!

  • Hi Rob-

    Either is fine, and neither is any more or less safe than the other.  I'd recommend the former (using StartNew) as it's a bit cleaner to write and it's slightly more efficient.

  • Hi Stephen, thank you for this post, it helped me a lot sorting out the mixup of TPL and APM scenarios.

    The question is, how about the rule of calling EndXxxx only once? There is currently no validation that EndFoo was called only once. How would you share the state (End was called or not flag) between EndFoo calls?

  • Hi George K-

    I'm not sure what you're asking. Can you elaborate?  In the code shown in this post, the delegate passed to ContinueWith will only be invoked at most once, and thus the AsyncCallback will only be invoked at most once.

  • Looks like my comment was filtered out.

    Well, I was talking about these guidelines msdn.microsoft.com/.../ms228963(v=vs.80).aspx

    suggesting to throw InvalidOperationException if EndXxx was called more than once or when you pass wrong IAsyncResult to EndXxx. To do that you will have to wrap the task in another proxy class, which would implement IAsyncResult and mantain the internal state. You can add something like EnvInvocation method there, which would throw exceptions in case of misuse.

    So the point is, just calling return ((Task<int>)asyncResult).Result in EndFoo is not enough.

  • Hi George-

    The MSDN docs you link to state that "The effect of calling the EndOperationName method multiple times with the same IAsyncResult is not defined." Because it's not defined, it's valid for you to implement EndXx in a way that succeeds if it's used multiple times.  The docs also note that "For either of the undefined scenarios, implementers should consider throwing InvalidOperationException", but it's just stating that as a consideration, and is by no means a hard and fast rule (in fact, there are multiple IAsyncResult implementations in the .NET Framework itself that don't throw in this case). If you want to enforce throwing an exception in this case, you can of course create a wrapper IAsyncResult object around the Task to track whether EndXx has already been called.

  • Yes, you're right Stephen, it's not mandatory, just a good practice. I think in WCF scenario you don't really have to do this. I've asked, because it wasn't clear to me how to do this at the time of asking :)

  • Is there a reason you use TaskContinuationOptions.ExecuteSynchronously when wrapping via TaskCompletionSource and TaskContinuationOptions.None otherwise (when optimizing/short-circuiting)? Is that intended or an oversight? Any reason not to use TaskContinuationOptions.ExecuteSynchronously for both?

  • raboof, thanks, I'd actually not intended to use it anywhere... I've fixed the post.

  • Regarding your comments to Maxim about the callback throwing...is there a way to actually handle this scenario?

    I'm using an AsyncCodeActivity in WF4 that exposes Begin/End methods to override and the work I need to do uses HttpClient.GetAsync (task-based).  In this case, if the callback throws, it appears that it fundamentally bombs out the workflow instance such that any exceptions aren't caught by a try-catch activity that wraps my async activity.

    I suspect (hope?) that writing my own task-based async activity might be a way to resolve this issue, but before I undertook that I thought I'd ask.

    Thanks

  • Hi Rob-

    If the caller of the callback isn't set up to deal well with exceptions coming from the callback (most aren't), then as the developer of the callback, you can avoid throwing exceptions.  That either means eating them or somehow routing them elsewhere, via some custom mechanism you come up with.

  • Thanks, Stephen. A complete code snippet, with support for both Task and Task<T>, and added a couple more optimizations, is available here: gist.github.com/4608941

Page 1 of 2 (18 items) 12