ParallelExtensionsExtras Tour - #2 - Task<TResult>.ToObservable

ParallelExtensionsExtras Tour - #2 - Task<TResult>.ToObservable

  • Comments 2

(The full set of ParallelExtensionsExtras Tour posts is available here.)  

In our previous ParallelExtensionsExtras tour post, we discussed a custom implementation of the LINQ operators, in particular for working with Task<TResult> instances in an asynchronous manner. There is already an impressive implementation of the LINQ operators and more for working with asynchronous operations: Reactive Extensions (Rx).  Rx provides a full LINQ implementation based on the IObservable<T> interface that was added to the .NET Framework 4. As it turns out, while Task<TResult> does not currently implement IObservable<T>, it’s actually quite a good fit to do so.  An observable represents any number of values terminated by either an end of the stream or by an exception.  A Task<TResult> fits this description: it completes with either a single value or with an exception.  While it’s possible that a future version of the .NET Framework will see Task<TResult> implement IObservable<T>, we can get the relevant behavior now by implementing it as an extension method for Task<TResult>; in fact, Rx includes just such an extension method, as does ParallelExtensionsExtras. 

Here we’ll take a look at the implementation available in ParallelExtensionsExtras, as part of the TaskExtrasExtensions.cs file.  I’ve omitted parameter validation for the sake of conciseness.   With such an extension method, we can rewrite the LINQ query from our last post to instead be based on observables, e.g.

IObservable<string> result = from x in Task.Factory.StartNew(

                                 () => ProduceInt()).ToObservable()

                             from y in Task.Factory.StartNew(

                                 () => Process(x)).ToObservable()

                             select y.ToString();

 

First, we need our ToObservable extension method itself, which should accept a Task<TResult> and return an IObservable<TResult>.  To do that, we need a type that implements IObservable<TResult> and wraps the Task<TResult>, so that we can work with the Task<TResult> when Subscribe is called on the IObservable<TResult>.

public static IObservable<TResult> ToObservable<TResult>(

    this Task<TResult> task)

{

    return new TaskObservable<TResult> { _task = task };

}

 

Now we just need to implement TaskObservable<TResult>, which implements IObservable<TResult> and its one method: Subscribe.  When Subscribe is called to register an IObserver<T>, we’ll take advantage of ContinueWith to get a callback when the task completes.  If the task completed successfully, we’ll pass along its Result to the observer’s OnNext, and then notify the observer through OnCompleted that the observable will not be sending out any more values.  If the task failed, we’ll pass along its Exception the observer through its OnError method.  And if the task was canceled, we’ll pass along a TaskCanceledException (which derives from OperationCanceledException).

That’s the bulk of the implementation, shown here:

private sealed class TaskObservable<TResult> : IObservable<TResult>

{

    internal Task<TResult> _task;

 

    public IDisposable Subscribe(IObserver<TResult> observer)

    {

        var cts = new CancellationTokenSource();

 

        _task.ContinueWith(t =>

        {

            switch (t.Status)

            {

                case TaskStatus.RanToCompletion:

                    observer.OnNext(_task.Result);

                    observer.OnCompleted();

                    break;

 

                case TaskStatus.Faulted:

                    observer.OnError(_task.Exception);

                    break;

 

                case TaskStatus.Canceled:

                    observer.OnError(new TaskCanceledException(t));

                    break;

            }

        }, cts.Token);

 

        return new CancelOnDispose { Source = cts };

    }

}

 

There’s one piece I have not described, and that’s dealing with unsubscription.  The Subscribe call returns an IDisposable that can be used to cancel the observer’s subscription, effectively unsubscribing the observer.  We handle that with task cancellation.  The IDisposable we return from Subscribe is just a simple wrapper around a CancellationTokenSource, such that the source will have cancellation requested when the object is disposed (e.g. Dispose() { _source.Cancel(); }).  This source’s CancellationToken is provided to the ContinueWith method, such that if the subscription is canceled, so too is the continuation.

Leave a Comment
  • Please add 6 and 7 and type the answer here:
  • Post
  • I'm not overly familar with the details of TPL yet. If thread A creates task B and converts it to an IObservable, and then disposes its IObservable, that cancels the CancellationTokenSource -- I got that. But if the task is running on another thread, it won't necessarily stop immediately, and might in fact be in the middle of a chain of events that will notify that IObserver that it's changed. Depending on how you do the threading, this might be a race condition.

    Does the IObserver get notified on thread A, or does it get notified on whatever thread the task happens to be running on? If it gets notified on thread A, is there a race condition where it might get notified even after you cancel the task?

  • Hi Joe-

    There's an unavoidable race built into the problem statement.  The observable will be asynchronously generating a value/exception, and thus it's racing with any attempt to unsubscribe from the observable.  From a correctness perspective, TPL ensures that nothing is corrupted by this race.  If the unsubscribe request comes in before the task completes (or, more accurately, before the continuation task begins running), the unsubscription will succeed and the data will never propagate.  If the unusubscription request comes in afterwards, it'll simply be a no-op.

Page 1 of 1 (2 items)