In many data-parallel scenarios, all of the data to be processed is available immediately. This blog post addresses the opposite scenario: the inputs arrive gradually (as if in a stream), and we want to start producing results even before reading the last element of the input sequence.
There is a variety of scenarios in which inputs become available gradually rather than all at once. The inputs could arrive as requests across the network, inputs entered by a user, data read from an I/O device, results computed from another computation, and so on. We want to process the inputs in parallel, and make the partial results available as they are computed.
Simple PLINQ queries support streaming. A query that only consists of simple Select and Where operators will do streaming:
var q = inputSrc.Where(x => Foo(x)).Select(x => ExpensiveComputation(x));
foreach(var x in q)
{
Process(x);
}
In this example, Process(x) will get called on some input elements even before all elements of the input have been read.
On the other hand, not all queries support streaming. In fact, some queries cannot possibly support streaming. Consider this query that contains an OrderBy() operator:
var q = src.AsParallel().Select(x => Foo(x)).OrderBy(x => x);
We need to compute Foo(x) for all input elements, and only then we can yield the smallest element. Thus, a query that contains an OrderBy operator does not (and cannot) run in a streaming fashion.
So, what kinds of PLINQ queries do run in a streaming fashion? Generally, they are simple queries. The query can contain any number of Select and Where operators, except for the special positional variants. By positional overloads, we mean the overloads of Select and Where that accept a delegate that accesses both element values and their positions in the sequence. A positional Select or Where operator does prevents streaming if there is a Where operator anywhere prior to it in the query.
A streaming query can produce both ordered or unordered results. As is typical in PLINQ, the results are unordered by default, but you can opt into ordering by using the AsOrdered() operator:
var q = src.AsParallel().AsOrdered().Select(x => Foo(x)).Where(y => Bar(y));
In the above query, PLINQ will read the elements from the src enumerable, distribute them into different partitions, process them on multiple threads, and rearrange results into a correctly-ordered output sequence. All of these stages are happening concurrently, so first results will start getting produced while the inputs are still getting read (assuming that the input sequence is sufficiently long).
One important point about streaming PLINQ queries is that the algorithms we use are optimized for throughput rather than latency. PLINQ uses buffers internally, so a particular result may sit in an output buffer until a certain number of results have been produced. Obviously, that is undesirable if one of the results needs to be sent back to a client as quickly as possible.
One major benefit of streaming is that all data does not have to be loaded into memory at any one particular time. For example, if the query reads its inputs from one file and writes the outputs to another file, all of the inputs will not necessarily have to get loaded into memory at one time. Keep this use case in mind, and when you come across it in your development, use PLINQ to easily get your code to scale on multi-core machines.
More and more, developers are realizing the significant scalability advantages that asynchronous programming can provide, especially as it relates to I/O.
Consider an application that needs to copy data from one stream to another stream, such as is being done in the following synchronous implementation:
static void CopyStreamToStream(Stream input, Stream output)
{
// Buffer space for the data to be read and written
byte [] buffer = new byte[0x2000];
// While there’s data to be read and written
while(true)
{
// Read data. If we weren’t able to read any, bail.
// Otherwise, write it out and start over again.
int numRead = input.Read(buffer, 0, buffer.Length);
if (numRead == 0) break;
output.Write(buffer, 0, numRead);
}
}
In many cases like this, such as if the Streams are FileStream instances representing files on disk, or NetworkStream instances representing remote data, the operations being performed require little-to-no computational power, as the majority of the time is spent waiting on the I/O subsystems and devices. If one such operation is being performed at a time, that waiting isn’t such a big deal. But in a synchronous implementation like the one above, that waiting ends up blocking a thread, rendering the thread useless to do anything else while waiting. Threads by default take up a sizeable chunk of memory as well as kernel resources. Thus, if multiple concurrent calls to CopyStreamToStream are executed, multiple threads may be wasted. Since threads consume a non-negligable amount of resources, we try to limit the number of threads in an application at any one time, such as by using a thread pool, and this synchronous style of programming I/O can lead to scalability bottlenecks, especially in server components where we desire to process as many user requests concurrently as the machine’s resources will possibly allow.
One solution to this problem is through compiler support. A compiler could recognize this synchronous pattern and translate it into an asynchronous one. Such a transformation of that same snippet might look like the following (note that this is hand-generated and is not the actual output of any particular compiler, and rather is my attempt to write this out concisely while still human-understandable):
static void CopyStreamToStreamAsync(
Stream input, Stream output, Action<Exception> completed)
{
// Buffer space for the data to be read and written
byte[] buffer = new byte[0x2000];
// The read/write loop. The parameter is the IAsyncResult of the
// last read operation that still need to be completed with a call
// to EndRead. If the parameter is null, that means a new read needs
// to be started.
Action<IAsyncResult> readWriteLoop = null;
readWriteLoop = iar =>
{
try
{
// Determine whether to start with a BeginRead or
// an EndRead/BeginWrite, based on whether iar is null.
// Then, as long as the loop continues, alternate between
// reading and writing.
for (bool isRead = iar == null; ; isRead = !isRead)
{
switch (isRead)
{
// Do BeginRead(...)
case true:
// Start the asynchronous read
iar = input.BeginRead(buffer, 0, buffer.Length, readResult =>
{
// If the read completed synchronously, immediately
// return from the callback, as the processing of the
// read will be handled synchronously
// from the same thread that called BeginRead
if (readResult.CompletedSynchronously) return;
// The read completed asynchronously, so we need
// to run the processing loop,
// starting with EndRead/BeginWrite.
readWriteLoop(readResult);
}, null);
// If the read is completing asynchronously, bail, as
// there's nothing more to do.
// If it completed synchronously, loop around to do
// the EndRead/BeginWrite synchronously.
if (!iar.CompletedSynchronously) return;
break;
// Do BeginWrite(...)
case false:
// Complete the previous read. If there's no more data
// to be read/written, bail.
int numRead = input.EndRead(iar);
if (numRead == 0)
{
completed(null);
return;
}
// Now that we know how much data was read, write it
// out asynchronously
iar = output.BeginWrite(buffer, 0, numRead, writeResult =>
{
// If the write completed synchronously, allow
// the thread that called BeginWrite
// to handle it.
if (writeResult.CompletedSynchronously) return;
// Otherwise, complete the asynchronous write
// and launch the read/write loop
// to continue all over again.
output.EndWrite(writeResult);
readWriteLoop(null);
}, null);
// If the write is completing asynchronously, bail,
// as there's nothing more to do.
// Otherwise, complete the write synchronously and
// loop around.
if (!iar.CompletedSynchronously) return;
output.EndWrite(iar);
break;
}
}
} catch(Exception e) { completed(e); }
};
// Start the whole process off with a read.
readWriteLoop(null);
}
The Axum compiler, available on DevLabs, is actually capable of these kinds of transformations for asynchronous programming, and you could imagine such functionality being baked into a mainstream language like C#. Here’s how the code could be written asynchronously with Axum:
static asynchronous void CopyStreamToStream(Stream input, Stream output)
{
byte [] buffer = new byte[0x2000];
while(true)
{
int numRead = input.Read(buffer, 0, buffer.Length);
if (numRead == 0) break;
output.Write(buffer, 0, numRead);
}
}
That’s quite lovely. Comparatively, the hand-written code is mind-numbing, and it’s not something you want to have to write each and every time you need to perform some kind of repeating operation like this asynchronously.
As such, some developers have started to take advantage of C# iterators for writing asynchronous code. While not originally designed for this purpose, the compiler transformations employed for C# iterators are similar to what’s necessary for writing asynchronous code, and thus with a bit of library-based support, it’s possible to write an iterator that looks sequential but that takes advantage of asynchrony. Several libraries have been based on this approach, including the Concurrency & Coordination Runtime (CCR) from Microsoft Robotics, Jeffrey Richter’s AsyncEnumerator, and others.
The key to taking advantage of this pattern is yielding something from an iterator that can invoke a callback when an operation completes. The pattern then becomes:
IEnumerable<ThingThatHasCallbackWhenCompletes> AsyncMethod()
{
...
yield return SomethingThatReturnsThingThatHasCallbackWhenCompletes();
... // code here executes when the
// yielded ThingThatHasCallbackWhenCompletes completes
}
The idea is that the iterator method returns an IEnumerable of instances that represent a piece of an asynchronous operation. A utility function is used to invoke the iterator method, and iterates over the resulting enumerator. Each time the utility function gets the next instance from the enumerator, it registers some code to monitor the operation for completion, and when the operation completes, it moves next on the enumerator. Moving next on the enumerator results in re-entering the iterator method at the code location after the last yield point, thus allowing another asynchronous operation to be yielded. In this fashion, the iterator method can in effect yield asynchronous operations, and by using “yield return” to instrument the code with those async points, you as the developer can write an asynchronous method in a manner that looks largely sequential.
Now, think about the description above for the kind of object that needs to be yielded: “something that can invoke a callback when an operation completes.” Sound familiar? The System.Threading.Tasks.Task class in .NET 4 provides this exact functionality. A Task represents an asynchronous operation, and it has a ContinueWith method that enables a callback to be invoked when that asynchronous operation completes. Thus, we should be able to yield Task instances from an iterator in order to write an asynchronous method. Here’s the same CopyStreamToStream example implemented asynchronously in this fashion:
static IEnumerable<Task> CopyStreamToStreamAsync(
Stream input, Stream output)
{
// Buffer space for the data to be read and written
byte [] buffer = new byte[0x2000];
// While there’s data to be read and written
while(true)
{
// Read data asynchronously. When the operation completes,
// if no data could be read, we’re done.
var read = Task<int>.Factory.FromAsync(
input.BeginRead, input.EndRead, buffer, 0, buffer.Length, null,
TaskCreationOptions.DetachedFromParent);
yield return read;
if (read.Result == 0) break;
// Write the data asynchronously
yield return Task.Factory.FromAsync(
output.BeginWrite, output.EndWrite, buffer, 0, read.Result, null,
TaskCreationOptions.DetachedFromParent);
}
}
Much simpler. Where we were previously doing synchronous reads and writes, now we’re yielding the result of calling the built-in Task.Factory.FromAsync method, which creates Tasks that represent asynchronous reads and writes following the APM pattern. We could of course simplify this code further by using a few helper extension methods to hide some of the asynchronous details (these helpers are part of the Beta 1 samples at http://code.msdn.microsoft.com/ParExtSamples):
public static Task<int> ReadTask(this Stream stream,
byte [] buffer, int offset, int count)
{
return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead,
buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}
public static Task WriteTask(this Stream stream,
byte [] buffer, int offset, int count)
{
return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite,
buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}
This then enables the previous code to be simplified to:
static IEnumerable<Task> CopyStreamToStreamAsync(
Stream input, Stream output)
{
byte [] buffer = new byte[0x2000];
while(true)
{
var read = input.ReadTask(buffer, 0, buffer.Length);
yield return read;
if (read.Result == 0) break;
yield return output.WriteTask(buffer, 0, read.Result);
}
}
That looks a lot like the synchronous version, and is *much* easier and less error-prone to write than the manual version shown earlier.
Of course, now we need a mechanism for iterating over the asynchronous iterator. As mentioned, we can take advantage of ContinueWith for the main body of the operation (as with the earlier helpers, the following method and several variants of it are available in the Beta 1 samples):
public static Task Iterate(this TaskFactory factory,
IEnumerable<Task> asyncIterator)
{
// Validate parameters
if (factory == null) throw new ArgumentNullException("factory");
if (asyncIterator == null)
throw new ArgumentNullException("asyncIterator");
// Get the scheduler to use, either the one provided by the factory
// or the current one if the factory didn’t have one specified.
var scheduler = factory.TaskScheduler ?? TaskScheduler.Current;
// Get an enumerator from the enumerable
var enumerator = asyncIterator.GetEnumerator();
if (enumerator == null) throw new InvalidOperationException();
// Create the task to be returned to the caller. And ensure
// that when everything is done, the enumerator is cleaned up.
var trs = new TaskCompletionSource<object>(factory.CreationOptions);
trs.Task.ContinueWith(_ => enumerator.Dispose(),
TaskContinuationOptions.DetachedFromParent, scheduler);
// This will be called every time more work can be done.
Action<Task> recursiveBody = null;
recursiveBody = antecedent =>
{
try
{
// If the previous task completed with any exceptions, bail
if (antecedent != null && antecedent.IsFaulted)
trs.TrySetException(antecedent.Exception);
// If the user requested cancellation, bail.
else if (trs.Task.IsCancellationRequested) trs.TrySetCanceled();
// If we should continue iterating and there's more to iterate
// over, create a continuation to continue processing. We only
// want to continue processing once the current Task (as yielded
// from the enumerator) is complete.
else if (enumerator.MoveNext())
enumerator.Current.ContinueWith(recursiveBody,
TaskContinuationOptions.DetachedFromParent, scheduler).
IgnoreExceptions();
// Otherwise, we're done!
else trs.TrySetResult(null);
}
// If MoveNext throws an exception, propagate that to the user
catch (Exception exc) { trs.TrySetException(exc); }
};
// Get things started by launching the first task
factory.StartNew(() => recursiveBody(null),
TaskCreationOptions.DetachedFromParent, scheduler).
IgnoreExceptions();
// Return the representative task to the user
return trs.Task;
}
Using this implementation, we can now run “asynchronous methods” that return IEnumerable<Task>, as did our CopyStreamToStreamAsync method:
var asyncOperation = Task.Factory.Iterate(
CopyStreamToStreamAsync(input, output));
Note that the Iterate implementation shown includes a few handy additions on top of the previous hand-coded solution. First, the Iterate method returns a Task, which can be used to track the entire asynchronous operation. Second, it supports cancellation, meaning a caller can request that the asynchronous iteration to shutdown early, even if it hasn’t completed yet. Third, we’ve now separated out the run logic into a separate method, which means that we no longer need all of that goop in the actual target asynchronous method.
On top of all that, this implementation is now based on TaskFactory, which means we can do things like ensure that the code runs on a certain scheduler, such as a scheduler that targets the UI. As an example of where that is handy, consider a method that asynchronously reads from a long, remote stream and stores the resulting data into a TextBox as it’s available:
static IEnumerable<Task> ReadStreamIntoTextBox(Stream stream)
{
byte [] buffer = new byte[0x2000];
Encoding enc = new UTF8Encoder();
while(true)
{
var read = stream.ReadTask(buffer, 0, buffer.Length);
yield return read;
if (read.Result == 0) break;
myTextBox.Text += enc.GetString(buffer, 0, read.Result)
}
}
As previously shown, I could invoke this method as follows:
public void button1_Click(object sender, EventArgs e)
{
Task.Factory.Iterate(ReadStreamIntoTextBox(inputStream)); // buggy
}
However, accessing myTextBox.Text from a thread other than the thread that created myTextBox is a no-no, and yet that’s potentially what will happen in the above. To address that, I want to ensure that the actual code from the iterator is executed on the UI thread (but I still don’t want the asynchronous operations to block the UI thread). To accomplish that, I can create a TaskFactory that will run tasks on the UI thread, as I do in the following code:
public void button1_Click(object sender, EventArgs e)
{
var uiFactory = new TaskFactory(
TaskScheduler.FromCurrentSynchronizationContext());
uiFactory.Iterate(ReadStreamIntoTextBox(inputStream));
}
Of course, while just being able to yield individual operations is useful, things get more interesting when you start considering multi-task continuations, as exposed through ContinueWhenAny and ContinueWhenAll. For example, in our previous copy stream example, we’re reading, then writing, then reading, then writing, and so forth. But we should be able to write the previously read bits while reading the next chunk, thereby achieving better speeds by overlapping latencies. Writing the code to do that using manual asynchrony would be a nightmare… with iterators and tasks, it’s manageable, almost fun:
static IEnumerable<Task> CopyStreamToStreamAsync(
Stream input, Stream output)
{
byte[][] buffers = new byte[2][] {
new byte[BUFFER_SIZE], new byte[BUFFER_SIZE] };
int filledBufferNum = 0;
Task writeTask = null;
while (true)
{
var readTask = input.ReadTask(
buffers[filledBufferNum], 0, buffers[filledBufferNum].Length);
yield return writeTask == null ?
readTask :
Task.Factory.ContinueWhenAll(new[] { readTask, writeTask },
tasks => Task.WaitAll(tasks), // to propagate exceptions
TaskContinuationOptions.DetachedFromParent);
if (readTask.Result == 0) break;
writeTask = output.WriteTask(
buffers[filledBufferNum], 0, readTask.Result);
filledBufferNum = filledBufferNum == 0 ? 1 : 0;
}
}
Here, ContinueWhenAll is used to ensure that the iterator isn’t re-entered until both any pending writes and reads have completed.
There are of course many variations to this code that you could implement. At the end of the day, I find it quite interesting to see how the Task primitive can be used to enable such scenarios.
In concurrent programs, race conditions are a fact of life but they aren’t all bad. Sometimes, race conditions are benign, as is often the case with lazy initialization.
The problem with racing to set a value, however, is that it can result in multiple objects being instantiated when only one is needed. Take the LazyInitializer class that shipped in Visual Studio 2010 Beta 1, for example. The method EnsureInitialized(ref T target, Func<T> valueFactory) accepts a ByRef value and a function that produces a value. If target is null, EnsureInitialized will execute valueFactory and set target to the return value. If multiple threads happen to overlap calls to EnsureInitialized, which is likely to be a rare occurrence, these threads may both initially see target as null and then execute valueFactory. One thread will win the race and will get to set target.
In Beta 1, we thought we’d be really smart by disposing of the objects that were created by threads that lost the race, if they implemented IDisposable. Turns out that’s a pretty bad thing to do. With lazy initialization, we assumed that most of time valueFactory would be creating a new value but that isn’t always the case. It’s quite possible that a valueFactory could return an object that has already been created, in which case, we’d be disposing of an object we didn’t own.
We’re making a couple of changes in the Parallel Extensions to ensure that we generally don’t dispose of object we aren’t sure we own and, moving forward, any new APIs that do eagerly instantiate objects that might not be used will not dispose of said objects.
Now you might say, “why? Ninety-percent of the time I am creating a new object with my value factory and it might be a really heavy object like a file or wait handle!” We hear you, but typically, once an object is disposed, there is no bringing it back and so by disposing of these objects we don’t allow anyone to opt out of that behavior if need be. If we don’t dispose, the GC will still cleanup your unused objects and if you really need to dispose of an object manually, there is a work around, i.e.
ManualResetEvent theEvent = null;
ManualResetEvent tmpEvent;
LazyInitializer.EnsureInitialized(ref theEvent,
() => { return tmpEvent = new ManualResetEvent(false); });
if (tmpEvent != theEvent) tmpEvent.Close();
In fact, you could easily create a small wrapper function to do the work for you, i.e.
public static T EnsureInitializedWithDispose<T>(ref T target, Func<T> function)
where T : class, IDisposable
{
T tmp = default(T);
T actual = LazyInitializer.EnsureInitialized(ref target, () => tmp = function());
if (actual != tmp) ((IDisposable)tmp).Dispose();
return actual;
}
And the issue isn’t just about race conditions. In general, we should never dispose of an object we didn’t explicitly create. BlockingCollection<T> in Beta 1, for example, will dispose of it’s underlying collection if the BlockingCollection<T> itself is disposed. Again, this is bad news if you only were using the BlockingCollection<T> as a temporary wrapper. In future releases of Visual Studio 2010, this will be corrected.
So when using these APIs, and any other API that may consume or create an IDisposable object, think about what that API is doing with the object and if you need to clean up resources manually. If you’re not sure, the finalizer should take care of most of your problems. In the rare situations where immediate cleanup is important, make sure you keep track of the objects and dispose of them properly. Also, when designing your own libraries, don’t dispose of objects you don’t own!
In a previous post, it was demonstrated how for loops with very small loop bodies could be parallelized by creating an iterator over ranges, and then using Parallel.ForEach over those ranges. A similar technique can be used to write parallel loops over iteration spaces of non-integers. For example, let’s say I wanted to parallelize the following loop, where the iteration range is based on doubles:
for(double d = 0.0; d < 1.0; d += .001)
{
Process(d);
}
Parallel.For only contains overloads for where the iteration variable is an Int32 or an Int64. To accomodate doubles, one approach would be to translate the range into an integer-based range in order to use Parallel.For, and then within the body of the loop translate it into a double. As an example, the previously shown loop could be rewritten as:
Parallel.For(0, 1000, i =>
{
double d = i / 1000.0;
Process(d);
});
Due to floating point arithmetic, this may not be exactly the same, but it may be close enough. Another approach is to implement an iterator like the following:
private static IEnumerable<double> Iterate(
double fromInclusive, double toExclusive, double step)
{
for(double d = fromInclusive; d < toExclusive; d += step) yield return d;
}
With that Iterate method, now I can parallelize the sequential loop using Parallel.ForEach:
Parallel.ForEach(Iterate(0.0, 1.0, .001), d =>
{
Process(d);
});
This same technique can be applied to a wide variety of scenarios. Keep in mind, however, that the IEnumerator<T> interface isn’t thread-safe, which means that Parallel.ForEach needs to take locks when accessing the data source. While ForEach internally uses some smarts to try to ammortize the cost of such locks over the processing, this is still overhead that needs to be overcome by more work in the body of the ForEach in order for good speedups to be achieved.
Parallel.ForEach has optimizations used when working on indexible data sources, such as lists and arrays, and in those cases the need for locking is decreased. Thus, performance may actually be improved in some cases by transforming the iteration space into a list or an array, which can be done using LINQ, even though there is both time and memory cost associated with creating an array from an enumerable. For example:
Parallel.ForEach(Iterate(0.0, 1.0, .001).ToArray(), d =>
{
Process(d);
});
Happy coding.
One of the great features that crosses all of Parallel Extensions types is a consistent approach to cancellation (see http://blogs.msdn.com/pfxteam/archive/2009/05/22/9635790.aspx). In this post we explore some of the ways cancellation is used in Parallel Extensions and explain the guidance we developed.
The new cancellation system is a cooperative approach based on two new types: CancellationTokenSource, which initiates cancellation requests, and CancellationToken which communicates a cancellation request to asynchronous operations and to long-running and blocking method calls.
If you are experimenting with Parallel Extensions, you might like to keep an eye out for the methods that accept a CancellationToken and test them out.
Blocking calls
Some of the Parallel Extension types introduce blocking methods, for example BlockingCollection.Take() and Task.Wait(). The default overloads for these methods will block indefinitely if the condition they are waiting for never occurs. One solution used by these APIs, and many others like them, is to provide a timeout overload so that they may return after some duration and report that the condition did not occur. However, a timeout isn't particularly convenient if you only want to stop waiting if a specific activity occurs, such as a user clicking a 'cancel' button. One possibility is to wait for some amount of time such as 100 milliseconds, check if the button was pressed, and if not then go back to waiting, but this puts the burden on the call-site and must be re-implemented repeatedly; and depending on the frequency of the polling, it may also add unnecessary cost. The types in Parallel Extensions use the new cancellation system and provide overloads that accept a CancellationToken which can cause early termination of blocking methods in response to a specific cancellation request.
Here are some examples of the blocking calls in Parallel Extensions that accept a CancellationToken:
BlockingCollection:
void Add(T item, CancellationToken cancellationToken)
T Take(CancellationToken cancellationToken)
ManualResetEventSlim:
void Wait(CancellationToken cancellationToken)
Task:
void Wait(CancellationToken cancellationToken)
void WaitAll(Task[] tasks, CancellationToken cancellationToken)
In each case, you supply a CancellationToken to the method, and if the CancellationToken is signaled via a call to the associated CancellationTokenSource.Cancel(), then the method will wake up and throw an OperationCanceledException. This frees you from the need to use the timeouts if you are happy to wait until the condition is true or a specific cancellation request occurs. Many of these methods also have overloads that accept both a CancellationToken and a timeout, so that both needs may be accommodated.
Long-running calls
There are also a variety of methods that may simply take a while to complete, particularly if large amounts of data or complex processing is taking place. For example, the following call is likely to take some time, even if individual calls to DoFunc(x) are relatively fast:
Parallel.For(1, 1000000000, (x) => DoFunc(x));
To assist with this, the Parallel APIs have overloads that accept a ParallelOptions class instance that holds a CancellationToken. The machinery inside Parallel.For(...)observes the supplied CancellationToken and will exit early if it sees that the token has be signaled (for more information on exiting loops early, see http://blogs.msdn.com/pfxteam/archive/2009/05/27/9645023.aspx). For example,
CancellationTokenSource cts = new CancellationTokenSource();
ParallelOptions options = new ParallelOptions
{CancellationToken = cts.Token};
Parallel.For(1, 100, options, (x) => DoSlowFunc(x));
// from another thread, call cts.Cancel() to request cancellation.
As you can see, the approach for canceling blocking calls and long-running calls is identical. In both cases you supply a CancellationToken to a specific method call and use the associated CancellationTokenSource to signal a cancellation request.
Digression: Internal cancellation
In certain situations in Parallel Extensions and in other systems, it is necessary to wake up a blocked method for reasons that aren't due to explicit cancellation by a user. For example, if one thread is blocked on blockingCollection.Take() due to the collection being empty and another thread subsequently calls blockingCollection.CompleteAdding(), then the first call should wake up and throw an InvalidOperationException to represent an incorrect usage. A great way to implement this is to use an internal CancellationTokenSource that can wake things up due to internal concerns, and to link it to a second, external CancellationTokenSource that has been supplied by the user. For example, the following captures the essential details of how CompleteAdding() and Take(CancellationToken) are implemented:
class BlockingCollection<T>
{
CancellationTokenSource internalCTS = new CancellationTokenSource();
public void FinishAdding()
{
//...
internalCTS.Cancel();
}
public T Take(CancellationToken externalToken)
{
using(CancellationTokenSource linkedTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(
internalCTS.Token, externalToken)){
try
{
return InternalTake(linkedTokenSource.Token);
}
catch (OperationCanceledException oce)
{
if(internalCTS.Token.IsCancellationRequested)
throw new InvalidOperationException("..msg..");
else if(externalToken.IsCancellationRequested)
throw new OperationCanceledException(externalToken);
else throw;
}
}
}
In this fragment, a linked CancellationTokenSource is created and its token passed to the InternalTake() method. This simplifies the implementation for InternalTake() but still allows it to be woken up due to either the external token being signaled or an incorrect concurrent call to CompleteAdding(). If the InternalTake() method throws an OperationCanceledException, we can test the individual tokens to determine what the cause was and take appropriate action. Note that oce.CancellationToken will be equal to linkedTokenSource.Token as this is the token that was actually being observed, but we can look at the original tokens to determine the cause for the linked token being signaled. There is a minor risk of confusion if both sources are signaled simultaneously, but this confusion is benign as it is reasonable to behave as though either source was responsible, and a particular implementation may choose to prioritize one mechanism over the other.
Cooperative cancellation with user-code
Both TPL and PLINQ provide infrastructure that calls back to user code. If the user code is long running or blocking, then it is very useful for the user code to be able to respond to cancellation in a cooperative fashion. TPL and PLINQ achieve this by tracking a supplied CancellationToken, and if they see some user code throw an OperationCanceledException that mentions this specific token, then it is treated as cooperative acknowledgement of cancellation. This means that both TPL and PLINQ understand that the user code did not suffer some catastrophic exceptional conditional, but rather that it is simply responding to cancellation. As a result, TPL and PLINQ constructs will simply exit with an OperationCanceledException of their own, rather than aggregating exceptions from all the participating worker threads and throwing an AggregateException.
PLINQ follows this basic plan; for example, the following query will throw a single OperationCanceledException if the CancellationTokenSource becomes signaled. It doesn't matter whether PLINQ or the user delegate sees the cancellation request first, as the user-delegate exception will be correctly understood by the PLINQ execution engine.
CancellationTokenSource cts = new CancellationTokenSource();
var resultArray =
data.AsParallel()
.WithCancellation(cts.Token)
.Select((x) =>
{
if (cts.Token.IsCancellationRequested)
throw new OperationCanceledException(cts.Token);
// ...
})
.ToArray();
In both PLINQ and TPL, it is interesting to note that the external cancellation token is not supplied back to the user-delegate via a parameter as may be expected. This would have bloated the number of method overloads considerably and, as shown above, the external token can be conveniently accessed via closures or equivalent techniques.
If the user code throws an OperationCanceledException but one or more "normal" exceptions occur on other threads, then all of the exceptions will be collated into an AggregateException, including the OperationCanceledException. Hence, if only cancellation occurred then a single OperationCanceledException is thrown, but an actual failure will always result in an AggregateException.
PLINQ Queries
PLINQ offers deep support for cancellation and endeavors to ensure that cancellation will be enacted swiftly for any query. For queries that involve only simple and fast user delegates, simply supply a CancellationToken via the WithCancellation() method and PLINQ takes care of the rest. If your query involves long-running or blocking user code, then you should use the cooperative cancellation pattern described in the preceding section. For reasons that are discussed in the next section, PLINQ may not check for cancellation every time it calls a user-delegate, and so long running delegates should check cancellation frequently (as a working rule of thumb, PLINQ will check for cancellation approximately once per one hundred calls to a user delegate). For example, if a delegate call runs for 50ms but does not perform any cancellation checks of its own, then PLINQ may not detect the cancellation request itself for up to 5 seconds, and this could significantly affect a user’s experience. For this reason, we recommend that long-running delegates called from PLINQ queries should check for cancellation as frequently as possible. A good lower bound is to check cancellation once per few thousand IL instructions executed, and an upper bound is to check no less frequently than once per 10ms, but once per 1ms is preferred to ensure snappy response.
The following example demonstrates a user delegate that may run for a while, and so checks cancellation itself every few thousand IL instructions. This provides for timely cancellation support with a negligible cost to performance.
// Assume an external CancellationToken 'token' has been supplied.
int[] data = ...;
var query =
data.AsParallel()
.WithCancellation(token)
.Select((x) =>
{
int result = 0;
for ( int i = 0; i < x; i++ ) {
for (int j = 0; j < 1000 * x; j++)
{
result += SimpleFunc(i,j);
}
token.ThrowIfCancellationRequested();
}
return result;})
.ToArray();
[Update: the call to token.ThrowIfCancellationRequested shown above is not available in Beta1 (but will appear in subsequent releases). This behavior can be acheived via a manual test-and-throw as shown in earlier examples, or via the extension-method approach described in the comments. -Mike]
Summary
The Parallel Extension types use the new cancellation system to provide consistent and rich cancellation features. All of the long-running and blocking methods in Parallel Extensions provide overloads that take a CancellationToken, and any callbacks made to user-code can participate in the process. Application code can also make use of the same ideas and facilities to ensure that cancellation can be a fully supported feature in any situation.
As has been discussed previously, one of the new features in the Task Parallel Library is TaskCompletionSource<TResult>, which enables the creation of a Task<TResult> that represents any other asynchronous operation.
There are a wide variety of sources in the .NET Framework for asynchronous work. One comes from components that implement the Asynchronous Programming Model (APM) pattern, which we discussed here. Another includes types that implement the Event-based Asynchronous Pattern (EAP). For some synchronous method Xyz, the EAP provides an asynchronous counterpart XyzAsync. Calling this method launches the asynchronous work, and when the work completes, a corresponding XyzCompleted event is raised. (That’s an oversimplification of the pattern, but it provides a grounding.)
For many situations, the EAP is quite straightforward and simple to use. However, there are cases where you’d like to be able to do things like join across multiple EAP asynchronous invocations, such as to download three different web pages asynchronously, and only when all three have completed do something else. Tasks in .NET 4.0 make this kind of operation easy, through Task.Factory.ContinueWhenAll (or ContinueWhenAny if you want to do something when any one of the items completes rather than when all of them do). However, in order to use these methods, you need Tasks, thus to use them with components that implement the EAP, you need to create Tasks from EAP. TaskCompletionSource<TResult> can be used to do exactly that.
First, let’s create two small helper functions that we can reuse over and over for multiple EAP-to-Task implementations:
private static TaskCompletionSource<T> CreateSource<T>(object state)
{
return new TaskCompletionSource<T>(
state, TaskCreationOptions.DetachedFromParent);
}
private static void TransferCompletion<T>(
TaskCompletionSource<T> tcs, AsyncCompletedEventArgs e,
Func<T> getResult)
{
if (e.UserState == tcs)
{
if (e.Cancelled) tcs.TrySetCanceled();
else if (e.Error != null) tcs.TrySetException(e.Error);
else tcs.TrySetResult(getResult());
}
}
The first helper, CreateSource, simply creates an instance of TaskCompletionSource<T> with the right type and settings. I’ve included TaskCreationOptions.DetachedFromParent under the assumption that tasks created for the EAP pattern aren’t necessarily meant to participate in parent/child relationships, but you could certainly change this if your scenarios needed that functionality.
The second helper, TransferCompletion, will be used whenever an EAP event signals completion of an asynchronous operation. It takes the AsyncCompletedEventArgs provided through the XyzCompleted event and uses it to determine whether the operation completed due to cancellation or due to an unhandled exception. Both of those pieces of data are standard to the base AsyncCompletedEventArgs class. However, each EAP operation typically comes with its own type derived from AsyncCompletedEventArgs: to be able to use this one helper function with any of those to mine the result of the operation, TransferCompletion also accepts a Func<T> that will return the results from the derived instance.
With those two helpers, writing a Task wrapper for an EAP operation is a cinch. Consider System.Net.WebClient, which provides support for downloading and uploading to and from URIs, with methods like DownloadData. We can write a Task-based wrapper for DownloadData in just a few lines (in this case, as an extension method):
public static Task<byte[]> DownloadDataTask(
this WebClient webClient, Uri address)
{
var tcs = CreateSource<byte[]>(address);
webClient.DownloadDataCompleted +=
(sender, e) => TransferCompletion(tcs, e, () => e.Result);
webClient.DownloadDataAsync(address, tcs);
return tcs.Task;
}
We first create a TaskCompletionSource<byte[]> whose Task<byte[]> will be returned from the method. We then register with the DownloadDataCompleted event handler, such that when the event is raised, the downloaded data (or exception or cancellation information) will be transferred to the returned Task. And then we start the operation. Piece of cake.
One interesting thing to note about the EAP is that some implementations support multiple asynchronous operations concurrently on the same instance. In such cases, a user-supplied token is needed to correlate the asynchronous invocations to the asynchronous completions (something that’s provided in Task implicitly by having a Task reference returned from and serving as a reference for an asynchronous operation). For that purpose, we pass the created TaskCompletionSource<TResult> as the user-supplied token, and in TransferCompletion, we only transfer the results if the token received matches the target completion source. If it doesn’t match, this is an event completion for another operation and should be ignored.
One potential issue with the previously shown implementation (and with the EAP pattern in general) is that, if they’re used over and over, the XyzCompleted event handlers will start to build up. Delegates are being registered with the event but not released. To fix that, we can also remove the event handler in the delegate handling the event. For example, DownloadDataTask can be rewritten as:
public static Task<byte[]> DownloadDataTask(
this WebClient webClient, Uri address)
{
var tcs = CreateSource<byte[]>(address);
DownloadDataCompletedEventHandler handler = null;
handler = (sender, e) => {
TransferCompletionToTask(tcs, e, () => e.Result);
webClient.DownloadDataCompleted -= handler;
};
webClient.DownloadDataCompleted += handler;
try { webClient.DownloadDataAsync(address, tcs); }
catch {
webClient.DownloadDataCompleted -= handler;
tcs.TrySetCanceled();
throw;
}
return tcs.Task;
}
The Beta 1 samples available at http://code.msdn.microsoft.com/ParExtSamples already include Task-based extensions for WebClient, as well as extensions for other EAP implementations like SmtpClient and Ping. Download and enjoy!
As Ed Essey explained in Partitioning in PLINQ, partitioning is an important step in PLINQ execution. Partitioning splits up a single input sequence into multiple sequences that can be processed in parallel. This post further explains chunk partitioning, the most general partitioning scheme that works on any IEnumerable<T>.
Chunk partitioning appears in two places in Parallel Extensions. First, it is one of the algorithms that PLINQ uses under the hood to execute queries in parallel. Second, chunk partitioning is available as a standalone algorithm through the Partitioner.Create() method.
To explain the design of the chunk partitioning algorithm, let's walk through the possible ways of processing an IEnumerable<T> with multiple worker threads, finally arriving at the solution used in PLINQ (approach 4).
Approach 1: Load the input sequence into an intermediate array
As a simple solution, we could walk over the input sequence and store all elements into an array. Then, we can split up the array into ranges, and assign each range to a different worker.
The disadvantage of this approach is that we need to allocate an array large enough to store all input elements. If the input sequence is long, this will algorithm leads to unnecessarily large memory consumption. Also, we need to wait until the entire input sequence is ready before the workers can start executing.
Approach 2: Hand out elements to threads on demand
An entirely different approach is to have all worker threads share one input enumerator. When a worker is ready to process the next input element, it takes a shared lock, gets the next element from the input enumerator, and releases the lock.
This algorithm has a fairly large overhead because processing every element requires locking. Also, handing out elements individually is prone to poor cache behavior.
This approach does have an interesting advantage over Approach 1, though: since workers receive data on demand, the workers that finish faster will come back to request more work. In contrast, Approach 1 splits up all work ahead of time, and a worker that is done early simply goes away.
Approach 3: Hand out elements in chunks
To mitigate the two drawbacks of Approach 2 (synchronization cost and cache behavior), we can hand out elements to threads in "chunks". When a thread is ready to process more inputs, it will take say 64 elements from the input enumerator.
Unfortunately, while this approach nicely amortizes the synchronization cost over multiple elements, it does not work well for short inputs. For example, if the input contains 50 elements and the chunk size is 64, all inputs will go into a single partition. Even if the work per element is large, we will not be able to benefit from parallelism, since one worker gets all the work.
And since IEnumerable<T> in general does not declare its length, we cannot simply tune the chunk size based on the input sequence length.
Approach 4: Hand out elements in chunks of increasing size
A solution to the problem with small inputs is to use chunks of a growing size. The first chunk assigned to each thread is of size 1 and subsequent chunks are gradually larger, until a specific threshold is reached.
Our solution doubles the chunk size every few chunks. So, each thread first receives a few chunks of size 1, then a few chunks of size 2, then 4, and so forth. Once the chunk size reaches a certain threshold, it remains constant.
This chunking strategy ensures that if the input is short, it will still get split up fairly among the cores. But, the chunk size also grows fairly quickly, and the per-chunk overheads are small for large inputs. Also, the algorithm is quite good at load-balancing, so if one worker is taking longer to process its inputs, other workers will process more elements to decrease the overall processing time.
One interesting consequence of the chunk partitioning algorithm is that multiple threads will call MoveNext() on the input enumerator. The worker threads will use a lock to ensure mutual exclusion, but the enumerator must not assume that MoveNext() will be called from a particular thread (e.g., it should not use thread-local storage, manipulate UI, etc).
The current implementation of both PLINQ chunk partitioning and Partitioner.Create() follows approach 4 fairly closely. Now you know how it behaves and why!
The Asynchronous Programming Model (APM) in the .NET Framework has been around since .NET 1.0 and is the most common pattern for asynchrony in the Framework. Even if you’re not familiar with the name, you’re likely familiar with the core of the pattern. For a given synchronous operation Xyz, the asynchronous version manifests as BeginXyz and EndXyz: BeginXyz starts the operation and the EndXyz method joins with it (completes it). There are several mechanisms by which the results can be joined with, such as by polling for completion using the IsCompleted property on the IAsyncResult returned from BeginXyz, blocking until the operation has completed by waiting on the IAsyncResult’s AsyncWaitHandle, simply calling EndXyz and passing it the IAsyncResult (which will block until the operation has completed), or passing to BeginXyz a callback which will be invoked when the operation has completed: that callback should then call EndXyz to retrieve the results.
This pattern is so common that we’ve opted to incorporate it as a first-class citizen into the Task Parallel Library. One way we’ve done this is by having the Task class itself implement IAsyncResult: this means that Task can be used as the core of a Begin/End implementation, easing the implementation for common scenarios. One feature new to .NET 4 Beta 1 since previous CTPs, however, is that we now support the inverse, easy creation of tasks from an implementation of the APM pattern. This new functionality shows up through the FromAsync methods on TaskFactory and TaskFactory<TResult>.
Under the covers, FromAsync utilizes the TaskCompletionSource<TResult> type. Let’s say you wanted to create a Task that represents an asynchronous read on a Stream. You could write something like the following:
public static Task<int> ReadTask(this Stream stream,
byte [] buffer, int offset, int count, object state)
{
var tcs = new TaskCompletionSource<int>();
stream.BeginRead(buffer, offset, count, ar =>
{
try { tcs.SetResult(stream.EndRead(ar)); }
catch(Exception exc) { tcs.SetException(exc); }
}, state);
return tcs.Task;
}
You could of course write a similar wrapper for every Begin/End pair you wanted to utilize, but that could get tedious and error-prone. To solve that, we’ve provided the FromAsync method, which takes advantage of the common structure of the APM pattern, along with delegate inference, to provide generic overloads that work with most APM implementations.
For example, let’s say I have a Stream and a byte buffer, and I want to read from that stream into the buffer. Synchronously, I could do something like:
int bytesRead = stream.Read(buffer, 0, buffer.Length);
Creating a Task that does the same thing asynchronously:
Task<int> bytesRead = Task<int>.Factory.FromAsync(
stream.BeginRead, stream.EndRead, buffer, 0, buffer.Length, null);
Under the covers, we follow a pattern very similar to the one shown earlier as a specific implementation for Stream.Read. Combine this support with the ability to do Task.WaitAll, Task.WaitAny, Task.Factory.ContinueWhenAll, and Task.Factory.ContinueWhenAny, and you can achieve some very useful coordination functionality in very little code.
Of course, we don’t have any magic at our disposal; all of this functionality is available through standard .NET libraries. That means that we achieve the above with a method with the following signature:
public Task<TResult> FromAsync<TArg1, TArg2, TArg3>(
Func<TArg1, TArg2, TArg3, AsyncCallback,
object, IAsyncResult> beginMethod,
Func<IAsyncResult, TResult> endMethod,
TArg1 arg1, TArg2 arg2, TArg3 arg3,
object state);
You’ll notice then that this overload is coded specifically for APM implementations that take three input parameters (of types TArg1, TArg2, and TArg3). The vast majority of the APM implementations in the .NET Framework take three or less input parameters, with only a trickling accepting more than that. As such, we’ve included overloads in the Task Parallel Library that follow this pattern for up to and including three parameters. Of course, as with all corner cases, there will certainly be scenarios that require usage with more than three parameters, or with slightly different forms. For those cases, we’ve also added overloads that accept an IAsyncResult as the first parameter, rather than accepting a beginMethod delegate. This way, you can pass in any IAsyncResult you want, and we’ll call back to the endMethod when we find that the IAsyncResult has completed. This approach typically isn’t as efficient as the beginMethod approach, but it can be quite handy in a pinch.
The Parallel class represents a significant advancement in parallelizing managed loops. For many common scenarios, it just works, resulting in terrific speedups. However, while ideally Parallel.For could be all things to all people, such things rarely work out, and we’ve had to prioritize certain scenarios over others.
One area Parallel.For may fall a bit short is in attempts to use it with very small loop bodies, such as:
int [] array = new int[100000000];
Parallel.For(0, array.Length, i=>
{
array[i] = i*i*i;
});
Such an operation should be readily parallelizable; after all, every iteration is completely independent of every other iteration, and we’re dealing with a big data parallel problem. The devil is in the details, however. To handle typical scenarios where the time it takes to complete each iteration ends up being non-uniform, Parallel.For’s implementation takes a lot of care to load balance across all threads participating in the loop, and that load balancing comes at a small performance cost. This cost is typically trumped by the performance benefits that come from doing the load balancing; however, when the body of the loop is as tiny as it is in the example above, even small overheads add up. Another overhead that also contributes is the delegate invocation required to invoke the loop body. It can be easy to forget when looking at a Parallel.For call that Parallel.For is really just a method, accepting as a parameter a delegate to be invoked for every iteration. That invocation isn’t free, and in the above case may even be more expensive than the body of the loop itself.
Fear not, however, as there exist ways to still achieve good speedups on such cases. One way is based on creating larger chunks of work for Parallel to operate on: as the chunk size increases, the overhead costs start to pale in comparison, and speedups are realized.
Consider a new ForRange method you could implement:
public static ParallelLoopResult ForRange(
int fromInclusive, int toExclusive, Action<int, int> body);
Unlike For, which invokes the body once per iteration, ForRange will invoke the body with a start and end of a range. Thus, given an initial sequential loop like the following:
for(int i=0; i<N; i++)
{
DoWork(i);
}
with For it would be parallelized by replacing the for loop with a Parallel.For:
Parallel.For(0, N, i=>
{
DoWork(i);
});
and with ForRange, it would be parallelized by wrapping the for loop with a ForRange:
ForRange(0, N, (from,to) =>
{
for(int i=from; i<to; i++)
{
DoWork(i);
}
});
There are several ways we can now implement ForRange. The first is simply by doing a little math. We can calculate the boundaries of each range and use a Parallel.For to run the user-supplied body action for each range, e.g.
public static ParallelLoopResult ForRange(
int fromInclusive, int toExclusive, Action<int, int> body)
{
int numberOfRanges = Environment.ProcessorCount;
int range = toExclusive - fromInclusive;
int stride = range / numberOfRanges;
if (range <= 0) numberOfRanges = 0;
return Parallel.For(0, numberOfRanges, i => {
int start = i * stride;
int end = (i == numberOfRanges - 1) ? toExclusive : start + stride;
body(start, end);
});
}
Another way is actually by using Parallel.ForEach under the covers. Rather than doing the math as was done above, we can write an iterator in C# that yields the ranges, and then Parallel.ForEach over those ranges, e.g.
public static ParallelLoopResult ForRange(
int fromInclusive, int toExclusive, Action<int, int> body)
{
int rangeSize = (toExclusive - fromInclusive) / Environment.ProcessorCount;
if (rangeSize == 0) rangeSize = 1;
return Parallel.ForEach(
CreateRanges(fromInclusive, toExclusive, rangeSize), range =>
{
body(range.Item1, range.Item2);
});
}
private static IEnumerable<Tuple<int,int>> CreateRanges(
int fromInclusive, int toExclusive, int rangeSize)
{
// Enumerate all of the ranges
for (int i = fromInclusive; i < toExclusive; i += rangeSize)
{
int from = i, to = i + rangeSize;
if (to > toExclusive) to = toExclusive;
yield return Tuple.Create(from, to);
}
}
(You can download an implementation of ForRange as part of the Beta 1 samples at http://code.msdn.microsoft.com/ParExtSamples.)
In general, we expect the design and implementation of Parallel.For will be right for the vast majority of scenarios. However, solutions like those above can be used to accommodate cases that don’t quite fit the mold.
The core entity in the Task Parallel Library around which everything else revolves is System.Threading.Tasks.Task. The most common way of creating a Task will be through the StartNew method on the TaskFactory class, a default instance of which is exposed through a static property on Task, e.g.
var t = Task.Factory.StartNew(() =>
{
… // body of the task goes here
});
There are, however, other ways of creating tasks. For example, while using StartNew is the preferred mechanism to create a Task and schedule/start it, we do support separating those two operations into two discrete actions, e.g.
var t = new Task(() =>
{
… // body of the task goes here
});
… // the task has been created but not scheduled
t.Start(); // now schedule it
Moreover, StartNew isn’t alone on TaskFactory; other methods like ContinueWhenAll, ContinueWhenAny, and FromAsync may all be used to create Task instances. Task also exposes a ContinueWith mechanism that can be used to create a Task that will be scheduled when the antecedent task (the Task on which ContinueWith is being called) completes.
Finally, the TaskCompletionSource<TResult> type can be used to create a Task<TResult> completely controlled by the completion source instance, through its SetResult, SetException, and SetCanceled methods (and their TrySet* variants).
Each of these different ways of creating a Task has different behaviors associated with it. The differences between them may not be obvious at first, but with a little thought, it should be clear why things behave the way they do. For example, calling Start on a Task created by StartNew is invalid (i.e. results in an exception)… you can’t start an already started Task. In contrast, a Task created by a Task’s constructor won’t have been scheduled, so it’s perfectly valid to call Start on it. Calling Start on a Task returned by a TaskCompletionSource<TResult> makes little sense, as there’s nothing to “start”, so that’s invalid. It’s invalid to call Start on a continuation task (e.g. one created by ContinueWith, ContinueWhenAny, or ContinueWhenAll) because the work should only be scheduled when the antecedent(s) has completed. And it’s invalid to call Start on a task created by FromAsync, because the work being done has already been initiated through a call to the beginMethod passed to FromAsync.
These kinds of behavioral differences can be quite useful when building up abstractions on top of tasks. For example, let’s say I want to implement a factory method for creating “delayed” tasks, ones that won’t actually be scheduled until some user-supplied timeout has occurred. One way to write this would be as follows:
public static Task StartNewDelayed(int millisecondsDelay, Action action)
{
// Validate arguments
if (millisecondsDelay < 0)
throw new ArgumentOutOfRangeException("millisecondsDelay");
if (action == null) throw new ArgumentNullException("action");
// Create the task
var t = new Task(action);
// Start a timer that will trigger it
var timer = new Timer(
_ => t.Start(), null, millisecondsDelay, Timeout.Infinite);
t.ContinueWith(_ => timer.Dispose());
return t;
}
This implementation creates a new Task to run the provided action, but doesn’t immediately start it. Instead, it creates a Timer with the user-supplied delay, and when the timer expires, the Timer’s callback starts the task. Once the timer has been started, the Task is returned to the user.
One problem with this implementation, which you might have guess based on earlier paragraphs, is that the Task returned to the user was created using the Task’s constructor. This means it can be explicitly Start’d. And that means the Task returned from StartNewDelayed could be started by the consumer prior to the Timer firing. That’s bad for two reasons: one, it breaks expectations about the behavior of the Task and the associated delay, and two, a Task may only be started once. If the Task is explicitly started and then the timer’s callback tries to Start the Task, kaboom: Start will throw an exception (since the Task was already started), the exception will go unhandled, and the app will come crumbling down.
Given what we now know about behaviors associated with creating tasks, we can use a different mechanism for creating a task that doesn’t allow the Task to be explicitly started.
public static Task StartNewDelayed(int millisecondsDelay, Action action)
{
// Validate arguments
if (millisecondsDelay < 0)
throw new ArgumentOutOfRangeException("millisecondsDelay");
if (action == null) throw new ArgumentNullException("action");
// Create a trigger used to start the task
var tcs = new TaskCompletionSource<object>();
// Start a timer that will trigger it
var timer = new Timer(
_ => tcs.SetResult(null), null, millisecondsDelay, Timeout.Infinite);
// Create and return a task that will be scheduled when the trigger fires.
return tcs.Task.ContinueWith(_ =>
{
timer.Dispose();
action();
});
}
In this new implementation, I’ve taken advantage of the fact that a continuation task can’t be explicitly started and can be used to run arbitrary user code. The timer is used to resolve a TaskCompletionSource<TResult>, and a continuation off of that completion source is used to run the action. It’s that continuation that’s returned.
The Task Parallel Library is centered around the Task class and its derived Task<TResult>. The main purpose of these types is to represent the execution of an asynchronous workload and to provide an object with a means to operate on that workload, whether it be to wait for it, to continue from it, or the like. The primary type of asynchronous workload supported by Task is the execution of a delegate, either an Action or a Func<T>, such that the delegate’s execution in the underlying scheduler is represented by the Task. But in any compositional system that wants to use Task as its centerpiece, just support for asynchronous delegate execution isn’t enough: support must be provided for other asynchronous operations as well. For example, there are a variety of asynchronous operations already implemented in the .NET Framework and exposed through the Asynchronous Programming Model (APM) pattern or the Event-Based Asynchronous Pattern (EAP). In both of these cases, we’d like to be able to refer to these asynchronous operations as Tasks and operate on them as Tasks, even though the underlying work isn’t necessarily being performed by scheduling and executing a delegate. (More to come on both of those in future posts.)
To support such a paradigm with Tasks, we need a way to retain the Task façade and the ability to refer to an arbitrary asynchronous operation as a Task, but to control the lifetime of that Task according to the rules of the underlying infrastructure that’s providing the asynchrony, and to do so in a manner that doesn’t cost significantly. This is the purpose of TaskCompletionSource<TResult>.
The TaskCompletionSource<TResult> type serves two related purposes, both alluded to by its name: it is a source for creating a task, and the source for that task’s completion. In essence, a TaskCompletionSource<TResult> acts as the producer for a Task<TResult> and its completion. You create a TaskCompletionSource<TResult> and hand the underlying Task<TResult> it’s created, accessible from its Task property. Unlike Tasks created by Task.Factory.StartNew, the Task handed out by TaskCompletionSource<TResult> does not have any scheduled delegate associated with it. Rather, TaskCompletionSource<TResult> provides methods that allow you as the developer to control the lifetime and completion of the associated Task. This includes SetResult, SetException, and SetCanceled, as well as TrySet* variants of each of those. (A Task may only be completed once, thus attempting to set a Task into a completed state when it’s already in a completed state is an error, and the Set* methods will throw. However, as we’re dealing with concurrency here, and there are some situations where races may be expected between multiple threads trying to resolve the completion source, the TrySet* variants return Booleans indicating success rather than throwing an exceptions.)
As a simple example, imagine for a moment that you didn’t have Task.Factory.StartNew, and thus you needed a way to execute a Func<T> asynchronously and have a Task<T> to represent that operation. This could be done with a TaskCompletionSource<T> as follows:
public static Task<T> RunAsync<T>(Func<T> function)
{
if (function == null) throw new ArgumentNullException(“function”);
var tcs = new TaskCompletionSource<T>();
ThreadPool.QueueUserWorkItem(_ =>
{
try
{
T result = function();
tcs.SetResult(result);
}
catch(Exception exc) { tcs.SetException(exc); }
});
return tcs.Task;
}
The operation is being performed asynchronously through a mechanism unknown to the TaskCompletionSource<T>. All it knows is that at some point, its SetResult or SetException method is being called to complete the Task<T> exposed through its Task property.
Note, too, that because Task<TResult> derives from Task, we can use the generic TaskCompletionSource<TResult> under the covers for methods that work in terms of Task rather than in terms of Task<TResult>. For example, consider the same RunAsync method just shown, but accepting an Action (which returns void) rather than a Func<T> (which returns T).
public static Task RunAsync(Action action)
{
var tcs = new TaskCompletionSource<Object>();
ThreadPool.QueueUserWorkItem(_ =>
{
try
{
action();
tcs.SetResult(null);
}
catch(Exception exc) { tcs.SetException(exc); }
});
return tcs.Task;
}
Since we no longer care what the type of T is, I’ve defaulted to using Object. Then, when the Action is executed successfully, SetResult is still used to transition the Task into the RanToCompletion final state; however, since the actual result value is irrelevant, null is used. Finally, RunAsync returns Task rather than Task<Object>. Of course, the instantiated task’s type is still Task<Object>, but we need not refer to it as such, and the consumer of this method need not care about those implementation details.
In future posts, we’ll look at how TaskCompletionSource<TResult> is a staple in a developer’s toolbox, including the developers of the Task Parallel Library itself, where TaskCompletionSource<TResult> is used liberally internally.
As we’ve mentioned previously, the .NET ThreadPool has undergone some serious renovations in .NET 4, improvements on which the Task Parallel Library and PLINQ both rely. Erika Parsons and Eric Eilebrecht are the PM and developer on the CLR team for the ThreadPool, and they’re featured in a great new Channel9 video covering the .NET 4 ThreadPool and what’s new. Check it out.
Prior to the .NET Framework 2.0, unhandled exceptions were largely ignored by the runtime. For example, if a work item queued to the ThreadPool threw an exception that went unhandled by that work item, the ThreadPool would eat that exception and continue on its merry way. Similarly, if a finalizer running on the finalizer thread threw an exception, the system would eat the exception and continue on executing other finalizers.
That changed for .NET 2.0. Exceptions are meant to indicate a problem, and automatically eating unhandled exceptions often hides significant errors and reliability problems in an application. As such, for .NET 2.0, this unhandled exception behavior was changed by default to more closely match how exceptions are handled by Windows in general: if an exception goes unhandled, the process comes crashing down. While this may seem harsh, allowing the process to crash typically makes it easier to catch an underlying problem, but more importantly, it prevents the application from continuing to hobble along in a potentially very bad state, with corrupted data, etc. Thus, since .NET 2.0, if a work item running on the ThreadPool throws an unhandled exception, by default the process crashes.
There’s a notable exemption to this behavior, even after 2.0: the Asynchronous Programming Model (APM) pattern. With the APM pattern, work is started asynchronously with a BeginXx method, and at some point later the results of the work are retrieved with a corresponding EndXx method. If the asynchronous work throws an exception, that exception is then propagated out of the call to the EndXx method when the EndXx method is invoked. This, of course, counts on the EndXx method being invoked. If the developer makes a mistake such that End is never invoked, any exception that occurred as part of the asynchronous invocation will likely go unnoticed, as would have happened in more situations in .NET 1.x.
With Tasks in .NET 4.0, we face a similar situation as does the APM pattern. A Task represents an asynchronous operation that may throw an unhandled exception, but unlike work items generated by ThreadPool.QueueUserWorkItem, a Task instance is used to later join with the asynchronous work. As such, any unhandled exceptions will be stored into the relevant Task instance, later thrown any time that Task is waited on (and also available through the Task’s Exception property). In fire-and-forget scenarios, where the developer has no intention of joining with the Task, or if the developer simply forgets to join with the Task, the exception may never be observed, and thus if we did nothing special, we’d be in a bad situation very much like that which the APM pattern faces with unhandled exceptions.
To address this, Tasks keep track of whether an unhandled exception has been “observed.” In this context, “observed” means that code has joined with the Task in some fashion in order to at least be made aware of the exception. This could be calling Wait/WaitAll on the Task. It could be checking the Task’s Exception property after the Task has completed. Or it could be using a Task<TResult>’s Result property. If a Task sees that its exception has been observed in some manner, life is good. If, however, all references to a Task are removed (making the Task available for garbage collection), and if its exception hasn’t yet been observed, the Task knows that its exception will never be observed. In such a case, the Task takes advantage of finalization, and uses a helper object to propagate the unhandled exception on the finalizer thread. With the behavior described earlier, that exception on the finalizer thread will go unhandled and invoke the default unhandled exception logic, which is to log the issue and crash the process.
In this manner, we can have our cake and eat it, too. As with the APM, we make any exceptions that occurred asynchronously available later for the app to retrieve. And as with work items on the ThreadPool, if an exception goes unhandled, it will cause the process to be torn down.
Of course, there may still be situations where you do want a fire-and-forget task, but where you want to automatically “observe” any unhandled exception, either to log it or something similar, or just to prevent the process from crashing in a situation where you know the exception will be benign. In that case, you can take advantage of continuations to address your needs.
Consider a Task t. After creating the Task, I can run code like the following:
t.ContinueWith(c => { var ignored = c.Exception; },
TaskContinuationOptions.OnlyOnFaulted |
TaskContinuationOptions.ExecuteSynchronously |
TaskContinuationOptions.DetachedFromParent);
This code creates a continuation off of Task t that will only be scheduled if Task t completes in a Faulted state, meaning that it completed due to an exception going unhandled. When this continuation runs, it will observe the Task’s exception, preventing it from getting finalized. You could even wrap this kind of logic up into an extension method, such as:
public static Task IgnoreExceptions(this Task task)
{
task.ContinueWith(c => { var ignored = c.Exception; },
TaskContinuationOptions.OnlyOnFaulted |
TaskContinuationOptions.ExecuteSynchronously |
TaskContinuationOptions.DetachedFromParent);
return task;
}
With this in hand, any time I create a fire-and-forget Task where I want an unhandled exception to be ignored, I can simply tag on IgnoreExceptions, e.g. instead of:
var t = Task.Factory.StartNew(…);
I could write:
var t = Task.Factory.StartNew(…).IgnoreExceptions();
Of course, the exception handling logic in Task exists for good reasons, to prevent unhandled exceptions from going unnoticed, and as such it’s not a great idea to liberally sprinkle use of an IgnoreExceptions extension method like this. But in some situations, it can be quite useful.
There’s a corollary to IgnoreExceptions which may also be useful. One of the downsides to the logic we use to tear down the process is that it relies on finalization. Finalization isn’t guaranteed to occur in a timely fashion, so an exception may go unhandled and it may be some time before the app then crashes; some time is better than never, but it’s still not ideal. If you want a more timely crash, you could use an extension method like the following:
public static Task FailFastOnException(this Task task)
{
task.ContinueWith(c => Environment.FailFast(“Task faulted”, c.Exception),
TaskContinuationOptions.OnlyOnFaulted |
TaskContinuationOptions.ExecuteSynchronously |
TaskContinuationOptions.DetachedFromParent);
return task;
}
Rather than ignoring the exception, this uses Environment.FailFast to immediately crash the process. In this manner, I could write code like the following:
var t = Task.Factory.StartNew(…).FailFastOnException();
and as soon as Task t faults, the process will crash. This could be beneficial when debugging, as you’ll know immediately when something goes wrong. There are lots of variations on this as well. Debugger.Break could be used to immediately break into an attached debugger, you could raise an event of your choosing, the exception could be logged to an application log file, and so forth. In this manner, continuations are quite powerful.
(For anyone interested in extending Parallel Extensions with extension methods like this, the Parallel Extensions Extras project in the Beta 1 samples available at http://code.msdn.microsoft.com/ParExtSamples includes a plethora of interesting and useful examples. Enjoy!)
We exert a good deal of effort ensuring that the APIs we provide are consistent within Parallel Extensions as well as with the rest of the .NET Framework. This is from many angles, including behavior and general design, but also naming. So when there are slight differences in naming, it raises questions.
One occurrence of such a slight naming difference is between the MaxDegreeOfParallelism property on ParallelOptions using by Parallel.For/ForEach/Invoke and the WithDegreeOfParallelism PLINQ extension method. Why isn’t it ParallelOptions.DegreeOfParallelism, or WithMaxDegreeOfParallelism?
The reason has to do with requirements imposed by the design of Parallel and PLINQ, the latter of which is a bit more restrictive in some ways. Parallel works using an under-the-covers concept we refer to as replicating tasks. The concept is that a loop will start with one task for processing the loop, but if more threads become available to assist in the processing, additional tasks will be created to run on those threads. This enables minimization of resource consumption, e.g. if most of the threads in the ThreadPool are busy processing other work, a parallel loop that starts executing will execute with a smaller number of threads, until such time as the threads doing the rest of that processing are freed up and can assist in the execution of the loop. Given this, it would be inaccurate to state that ParallelOptions enables the specification of a DegreeOfParallelism, because it’s really a maximum degree: the loop starts with a degree of 1, and may work its way up to any maximum that’s specified as resources become available.
PLINQ is different. Some important Standard Query Operators in PLINQ require communication between the threads involved in the processing of the query, including some that rely on a Barrier to enable threads to operate in lock-step. The PLINQ design requires that a specific number of threads be actively involved for the query to make any progress. Thus when you specify a DegreeOfParallelism for PLINQ, you’re specifying the actual number of threads that will be involved, rather than just a maximum.
It’s conceivable that in the future, Parallel could be augmented to guarantee a certain number of threads involved, at which point a DegreeOfParallelism could be added, and it’s conceivable that in the future, PLINQ could be augmented to support varying number of threads over the lifetime of a query, at which point a WithMaxDegreeOfParallelism could be added. In the end, the names were chosen to better convey exactly what’s happening in the systems, so that developers can set their expectations correctly.
Partitioning in PLINQ
Every PLINQ query that can be parallelized starts with the same step: partitioning. Some queries may even need to repartition in the middle. Partitioning is a fairly simple concept at the high level: PLINQ takes a lock on the input data source, breaks it into multiple pieces, and then distributes these to the available processing cores on the machine. Each of the cores will then process the data as appropriate and either merge, aggregate, or execute a function over the results from the partitions.
Example query: (from x in D.AsParallel() where p(x) select x*x*x).Sum();

Here’s a simple way to look at it. On a 4-core machine, take 4 million elements, divide this into 4 partitions of 1 million elements each, and give each of the 4 cores a million elements of data to process. Assuming that the data and the processing of the data is uniform, that all of the cores operate with the same amount of effectiveness, that nothing else is using the cores, and that we can access all of the elements directly rather than only being able to access element N after accessing N-1 (i.e. indexing rather than iterating) this is an efficient algorithm for some straight-forward types of queries. If you just counted those assumptions, there are a lot of factors that PLINQ takes into account when processing a query. It’s not as simple as it appears to be. There are many factors that come into account, some of which did not even factor as assumptions in that query. How is the data being ordered? Is there contention within the query itself? Do we know how much data is in the query? On and on, the considerations continue. Meanwhile, we’re trying to design a multi-purpose system that can handle any of an infinite number of query shapes that you can throw at it. (You might wonder how we test all of that. Ask us about SLUG someday.)
Needless to say, we need to take a look over the query and the data source to make these decisions. And we want to make these decisions fast, because the more time we spend making the decisions, the more performance we have wasted that could be spent processing the data. There are a lot of things to balance here as we try to decide what best to do.
Based on many factors, we have 4 primary algorithms that we use for partitioning alone. They’re worth getting to know, because we’ll talk more about them and tweaks that we make to them in future technology discussions.
1. Range Partitioning – This is a pretty common partitioning scheme, similar to the one that I described in the example above. This is amenable to many query shapes, though it only works with indexible data sources such as lists and arrays (i.e. IList<T> and T[]). If you give PLINQ something typed as IEnumerable or IEnumerable<T>, PLINQ will query for the ILIst<T> interface, and if it’s found, will use that interface implementation with range partitioning. The benefits of these data sources is that we know the exact length and can access any of the elements within the arrays directly. For the majority of cases, there are large performance benefits to using this type of partitioning.

2. Chunk Partitioning – This is a general purpose partitioning scheme that works for any data source, and is the main partitioning type for non-indexible data sources. In this scheme, worker threads request data, and it is served up to the thread in chunks. IEnumerables and IEnumerable<T>s do not have fixed Count properties (there is a LINQ extension method for this, but that is not the same), so there’s no way to know when or if the data source will enumerate completely. It could be 3 elements, it could be 3 million elements, it could be infinite. A single system needs to take all of these possibilities into account and factor in different delegate sizes, uneven delegate sizes, selectivity etc. The chunk partitioning algorithm is quite general and PLINQ’s algorithm had to be tuned for good performance on a wide range of queries. We’ve experimented with many different growth patterns and currently use a plan that doubles after a certain number of requests. This is subject to change as we tune for performance, so don’t depend on this. Another important optimization is that chunk partitioning balances the load among cores, as the tasks per core dynamically request more work as needed. This ensures that all cores are utilized throughout the query and can all cross the finish line at the same time vs. a ragged, sequential entry to the end.

3. Striped Partitioning – This scheme is used for SkipWhile and TakeWhile and is optimized for processing items at the head of a data source (which obviously suits the needs of SkipWhile and TakeWhile). In striped partitioning, each of the n worker threads is allocated a small number of items (sometimes 1) from each block of n items. The set of items belonging to a single thread is called a ‘stripe’, hence the name. A useful feature of this scheme is that there is no inter-thread synchronization required as each worker thread can determine its data via simple arithmetic. This is really a special case of range partitioning and only works on arrays and types that implement IList<T>.

4. Hash Partitioning – Hash partitioning is a special type of partitioning that is used by the query operators that must compare data elements (these operators are: Join, GroupJoin, GroupBy, Distinct, Except, Union, Intersect). When hash-partitioning occurs (which is just prior to any of the operators mentioned), all of the data is processed and channeled to threads such that items with identical hash-codes will be handled by the same thread. This hash-partitioning work is costly, but it means that all the comparison work can then be performed without further synchronization. Hash partitioning assigns every element to an output partition based on a hash computed from each element’s key. This can be an efficient way of building a hash-table on the fly concurrently, and can be used to accomplish partitioning and hashing for the hash join algorithm. The benefit is that PLINQ can now use the same hash partitioning scheme for the data source used for probing; this way all possible matches end up in the same partition, meaning less shared data and smaller hash table sizes (each partition has its own hash table). There’s a lot going on with hash-partitioning, so it’s not as speedy as the other types, especially when ordering is involved in the query. As a result the query operators that rely upon it have additional overheads compared to simpler operators.

Hopefully that gives you a little better understanding of what goes on under the covers. As we write more about the system, including performance tips and tricks, here’s a basis for understanding how things work. If you can structure your applications to use specific types of partitioning, you may be able to receive greater performance gains by leveraging the more efficient partitioning algorithms.