Awaiting Socket Operations

Awaiting Socket Operations

Rate This
  • Comments 52

The System.Net.Sockets.Socket class in .NET exposes multiple sets of asynchronous methods that perform the same basic operations but that are exposed with different patterns.

The first set follows the APM pattern, where for a synchronous method like Receive, the BeginReceive and EndReceive methods are exposed.  If you want to be able to “await” such asynchronous operations in an async method, the easiest way to do so is to create a Task-based wrapper.  That can be done using Task.Factory.FromAsync, or it can be done more manually by using a TaskCompletionSource<TResult>, e.g.

public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>();
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    { 
        try { tcs.TrySetResult(socket.EndReceive(iar)); }
        catch (Exception exc) { tcs.TrySetException(exc); }
    }, null);
    return tcs.Task;
}

or with a slightly more efficient implementation that avoids some of the extra allocations here:

public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>(socket);
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    {
        var t = (TaskCompletionSource<int>)iar.AsyncState;
        var s = (Socket)t.Task.AsyncState;
        try { t.TrySetResult(s.EndReceive(iar)); }
        catch (Exception exc) { t.TrySetException(exc); }
    }, tcs);
    return tcs.Task;
}

For many asynchronous methods, this approach is good enough.  The overhead involved in creating a new TaskCompletionSource<TResult> for each operation is often dwarfed by the costs of the operation itself.

However, there are some scenarios where this is not sufficient, and not only the additional tasks, but the APM implementation itself.  If you’re making thousands upon thousands of socket calls asynchronously per second, that’s thousands upon thousands of IAsyncResult objects getting created.  That’s a lot of pressure on the garbage collector, and can result in unacceptable pauses in some networking-intensive apps.

To address that, Socket exposes another set of asynchronous methods.  These methods look similar to the Event-based Async Pattern (EAP), but they’re subtly different.  Basically, you create a SocketAsyncEventArgs instance, and you configure that instance with a buffer, with a Completion event handler, and so on. Then you pass this instance into methods like ReceiveAsync and  SendAsync.  Those methods return a Boolean indicating whether they completed synchronously, and if they did, then the event args instance is immediately reusable, and the caller is responsible for finishing the operation synchronously (e.g. checking the status of the operation, getting from the SocketAsyncEventArgs instance the number of bytes received, etc.)  If the operation didn’t complete synchronously, then the completion event will be raised when it does complete, and that handler does the same completion logic we would have done synchronously.  While a bit more complicated, the benefit of this pattern is that these SocketAsyncEventArgs instances can be reused, such that once an instance is no longer being used by a first asynchronous call, it may be reused for a second, and then for a third, and so on.  This avoids needing to construct additional objects per async operation, which thereby decreases pressure on the garbage collector.

We could of course wrap such asynchronous methods with Task, as well, but that would largely defeat the purpose of these methods, similar to allocating an IAsyncResult in the APM case.  Still, though, we want to be able to take advantage of the compiler’s async/await support to make it easier to write asynchronous code using sockets.  We want our cake and to eat it, too.

Of course, the compiler supports awaiting more than just Tasks.  So if you have a specialized scenario like this, you can take advantage of the compiler’s pattern-based support for awaiting things.  Below, I’ve implemented an awaitable wrapper for a SocketAsyncEventArgs:

public sealed class SocketAwaitable : INotifyCompletion
{
    private readonly static Action SENTINEL = () => { };

    internal bool m_wasCompleted;
    internal Action m_continuation;
    internal SocketAsyncEventArgs m_eventArgs;

    public SocketAwaitable(SocketAsyncEventArgs eventArgs)
    {
        if (eventArgs == null) throw new ArgumentNullException("eventArgs");
        m_eventArgs = eventArgs;
        eventArgs.Completed += delegate
        {
            var prev = m_continuation ?? Interlocked.CompareExchange(
                ref m_continuation, SENTINEL, null);
            if (prev != null) prev();
        };
    }

    internal void Reset()
    {
        m_wasCompleted = false;
        m_continuation = null;
    }

    public SocketAwaitable GetAwaiter() { return this; }

    public bool IsCompleted { get { return m_wasCompleted; } }

    public void OnCompleted(Action continuation)
    {
        if (m_continuation == SENTINEL ||
            Interlocked.CompareExchange(
                ref m_continuation, continuation, null) == SENTINEL)
        {
            Task.Run(continuation);
        }
    }

    public void GetResult()
    {
        if (m_eventArgs.SocketError != SocketError.Success)
            throw new SocketException((int)m_eventArgs.SocketError);
    }
}

With that helper type, I can write a short three-line wrapper method for each of the Socket methods I care about, e.g.

public static class SocketExtensions
{
    public static SocketAwaitable ReceiveAsync(this Socket socket,
        SocketAwaitable awaitable)
    {
        awaitable.Reset();
        if (!socket.ReceiveAsync(awaitable.m_eventArgs))
            awaitable.m_wasCompleted = true;
        return awaitable;
    }

    public static SocketAwaitable SendAsync(this Socket socket,
        SocketAwaitable awaitable)
    {
        awaitable.Reset();
        if (!socket.SendAsync(awaitable.m_eventArgs)) 
            awaitable.m_wasCompleted = true;
        return awaitable;
    }

    // ...
}

And with that, I can use async/await with these very efficient asynchronous methods on Socket, e.g.

static async Task ReadAsync(Socket s)
{
    // Reusable SocketAsyncEventArgs and awaitable wrapper
    var args = new SocketAsyncEventArgs();
    args.SetBuffer(new byte[0x1000], 0, 0x1000);
    var awaitable = new SocketAwaitable(args);

    // Do processing, continually receiving from the socket
    while (true)
    {
        await s.ReceiveAsync(awaitable);
        int bytesRead = args.BytesTransferred;
        if (bytesRead <= 0) break;

        Console.WriteLine(bytesRead);
    }
}

Here, when I make a call to my ReceiveAsync extension method, I’m using a reusable SocketAsyncEventArgs that was previously created and wrapped into one of my helper SocketAwaitable types.  ReceiveAsync doesn’t need to do any allocation; it simply calls to the underlying ReceiveAsync on the Socket, passing in the wrapped SocketAsyncEventArgs.  If the operation completes synchronously, we just set the awaitable’s m_wasCompleted to true, such that when the compiler-generated code awaits the returned SocketAwaitable, it sees that IsCompleted is true, and simply proceeds to call GetResult, which will throw an exception if the socket operation failed.  If the underlying ReceiveAsync call does not complete synchronously, then m_wasCompleted will remain false, and it’ll be up to the OnCompleted method to store the await continuation to be run when the SocketAsyncEventArgs’ Completed handler is invoked.  The only time we need to schedule a task here is if the socket operation asynchronously completes between the time that the compiler-generated code checks IsCompleted and then invokes OnCompleted, and that should be rare.

If you wanted to write a bit more code, you could even create a specialized version of SocketAwaitable such that its GetResult method returned the result of the operation, e.g. a ReceiveSocketAwaitable whose GetResult method returned the SocketAsyncEventArgs.BytesTransferred value.  With that, you could then write the above loop almost as if it were synchronous code, e.g.

static async Task ReadAsync(Socket s)
{
    // Reusable SocketAsyncEventArgs and awaitable wrapper
    var args = new SocketAsyncEventArgs();
    args.SetBuffer(new byte[0x1000], 0, 0x1000);
    var awaitable = new ReceiveSocketAwaitable(args);

    // Do processing, continually receiving from the socket
    int bytesRead;
    while ((bytesRead = await s.ReceiveAsync(awaitable)) > 0)
    {
        Console.WriteLine(bytesRead);
    }
}

Enjoy.

Leave a Comment
  • Please add 8 and 4 and type the answer here:
  • Post
  • Stephen,

    Now I get it :D thank you so much.

    Can I ask one more?

    After calling `prev = m_continuation ?? Interlocked.CompareExchange(ref m_continuation, SENTINEL, null)`, how is it possible for `prev` to be null?

  • @Şafak Gür: CompareExchange returns the value that was previously in the target slot, so if m_continuation is null prior to the call, prev will be null.

  • Hey stephan

    I was under the impression that invoking a Task.Run on a pure asynchronous method is kind of "cheating" since it isnt really pure async.

    I was wondering, if so, why is the continuation run via a Task.Run.

    Thanks

  • Hi Yuval... please see my earlier response at 11 Jan 2013 8:17 AM where I address the use of Task.Run.  Thanks.

  • Got it. Thanks alot!

  • Are there any echo server example that i can quickly refer to?

    THanks.

  • Is there any way to get information about the sender?

                           RemoteEndPoint = new IPEndPoint(IPAddress.Any , 22112);

    server = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);

    server.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.PacketInformation, true);

    When I do this, awaitable.m_eventArgs.ReceiveMessageFromPacketInfo always contains null for an address.

  • OK, I think I've solved the previous:  I need to create an extension method and call ReceiveFrom.

    However, as far as I can see, that requires me to create a new EndPoint each time.

    Also, looking through the Pattern Based Async, I had always assumed (I now believe incorrectly) that when you used await you ended up back on the same thread.  However, it looks like the continuation now can run on any thread.  Is that the case with task based await as well?

  • @Anthony Wieser: When you await a task, by default if there's a SynchronizationContext associated with the current thread, the continuation will be posted back to that SynchronizationContext; this is how code running on the UI thread makes it back to the UI thread after an await.  If there's no custom SynchronizationContext but there is a custom TaskScheduler, then the continuation will be scheduled to that scheduler.  Otherwise, in general the continuation will run wherever the completing task completes.

  • Hi Stephen!

    I like your implementation so I created a small TCP server (console app) that will use it.

    Server is "attacked" from another machine with 1000 requests asynchronously.

    At some point server hangs and cannot accept new clients, although main thread is running.

    It is like tasks are locked somewhere. Any ideas ?

    Regards,

    Marko

  • @Marko: Do you have a small repro?

  • Sorry, but I don't understand what do you mean by repro...

  • @Marko: a small standalone unit of code that's compilable and that exemplifies the problem.

  • Stephen, thanks for your work!

    I have a question for you. Why SENTINEL is defined as static?

    How this class could be used for multiple SocketAsyncEventArgs objects at the same time?

    Thanks.

  • @DDS: SENTINEL is an object that's simply used for its object identity.  Its usage doesn't cause any concurrency problems, and it's fine to use different instances of SocketAsyncEventArgs concurrently.

Page 3 of 4 (52 items) 1234