Cooperatively pausing async methods

Cooperatively pausing async methods

Rate This
  • Comments 32

Recently I was writing an app that processed a bunch of files asynchronously.  As with the Windows copy file dialog, I wanted to be able to provide the user with a button that would pause the processing operation.

To achieve that, I implemented a simple mechanism that would allow me to pass a “pause token” into the async method, which the async method could asynchronous wait on at appropriate points. 

public async Task ProcessFiles(
    IEnumerable<StorageFile> files, PauseToken pauseToken)
{
    foreach(var file in files)
    {
        await pauseToken.WaitWhilePausedAsync();
        await ProcessAsync(file);
    }
}

My pause token follows a similar design to that of CancellationToken and CancellationTokenSource.  I have a PauseToken instance that I can pass to any number of operations (synchronous or asynchronous), and those operations can monitor that token to be alerted to pause requests.  Separately, a PauseTokenSource is responsible for creating the PauseToken to be handed out and for issuing the pause requests.

public class PauseTokenSource
{
    public bool IsPaused { get; set; }
    public PauseToken Token { get; }
}

public struct PauseToken
{
    public bool IsPaused { get; }
    public Task WaitWhilePausedAsync();
}

We’ll start by implementing PauseTokenSource, which is the meat of the implementation; as with CancellationToken and CancellationTokenSource, PauseToken is just a thin value-type veneer on top of PauseTokenSource that just delegates most calls to the underlying reference type.  PauseTokenSource has one instance field:

private volatile TaskCompletionSource<bool> m_paused;

The m_paused field is the TaskCompletionSource<bool> that can be used to complete the Task we’ll hand out to waiters when the instance is paused (such that when we’re un-paused, we’ll set the Task to wake up all the waiters): if m_paused is null, we’re not paused, and if it’s non-null, we’re currently paused.

The bulk of the implementation is then in PauseTokenSource.IsPaused.  Its getter just returns whether m_paused is not null, but its setter is more complicated:

public bool IsPaused
{
    get { return m_paused != null; }
    set
    {
        if (value)
        {
            Interlocked.CompareExchange(
                ref m_paused, new TaskCompletionSource<bool>(), null);
        }
        else
        {
            while (true)
            {
                var tcs = m_paused;
                if (tcs == null) return;
                if (Interlocked.CompareExchange(ref m_paused, null, tcs) == tcs)
                {
                    tcs.SetResult(true);
                    break;
                }
            }
        }
    }
}

If IsPaused is being set to true, then we simply need to transition m_paused from null to a new TaskCompletionSource<bool>; we do this with an interlocked compare-exchange so that we only do the transition if m_paused is null, regardless of what other threads we might be competing with.

If IsPaused is being set to false, we need to do two things: transition m_paused from non-null to null, and complete the Task from the TaskCompletionSource<bool> that was stored in m_paused.  We do this with another Interlocked.CompareExchange, and as we need to tell the CompareExchange operation what exact value we expect to find in m_paused, and as another thread could be changing it out from under us, we need to do this in a standard compare-exchange loop: grab the current value, do the compare exchange assuming that value, and if the value actually changed between the time we grabbed it and the time we did the compare-exchange, repeat.

To shield these implementation details from PauseToken, we’ll add an internal WaitWhilePauseAsync method to PauseTokenSource that PauseToken can then access.

internal Task WaitWhilePausedAsync()
{
    var cur = m_paused;
    return cur != null ? cur.Task : s_completedTask;
}

This method just grabs m_paused, and if it’s non-null returns its Task.  If it is null, then we’re not paused, so we can hand back an already completed Task in order to avoid unnecessary allocations (since we should expect it to be very common that WaitWhilePauseAsync is called when not actually paused):

internal static readonly Task s_completedTask = Task.FromResult(true);

The last member we need on PauseTokenSource is the Token property that will return the associated PauseToken:

public PauseToken Token { get { return new PauseToken(this); } }

Now for implementing PauseToken.  Its implementation is very simple, as it’s just a wrapper over the PauseTokenSource from which its constructed:

public struct PauseToken
{
    private readonly PauseTokenSource m_source;
    internal PauseToken(PauseTokenSource source) { m_source = source; }

    public bool IsPaused { get { return m_source != null && m_source.IsPaused; } }

    public Task WaitWhilePausedAsync()
    {
        return IsPaused ?
            m_source.WaitWhilePausedAsync() :
            PauseTokenSource.s_completedTask;
    }
}

PauseToken’s IsPaused property only has a getter and not a setter, since our design requires that all transitioning from un-paused to paused, and vice versa, is done via the PauseTokenSource (that way, only someone with access to the source can cause the transition).  PauseToken’s IsPaused getter just delegates to the source’s IsPaused; of course, as this PauseToken is a struct, it’s possible it could have been default initialized such that m_source would be null… in that case, we’ll just return false from IsPaused.

Finally, we have our PauseToken’s WaitWhilePauseAsync method.  If we’re paused, we simply delegate to the source’s WaitWhilePausedAsync implementation we already saw.  If we’re not paused (which could include not having a source), we just return our cached already-completed Task.

That’s it: our implementation is now complete, and we can start using it to pause asynchronous operations.  Here’s a basic console-based example of using our new PauseToken type:

class Program
{
    static void Main()
    {
        var pts = new PauseTokenSource();
        Task.Run(() =>
        {
            while (true)
            {
                Console.ReadLine();
                pts.IsPaused = !pts.IsPaused;
            }
        });
        SomeMethodAsync(pts.Token).Wait();
    }

    public static async Task SomeMethodAsync(PauseToken pause)
    {
        for (int i = 0; i < 100; i++)
        {
            Console.WriteLine(i);
            await Task.Delay(100);
            await pause.WaitWhilePausedAsync();
        }
    }
}

As a final thought, for those of you familiar with various kinds of synchronization primitives, PauseTokenSource might remind you of one in particular: manual reset events.  In fact, that’s basically what it is, just with a different API set (for comparison, see this blog post on building an AsyncManualResetEvent).  Setting IsPaused to false is like setting/signaling a manual reset event, and setting it to true is like resetting one.

Enjoy!

Leave a Comment
  • Please add 4 and 7 and type the answer here:
  • Post
  • Like Gebb I would like to understand how to prevent read introductions but I'm not satisfied yet. I searched the web but couldn't find a definitive, precise answer.

    This is unfortunate because it's critical to write correct concurrent code. For instance your method WaitWhilePausedAsync would not be correct if a read introduction was possible.

    Maybe you could write a blog post on the topic?

    You clear the issue in the following comment above:

    "read introduction is one mechanism by which a memory reordering might be introduced"

    With the implicit argument that because a volatile variable prevents reordering, the read introduction is prevented.

    But!

    The part 1 of this article, which you've linked to, explicitely disagrees with you. It explicitely describes "read introduction" as a "NON-reordering optimization". I quote the relevant paragraph:

    "*Non-Reordering Optimizations* Some compiler optimizations may introduce or eliminate certain memory operations. For example, the compiler might replace repeated reads of a field with a single read. Similarly, if code reads a field and stores the value in a local variable and then repeatedly reads the variable, the compiler could choose to repeatedly read the field instead."

    If a "read introduction" is not a reordering optimization -- which makes sense -- I see nothing in the definition of a volatile variable that prevents it. Quoting the volatile explanation from the same article:

    "*Volatile Fields* The C# programming language provides volatile fields that constrain how memory operations can be reordered. The ECMA specification states that volatile fields provide acquire-­release semantics (bit.ly/NArSlt).

    A read of a volatile field has acquire semantics, which means it can’t be reordered with subsequent operations. The volatile read forms a one-way fence: preceding operations can pass it, but subsequent operations can’t. [...]"

    Can you please shed some light on this issue? What are the precise rules regarding read introduction, when is code safe, when is it not?

    Thanks!

  • jods: See section "Publication via Volatile Field" of the Part 1 of my article (msdn.microsoft.com/.../jj863136.aspx). No read may be introduced that would break the acquire-release semantics of a volatile field.

  • @Igor: I read it again and it doesn't explicitely talk about "Read introductions" only reorderings.

    The rule "no read is introduced that would break semantics of a volatile field" doesn't seem sufficient to me.

    From my understanding of the C# standard, volatile means:

    [1] volatile reads have acquire semantics, i.e. no memory operation can be moved before;

    [2] volatile write have release semantics, i.e. no memory operation can be moved after.

    Add to that the rule that [3] no optimization should change the behavior of a single threaded program.

    Now consider Stephen's code above:

       var cur = m_paused;

       return cur != null ? cur.Task : s_completedTask;

    Can you explain which rule above has been broken if "cur" in the expression "cur.Task" is read from m_paused again rather than from

    a temporary?

    Clearly rule [1] has not been violated because no memory operation has been moved before the statement "var cur = m_paused".

    Rule [3] is fine as well: as long as there's no other thread m_paused can't have changed.

    Rule [2] doesn't matter as there is no volatile write.

    Maybe the missing part of the standard is simply: no read/write can be introduced/eliminated on a volatile variable?

    Bonus chatter: how does all this relate to non-volatile variable? What if I rely on a few MemoryBarrier or Interlocked operations

    rather than an always volatile field?

    I think the confusion arose from the vague wording of the second article that basically states "read introductions -- or

    eliminations -- can happen in some circumstances" without providing the precise rules that dictate when such

    introduction/elimination is actually allowed or when it is forbidden.

    I couldn't find a precise definition anywhere, information on the web only seems to focus on the ordering issues and

    acquire/release semantics.

    Or maybe it's just me who doesn't understand how acquire semantic and a read introduction are related?

  • @jods: The reason I mentioned the "Publication via Volatile Field" section is that it describes how to safely use volatile fields. For example, the WaitWhilePausedAsync method in this article is an instance of the Publication via Volatile Fields pattern. Also take a look at the "Lazy Initialization" pattern.

    > Can you explain which rule above has been broken if "cur" in the expression "cur.Task" is read from m_paused again rather than from a temporary?

    Elsewhere in the C# specification, a volatile read is defined to be a "side effect". As a result, repeating the read of m_paused would be equivalent to adding another side effect, which is not allowed.

    > Bonus chatter: how does all this relate to non-volatile variable? What if I rely on a few MemoryBarrier or Interlocked operations rather than an always volatile field?

    That can work too.

  • Thanks Igor!

    I read that section (§3.10 Execution Order) before but completely missed this important aspect: volatile reads and writes are side-effect.

    Hence they can't be introduced or removed!

    That explains it everything.

    Just to be sure that I understood your answer about MemoryBarrier: introducing a read can be seen as moving a memory operation accross the barrier and this is why it's forbidden, even though it's not a side-effect if the variable is not volatile? E.g.

    var cur = memory_read;

    Thread.MemoryBarrier();

    // Accessing memory_read here is moving a memory operation accross the barrier?

    return cur != null ? cur.IsPaused : false;

    Of course Interlocked or Thread.VolatileRead are more obvious as they can't be "repeated" or "introduced".

    Thanks for the explanation!

  • @Igor, Stephen:

    Sorry to come again on this topic but I was surprised when reading the MSDN Magazine (Oct 05) article 'Understand the Impact of Low-Lock Techniques in Multithreaded Apps'.

    When describing the .net 2.0 memory model, it explicitely says:

    "Reads and writes cannot be introduced"

    Which seems to be the exact opposite of Igor's statement in his article.

    I also found that Joe Duffy makes the same statement in his book "Concurrent Programming on Windows", pp517-8:

    "the .NET memory model prohibits it [read introduction] for ordinary variables referring to GC heap memory".

    So what should I conclude here?

    1. The memory model was weaken in .net 4? (unlikely I guess)

    2. The JIT optimizations may violate the CLR memory model? (which would be very dangerous)

    3. Igor's article is incorrect about read introductions?

    4. Joe Duffy and Vance Morrisson are mistaken about read introductions?

    Thanks!

  • Note: there have been several questions on StackOverflow regarding the MSDN article and specficially the read introducion section, e.g.:

    stackoverflow.com/.../16789697

    or

    stackoverflow.com/.../read-introduction-in-c-sharp-how-to-protect-against-it

    Maybe it's best to end this discussion there.

  • Hi Stephen, thanks for the great article. There's one point I'd like to clarify. It appears your implementation of "IsPaused = true" is a "fire-and-continue" request - by design, I suppose. That is, when the consumer-side code - which has requested the pause - continues, the producer-side code may have not already reached the paused (awaiting) state.  In a weird scenario, we may have updated the UI and suspended the progress bar, but the files are still being copied in the background. Here's what I mean: http://pastebin.com/1ezQUHA5. Am I missing something?

  • There are two points I don't understand:

    a) Why do you check m_source for null - the only place where PauseToken is constructed is in

           public PauseToken Token { get { return new PauseToken(this); } }

      and 'this' refers per definition always to an existing instance of PauseTokenSource?

      It would be even better to let PauseToken throw an exception in cases it is constructed elsewhere as it indicates a wrong usage.

    b) Why do you check for 'IsPaused' in

      return IsPaused ?

               m_source.WaitWhilePausedAsync() :

               PauseTokenSource.s_completedTask;

      - m_source.WaitWhilePausedAsync() returns the same result / performs the same check already

    Thanks for your answer.

  • @Andreas:

    a) PauseToken is a struct, so if a developer writes "new PauseToken()" or "default(PauseToken)", m_source will be null.

    b) It could just call WaitWhilePausedAsync.

  • I have a question about setting IsPaused to "false": The code uses a compare exchange loop and I understand why the loop is necessary. However, I don't understand why a compare-exchange is needed in the first place: Couldn't we just replace it with Interlocked.Exchange():

    public bool IsPaused

    {

       ...

       set

       {

           if (value)

           {

               ...

           }

           else

           {

               var tcs = Interlocked.Exchange(ref m_paused, null);

               if (tcs != null)

               {

                   tcs.SetResult(true);

               }

           }

       }

    }

    Would this be a valid alternative or am I missing something? Thanks!

  • @alecor: I haven't worked through that in detail, but at first glance it seems like an Interlocked.Exchange would be sufficient here.

  • It seems not to work when I remove the await Task.Delay(100) instruction and set IsPaused = true|false twice by code.

  • Take PauseTokenSource class and PauseToken struct as designed.

    Broken code (why?) :

    class Program

    {

           static void Main()

           {

               var pts = new PauseTokenSource();

               Task.Run(() =>

               {

                   Stopwatch sw = new Stopwatch();

                   sw.Start();

                   Thread.Sleep(1000);

                   while (true)

                   {

                       pts.IsPaused = true;

                       Console.WriteLine("Pause at " + sw.ElapsedMilliseconds);

                       Thread.Sleep(1000);

                       pts.IsPaused = false;

                       Thread.Sleep(500);

                   }

               });

               ProcessAsync(0, pts.Token).Wait();

               Console.ReadLine();

           }

           public static async Task ProcessAsync(PauseToken pause)

           {

               for (long i = 0; true; i++)

               {

                   Console.WriteLine(i));

                   //await Task.Delay(1); // does not work with "await Task.Delay(TimeSpan.FromTicks(9999))" but with "await Task.Delay(TimeSpan.FromTicks(10000))" it is Ok

                   await pause.WaitWhilePausedAsync();

               }

           }

       }

  • @D_: The design is such that it only pauses if IsPaused is true when WaitWhilePausedAsync is called; the implementation doesn't remember a previous change of IsPaused to true if you set it to false again before WaitWhilePausedAsync is called, and even then, as soon as you set it to true, the Task returned from WaitWhilePausedAsync will complete and the await continuation will be allowed to run.  If in the code in the blog post you remove the "await Task.Delay(100)", then you're just sitting in a tight loop accessing "await pause.WaitWhilePauseAsync()" and it's just going to be returning an already completed task because it's not paused.  In your example, when you use TimeSpan.FromTicks(9999), that's less than a millisecond, so it's effectively TimeSpan.FromMilliseconds(0), which means it's basically no different than doing "await Task.Delay(0)" or removing the delay entirely.

Page 2 of 3 (32 items) 123