Implementing the Asynchronous Programming Model with Future<T>

Implementing the Asynchronous Programming Model with Future<T>

  • Comments 10

One of our design goals for the Task Parallel Library is to integrate well into existing asynchronous mechanisms in the .NET Framework.  And one of the most common concurrency-related patterns in the .NET Framework is the Asynchronous Programming Model (APM), which typically manifests as a BeginXx method that kicks off an asynchronous operation and returns an IAsyncResult, along with an EndXx method that accepts an IAsyncResult and returns the computed value.

Implementing this for computationally intensive asynchronous operations can be done with System.Threading.Tasks.Future<T>, as Future<T> derives from System.Threading.Tasks.Task, and Task implements IAsyncResult.  Imagine you had a class used for computing the value of pi to an arbitrary number of decimal places:

public class PI
{
    public string Calculate(int decimalPlaces) { ... }
}

We can add BeginCalculate and EndCalculate methods that provide a trimmed-down implementation of the APM for Calculate with only a few lines of code:

public class PI
{
    ...

    public IAsyncResult BeginCalculate(int decimalPlaces)
    {
        return Future.Create(() => Calculate(decimalPlaces));
    } 

   
public string EndCalculate(IAsyncResult ar)
    {
        var f = ar as Future<string>;
        if (f == null) throw new ArgumentException("ar");
        return f.Value;
    }
}

However, the full APM pattern dictates that BeginXx should also accept an AsyncCallback that is invoked when the operation completes, along with some user-defined state.  The AsyncCallback addition is easy to implement with the current design of Future<T>:

public IAsyncResult BeginCalculate(
    int decimalPlaces, AsyncCallback ac)
{
    var f = Future.Create(() => Calculate(decimalPlaces));
    if (ac != null) f.Completed += delegate { ac(f); };
    return f;
}

I simply register with the Future<T>'s Completed event such that when the Future<T> completes, it invokes the AsyncCallback.  Easy enough.  Unfortunately, with the current design of Future<T>, supporting the state argument is more difficult, since unlike Task, none of the factories for Future<T> accept an arbitrary state argument (this is something we're planning to rectify).  For now with the current CTP bits, you can work around this by creating your own small wrapper for the Future<T> and the state object:

private class FutureWithState<T> : IAsyncResult
{
    public IAsyncResult Future;
    public object State; 

    public object AsyncState { get { return State; } }
    public WaitHandle AsyncWaitHandle {
        get { return Future.AsyncWaitHandle; } }
    public bool CompletedSynchronously {
       
get { return Future.CompletedSynchronously; } }
    public bool IsCompleted {
        get { return Future.IsCompleted; } }
}

The modifications to our implementations of BeginCalculate and EndCalculate to support state are then very straightforward:

public IAsyncResult BeginCalculate(
    int decimalPlaces, AsyncCallback ac, object state)
{
    var f = Future.Create(() => Calculate(decimalPlaces));
    var fws = new FutureWithState<string> {
        Future = f, State = state };
    if (ac != null) f.Completed += delegate { ac(fws); };
    return fws;

}

public string EndCalculate(IAsyncResult ar)
{
    var f = ar as FutureWithState<string>;
    if (f == null) throw new ArgumentException("ar");
    return f.Future.Value;
}

This same container approach can work for scenarios where you want to store more arbitrary state with your IAsyncResult.

Note, too, that Future<T> can also be used for situations where the asynchronous work isn't computationally intensive.  Consider the System.Net.NetworkInformation.Ping class that was introduced in the .NET Framework 2.0.  Rather than following the APM pattern, Ping derives from Component and follows the Event-based Asynchronous Pattern (EAP).  So rather than exposing BeginSend and EndSend methods that deal with IAsyncResults, it exposes a SendAsync method that returns void, along with a PingCompleted event that is raised when the asynchronous ping send operation completes.  What if we wanted to use asynchronous pings, but we wanted to expose them through the APM pattern.  We could easily implement this with Future<T> as follows:

public class MyPing
{
    public IAsyncResult BeginPing(string host)
    {
        return Future.Create(() => new Ping().Send(host));
    } 

    public PingReply EndPing(IAsyncResult ar)
    {
        var f = ar as Future<PingReply>;
        if (f == null) throw new ArgumentException("ar");
        return f.Value;
    }
}

but this has some serious downsides: foremost, the implementation of Send is not computationally intensive, and yet the Future<T> is going to be using and blocking a thread to do a synchronous send.  What we really want to do is take full advantage of Ping's asynchronous send implementation.  And we can do that by using Future<T>'s "promise" capabilities:

public class MyPing
{
    public IAsyncResult BeginPing(string host, AsyncCallback ac)
    {
        var f = Future<PingReply>.Create();
        if (ac != null) f.Completed += delegate { ac(f); };

        Ping p = new Ping();
        p.PingCompleted += (sender, state) =>
        {
            if (state.Error != null) f.Exception = state.Error;
            else f.Value = state.Reply;
       
};
        p.SendAsync(host, null);

        return f;
    } 

    public PingReply EndPing(IAsyncResult ar)
    {
        var f = ar as Future<PingReply>;
        if (f == null) throw new ArgumentException("ar");
        return f.Value;
    }
}

The Future<T> I create here has no delegate associated with it.  Instead, it allows its Value and Exception properties to be set (only once), and someone waiting on Future<T>'s Value will block until either Exception or Value is set.  I take advantage of that by setting these properties in the PingCompleted event handler.  With very little code, I now can get a fully-working IAsyncResult that represents an asynchronous ping, where the original asynchronous behavior was exposed through a different mechanism.

Leave a Comment
  • Please add 7 and 2 and type the answer here:
  • Post
  • PingBack from http://msdnrss.thecoderblogs.com/2008/02/29/implementing-the-asynchronous-programming-model-with-futuret/

  • Hi,

    It seems like you have a race condition in your BeginXXX and EndXXX implementation, because you first create the Future<> and then register to its Completed event, but it might already complete before you had the time to register.

    Is there anything inherent to prevent it that I'm missing?

    Sasha

  • The implementation of Completed accounts for that (or, at least, it's meant to... there was a related bug in the CTP that has since been fixed).  Any delegates registered with the Completed event after the Task completes will be invoked immediately.

  • In a previous post, I talked about implementing the Asynchronous Programming Model pattern using Future&lt;T&gt;

  • I want to comment your response to sasha's remark.

    When execution is very quick and Future is already completed before we register our callback, you said that this delegate will be nevertheless called.

    What it 'd like to know is if this delegate wil be called inside calling "+=" threading context (for short in EndXXX calling context)?

    I ask this question because I can't see any other simple solution.

    I know that in APM pattern we can't predict in which threading context callback will ba called. But I'd like to know if it's my own. If callback take a long time to execute, I'd like to know if this time will be taken in my context.

    Moreover if I need to register two callback. This must be accomplish in a sequential way like is:

    1: var f = Future.Create(()=>Calculate(decimalPlaces));

    2: if (ac1 != null) f.Completed += delegate{ac1(f);};

    3: if (ac2 != null) f.Completed += delegate{ac2(f);};

    If Calculate completion occured between line 2 and line 3 what append?

    ac1 can be caled in TaskManager threading context and ac2 in += caller threading context. Strande behavior isn't it?

    I think a better way would be to create future in a suspended state. It can permit to register callback before resuming. This way, each callback can be called in TaskManager threading context.

    Such an implementation would preserve IASyncResult Completed event logic.

  • Trust me, it's not persecution, I really envoy reading about PFX :).

    But in "BeginCalculate (int decimalPlaces, AsyncCallback ac, object state)" implementation, you call callback function with Futute instance and not FutureWithState<string>.

    I think if we call EndCalculate from callback (as usual), we always have an ArgumentException.

  • Olivier, thanks for the great feedback.  

    Regarding your concerns around the Completed event, we have a new design we're working on which I believe will address all of your concerns (and those others have expressed). Stay tuned.

    Regarding BeginCalculate/EndCalculate with FutureWithState, oops :) You're right.  I fixed the problem in the post; I should have been passing fws (rather than f) to the callback.

    Thanks.

  • pls help me --- :-(

    I am using following class which has Async call to DB. It has a GetData() method which will be overidden in derived class for example filling datagrid.

    When I run the application, sometime it displays datagrid and sometimes it not ???

    Can u pls suggest, is there n e thing m i missing here ????

    Thanks in advance

    public class Class1 : System.Web.UI.WebControls.WebParts.WebPart

       {

           public delegate void FillGrid();

           private FillGrid flgrd = null;

           // when class gets loaded this event is fired

           protected override void OnLoad(EventArgs e)

           {

               base.OnLoad(e);

               CallAsyncMethod();

           }

           private void CallAsyncMethod()

           {

               flgrd = new FillGrid(processData);

               IAsyncResult ar = flgrd.BeginInvoke(new AsyncCallback(this.callAsyncResultMethod), flgrd);

               //AutoResetEvent a = new AutoResetEvent(false);

               //a.WaitOne();

           }

           private void callAsyncResultMethod(IAsyncResult ar)

           {

                flgrd = ((FillGrid)(ar.AsyncState));

               ar.AsyncWaitHandle.WaitOne();

               flgrd.EndInvoke(ar);

           }

           protected virtual void GetData()

           {

             // User will override this method in derived class for filling data grid

           }

           private void processData()

           {

               this.GetData();

           }        

       }

    I am new to delegates and async prog, if making any mistakes pls suggest.

    It will be very helpful for me :-)

    Ashish

  • PingBack from http://dvanderboom.wordpress.com/2008/07/03/concurrency-with-futures/

  • PingBack from http://topalternativedating.info/story.php?id=11677

Page 1 of 1 (10 items)