Awaiting Socket Operations

Awaiting Socket Operations

Rate This
  • Comments 70

The System.Net.Sockets.Socket class in .NET exposes multiple sets of asynchronous methods that perform the same basic operations but that are exposed with different patterns.

The first set follows the APM pattern, where for a synchronous method like Receive, the BeginReceive and EndReceive methods are exposed.  If you want to be able to “await” such asynchronous operations in an async method, the easiest way to do so is to create a Task-based wrapper.  That can be done using Task.Factory.FromAsync, or it can be done more manually by using a TaskCompletionSource<TResult>, e.g.

public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>();
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    { 
        try { tcs.TrySetResult(socket.EndReceive(iar)); }
        catch (Exception exc) { tcs.TrySetException(exc); }
    }, null);
    return tcs.Task;
}

or with a slightly more efficient implementation that avoids some of the extra allocations here:

public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>(socket);
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    {
        var t = (TaskCompletionSource<int>)iar.AsyncState;
        var s = (Socket)t.Task.AsyncState;
        try { t.TrySetResult(s.EndReceive(iar)); }
        catch (Exception exc) { t.TrySetException(exc); }
    }, tcs);
    return tcs.Task;
}

For many asynchronous methods, this approach is good enough.  The overhead involved in creating a new TaskCompletionSource<TResult> for each operation is often dwarfed by the costs of the operation itself.

However, there are some scenarios where this is not sufficient, and not only the additional tasks, but the APM implementation itself.  If you’re making thousands upon thousands of socket calls asynchronously per second, that’s thousands upon thousands of IAsyncResult objects getting created.  That’s a lot of pressure on the garbage collector, and can result in unacceptable pauses in some networking-intensive apps.

To address that, Socket exposes another set of asynchronous methods.  These methods look similar to the Event-based Async Pattern (EAP), but they’re subtly different.  Basically, you create a SocketAsyncEventArgs instance, and you configure that instance with a buffer, with a Completion event handler, and so on. Then you pass this instance into methods like ReceiveAsync and  SendAsync.  Those methods return a Boolean indicating whether they completed synchronously, and if they did, then the event args instance is immediately reusable, and the caller is responsible for finishing the operation synchronously (e.g. checking the status of the operation, getting from the SocketAsyncEventArgs instance the number of bytes received, etc.)  If the operation didn’t complete synchronously, then the completion event will be raised when it does complete, and that handler does the same completion logic we would have done synchronously.  While a bit more complicated, the benefit of this pattern is that these SocketAsyncEventArgs instances can be reused, such that once an instance is no longer being used by a first asynchronous call, it may be reused for a second, and then for a third, and so on.  This avoids needing to construct additional objects per async operation, which thereby decreases pressure on the garbage collector.

We could of course wrap such asynchronous methods with Task, as well, but that would largely defeat the purpose of these methods, similar to allocating an IAsyncResult in the APM case.  Still, though, we want to be able to take advantage of the compiler’s async/await support to make it easier to write asynchronous code using sockets.  We want our cake and to eat it, too.

Of course, the compiler supports awaiting more than just Tasks.  So if you have a specialized scenario like this, you can take advantage of the compiler’s pattern-based support for awaiting things.  Below, I’ve implemented an awaitable wrapper for a SocketAsyncEventArgs:

public sealed class SocketAwaitable : INotifyCompletion
{
    private readonly static Action SENTINEL = () => { };

    internal bool m_wasCompleted;
    internal Action m_continuation;
    internal SocketAsyncEventArgs m_eventArgs;

    public SocketAwaitable(SocketAsyncEventArgs eventArgs)
    {
        if (eventArgs == null) throw new ArgumentNullException("eventArgs");
        m_eventArgs = eventArgs;
        eventArgs.Completed += delegate
        {
            var prev = m_continuation ?? Interlocked.CompareExchange(
                ref m_continuation, SENTINEL, null);
            if (prev != null) prev();
        };
    }

    internal void Reset()
    {
        m_wasCompleted = false;
        m_continuation = null;
    }

    public SocketAwaitable GetAwaiter() { return this; }

    public bool IsCompleted { get { return m_wasCompleted; } }

    public void OnCompleted(Action continuation)
    {
        if (m_continuation == SENTINEL ||
            Interlocked.CompareExchange(
                ref m_continuation, continuation, null) == SENTINEL)
        {
            Task.Run(continuation);
        }
    }

    public void GetResult()
    {
        if (m_eventArgs.SocketError != SocketError.Success)
            throw new SocketException((int)m_eventArgs.SocketError);
    }
}

With that helper type, I can write a short three-line wrapper method for each of the Socket methods I care about, e.g.

public static class SocketExtensions
{
    public static SocketAwaitable ReceiveAsync(this Socket socket,
        SocketAwaitable awaitable)
    {
        awaitable.Reset();
        if (!socket.ReceiveAsync(awaitable.m_eventArgs))
            awaitable.m_wasCompleted = true;
        return awaitable;
    }

    public static SocketAwaitable SendAsync(this Socket socket,
        SocketAwaitable awaitable)
    {
        awaitable.Reset();
        if (!socket.SendAsync(awaitable.m_eventArgs)) 
            awaitable.m_wasCompleted = true;
        return awaitable;
    }

    // ...
}

And with that, I can use async/await with these very efficient asynchronous methods on Socket, e.g.

static async Task ReadAsync(Socket s)
{
    // Reusable SocketAsyncEventArgs and awaitable wrapper
    var args = new SocketAsyncEventArgs();
    args.SetBuffer(new byte[0x1000], 0, 0x1000);
    var awaitable = new SocketAwaitable(args);

    // Do processing, continually receiving from the socket
    while (true)
    {
        await s.ReceiveAsync(awaitable);
        int bytesRead = args.BytesTransferred;
        if (bytesRead <= 0) break;

        Console.WriteLine(bytesRead);
    }
}

Here, when I make a call to my ReceiveAsync extension method, I’m using a reusable SocketAsyncEventArgs that was previously created and wrapped into one of my helper SocketAwaitable types.  ReceiveAsync doesn’t need to do any allocation; it simply calls to the underlying ReceiveAsync on the Socket, passing in the wrapped SocketAsyncEventArgs.  If the operation completes synchronously, we just set the awaitable’s m_wasCompleted to true, such that when the compiler-generated code awaits the returned SocketAwaitable, it sees that IsCompleted is true, and simply proceeds to call GetResult, which will throw an exception if the socket operation failed.  If the underlying ReceiveAsync call does not complete synchronously, then m_wasCompleted will remain false, and it’ll be up to the OnCompleted method to store the await continuation to be run when the SocketAsyncEventArgs’ Completed handler is invoked.  The only time we need to schedule a task here is if the socket operation asynchronously completes between the time that the compiler-generated code checks IsCompleted and then invokes OnCompleted, and that should be rare.

If you wanted to write a bit more code, you could even create a specialized version of SocketAwaitable such that its GetResult method returned the result of the operation, e.g. a ReceiveSocketAwaitable whose GetResult method returned the SocketAsyncEventArgs.BytesTransferred value.  With that, you could then write the above loop almost as if it were synchronous code, e.g.

static async Task ReadAsync(Socket s)
{
    // Reusable SocketAsyncEventArgs and awaitable wrapper
    var args = new SocketAsyncEventArgs();
    args.SetBuffer(new byte[0x1000], 0, 0x1000);
    var awaitable = new ReceiveSocketAwaitable(args);

    // Do processing, continually receiving from the socket
    int bytesRead;
    while ((bytesRead = await s.ReceiveAsync(awaitable)) > 0)
    {
        Console.WriteLine(bytesRead);
    }
}

Enjoy.

Leave a Comment
  • Please add 4 and 3 and type the answer here:
  • Post
  • That's a sweet solution, Stephen! Are you planning to include a SocketAwaitable sort of class in .NET vNext?

  • Nice trick.

  • tobi, thanks.

    Stephen, glad you like it. Right now I don't believe there are any such plans for .NET 4.5.

  • Is there any advantage in calling s.AsyncReceive multiple times so that the underlying io completion ports can fulfill the data as it arrives?

  • Hi Dave-

    It's possible you could get some performance benefits from doing that, but you'd really need to test it out in your scenario to see whether the difference is measurable and whether it's impactful enough to warrant the resulting complication of the programming model.

  • In the constructor for SocketAwaitable could you explain the lines

    internal Action m_continuation;

    var prev = m_continuation ?? Interlocked.CompareExchange(ref m_continuation, SENTINEL, null);

    To me it read as,

    m_continuation is set to null by CLR

    if m_continuation is not null assign it to prev   (not true)

    so use the CompareExchange() to compare m_continuation to null and if equal swap with SENTINEL (true)

    so it looks to me like we compare something we know is null twice just to assign prev to SENTINEL

    What am I missing?

  • Hi Paul-

    That second line isn't actually executed in the ctor... note that it's part of a delegate that's set as an event handler for Completed, so it's executed when the socket operation completes, not when the ctor runs.

  • Sorry my bad for explaining it badly. I understand its a delegate and is executed later when the event is raised not in the ctor. But it still looks to me like line with the null coalescing operator is checking for null twice. Is this to prevent a race condition? Zen needed ;-0

  • Hi Paul-

    It's an optimization.  This event handlers needs to mark the m_continuation delegate as SENTINEL if the OnCompleted method has not run yet, so that OnCompleted will know not to store the delegate in m_continuation and hope that it'll be later run when the socket operation completes.  That's why we need an interlocked, because we need to synchronize between this event handler and OnCompleted.  However, at this point in the lifecycle of this type, the only transition that could have already occurred for m_continuation is from null to the continuation delegate supplied to OnCompleted.  Therefore, if m_continuation is already non-null, we can just use its value, rather than paying the cost of the interlocked operation.  So we first check if it's already non-null, in which case we can just use its value without additional synchronization.  If it's still null, then we need the synchronization, so we check again but this time with the atomic compare-and-swap provided by CompareExchange.

    Does that help?

  • Hi Stephen, would you comment on which of the async methods use I/O COmpletion ports under the covers? Thanks, Dave

  • Hi Dave-  I haven't done an exhaustive tour of the various implementations.  Is there a particular method you're interested in?

  • Do you have a complete working sample for download please?

  • herbertf, I don't have a complete sample that uses this.  Sorry.

  • I just watched "The zen of async: Best practices for best performance" and I have to ask: Is this approach (awaiting a socket awaitable) has lesser overhead than creating a TaskCompletionSource each time?

    I mean if I were to wrap ebap methods with Task and call s.ReceiveAsync().ContinueWith(rt => s.SendAsync(stuff)), would it be faster or slower than awaiting a SocketAwaitable two times?

    I cache SocketAsyncEventArgs objects and reuse them when needed, I guess I could cache whole SocketAwaitable instances with the SocketEventArgs objects with them and that's a plus but it's still not clear for me that how expensive it is to "await" a Task when compared to use its ContinueWith() method.

  • @Şafak Gür: Yes, if you're concerned about allocations (which would be one of the primary reasons you'd be using these Socket APIs to begin with), this awaitable approach will result in fewer allocations than creating TaskCompletionSource instances and using ContinueWith.  The awaitable approach I outlined doesn't allocate per socket operation, except in what should be relatively rare cases.  The point here would be to create and reuse these XxSocketAwaitable instances.

Page 1 of 5 (70 items) 12345