Tasks, Monads, and LINQ

Tasks, Monads, and LINQ

Rate This
  • Comments 13

A few years back, Wes Dyer wrote a great post on monads, and more recently, Eric Lippert wrote a terrific blog series exploring monads and C#. In that series, Eric alluded to Task<TResult> several times, so I thought I’d share a few related thoughts on Task<TResult> and the async/await keywords.

As both Wes and Eric highlight, a monad is a triple consisting of a type, a Unit function (often called Return), and a Bind function. If the type in question is Task<T>, what are its Unit and Bind functions?

The Unit operator takes a T and “amplifies” it into an instance of the type:

public static M<T> Unit<T>(this T value);

That’s exactly what Task.FromResult does, producing a Task<T> from a T, so our Unit method can easily be implemented as a call to FromResult:

public static Task<T> Unit<T>(this T value)
{
    return Task.FromResult(value);
}

What about Bind? The Bind operator takes the instance of our type, extracts the value from it, runs a function over that value to get a new amplified value, and returns that amplified value:

public static M<V> Bind<U, V>(
    this M<U> m, Func<U, M<V>> k);

If you squint at this, and if you’ve read my previous blog post Implementing Then with Await, the structure of this declaration should look eerily like the last overload of Then discussed:

public static Task<TNewResult> Then<TResult, TNewResult>(
    this Task<TResult> task, Func<TResult, Task<TNewResult>> continuation);

In fact, other than the symbols chosen, they’re identical, and we can implement Bind just as we implemented Then:

public static async Task<V> Bind<U, V>(
    this Task<U> m, Func<U, Task<V>> k)
{
    return await k(await m);
}

This is possible so concisely because await and async are so close in nature to the monadic operators. When you write an async function that returns Task<V>, the compiler expects the body of the method to return a V, and it lifts (or amplifies) that V into the Task<V> that’s returned from the method call; this is basically Unit (async, of course, also handles the creation and completion of the returned Task for the case where an exception propagates out of the body of the async method). Further, a core piece of the Bind operator is in extracting the value from the supplied instance, and that’s nicely handled by await.

In Eric’s last post on monads, he talks about some of the C# LINQ operators, and how they can easily be implemented on top of types that correctly implement a Unit and a Bind method:

static M<C> SelectMany<A, B, C>(
    this M<A> monad,
    Func<A, M<B>> function,
    Func<A, B, C> projection)
{
    return monad.Bind(
        outer => function(outer).Bind(
            inner => projection(outer, inner).Unit()));
}

Sure enough, with our Bind and Unit implementations around Task<T>, we can substitute in “Task” for “M”, and it “just works”:

static Task<C> SelectMany<A, B, C>(
    this Task<A> monad,
    Func<A, Task<B>> function,
    Func<A, B, C> projection)
{
    return monad.Bind(
        outer => function(outer).Bind(
            inner => projection(outer, inner).Unit()));
}

What does it mean here to “just work”? It means we can start writing LINQ queries using the C# query comprehension syntax with operators that rely on SelectMany, e.g.

int c = await (from first in Task.Run(() => 1)
               from second in Task.Run(() => 2)
               select first + second);
Console.WriteLine(c == 3); // will output True

Of course, we can implement SelectMany without Bind and Unit, just using async/await directly:

static async Task<C> SelectMany<A, B, C>(
    this Task<A> task,
    Func<A, Task<B>> function,
    Func<A, B, C> projection)
{
    A a = await task;
    B b = await function(a);
    return projection(a, b);
}

In fact, we can implement many of the LINQ query operators simply and efficiently using async/await. The C# specification section 7.16.3 lists which operators we need to implement to support all of the C# query comprehension syntax, i.e. all of the LINQ contextual keywords in C#, such as select and where. Some of these operators, like OrderBy, make little sense when dealing with singular values as we have with Task<T>, but we can easily implement the others. This enables using most of the C# LINQ query comprehension syntax with Tasks:

public static async Task<V> SelectMany<T, U, V>(
    this Task<T> source, Func<T, Task<U>> selector, Func<T, U, V> resultSelector)
{
    T t = await source;
    U u = await selector(t);
    return resultSelector(t, u);
}

public static async Task<U> Select<T, U>(
    this Task<T> source, Func<T, U> selector)
{
    T t = await source;
    return selector(t);
}

public static async Task<T> Where<T>(
    this Task<T> source, Func<T, bool> predicate)
{
    T t = await source;
    if (!predicate(t)) throw new OperationCanceledException();
    return t;
}

public static async Task<V> Join<T, U, K, V>(
    this Task<T> source, Task<U> inner,
    Func<T, K> outerKeySelector, Func<U, K> innerKeySelector,
    Func<T, U, V> resultSelector)
{
    await Task.WhenAll(source, inner);
    if (!EqualityComparer<K>.Default.Equals(
        outerKeySelector(source.Result), innerKeySelector(inner.Result)))
            throw new OperationCanceledException();
    return resultSelector(source.Result, inner.Result);
}

public static async Task<V> GroupJoin<T, U, K, V>(
    this Task<T> source, Task<U> inner,
    Func<T, K> outerKeySelector, Func<U, K> innerKeySelector,
    Func<T, Task<U>, V> resultSelector)
{
    T t = await source;
   
return resultSelector(t,
        inner.Where(u => EqualityComparer<K>.Default.Equals(
            outerKeySelector(t), innerKeySelector(u))));
}

public static async Task<T> Cast<T>(this Task source)
{
    await source;
    return (T)((dynamic)source).Result;
}

Interestingly, Task<TResult> already has the members necessary to be considered “comonadic.” As Brian Beckman discusses in his precis, a comonad is the dual of a monad, a triple consisting of the type and two operators: Extract (the flip of Unit/Return) and Extend (the flip of Bind). Here I’ve taken a few liberties with the signatures from what Brian outlines, such as swapping the order of some of the parameters:

public T Extract<T>(this W<T> self);
public W<U> Extend<T, U>(this W<T> self, Func<W<T>, U> func);

Task<TResult> already supports Extract, it’s just called Result:

public TResult Result;

And it already supports Extend, it’s just called ContinueWith:

public Task<TNewResult> ContinueWith<TNewResult>(
    Func<Task<TResult>, TNewResult> func);

(In truth, to correctly implement all of the comonadic laws Brian outlines, we’d likely want to tweak both of these with a thin layer of additional code to modify some corner cases around exceptions and cancellation, due to how Result propagates exceptions wrapped in AggregateException and how ContinueWith tries to match thrown OperationCanceledExceptions against the CancellationToken supplied to ContinueWith. But the basic idea stands.)

Most of the posts I write on this blog are about practical things. So, is any of this really useful in everyday coding with Tasks and async/await? Would you actually want to implement and use the LINQ surface area directly for Tasks? Eh, probably not. But it’s a fun to see how all of these things relate.

(Update 4/3/2013: Thanks to Aaron Lahman for pointing out the bug I had in my GroupJoin implementation.  Fixed.)

Leave a Comment
  • Please add 2 and 1 and type the answer here:
  • Post
  • Really like your posts, keep it up! And thanks to Eric I now know what a monad is.

    I have aquestion concerning exceptions thrown in a Task returning method. Should all exceptions of a Task returning method be thrown inside the returned task (so that the caller does not need to guide the method call itself inside a try catch block) or is it ok to throw some exceptions (for example input validation or invalid state) before the returned task is set up (fail fast) whereas the caller needs to be aware to protect the method call itself (besides checking the returned task's result)?

    (In the latter case the caller's code might get a little verbose especially when chaining tasks...)

    Or is it just a matter of taste and you should document the method call accordingly? I'd really apprechiate your thoughts or guideline on that.

    Example:

    public Task<int> CalcSthAsync(int x, int y)

    {

     // is it ok to throw exceptions here?

     // for example concerning state (object already disposed etc.) or

     // input validation:

     if(x < 0)

       throw new InvalidOperationException(...);

     return Task.Factury.StartNew(() =>

     {

       // or should I throw all exceptions inside the task

       // so that caller just needs to ckeck the returned task for exceptions

       // and not guide the method call itself inside a try catch block?

       if(x < 0)

         throw new InvalidOperationException(...);

       ...    

     });

    }

  • @Stephen

    Bit off topic, but still related to languages.

    What about Roslyn ? Can you say something new ?

  • Very awesome stuff. Should've figured it'd be this easy with await.

    By the way, you missed 'async' in the method declaration:

    static Task<C> SelectMany<A, B, C>(

       this Task<A> task,

       Func<A, Task<B>> function,

       Func<A, B, C> projection)

    {

       A a = await task;

       B b = await function(a);

       return projection(a, b);

    }

  • « Would you actually want to implement and use the LINQ surface area directly for Tasks? Eh, probably not.»

    Actually, this is essentially the point of Reactive Extensions (Rx) (and libraries conceptually near to Rx like LinqToAwait): using monads and LINQ operators and LINQ-like operators you can do some very complex asynchronous programming in very simple-appearing "query" structures.

  • @Tom: Please see the document at www.microsoft.com/.../details.aspx, and specifically the end of page 3 / beginning of page 4 where it talks about this.

    @Tristan: The "Roslyn" forum at social.msdn.microsoft.com/.../roslyn is a good place to ask questions about "Roslyn".

    @shoftee: Thanks for the catch.  Fixed.

    @Max Battcher: Rx is about working with collections, and these kinds of query operations are very useful for working with collections; they're less useful for types with at most one value, like Task<T>.  Do you have an example in mind where the query operators I've implemented in this post make using C# query comprehension syntax much better for working with Task<T> than does async/await?  I've not thought of a good one.

  • Presumably all of the "await" subjects should have ".ConfigureAwait(false)" added, since none of the continuations rely on the synchronization context?

  • @Richard: It's a good question.  I debated doing so.  Normally my answer would be a definitive "yes!" but because most of these methods take user-supplied delegates that are invoked after an await, using ConfigureAwait(false) on all of the awaits would mean that many of these delegates would not be run back on the original context, and it's not clear whether that would be desirable or not.  I'll leave it up to someone using this code to decide whether to augment it appropriately, if anyone actually uses the code (as I mention in the post, this was more thought experiment than anything else).  In general, yes, try to use ConfigureAwait(false) on all awaits in libraries unless there's a good reason not to.

  • Thanks for your response, Stephen, I was not aware of that document, it helped a lot. (Before commenting on your blog I searched the web and actually assumed to find my question answered at stackoverflow - but I did not find some answer.)

    So I will put the "usage" checks (like argument and state validation) before the task creating block of the method. Thanks again.

  • @Tom: Great, glad you found the doc helpful.

  • Thanks so much for this post Stephen. I have been trying to get my head around how Tasks are related to continuations, coroutines, monads and this is extremely helpful.

    -Kurt

  • Dear Stephen, sorry for asking here (just didn't found a way to PM you directly), but can you answer on my question please?

    http://pastebin.com/tXbsVYaH

    Here is an example of my situation in WP7 project + BCL.Async. It's just a dummy WP7 project with one page. So you can test too.

    I have a situation where I have object who has TaskCompletionSource in it. And it has a GetResult method which will return tcs's task to be awaited (not in this example). But it is possible that GetResult will never be called, and task will never be awaited. In this case I need to dispose a tcs.

    Here I am using tcs.TrySetException as I do it in my project. And thanks to TaskScheduler.UnobservedTaskException I can receive that exception. Exception bubbles up as expected. BUT! When I am using tcs.TrySetCanceled - exception will never bubble up after object's finalization (you can test it - just comment TrySetException and uncomment TrySetCanceled in Dispose method), but if you await tcs.Task and then call TrySetCanceled you will receive TaskCanceledException.

    In case of TrySetCanceled, is exception swallowed during finalization or its expected behaviour for tcs that it gracefully dispose a task without exception if it was cancelled and never been awaited?

    Sorry for my English, I was tried to write clear, hope you understand text I written :)

    Thanks for reply!

  • Hi Vladimir-

    UnobservedTaskException is only relevant to tasks that end in the Faulted state, and TrySetException does put the task into a faulted state.  In contrast, TrySetCanceled puts a task into a canceled state.  Canceled tasks are not faulted and don't cause UnobservedTaskException to be invoked, don't cause a program to crash, etc.  This is by design.

    I hope that helps.

  • Stephen, thank you so much!

Page 1 of 1 (13 items)