Cooperatively pausing async methods

Cooperatively pausing async methods

Rate This
  • Comments 27

Recently I was writing an app that processed a bunch of files asynchronously.  As with the Windows copy file dialog, I wanted to be able to provide the user with a button that would pause the processing operation.

To achieve that, I implemented a simple mechanism that would allow me to pass a “pause token” into the async method, which the async method could asynchronous wait on at appropriate points. 

public async Task ProcessFiles(
    IEnumerable<StorageFile> files, PauseToken pauseToken)
{
    foreach(var file in files)
    {
        await pauseToken.WaitWhilePausedAsync();
        await ProcessAsync(file);
    }
}

My pause token follows a similar design to that of CancellationToken and CancellationTokenSource.  I have a PauseToken instance that I can pass to any number of operations (synchronous or asynchronous), and those operations can monitor that token to be alerted to pause requests.  Separately, a PauseTokenSource is responsible for creating the PauseToken to be handed out and for issuing the pause requests.

public class PauseTokenSource
{
    public bool IsPaused { get; set; }
    public PauseToken Token { get; }
}

public struct PauseToken
{
    public bool IsPaused { get; }
    public Task WaitWhilePausedAsync();
}

We’ll start by implementing PauseTokenSource, which is the meat of the implementation; as with CancellationToken and CancellationTokenSource, PauseToken is just a thin value-type veneer on top of PauseTokenSource that just delegates most calls to the underlying reference type.  PauseTokenSource has one instance field:

private volatile TaskCompletionSource<bool> m_paused;

The m_paused field is the TaskCompletionSource<bool> that can be used to complete the Task we’ll hand out to waiters when the instance is paused (such that when we’re un-paused, we’ll set the Task to wake up all the waiters): if m_paused is null, we’re not paused, and if it’s non-null, we’re currently paused.

The bulk of the implementation is then in PauseTokenSource.IsPaused.  Its getter just returns whether m_paused is not null, but its setter is more complicated:

public bool IsPaused
{
    get { return m_paused != null; }
    set
    {
        if (value)
        {
            Interlocked.CompareExchange(
                ref m_paused, new TaskCompletionSource<bool>(), null);
        }
        else
        {
            while (true)
            {
                var tcs = m_paused;
                if (tcs == null) return;
                if (Interlocked.CompareExchange(ref m_paused, null, tcs) == tcs)
                {
                    tcs.SetResult(true);
                    break;
                }
            }
        }
    }
}

If IsPaused is being set to true, then we simply need to transition m_paused from null to a new TaskCompletionSource<bool>; we do this with an interlocked compare-exchange so that we only do the transition if m_paused is null, regardless of what other threads we might be competing with.

If IsPaused is being set to false, we need to do two things: transition m_paused from non-null to null, and complete the Task from the TaskCompletionSource<bool> that was stored in m_paused.  We do this with another Interlocked.CompareExchange, and as we need to tell the CompareExchange operation what exact value we expect to find in m_paused, and as another thread could be changing it out from under us, we need to do this in a standard compare-exchange loop: grab the current value, do the compare exchange assuming that value, and if the value actually changed between the time we grabbed it and the time we did the compare-exchange, repeat.

To shield these implementation details from PauseToken, we’ll add an internal WaitWhilePauseAsync method to PauseTokenSource that PauseToken can then access.

internal Task WaitWhilePausedAsync()
{
    var cur = m_paused;
    return cur != null ? cur.Task : s_completedTask;
}

This method just grabs m_paused, and if it’s non-null returns its Task.  If it is null, then we’re not paused, so we can hand back an already completed Task in order to avoid unnecessary allocations (since we should expect it to be very common that WaitWhilePauseAsync is called when not actually paused):

internal static readonly Task s_completedTask = Task.FromResult(true);

The last member we need on PauseTokenSource is the Token property that will return the associated PauseToken:

public PauseToken Token { get { return new PauseToken(this); } }

Now for implementing PauseToken.  Its implementation is very simple, as it’s just a wrapper over the PauseTokenSource from which its constructed:

public struct PauseToken
{
    private readonly PauseTokenSource m_source;
    internal PauseToken(PauseTokenSource source) { m_source = source; }

    public bool IsPaused { get { return m_source != null && m_source.IsPaused; } }

    public Task WaitWhilePausedAsync()
    {
        return IsPaused ?
            m_source.WaitWhilePausedAsync() :
            PauseTokenSource.s_completedTask;
    }
}

PauseToken’s IsPaused property only has a getter and not a setter, since our design requires that all transitioning from un-paused to paused, and vice versa, is done via the PauseTokenSource (that way, only someone with access to the source can cause the transition).  PauseToken’s IsPaused getter just delegates to the source’s IsPaused; of course, as this PauseToken is a struct, it’s possible it could have been default initialized such that m_source would be null… in that case, we’ll just return false from IsPaused.

Finally, we have our PauseToken’s WaitWhilePauseAsync method.  If we’re paused, we simply delegate to the source’s WaitWhilePausedAsync implementation we already saw.  If we’re not paused (which could include not having a source), we just return our cached already-completed Task.

That’s it: our implementation is now complete, and we can start using it to pause asynchronous operations.  Here’s a basic console-based example of using our new PauseToken type:

class Program
{
    static void Main()
    {
        var pts = new PauseTokenSource();
        Task.Run(() =>
        {
            while (true)
            {
                Console.ReadLine();
                pts.IsPaused = !pts.IsPaused;
            }
        });
        SomeMethodAsync(pts.Token).Wait();
    }

    public static async Task SomeMethodAsync(PauseToken pause)
    {
        for (int i = 0; i < 100; i++)
        {
            Console.WriteLine(i);
            await Task.Delay(100);
            await pause.WaitWhilePausedAsync();
        }
    }
}

As a final thought, for those of you familiar with various kinds of synchronization primitives, PauseTokenSource might remind you of one in particular: manual reset events.  In fact, that’s basically what it is, just with a different API set (for comparison, see this blog post on building an AsyncManualResetEvent).  Setting IsPaused to false is like setting/signaling a manual reset event, and setting it to true is like resetting one.

Enjoy!

Leave a Comment
  • Please add 5 and 3 and type the answer here:
  • Post
  • Great article as usual. As we know async is hard but I think you must of been hit with an axe in the exact centre of the head as a child or something else!!! ie. Gollum

    Now my n00b question : can you explain or point me to a source that explains "PauseToken is just a thin value-type veneer on top of PauseTokenSource" What does this wrapping give us?

  • Re: "you must of been hit with an axe in the exact centre of the head as a child or something else!!!"

    I'm not sure what that means, but I think it's a compliment, so thanks :-)

    Re: "What does this wrapping give us?"

    It gives separation of concerns between the ability to consume requests and the ability to produce them. You might want to pass a token to a method that should be able to observe the request, but you don't necessarily want it to be able issue one. This is particularly true if you pass the token to multiple methods: you don't want one peer to be able to interfere with another.

  • I was confused for a moment by the name s_runningTask, because I thought it's a Task in the running state, which doesn't make any sense. But you meant (I think) that it's a Task that's used when the token is not paused, i.e. running.

    I think the name could be improved, but I'm not sure how exactly.

  • @svick: yes, that's what I meant: it's used when not paused, hence"running". To try to avoid such confusion, I've renamed it to be s_completedTask.

    @tobi:  Thanks for your nice comments; unfortunately, it looks like I accidentally deleted your post somehow while responding to it.  Please feel free to repost.

  • Doesn't the setter for IsPaused create a TaskCompletionSource instance everytime the value is true even if that value is never used?

  • @Paulo Morgado: Yes, in my implementation in the post, a TaskCompletionSource<T> is created when IsPaused is being set to true, even if it's already true.  You could certainly add a check first thing in the setter that did "if (value == IsPaused) return;"  I just didn't bother, figuring that in most cases, folks aren't going to be setting IsPaused to the value it already is, and even in the cases where someone did, the chances that it's a hot path that needs to alleviate all possible allocations is very unlikely.  So, for brevity, I left it out.

  • Reading your blog with pleasure! Will use this with .NET 4.

    Maybe I'm missing something but I will rename IsPause to IsPauseRequested and WaitWhilePausedAsync to WaitWhilePauseRequestedAsync. I guess these names reflect more clear what the code does.

  • @Tom: I'm glad you've enjoyed the blog.  And it sounds like you're not missing anything... your names are fine.

  • In the WaitWhilePausedAsync method you don't seem to care about read introduction described here  msdn.microsoft.com/.../jj883956.aspx

    Do you happen to know when exactly the compiler decides to introduce reads? If you do, would you share the knowledge?

  • @Gebb: Which read are you concerned about? m_paused is volatile, such that any read on it has acquire semantics and acts as a downward fence on subsequent reads.

  • @Stephen: I'm worried about this particular piece.

    var cur = m_paused;

    return cur != null ? cur.Task : s_completedTask;

    Igor's article states that the compiler may introduce a read from the field where you do cur.Task and it doesn't mention if voilatileness of the field can influence read introduction. Here is a piece of code from the article:

    public class ReadIntro {

     private Object _obj = new Object();

     void PrintObj() {

       Object obj = _obj;

       if (obj != null) {

         Console.WriteLine(obj.ToString());

       // May throw a NullReferenceException

       }

     }

     void Uninitialize() {

       _obj = null;

     }

    }

    Notice the comment. Isn't it basically the same as your code?

  • @Gebb: It's not the same, in particular because _obj in that example isn't marked as volatile.  The article you're referring to is the second of a two-article series.  See the first article at msdn.microsoft.com/.../jj863136.aspx, specifically the section titled "Volatile Fields".

  • @Stephen: As I understand volatile fields prevent certain kinds of memory operations reordering. But does this concept of "read introduction" have anything to do with reordering?

  • @Gebb: Yes, "read introduction" is one mechanism by which a memory reordering might be introduced.

  • Thank you, Stephen! The article doesn't state explicitly that volatileness prevents read introduction, but I take it from your replies that it does. Good news.

Page 1 of 2 (27 items) 12