Cancellation in Parallel Extensions

Cancellation in Parallel Extensions

Rate This
  • Comments 8

One of the great features that crosses all of Parallel Extensions types is a consistent approach to cancellation (see http://blogs.msdn.com/pfxteam/archive/2009/05/22/9635790.aspx). In this post we explore some of the ways cancellation is used in Parallel Extensions and explain the guidance we developed.

The new cancellation system is a cooperative approach based on two new types: CancellationTokenSource, which initiates cancellation requests, and CancellationToken which communicates a cancellation request to asynchronous operations and to long-running and blocking method calls.

If you are experimenting with Parallel Extensions, you might like to keep an eye out for the methods that accept a CancellationToken and test them out.

Blocking calls

Some of the Parallel Extension types introduce blocking methods, for example BlockingCollection.Take() and Task.Wait(). The default overloads for these methods will block indefinitely if the condition they are waiting for never occurs. One solution used by these APIs, and many others like them, is to provide a timeout overload so that they may return after some duration and report that the condition did not occur. However, a timeout isn't particularly convenient if you only want to stop waiting if a specific activity occurs, such as a user clicking a 'cancel' button. One possibility is to wait for some amount of time such as 100 milliseconds, check if the button was pressed, and if not then go back to waiting, but this puts the burden on the call-site and must be re-implemented repeatedly; and depending on the frequency of the polling, it may also add unnecessary cost. The types in Parallel Extensions use the new cancellation system and provide overloads that accept a CancellationToken which can cause early termination of blocking methods in response to a specific cancellation request.

Here are some examples of the blocking calls in Parallel Extensions that accept a CancellationToken:

BlockingCollection:
void Add(T item, CancellationToken cancellationToken)
T Take(CancellationToken cancellationToken)

ManualResetEventSlim:
void Wait(CancellationToken cancellationToken)

Task:
void Wait(CancellationToken cancellationToken)
void WaitAll(Task[] tasks, CancellationToken cancellationToken)

In each case, you supply a CancellationToken to the method, and if the CancellationToken is signaled via a call to the associated CancellationTokenSource.Cancel(), then the method will wake up and throw an OperationCanceledException. This frees you from the need to use the timeouts if you are happy to wait until the condition is true or a specific cancellation request occurs. Many of these methods also have overloads that accept both a CancellationToken and a timeout, so that both needs may be accommodated.

Long-running calls

There are also a variety of methods that may simply take a while to complete, particularly if large amounts of data or complex processing is taking place. For example, the following call is likely to take some time, even if individual calls to DoFunc(x) are relatively fast:

Parallel.For(1, 1000000000, (x) => DoFunc(x));

To assist with this, the Parallel APIs have overloads that accept a ParallelOptions class instance that holds a CancellationToken. The machinery inside Parallel.For(...)observes the supplied CancellationToken and will exit early if it sees that the token has be signaled (for more information on exiting loops early, see http://blogs.msdn.com/pfxteam/archive/2009/05/27/9645023.aspx). For example,

CancellationTokenSource cts = new CancellationTokenSource();
ParallelOptions options = new ParallelOptions
                                 {CancellationToken = cts.Token};
Parallel.For(1, 100, options, (x) => DoSlowFunc(x));
// from another thread, call cts.Cancel() to request cancellation.

As you can see, the approach for canceling blocking calls and long-running calls is identical. In both cases you supply a CancellationToken to a specific method call and use the associated CancellationTokenSource to signal a cancellation request.

Digression: Internal cancellation

In certain situations in Parallel Extensions and in other systems, it is necessary to wake up a blocked method for reasons that aren't due to explicit cancellation by a user. For example, if one thread is blocked on blockingCollection.Take() due to the collection being empty and another thread subsequently calls blockingCollection.CompleteAdding(), then the first call should wake up and throw an InvalidOperationException to represent an incorrect usage. A great way to implement this is to use an internal CancellationTokenSource that can wake things up due to internal concerns, and to link it to a second, external CancellationTokenSource that has been supplied by the user. For example, the following captures the essential details of how CompleteAdding() and Take(CancellationToken) are implemented:

class BlockingCollection<T>
{
   CancellationTokenSource internalCTS = new CancellationTokenSource();

   public void FinishAdding()
   {
      //...
      internalCTS.Cancel();
   }

   public T Take(CancellationToken externalToken)
   {
      using(CancellationTokenSource linkedTokenSource =
      CancellationTokenSource.CreateLinkedTokenSource(
      internalCTS.Token, externalToken)){
      try
      {
         return InternalTake(linkedTokenSource.Token);
      }
      catch (OperationCanceledException oce)
      {
         if(internalCTS.Token.IsCancellationRequested)
            throw new InvalidOperationException("..msg..");
         else if(externalToken.IsCancellationRequested)
            throw new OperationCanceledException(externalToken);
         else throw;
      }
   }
}

In this fragment, a linked CancellationTokenSource is created and its token passed to the InternalTake() method. This simplifies the implementation for InternalTake() but still allows it to be woken up due to either the external token being signaled or an incorrect concurrent call to CompleteAdding(). If the InternalTake() method throws an OperationCanceledException, we can test the individual tokens to determine what the cause was and take appropriate action. Note that oce.CancellationToken will be equal to linkedTokenSource.Token as this is the token that was actually being observed, but we can look at the original tokens to determine the cause for the linked token being signaled. There is a minor risk of confusion if both sources are signaled simultaneously, but this confusion is benign as it is reasonable to behave as though either source was responsible, and a particular implementation may choose to prioritize one mechanism over the other.

Cooperative cancellation with user-code

Both TPL and PLINQ provide infrastructure that calls back to user code. If the user code is long running or blocking, then it is very useful for the user code to be able to respond to cancellation in a cooperative fashion. TPL and PLINQ achieve this by tracking a supplied CancellationToken, and if they see some user code throw an OperationCanceledException that mentions this specific token, then it is treated as cooperative acknowledgement of cancellation. This means that both TPL and PLINQ understand that the user code did not suffer some catastrophic exceptional conditional, but rather that it is simply responding to cancellation. As a result, TPL and PLINQ constructs will simply exit with an OperationCanceledException of their own, rather than aggregating exceptions from all the participating worker threads and throwing an AggregateException.

PLINQ follows this basic plan; for example, the following query will throw a single OperationCanceledException if the CancellationTokenSource becomes signaled. It doesn't matter whether PLINQ or the user delegate sees the cancellation request first, as the user-delegate exception will be correctly understood by the PLINQ execution engine.

CancellationTokenSource cts = new CancellationTokenSource();
var resultArray =
   
data.AsParallel()
        .WithCancellation(cts.Token)
        .Select((x) =>
           {
              if (cts.Token.IsCancellationRequested)
                 throw new OperationCanceledException(cts.Token);
              // ...
           })
        .ToArray();

In both PLINQ and TPL, it is interesting to note that the external cancellation token is not supplied back to the user-delegate via a parameter as may be expected. This would have bloated the number of method overloads considerably and, as shown above, the external token can be conveniently accessed via closures or equivalent techniques.

If the user code throws an OperationCanceledException but one or more "normal" exceptions occur on other threads, then all of the exceptions will be collated into an AggregateException, including the OperationCanceledException. Hence, if only cancellation occurred then a single OperationCanceledException is thrown, but an actual failure will always result in an AggregateException.

PLINQ Queries

PLINQ offers deep support for cancellation and endeavors to ensure that cancellation will be enacted swiftly for any query. For queries that involve only simple and fast user delegates, simply supply a CancellationToken via the WithCancellation() method and PLINQ takes care of the rest. If your query involves long-running or blocking user code, then you should use the cooperative cancellation pattern described in the preceding section. For reasons that are discussed in the next section, PLINQ may not check for cancellation every time it calls a user-delegate, and so long running delegates should check cancellation frequently (as a working rule of thumb, PLINQ will check for cancellation approximately once per one hundred calls to a user delegate). For example, if a delegate call runs for 50ms but does not perform any cancellation checks of its own, then PLINQ may not detect the cancellation request itself for up to 5 seconds, and this could significantly affect a user’s experience. For this reason, we recommend that long-running delegates called from PLINQ queries should check for cancellation as frequently as possible. A good lower bound is to check cancellation once per few thousand IL instructions executed, and an upper bound is to check no less frequently than once per 10ms, but once per 1ms is preferred to ensure snappy response.

The following example demonstrates a user delegate that may run for a while, and so checks cancellation itself every few thousand IL instructions. This provides for timely cancellation support with a negligible cost to performance.

// Assume an external CancellationToken 'token' has been supplied.
int[] data = ...;
var query =
   data.AsParallel()
       .WithCancellation(token)
       .Select((x) =>
          {
             int result = 0;
             for ( int i = 0; i < x; i++ ) {
                for (int j = 0; j < 1000 * x; j++)
                {
                   result += SimpleFunc(i,j);
                }
                token.ThrowIfCancellationRequested();
             }
             return result;})
       .ToArray();

[Update: the call to token.ThrowIfCancellationRequested shown above is not available in Beta1 (but will appear in subsequent releases).  This behavior can be acheived via a manual test-and-throw as shown in earlier examples, or via the extension-method approach described in the comments.  -Mike]

Summary

The Parallel Extension types use the new cancellation system to provide consistent and rich cancellation features. All of the long-running and blocking methods in Parallel Extensions provide overloads that take a CancellationToken, and any callbacks made to user-code can participate in the process. Application code can also make use of the same ideas and facilities to ensure that cancellation can be a fully supported feature in any situation.

Leave a Comment
  • Please add 8 and 6 and type the answer here:
  • Post
  • Interesting Finds: June 23, 2009

  • I see that you delete comments that you dislike. If you don't want comments then don't put this pretty form on your blog!!!

  • Pter, we don't delete comments (unless they're obscene), and we value any and all feedback, whether positive or critical.  Are you referring to a comment that you made that is no longer showing up?  I see you commented on another post we have on cancellation, at http://blogs.msdn.com/pfxteam/archive/2009/05/22/9635790.aspx... is it possible you thought you posted on this post rather than on that one?

  • Hi Stephen

    Tell me, where is that ThrowIfCancellationRequested() method defined? Is it an extension method?

    It would certainly make sense to define such a method on CancellationToken (I actually went *looking* for something like this the first time I used CancellationToken!)

    Joe

  • Sorry for the confusion..and thanks for bringing it to our attention.

    Unfortunately that method slipped through and was not included in Beta1, but it will ship in the next release.

    In the meantime, you can use your own extension method to acheive the same behavior that we will ship.

    public static class CancellationTokenHelpers{

     public static void ThrowIfCancellationRequested(this CancellationToken ct){

        if(ct.IsCancellationRequested)

           throw new OperationCanceledException("The operation was canceled.", ct);

     }

    }

    -Mike.

  • I'm trying to understand the Cancellation Framework machinery. I know that it is important to cancel operation in a cooperative manner. I've understood the CancellationTaskSource concept. But I've problems to understand the behavior with Task.WaitAll(...). I have the following example:

    public void Cancellation()

    {

       CancellationTokenSource cts = new CancellationTokenSource();

       Task t1 = new Task(() =>

       {

           while (true)

           {

               if (Task.Current.CancellationToken.IsCancellationRequested)

               {

                   Console.WriteLine("Canceled");

                   break;

               }

               Console.WriteLine("Task is running.");

               Thread.Sleep(1000);

           }

       });

       Task t2 = new Task(() =>

       {

           while (true)

           {

               if (Task.Current.CancellationToken.IsCancellationRequested)

               {

                   Console.WriteLine("Canceled");

                   break;

               }

               Console.WriteLine("Task is running.");

               Thread.Sleep(1000);

           }

       });

       t1.Start();

       t2.Start();

       new Task(() =>

       {

           Thread.Sleep(1000);

           cts.Cancel();

       }, TaskCreationOptions.None).Start();

       Task.WaitAll(new Task[] { t1, t2}, cts.Token);

    }

    I assumed that the Cancellation request is propagated to the waiting Task (in this example t1, t2) by the Cancellation Framework. But in my example the defined OperationCanceledException is thrown. In this case it is not possible to manage the cancellation request within task1 and task2. Like this behavior it is possible that the running tasks own shared resources (Deadlock) or shared state will be invalid after cancellation.

  • The issue here is that you are canceling two distinct types of operation: you are asking for cancellation of the tasks, and you are also asking for cancellation of the wait operation itself.  Sometimes this may be appropriate, but most often you will want to genuinely wait until the tasks have completed (with the understanding that the tasks may complete by observing a request for cancellation).

    Furthermore, because waiting for task completion is the common approach to marshal any exceptions the tasks may have suffered, allowing the Wait() operation itself to be canceled is preventing the marshalling from occuring and may be counterproductive.

    ie, use Task.WaitAll(tasks) rather than Task.WaitAll(tasks, ct).

    -mike.

  • Thanks Mike for your comment.

    I thought I did a mistake and used the new API in a wrong way ... Ok, I understand that the CancelToken for the WaitXXX-Operation is only responsible to cancel the waiting state like a timeout.

    But it is a break regarding the CancellationFramework (CF) and the framework don't meet the requirements for a cancellation-"aware" framework. Maybe in a future API version the given CancelToken is transfered to all participants.

Page 1 of 1 (8 items)