Awaiting Socket Operations

Awaiting Socket Operations

Rate This
  • Comments 60

The System.Net.Sockets.Socket class in .NET exposes multiple sets of asynchronous methods that perform the same basic operations but that are exposed with different patterns.

The first set follows the APM pattern, where for a synchronous method like Receive, the BeginReceive and EndReceive methods are exposed.  If you want to be able to “await” such asynchronous operations in an async method, the easiest way to do so is to create a Task-based wrapper.  That can be done using Task.Factory.FromAsync, or it can be done more manually by using a TaskCompletionSource<TResult>, e.g.

public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>();
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    { 
        try { tcs.TrySetResult(socket.EndReceive(iar)); }
        catch (Exception exc) { tcs.TrySetException(exc); }
    }, null);
    return tcs.Task;
}

or with a slightly more efficient implementation that avoids some of the extra allocations here:

public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>(socket);
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    {
        var t = (TaskCompletionSource<int>)iar.AsyncState;
        var s = (Socket)t.Task.AsyncState;
        try { t.TrySetResult(s.EndReceive(iar)); }
        catch (Exception exc) { t.TrySetException(exc); }
    }, tcs);
    return tcs.Task;
}

For many asynchronous methods, this approach is good enough.  The overhead involved in creating a new TaskCompletionSource<TResult> for each operation is often dwarfed by the costs of the operation itself.

However, there are some scenarios where this is not sufficient, and not only the additional tasks, but the APM implementation itself.  If you’re making thousands upon thousands of socket calls asynchronously per second, that’s thousands upon thousands of IAsyncResult objects getting created.  That’s a lot of pressure on the garbage collector, and can result in unacceptable pauses in some networking-intensive apps.

To address that, Socket exposes another set of asynchronous methods.  These methods look similar to the Event-based Async Pattern (EAP), but they’re subtly different.  Basically, you create a SocketAsyncEventArgs instance, and you configure that instance with a buffer, with a Completion event handler, and so on. Then you pass this instance into methods like ReceiveAsync and  SendAsync.  Those methods return a Boolean indicating whether they completed synchronously, and if they did, then the event args instance is immediately reusable, and the caller is responsible for finishing the operation synchronously (e.g. checking the status of the operation, getting from the SocketAsyncEventArgs instance the number of bytes received, etc.)  If the operation didn’t complete synchronously, then the completion event will be raised when it does complete, and that handler does the same completion logic we would have done synchronously.  While a bit more complicated, the benefit of this pattern is that these SocketAsyncEventArgs instances can be reused, such that once an instance is no longer being used by a first asynchronous call, it may be reused for a second, and then for a third, and so on.  This avoids needing to construct additional objects per async operation, which thereby decreases pressure on the garbage collector.

We could of course wrap such asynchronous methods with Task, as well, but that would largely defeat the purpose of these methods, similar to allocating an IAsyncResult in the APM case.  Still, though, we want to be able to take advantage of the compiler’s async/await support to make it easier to write asynchronous code using sockets.  We want our cake and to eat it, too.

Of course, the compiler supports awaiting more than just Tasks.  So if you have a specialized scenario like this, you can take advantage of the compiler’s pattern-based support for awaiting things.  Below, I’ve implemented an awaitable wrapper for a SocketAsyncEventArgs:

public sealed class SocketAwaitable : INotifyCompletion
{
    private readonly static Action SENTINEL = () => { };

    internal bool m_wasCompleted;
    internal Action m_continuation;
    internal SocketAsyncEventArgs m_eventArgs;

    public SocketAwaitable(SocketAsyncEventArgs eventArgs)
    {
        if (eventArgs == null) throw new ArgumentNullException("eventArgs");
        m_eventArgs = eventArgs;
        eventArgs.Completed += delegate
        {
            var prev = m_continuation ?? Interlocked.CompareExchange(
                ref m_continuation, SENTINEL, null);
            if (prev != null) prev();
        };
    }

    internal void Reset()
    {
        m_wasCompleted = false;
        m_continuation = null;
    }

    public SocketAwaitable GetAwaiter() { return this; }

    public bool IsCompleted { get { return m_wasCompleted; } }

    public void OnCompleted(Action continuation)
    {
        if (m_continuation == SENTINEL ||
            Interlocked.CompareExchange(
                ref m_continuation, continuation, null) == SENTINEL)
        {
            Task.Run(continuation);
        }
    }

    public void GetResult()
    {
        if (m_eventArgs.SocketError != SocketError.Success)
            throw new SocketException((int)m_eventArgs.SocketError);
    }
}

With that helper type, I can write a short three-line wrapper method for each of the Socket methods I care about, e.g.

public static class SocketExtensions
{
    public static SocketAwaitable ReceiveAsync(this Socket socket,
        SocketAwaitable awaitable)
    {
        awaitable.Reset();
        if (!socket.ReceiveAsync(awaitable.m_eventArgs))
            awaitable.m_wasCompleted = true;
        return awaitable;
    }

    public static SocketAwaitable SendAsync(this Socket socket,
        SocketAwaitable awaitable)
    {
        awaitable.Reset();
        if (!socket.SendAsync(awaitable.m_eventArgs)) 
            awaitable.m_wasCompleted = true;
        return awaitable;
    }

    // ...
}

And with that, I can use async/await with these very efficient asynchronous methods on Socket, e.g.

static async Task ReadAsync(Socket s)
{
    // Reusable SocketAsyncEventArgs and awaitable wrapper
    var args = new SocketAsyncEventArgs();
    args.SetBuffer(new byte[0x1000], 0, 0x1000);
    var awaitable = new SocketAwaitable(args);

    // Do processing, continually receiving from the socket
    while (true)
    {
        await s.ReceiveAsync(awaitable);
        int bytesRead = args.BytesTransferred;
        if (bytesRead <= 0) break;

        Console.WriteLine(bytesRead);
    }
}

Here, when I make a call to my ReceiveAsync extension method, I’m using a reusable SocketAsyncEventArgs that was previously created and wrapped into one of my helper SocketAwaitable types.  ReceiveAsync doesn’t need to do any allocation; it simply calls to the underlying ReceiveAsync on the Socket, passing in the wrapped SocketAsyncEventArgs.  If the operation completes synchronously, we just set the awaitable’s m_wasCompleted to true, such that when the compiler-generated code awaits the returned SocketAwaitable, it sees that IsCompleted is true, and simply proceeds to call GetResult, which will throw an exception if the socket operation failed.  If the underlying ReceiveAsync call does not complete synchronously, then m_wasCompleted will remain false, and it’ll be up to the OnCompleted method to store the await continuation to be run when the SocketAsyncEventArgs’ Completed handler is invoked.  The only time we need to schedule a task here is if the socket operation asynchronously completes between the time that the compiler-generated code checks IsCompleted and then invokes OnCompleted, and that should be rare.

If you wanted to write a bit more code, you could even create a specialized version of SocketAwaitable such that its GetResult method returned the result of the operation, e.g. a ReceiveSocketAwaitable whose GetResult method returned the SocketAsyncEventArgs.BytesTransferred value.  With that, you could then write the above loop almost as if it were synchronous code, e.g.

static async Task ReadAsync(Socket s)
{
    // Reusable SocketAsyncEventArgs and awaitable wrapper
    var args = new SocketAsyncEventArgs();
    args.SetBuffer(new byte[0x1000], 0, 0x1000);
    var awaitable = new ReceiveSocketAwaitable(args);

    // Do processing, continually receiving from the socket
    int bytesRead;
    while ((bytesRead = await s.ReceiveAsync(awaitable)) > 0)
    {
        Console.WriteLine(bytesRead);
    }
}

Enjoy.

Leave a Comment
  • Please add 5 and 1 and type the answer here:
  • Post
  • Hey Stephen, thanks for sharing these methods. I've used the SocketAwaitable way of async socket configuration. However not sure what's the best way of cancelling out from ReadAsync method. I tired instantiating a CancellationToken withing the async method in hopes that it would be used by the compiler when generating appropriate task, but cancelling the token doesn't cancel async awaiting. Tried disposing of the socket, but get socket exceptions. I could in theory handle those exceptions, but would like to know if there's more elegant way of cancelling async await on ReadAsync.

    Thanks

  • @Dimitri: You're welcome, glad you've found this useful.  With regards to cancellation, I don't believe the socket class provides any other mechanism for canceling/aborting such receive operations other than by closing the socket.  Other than that, you're stuck with non-cancelable operations, and hence blogs.msdn.com/.../how-do-i-cancel-non-cancelable-async-operations.aspx .

  • Hey Stephen, Very informative article ;-)

    One issue not directly related to this post but in the same vein, regarding Authentication pops up. How do we authenticate users server-side when SocketAsyncEventArgs used? It seems as if NegotiateStream etc. can not be used for this purpose. Any input here?

    Thanks

  • @Morten Overgaard:

    I'm not sure; I'm not an expert in those particular APIs, unfortunately.  You might try asking your question in the forum at social.msdn.microsoft.com/.../threads.

  • I am just confused when "m_continuation == SENTINEL" would be true?

    Why the continuation be "Task.Run"ed immediately after OnCompleted called? Then why you run it again in eventArgs.Completed.

    Sorry. I google around for anything about the OnCompleted and the compiler, but no luck.

  • @Logan Zhou:

    re: "I am just confused when "m_continuation == SENTINEL" would be true?"

    It's set to true when the socket operation completes, so it could be true in OnCompleted if the socket operation completed fast enough that m_continuation was already set to SENTINEL by the time OnCompleted was called.

    re: "Why the continuation be "Task.Run"ed immediately after OnCompleted called? Then why you run it again in eventArgs.Completed."

    It's only going to be executed in one of these two cases, not both (that's what the Interlocked.CompareExchange guarantees).  If OnCompleted has already been called by the time the async operation completes, then the user's code will have already provided the continuation delegate that should be executed, and the event handler will invoke that continuation.  If instead the async operation has already completed by the time OnCompleted is called, then OnCompleted needs to be responsible for invoking the continuation.  However, it shouldn't just run the continuation synchronously, as that could lead to something called a "stack dive", so instead it queues it to run the continuation delegate asynchronously via Task.Run.

  • @Stephen Toub, could you use volatile instead of CompareExchange and would it be faster?  Thanks.

  • @AceHack: Neither use of CompareExchange in my sample could be replaced with just volatile; the atomic compare-and-swap semantics are necessary for correctness.

  • Thanks for that sweet SocketAwaitable code. Will be used in production with about 200000 terminals around the world soon. :-)

  • @rudimenter:  That's great to hear :)  It's working well for you?

  • @Stephen: More than well. We are bound to .Net4 so we could not upgrade to 4.5. Async/await simplified our AsyncSocket code a lot. We got also rid of many try catches lurking around every code corner. SocketAwaitable went through a few code reviews but we are all really happy with the performance and coding style which async/await brings.

    Thx :-)

  • Dear Stephen! Thank you for this amazing solution!

    I was using solution based on TaskCompletionSource in my winphone app and I had no idea that custom awaiter is possible :D

    But now I am switching to this because it's pretty awesome. Btw, I am using your WithCancellation extenstion for Task too :)

    Many thanks to you!

  • @Vladimir: Thanks :) I'm glad you've found it useful.

  • Hi Stephen,

    Thank you -again- for this, but I don't get what the "stack drive" is.

    ```

    var a = e.GetAwaiter();

    if (!a.IsCompleted)

    {

       SAVE_STATE()

       // Why can't a.OnCompleted run the continuation synchronously and return?

       a.OnCompleted(&cont);

       return;

    cont:

       RESTORE_STATE()

    }

    var r = a.GetResult();

    ```

    Can you provide some insight?

  • @Şafak Gür: "stack dive" ;)  Let's say the async method is executing on the stack and awaits something that's not yet completed, but then by the time it calls OnCompleted it has completed.  If OnCompleted were to run the delegate synchronously, that delegate reenters the async method and now continues executing it... remember that the previous invocation of the async method is still on the stack, so we're now one frame deeper.  We now await another task that's not yet completed, but by the time we call OnCompleted it is completed, and if we synchronously invoke the delegate, we're now one frame deeper.  And so on.  If this happened enough times in a row, we'd overflow the stack.  That's a "stack dive".

Page 2 of 4 (60 items) 1234