Advanced APM Consumption in Async Methods

Advanced APM Consumption in Async Methods

Rate This
  • Comments 5

I’ve previously blogged about how to expose existing Asynchronous Programming Model (APM) implementations as Task-based methods.  This can be done manually using a TaskCompletionSource<TResult>, or it can be done using the built-in wrapper provided in TPL via the Task.Factory.FromAsync method.  By creating a Task-based wrapper for such BeginXx/EndXx method pairs, APM implementations become consumable in async methods in C# and Visual Basic: since Tasks are awaitable, any operation exposed as a Task is then inherently awaitable.

What I haven’t before discussed is how you can consume an APM implementation without wrapping it in a Task, and that’s what I’ll do in this blog post.  What follows is an advanced approach with the only benefit being potentially reduced overheads, and the resulting performance gains are really only relevant in extreme situations.  But such situations do exist, primarily in high-throughput server scenarios.  Further, the techniques involved here are interesting regardless of whether you decide to use them in practice (I previously discussed a similar approach specifically geared towards sockets).  Without further ado…

The key to this approach is that the C# and Visual Basic compilers support awaiting things other than Tasks, and in fact support awaiting anything that follows the right pattern.  We can take advantage of that to implement a custom awaitable geared explicitly towards optimizing performance when consuming APM implementations.  For very high-throughput workloads, allocations and the resulting garbage collections can lead to noticeable overheads, and thus we want to minimize allocations as much as possible.  In particular, there are many APM implementations that can complete their processing synchronously (i.e. during the BeginXx method), e.g. if you asynchronously retrieve a value that happens to already be buffered or cached, it’s possible to synchronously satisfy the request from that buffer or cache, rather than performing I/O to retrieve or generate it.  For situations where it’s possible for requests to be satisfied synchronously, it’s common that these synchronous code paths become hot paths, and as such we want to avoid as much overhead as possible on them.

One of the tricks I’ve mentioned in the past for minimizing allocations is caching and reusing the objects used to represent asynchronous operations.  For example, in the case of async methods with C# and Visual Basic, the async method infrastructure caches common Task and Task<TResult> instances that it can use repeatedly (if the right conditions are met) as the return object from an async method. As another example (and an internal implementation detail, so this could of course change in the future), MemoryStream’s ReadAsync method will remember the last Task<int> it returned for a successful read, and if the subsequent ReadAsync operation completes with the same number of bytes, it can reuse and return that same cached Task<int> rather than instantiating a new one.  We can take advantage of this same kind of optimization in order to minimize allocations when consuming APM implementations directly.

To do so, we’ll develop an “AsyncApmAdapter” type that implements the await pattern and that we can reuse over and over again for sequential APM calls.  This type will be awaitable, with its GetResult method returning the IAsyncResult for the operation.  We’ll pass an adapter instance as the object state into the BeginXx method, along with an AsyncCallback that will configure the adapter instance appropriately when the asynchronous operation completes.  We’ll then await the adapter and pass the resulting IAsyncResult into the EndXx method. This yields the following pattern:

var adapter = new AsyncApmAdapter();
...
BeginXx(..., AsyncApmAdapter.Callback, adapter);
EndXx(await adapter);

To use my favorite looping example, if we wanted to write a read / write loop to copy from one stream to another using Stream.Begin/EndRead and Stream.Begin/EndWrite, that could look like:

static async Task CopyStreamAsync(Stream source, Stream dest)
{
    var adapter = new AsyncApmAdapter();
    var buffer = new byte[0x1000];
 
    while(true)
    {
        source.BeginRead(buffer, 0, buffer.Length,
            AsyncApmAdapter.Callback, adapter);
        int numRead = source.EndRead(await adapter);
        if (numRead == 0) break;
 
        dest.BeginWrite(buffer, 0, numRead,
            AsyncApmAdapter.Callback, adapter);
        dest.EndWrite(await adapter);
    }
}

Note that we’ve only had to allocate the one AsyncApmAdapter instance, which we can reuse over and over again as we make Begin/EndRead and Begin/EndWrite APM calls to the source and destination streams.

How do we achieve such magic? Let’s start by defining our adapter type, which will act both as an awaitable and as an awaiter:

internal sealed class AsyncApmAdapter : INotifyCompletion
{
   
public AsyncApmAdapter GetAwaiter() { return this; }

    public bool IsCompleted { get { ... } }
    public void OnCompleted(Action continuation) { ... }
    public IAsyncResult GetResult() { ... }

    ... // fields

}

We’ll need a few fields in our adapter.  First, we’ll need to be able to store an IAsyncResult for the APM side of things, and second, we’ll need to be able to store the Action continuation delegate that will be supplied by the async method when awaiting on our adapter instance.  We’re also going to need a dummy Action delegate that we can use as a sentinel, a marker to help synchronize (we’ll see where and why shortly).

private IAsyncResult _asyncResult;
private Action _continuation;
private readonly static Action CALLBACK_RAN = () => { };

As used in the earlier example, our adapter exposes a static Callback field of type AsyncCallback; by creating and caching this delegate once, we can use it for all invocations (it doesn’t mutate any shared state).  It is the responsibility of this AsyncCallback to mutate supplied the adapter instance (passed in via the object state) when the async operation completes.  If the operation completes synchronously (such that CompletedSynchronously is true), the only thing the callback needs to do is store the IAsyncResult for the operation into the adapter.  If, however, the operation completes asynchronously, then we need to do a bit more.

public readonly static AsyncCallback Callback = asyncResult =>
{
    var adapter = (AsyncApmAdapter) asyncResult.AsyncState;
   
adapter._asyncResult = asyncResult;

   
if (!asyncResult.CompletedSynchronously)
   
{
        Action
continuation = adapter._continuation ??
            Interlocked.CompareExchange(
                ref adapter._continuation, CALLBACK_RAN, null);
        if (continuation != null) continuation();
    }

};

In the case where we’re completing asynchronously, we’re potentially racing with the call site that will be checking whether the async method should continue running synchronously (via the awaiter’s IsCompleted) or whether it should yield, suspending the async method’s execution (via the awaiter’s OnCompleted).  As such, there’s some coordination that needs to happen between the callback and IsCompleted / OnCompleted.  The OnCompleted method will set the _continuation field to the Action provided by async method infrastructure.  As such, if _continuation is already non-null, then OnCompleted was already invoked, and we thus need to invoke the supplied Action _continuation here.  If, however, _continuation is null, we need to set it to our sentinel Action delegate (CALLBACK_RUN)… this will indicate to IsCompleted / OnCompleted that the async operation has already finished, and thus IsCompleted / OnCompleted needs to take care of continuing the execution.

IsCompleted’s job is relatively simple.  The _asyncResult field starts out on the adapter as null.  If it’s still null when IsCompleted is accessed, then we must yield so that execution can continue asynchronously.  If the _asyncResult field is non-null when IsCompleted is accessed, that means the callback has at least started running (and filled in _asyncResult as part of that), but we don’t know whether it’s finished or not.  So, we check one of two additional conditions.  If the IAsyncResult’s CompletedSynchronously is true, then we know the callback completed, because it was invoked synchronously as part of the call to BeginXx, and we’re not accessing IsCompleted until after BeginXx returns.  If, however, CompletedSynchronously is false, then we’ll check the _continuation field itself.  Remember that the callback will try to set the _continuation field to the CALLBACK_RAN sentinel, so here in IsCompleted if _continuation is already CALLBACK_RAN, then we know that, well, the callback ran.

public bool IsCompleted
{
    get
    {
        IAsyncResult iar = _asyncResult;
        return iar != null &&
               (iar.CompletedSynchronously || 
                _continuation == CALLBACK_RAN);
    }
}

There are some benign races here.  It’s possible that the async operation could be completing concurrently with IsCompleted being called, and that’s ok… in the rare case where the operation has already completed but where IsCompleted doesn’t notice, it’ll simply return false, and we’ll just rely on OnCompleted to handle synchronizing with the callback.

Now on to OnCompleted.  In the await pattern, OnCompleted is invoked to schedule the Action continuation delegate.  To do so in our case, ideally OnCompleted can just store the provided Action into the _continuation field, such that when the async operation completes, the callback will invoke _continuation (as we saw earlier).  However, it’s possible that IsCompleted returns false, but concurrently with invoking IsCompleted and OnCompleted, the operation then completes.  Thus, OnCompleted must be able to deal with the case where the operation has already completed by the time OnCompleted is invoked. However, OnCompleted must not invoke the delegate synchronously, even if it determines that the async operation has already completed, otherwise we risk “stack dives”.  So, OnCompleted checks whether the callback has already tried to pick up the _continuation delegate, which OnCompleted knows by seeing if _continuation is CALLBACK_RAN.  If it’s not yet CALLBACK_RAN, then it’ll simply store the supplied continuation delegate into _continuation.  If, however, it is already CALLBACK_RAN, then it’ll use a Task to schedule the continuation to run asynchronously.  And, for correctness, any writes here need to happen as part of an atomic compare-and-swap operation.

public void OnCompleted(Action continuation)
{
    if (_continuation == CALLBACK_RAN ||
        Interlocked.CompareExchange(
            ref _continuation, continuation, null) == CALLBACK_RAN)
    {
       
Task.Run(continuation);
   
}
}

Finally, we need to implement GetResult.  As I mentioned previously, GetResult’s role will be to return the stored IAsyncResult.  However, we also want to be able to reuse this adapter instance over and over, which means we need to reset its state after each operation completes, and GetResult is a good place to do that:

public IAsyncResult GetResult() {
    IAsyncResult result = _asyncResult;
    _asyncResult = null;
    _continuation = null;
    return result;
}

That’s it for the implementation.  A bit tricky, certainly, but relatively little code, and it can provide for improved performance in those extreme situations I alluded to earlier.  Of course, that potential overhead reduction comes at the price of readability.  For example, with this approach you can no longer use the async operation as an expression.  To address that, a colleague of mine, Eric Eilebrecht, suggested a cute hack: implement a GetAwaiter extension method on IAsyncResult:

public static class IAsyncResultExtensions
{
   
public static AsyncApmAdapter GetAwaiter(this IAsyncResult iar)
   
{
       
return (AsyncApmAdapter)iar.AsyncState;
   
}
}

With that, instead of writing:

BeginXx(..., AsyncApmAdapter.Callback, adapter);
EndXx(await adapter);

you could write:

EndXx(await BeginXx(..., AsyncApmAdapter.Callback, adapter));

thereby regaining some of the expressiveness lost in the approach.  Of course, this extension method only makes sense when the object state is an AsyncApmAdapter used in this manner, so be careful with such an approach.

Per my comments at the beginning of this post, if you need to consume an APM implementation, converting it into a Task (via Task.Factory.FromAsync or other means) is a great way.  However, if you do find yourself in a position where you’re using an async method to consume an APM implementation, and if it’s crucial to your scenario that you eek out every last bit of performance possible, you could consider an approach like that outlined in this post.  As with any micro-optimization that leads to (slightly) less readable code, make sure it’s worthwhile before paying for it, and measure, measure, measure.

Enjoy!

Leave a Comment
  • Please add 7 and 8 and type the answer here:
  • Post
  • Great post! I agree with phrase: Measure thrice, cut once! :D

  • Thanks for this post and i learned a lot from it .

    A question is when a delegate be called by BeginInvoke and that method included other EndInvoke and await...

    The AsyncApmAdapter be raised when the delegate meet the first return in MoveNext().

    Then the nested method has not finished exactly.

    Dose that mean this pattern could not be used in nested?

  • @neverser: Sorry, I'm not understanding the question.  Can you elaborate?

  • I tested this solution in past few days

    and i found some thing interesting.

    If i have some method like this:

    public void Test()

    {

       Action t = new Action(foo);

       t.BeginInvoke(ar =>

       {

           t.EndInvoke(ar as IAsyncResult);

           Console.WriteLine("5");

       }, null);

       Console.ReadLine();

    }

    void foo()

    {

       A();

    }

    async void A()

    {

       Console.WriteLine("1");

       await B();

       Console.WriteLine("2");

    }

    async Task B()

    {

       Console.WriteLine("3");

       await Task.Delay(5000);

       Console.WriteLine("4");

    }

    Run this case,i will get result like:

    13542

    I found that the await keyword build a state machine,

    and the continue of await sentence be delegate to other Delegate which point at same method in the state machine.

    finally ,when await sentence is running ,state machine just return , and wait for continue be invoked.

    Unfortunately,In case the return keyword was called ,AsyncCallback of the beginInvoke will be invoked.

    Await will not be a real await.

    I guess async/await can't be nested in method which be called by BeginInvoke .Am i right ?

    ps: sorry for my description.

  • @neverser:

    When you invoke a method implemented using the async keyword, it will start executing synchronously, and it will return to the caller as soon as it hits the first await on something that's not yet completed.  If your async method returns a Task, then when it returns to you, you'll be given a Task that can be used to track the eventual completion of that method.  However, if you make it return void, you're not giving back any handle with which to track completion.  And that's what you've done with your foo method (and your A method).  See blogs.msdn.com/.../10293335.aspx and  blogs.msdn.com/.../10265476.aspx for more info.

    Additionally, if your goal with your Test method is to offload work to another thread, you don't need to use a delegate and its BeginInvoke/EndInvoke method... you can just use Task.Run or Task.Factory.StartNew.

Page 1 of 1 (5 items)