Awaiting Socket Operations

Awaiting Socket Operations

Rate This
  • Comments 59

The System.Net.Sockets.Socket class in .NET exposes multiple sets of asynchronous methods that perform the same basic operations but that are exposed with different patterns.

The first set follows the APM pattern, where for a synchronous method like Receive, the BeginReceive and EndReceive methods are exposed.  If you want to be able to “await” such asynchronous operations in an async method, the easiest way to do so is to create a Task-based wrapper.  That can be done using Task.Factory.FromAsync, or it can be done more manually by using a TaskCompletionSource<TResult>, e.g.

public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>();
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    { 
        try { tcs.TrySetResult(socket.EndReceive(iar)); }
        catch (Exception exc) { tcs.TrySetException(exc); }
    }, null);
    return tcs.Task;
}

or with a slightly more efficient implementation that avoids some of the extra allocations here:

public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>(socket);
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    {
        var t = (TaskCompletionSource<int>)iar.AsyncState;
        var s = (Socket)t.Task.AsyncState;
        try { t.TrySetResult(s.EndReceive(iar)); }
        catch (Exception exc) { t.TrySetException(exc); }
    }, tcs);
    return tcs.Task;
}

For many asynchronous methods, this approach is good enough.  The overhead involved in creating a new TaskCompletionSource<TResult> for each operation is often dwarfed by the costs of the operation itself.

However, there are some scenarios where this is not sufficient, and not only the additional tasks, but the APM implementation itself.  If you’re making thousands upon thousands of socket calls asynchronously per second, that’s thousands upon thousands of IAsyncResult objects getting created.  That’s a lot of pressure on the garbage collector, and can result in unacceptable pauses in some networking-intensive apps.

To address that, Socket exposes another set of asynchronous methods.  These methods look similar to the Event-based Async Pattern (EAP), but they’re subtly different.  Basically, you create a SocketAsyncEventArgs instance, and you configure that instance with a buffer, with a Completion event handler, and so on. Then you pass this instance into methods like ReceiveAsync and  SendAsync.  Those methods return a Boolean indicating whether they completed synchronously, and if they did, then the event args instance is immediately reusable, and the caller is responsible for finishing the operation synchronously (e.g. checking the status of the operation, getting from the SocketAsyncEventArgs instance the number of bytes received, etc.)  If the operation didn’t complete synchronously, then the completion event will be raised when it does complete, and that handler does the same completion logic we would have done synchronously.  While a bit more complicated, the benefit of this pattern is that these SocketAsyncEventArgs instances can be reused, such that once an instance is no longer being used by a first asynchronous call, it may be reused for a second, and then for a third, and so on.  This avoids needing to construct additional objects per async operation, which thereby decreases pressure on the garbage collector.

We could of course wrap such asynchronous methods with Task, as well, but that would largely defeat the purpose of these methods, similar to allocating an IAsyncResult in the APM case.  Still, though, we want to be able to take advantage of the compiler’s async/await support to make it easier to write asynchronous code using sockets.  We want our cake and to eat it, too.

Of course, the compiler supports awaiting more than just Tasks.  So if you have a specialized scenario like this, you can take advantage of the compiler’s pattern-based support for awaiting things.  Below, I’ve implemented an awaitable wrapper for a SocketAsyncEventArgs:

public sealed class SocketAwaitable : INotifyCompletion
{
    private readonly static Action SENTINEL = () => { };

    internal bool m_wasCompleted;
    internal Action m_continuation;
    internal SocketAsyncEventArgs m_eventArgs;

    public SocketAwaitable(SocketAsyncEventArgs eventArgs)
    {
        if (eventArgs == null) throw new ArgumentNullException("eventArgs");
        m_eventArgs = eventArgs;
        eventArgs.Completed += delegate
        {
            var prev = m_continuation ?? Interlocked.CompareExchange(
                ref m_continuation, SENTINEL, null);
            if (prev != null) prev();
        };
    }

    internal void Reset()
    {
        m_wasCompleted = false;
        m_continuation = null;
    }

    public SocketAwaitable GetAwaiter() { return this; }

    public bool IsCompleted { get { return m_wasCompleted; } }

    public void OnCompleted(Action continuation)
    {
        if (m_continuation == SENTINEL ||
            Interlocked.CompareExchange(
                ref m_continuation, continuation, null) == SENTINEL)
        {
            Task.Run(continuation);
        }
    }

    public void GetResult()
    {
        if (m_eventArgs.SocketError != SocketError.Success)
            throw new SocketException((int)m_eventArgs.SocketError);
    }
}

With that helper type, I can write a short three-line wrapper method for each of the Socket methods I care about, e.g.

public static class SocketExtensions
{
    public static SocketAwaitable ReceiveAsync(this Socket socket,
        SocketAwaitable awaitable)
    {
        awaitable.Reset();
        if (!socket.ReceiveAsync(awaitable.m_eventArgs))
            awaitable.m_wasCompleted = true;
        return awaitable;
    }

    public static SocketAwaitable SendAsync(this Socket socket,
        SocketAwaitable awaitable)
    {
        awaitable.Reset();
        if (!socket.SendAsync(awaitable.m_eventArgs)) 
            awaitable.m_wasCompleted = true;
        return awaitable;
    }

    // ...
}

And with that, I can use async/await with these very efficient asynchronous methods on Socket, e.g.

static async Task ReadAsync(Socket s)
{
    // Reusable SocketAsyncEventArgs and awaitable wrapper
    var args = new SocketAsyncEventArgs();
    args.SetBuffer(new byte[0x1000], 0, 0x1000);
    var awaitable = new SocketAwaitable(args);

    // Do processing, continually receiving from the socket
    while (true)
    {
        await s.ReceiveAsync(awaitable);
        int bytesRead = args.BytesTransferred;
        if (bytesRead <= 0) break;

        Console.WriteLine(bytesRead);
    }
}

Here, when I make a call to my ReceiveAsync extension method, I’m using a reusable SocketAsyncEventArgs that was previously created and wrapped into one of my helper SocketAwaitable types.  ReceiveAsync doesn’t need to do any allocation; it simply calls to the underlying ReceiveAsync on the Socket, passing in the wrapped SocketAsyncEventArgs.  If the operation completes synchronously, we just set the awaitable’s m_wasCompleted to true, such that when the compiler-generated code awaits the returned SocketAwaitable, it sees that IsCompleted is true, and simply proceeds to call GetResult, which will throw an exception if the socket operation failed.  If the underlying ReceiveAsync call does not complete synchronously, then m_wasCompleted will remain false, and it’ll be up to the OnCompleted method to store the await continuation to be run when the SocketAsyncEventArgs’ Completed handler is invoked.  The only time we need to schedule a task here is if the socket operation asynchronously completes between the time that the compiler-generated code checks IsCompleted and then invokes OnCompleted, and that should be rare.

If you wanted to write a bit more code, you could even create a specialized version of SocketAwaitable such that its GetResult method returned the result of the operation, e.g. a ReceiveSocketAwaitable whose GetResult method returned the SocketAsyncEventArgs.BytesTransferred value.  With that, you could then write the above loop almost as if it were synchronous code, e.g.

static async Task ReadAsync(Socket s)
{
    // Reusable SocketAsyncEventArgs and awaitable wrapper
    var args = new SocketAsyncEventArgs();
    args.SetBuffer(new byte[0x1000], 0, 0x1000);
    var awaitable = new ReceiveSocketAwaitable(args);

    // Do processing, continually receiving from the socket
    int bytesRead;
    while ((bytesRead = await s.ReceiveAsync(awaitable)) > 0)
    {
        Console.WriteLine(bytesRead);
    }
}

Enjoy.

Leave a Comment
  • Please add 4 and 4 and type the answer here:
  • Post
  • Just a quick idea Marko. Are you running server and client(s) on the same machine? I've done a fair bit of socket work over the years and using the same machine has tripped me over a few times

  • Sorry for late response.

    @Spud: No, I am not running tests on the same machine.

    @Stephen: Code is not small because it is a part of communication library I am writing that will provide Client and/or Server classes for different communication types (TCP, UDP, SSL, Serial, ...) and each class supports different 'AsyncModes' (APM, EAP, ...). Your model is called TEAP. I am planing to publish it on Codeplex or Sourceforge, but I don't know when, so I can send you the library and test app on email if you want to take a look.

  • Hi Stephen,

    Is there any way to safely reset m_continuation to null? I've tried to reset it after async operation is complete. But in my test app, that uses a lot of sockets and sometimes it aborts async read operations with Socket.Dispose, I experience app hang because async continuation action is not called. That happens after Socket.Dispose. If I remove reset of m_continuation to null after async operation completion everything runs fine. And Socket.Dispose doesn't cause hang.

    Sure I call Reset method before new async operation.

    That is strange to me why I see such behavior.

    Is want to reset m_continuation to null because my memory profiler tool (.NET Memory Profiler) thinks that m_continuation holds indirect reference to async state objects that can't be fully collected by GC.

    Thank you.

  • @DDS: Do you have a small repro of the issue?  It sounds like you're setting m_continuation to null while someone's still using the instance.  Maybe you're accidentally using the awaitable multiple times concurrently?  Maybe you're setting m_continuation to null before all work associated with the async operation has actually completed? Etc.

  • I don't have repro code, as test app is a part of large project that uses awaitable sockets. Sorry about that.

    Awaitable object should not be used multiple times. But I will add tracing for this now.

    I tried to call Reset after async operation is completed, e.g. as first line of continuation code. Could it be somehow that if I reset one awaitable it breaks waiting chain of other awaitable objects? Maybe in Task.Run.

    Thank you.

  • After your comment I did found a bug in our Dispose code that was called after connection was aborted. Now all works well.

    Thank you again for great work!

  • @DDS: Glad to hear you got your problem solved.

  • Stephen, I have a socket application, which has many clients. Can I use the method?

  • @ardmore: Sure.

  • Hi Stephen, thanks for the nice solution. Unfortunately I am running into a problem on the subsequent calls. The first call executes Task.Run(continuation) consistently and the subsequent call throws exception that says "An attempt was made to transition a task to a final state when it had already completed.". I understand that this error occurs in "..Builder.SetResult(result)" compiler generated code. Also, I noticed that "...Builder.Task.Status" is "WaitingForActivation". Do you know what's going on? Thanks in advance.

  • @Matthew: Sounds like there's a problem in your code that's resulting in the compiler-provided delegate being invoked multiple times.  What does your code look like?

  • @Stephen,

    thanks for looking in this.

    SocketAwaitable and SocketExtensions classes are exact copy from your code. And here is my method that calls Socket.SendAsync() method.

    Where 'sendSAEA' is SocketAsyncEventArgs from the pool.

           public async Task<int> SendAsync(byte[] data)

           {

               if (data == null || data.Length == 0)

                   return 0;

               Socket socket = (Socket)this.sendSAEA.UserToken;

               // set data

               this.sendSAEA.SetBuffer(0, data.Length);

               Array.Copy(data, this.sendSAEA.Buffer, data.Length);

               // create awaitable

               SocketAwaitable awaitable = new SocketAwaitable(this.sendSAEA);

               await socket.SendAsync(awaitable);

               //// return number of bytes transferred

               return this.sendSAEA.BytesTransferred;                            

           }

  • @Mathew: Your code is creating a new SocketAwaitable each time, wrapping a SocketAsyncEventArgs that's getting reused repeatedly.  Each time you call the SocketAwaitable constructor, you're hooking up another event handler to the SAEA, and it's never being removed, so subsequent uses will cause the previous use's delegate to be invoked again, which is why you're getting the errors you're getting... a previous async method in which you used this SAEA is having its compiler-provided continuation delegate invoked again.  If you're going to pool something, you should be pooling instances of the SocketAwaitable, rather than the underlying SocketAsyncEventArgs... the design of the SocketAwaitable I posted expects that it logically owns the SocketAsyncEventArgs instance and that no one else will be using it.  By pooling the SocketAwaitable instead of the SocketAsyncEventArgs, you'll also avoid an extra allocation per usage.  Good luck.

  • @Stephen, thanks for the response. I moved the SocketAwaitable to the pool and is working as expected. Thank you.

Page 4 of 4 (59 items) 1234