Recursion and Concurrency
When teaching recursion in an introductory computer science course, one of the most common examples used involves a tree data structure. Trees are useful in this regard as they are simple and recursive in nature, with a tree's children also being trees, and allow for teaching different kinds of traversals (in-order, pre-order, post-order, level-order, and so forth). But when these introductory concepts meet multithreading, the concepts are no longer so simple, at least not with the tools available in mainstream languages today like C#, C++, and Java. Consider a simple Tree data structure:
class Tree<T>
{
public Tree<T> Left, Right; // children
public T Data; // data for this node
}
Let's say we want to execute an Action<T> for each datum stored in the tree, and let's assume I don't care about order (introducing parallelism could be questionable if I did). That's straightforward to do sequentially and recursively:
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
// Process the current node, then the left, then the right
action(tree.Data);
Process(tree.Left, action);
Process(tree.Right, action);
}
I could also do so without recursion by maintaining an explicit stack (or queue, or some other data structure with different ordering guarantees):
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
var toExplore = new Stack<Tree<T>>();
// Start with the root node
toExplore.Push(tree);
while (toExplore.Count > 0)
{
// Grab the next node, process it, and push its children
var current = toExplore.Pop();
action(current.Data);
if (current.Left != null)
toExplore.Push(current.Left);
if (current.Right != null)
toExplore.Push(current.Right);
}
}
Now, let's assume the action we're performing on each node of the tree is independent, relatively expensive and/or that the tree is relatively large, and as such we want to process the tree in parallel (we're of course also assuming that the action delegate is thread-safe), meaning that we want multiple threads each running the action delegate on distinct tree nodes. How do we do this with what we have in .NET today?
There are multiple approaches, some more valid than others. The first thing someone might try is to follow the original recursive implementation but using the ThreadPool, which could look something like this:
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
// Use an event to prevent this method from
// returning until its children have completed
using (var mre = new ManualResetEvent(false))
{
// Process the left child asynchronously
ThreadPool.QueueUserWorkItem(delegate
{
Process(tree.Left, action);
mre.Set();
});
// Process current node and right child synchronously
action(tree.Data);
Process(tree.Right, action);
// Wait for the left child
mre.WaitOne();
}
}
The idea behind this implementation is to, given a node, spin up a work item to process that node's left child in parallel with the current node, and then process the current node's data as well as its right child. Of course, I could be losing out on some parallelism here as I delay processing of the right child until I'm done processing the current data. So we modify it slightly:
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
// Use an event to wait for the children
using (var mre = new ManualResetEvent(false))
{
int count = 2;
// Process the left child asynchronously
ThreadPool.QueueUserWorkItem(delegate
{
Process(tree.Left, action);
if (Interlocked.Decrement(ref count) == 0)
mre.Set();
});
// Process the right child asynchronously
ThreadPool.QueueUserWorkItem(delegate
{
Process(tree.Right, action);
if (Interlocked.Decrement(ref count) == 0)
mre.Set();
});
// Process the current node synchronously
action(tree.Data);
// Wait for the children
mre.WaitOne();
}
}
I've now fixed that issue, such that both the left and the right children could potentially be processed in parallel with the current node, but that was by far not the worst problem. For starters, I'm creating a ManualResetEvent for every node in the tree... that's expensive. ManualResetEvent is a thin wrapper around a Win32 kernel event primitive, so creating one of these things requires kernel transitions, as does setting and waiting on one. Next, every time I process a node, I block waiting for its children to complete. And as the processing of a node (all but the root) is happening on a thread from the ThreadPool, I'm blocking ThreadPool threads. If a ThreadPool thread gets blocked, the ThreadPool will need to inject additional threads in order to process the remaining work items, and thus this implementation will require approximately one thread from the pool per node in the tree. That's a lot of threads! And that carries with it some serious problems. By default, a thread in .NET has a megabyte of stack space committed for it, so each thread burns a megabyte of (virtual) memory. The ThreadPool also throttles the creation of additional threads, such that introducing a new thread (once the number of pool threads equals the number of processors) will take 500 ms. For a tree of 250 nodes, that means its processing will take close to 2 minutes, purely for the overhead of creating threads, nevermind the actual processing of the nodes. And worse, there is a maximum number of threads in the pool: in .NET, the ThreadPool has a limited number of threads, by default 25 per processor in .NET 1.x/2.0 and 250 per processor in .NET 2.0 SP1. If the pool reaches the maximum, no new threads will be created, and thus this implementation could deadlock. Parent nodes will be waiting for their child nodes to complete, but the child nodes can't be processed until their parent nodes complete and relinquish a thread from the pool to execute.
Obviously, we need a better implementation. Next up, we can walk the tree sequentially, queuing up a work item in the pool for each node in the tree. Here we take that approach based on the recursive implementation of a tree walk:
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
// Use an event to wait for all of the nodes to complete
using (var mre = new ManualResetEvent(false))
{
int count = 1;
// Recursive delegate to walk the tree
Action<Tree<T>> processNode = null;
processNode = node =>
{
if (node == null) return;
// Asynchronously run the action on the current node
Interlocked.Increment(ref count);
ThreadPool.QueueUserWorkItem(delegate
{
action(node.Data);
if (Interlocked.Decrement(ref count) == 0)
mre.Set();
});
// Process the children
processNode(node.Left);
processNode(node.Right);
};
// Start off with the root node
processNode(tree);
// Signal that no more work items will be created
if (Interlocked.Decrement(ref count) == 0) mre.Set();
// Wait for all of the work to complete
mre.WaitOne();
}
}
This implementation is better from a resources perspective and won't lead to the same kind of deadlocks as in the previous example. We're still creating a work item for each node, but now the work item is simply to asynchronously execute the action on the node's data, whereas the tree walk itself is still happening sequentially on the main thread (taking advantage of a recursive delegate). To maintain blocking behavior similar to the previous examples (where the call to Process won't return until all of the work has completed) though not identical (the processing of children nodes don't block the parent), a counter is incremented before each work item is created and is decremented when each work item finishes; only when that counter reaches 0 will the main thread continue (note how we need to start the counter at 1 and then decrement it after all of the work items have been created... this is to ensure that a work item completing before all of the work has been kicked off doesn't set the event prematurely). We can take the same approach with a variant of the iterative implementation shown earlier:
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
// Use an event to wait for all of the nodes
// to complete
using (var mre = new ManualResetEvent(false))
{
int count = 1;
// Start with the root node
var toExplore = new Stack<Tree<T>>();
toExplore.Push(tree);
// Process all of the nodes
while (toExplore.Count > 0)
{
// Get the current node and and push its children
var current = toExplore.Pop();
if (current.Left != null)
toExplore.Push(current.Left);
if (current.Right != null)
toExplore.Push(current.Right);
// Asynchronously process the data
Interlocked.Increment(ref count);
ThreadPool.QueueUserWorkItem(delegate
{
action(current.Data);
if (Interlocked.Decrement(ref count) == 0)
mre.Set();
});
}
// Signal that no more work items will be created
if (Interlocked.Decrement(ref count) == 0) mre.Set();
// Wait for all work items to be completed
mre.WaitOne();
}
}
These approaches work by queueing a work item per node in the tree. There are, of course, other approaches. Rather than adding a work item per node, we can create N work items, and have each work item process approximately 1/Nth of the nodes in the tree, where N is the number of threads. For example, we could store all of the nodes into a list and then split that list into N pieces and have a work item process each piece:
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
// Create a list of all nodes in the tree
var nodes = new List<Tree<T>>();
var toExplore = new Stack<Tree<T>>();
toExplore.Push(tree);
while (toExplore.Count > 0)
{
var current = toExplore.Pop();
nodes.Add(current);
if (current.Left != null)
toExplore.Push(current.Left);
if (current.Right != null)
toExplore.Push(current.Right);
}
// Divide the list up into chunks
int workItems = Environment.ProcessorCount;
int chunkSize = Math.Max(
nodes.Count / workItems, 1);
int count = workItems;
// Use an event to wait for all work items
using (var mre = new ManualResetEvent(false))
{
// Each work item processes appx 1/Nth of the data items
WaitCallback callback = state =>
{
int iteration = (int)state;
int from = chunkSize * iteration;
int to = iteration == workItems - 1 ?
nodes.Count : chunkSize * (iteration + 1);
while (from < to) action(nodes[from++].Data);
if (Interlocked.Decrement(ref count) == 0)
mre.Set();
};
// The ThreadPool is used to process all but one of the
// chunks; the current thread is used for that chunk,
// rather than just blocking.
for (int i = 0; i<workItems; i++)
{
if (i < workItems-1)
ThreadPool.QueueUserWorkItem(callback, i);
else
callback(i);
}
// Wait for all work to complete
mre.WaitOne();
}
}
Rather than statically dividing the work up, I could also rewrite the code such that each of the work items themselves is responsible for pulling work to handle:
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
// Get an enumerator for the tree
IEnumerator<T> enumerator =
GetNodes(tree).GetEnumerator();
int workItems = Environment.ProcessorCount;
int count = workItems;
// Use an event to wait for all work items to complete
using (var mre = new ManualResetEvent(false))
{
// Each work item will continually pull data from the
// enumerator and process it until there is no more data
// to process
WaitCallback callback = delegate
{
while (true)
{
T data;
lock (enumerator)
{
if (!enumerator.MoveNext()) break;
data = enumerator.Current;
}
action(data);
}
if (Interlocked.Decrement(ref count) == 0)
mre.Set();
};
// The ThreadPool is used to process all but one of the
// chunks; the current thread is used for that chunk,
// rather than just blocking.
for (int i = 0; i < workItems; i++)
{
if (i < workItems-1)
ThreadPool.QueueUserWorkItem(callback, i);
else
callback(i);
}
// Wait for all work to complete
mre.WaitOne();
}
}
// An enumerator for a tree
public static IEnumerable<T> GetNodes<T>(Tree<T> tree)
{
if (tree != null)
{
yield return tree.Data;
foreach (var data in GetNodes(tree.Left))
yield return data;
foreach (var data in GetNodes(tree.Right))
yield return data;
}
}
Here I use a C# iterator to create an enumerator for all of the nodes in the tree. A work item is queued for each processor, and each of those work items retrieves an item from the enumerator, processes it, and goes back for more. Since multiple threads are accessing the enumerator (which isn't thread-safe), retrieving an item from it must be done under lock. Unfortunately, that's expensive; a better implementation would shift to grabbing more items from the enumerator so that the lock could be taken fewer times.
There are still further possible approaches. For example, we could walk the tree until we get to nodes of a certain depth (such as the log of the number of cores) and then process all of the subtrees at that depth in parallel with each other.
All of these implementations also have potential locality issues. It's reasonable to assume (though certainly not guaranteed) that nodes in the tree near to each other were created near to each other in time and are likely near to each other in memory (this assumption may be more applicable to other recursive problems, such as ones to be discussed shortly). For best cache performance, we'd like to try to have nodes near each other processed by the same thread and near to each other in time so that cache hits are optimized. But all of these solutions have been, in effect, randomly assigning nodes to threads.
Once of the nice things about the tree walk example is that it's very simple; there are no pre- or post- processing steps necessary for a node, which means I don't actually need to block at the end of a parent in order to wait for its children as I did in some of the previous examples. The problem also allows me to start processing children of a node before I've finished processing its parent. Many recursive problems are not like that, however. Consider a typical recursive sort, like quick sort:
public static void Quicksort<T>(T[] arr, int left, int right)
where T : IComparable<T>
{
if (right > left)
{
int pivot = Partition(arr, left, right);
Quicksort(arr, left, pivot - 1);
Quicksort(arr, pivot + 1, right);
}
}
Order here definitely makes a difference. If we think of the Partition step like the execution of the Action in the tree walk example, I'm not able to do the Partition in parallel with the processing of the children, since the processing of the children is dependent on the outcome of the call to Partition. That's a pretty easy problem to workaround of course; I just wouldn't start the children executing asynchronously until after the Partition step is complete. But now consider a recursive sort like merge sort:
public static void Mergesort<T>(T[] arr, int left, int right)
where T : IComparable<T>
{
if (right > left)
{
int mid = (right + left) / 2;
Mergesort(arr, left, mid);
Mergesort(arr, mid + 1, right);
Merge(arr, left, mid + 1, right);
}
}
The merge step at a particular node must occur after the recursive calls, which means in a parallel implementation either we'd need to block waiting for the two recursive calls to complete before starting the call to Merge, or we'd have to use a form of continuation-passing style to join on the two merge calls and have their completion start the execution of the merge operation. Either way, this kind of processing with the ThreadPool is incredibly tricky to implement efficiently.
One of our goals when designing Parallel Extensions to the .NET Framework was to make such recursive parallel operations much easier to implement, and much easier to implement efficiently.
Let's start with PLINQ and our tree walk. Taking the Tree<T> iterator we already implemented, processing the tree in parallel is now a cakewalk:
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
GetNodes(tree).AsParallel().ForAll(action);
}
Under the covers, this is very much like our implementation where we created several work items, and each of those work items pulled items (under lock) from the enumerator. PLINQ is more efficient, however, and uses varying chunk sizes (rather than always pulling just one element at a time) in an attempt to minimize the number of times the lock must be taken, hopefully reducing contention on the lock. This, of course, as with the similar previous implementation is doing a sequential tree walk but running the actions in parallel. If we actually want to walk the tree in parallel, we can use the Task Parallel Library:
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
Parallel.Do(
() => action(tree.Data),
() => Process(tree.Left, action),
() => Process(tree.Right, action));
}
It's almost scary how simple that is. The Parallel.Do statement potentially runs the three provided operations in parallel, blocking until all three have completed. A really cool thing about this though is that it will attempt to reuse the current thread as much as possible, so we don't run into the same kinds of issues that we had with the ThreadPool where we were blocking a whole slew of ThreadPool threads. The underlying work-stealing nature of the implementation also goes a long way towards helping to address the cache locality issues we previously mentioned. If we want more control, we can dive down below the Parallel class and explicitly use Tasks:
public static void Process<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
var t1 = Task.Create(
delegate { Process(tree.Left, action); });
var t2 = Task.Create(
delegate { Process(tree.Right, action); });
action(tree.Data);
Task.WaitAll(new Task[] { t1, t2 });
}
The same techniques can apply to the quicksort implementation:
public static void Quicksort<T>(T[] arr, int left, int right)
where T : IComparable<T>
{
if (right > left)
{
int pivot = Partition(arr, left, right);
Parallel.Do(
() => Quicksort(arr, left, pivot - 1),
() => Quicksort(arr, pivot + 1, right));
}
}
as well as to the mergesort implementation:
public static void Mergesort<T>(
T[] arr, int left, int right) where T : IComparable<T>
{
if (right > left)
{
int mid = (right + left) / 2;
Parallel.Do(
() => Mergesort(arr, left, mid),
() => Mergesort(arr, mid + 1, right));
Merge(arr, left, mid + 1, right);
}
}
I love how simple the parallelization of this code becomes. However, this code still has some issues. There is necessary overhead to an operation like Parallel.Do, which under the covers creates Tasks for the individual operations and waits on those Tasks; that overhead may not be a big deal if the work being done is significant, but for fine-grain concurrency that overhead can end up dominating the computation, potentially causing the parallel implementation to run slower than the sequential implementation. For now, the best way to work around that is to complicate the code slightly with thresholds. The idea behind a threshold is that you introduce enough parallelism to keep the system saturated, but once enough has been expressed, you switch over to a sequential implementation so as to avoid the extra overhead. With the tree walk, using a depth as a threshold might look like this:
private static void Process<T>(
Tree<T> tree, Action<T> action, int depth)
{
if (tree == null) return;
if (depth > 5)
{
action(tree.Data);
Process(tree.Left, action, depth + 1);
Process(tree.Right, action, depth + 1);
}
else
{
Parallel.Do(
() => action(tree.Data),
() => Process(tree.Left, action, depth + 1),
() => Process(tree.Right, action, depth + 1));
}
}
I have both a sequential and a parallel implementation here, and I switch from one to the other based on how deep in the tree I am. Depth may not always be the right gating factor to use as a threshold, especially if the tree isn't balanced. We're working through ways to reduce the overhead we currently see for these operations in the early CTP we released in December, and we're testing out some approaches that may eliminate the need to use thresholds in certain cases. For now, though, as you're playing around with the bits, keep thresholds in mind. The same technique could be used with both sorting examples, as well.
All in all, parallelism is a very interesting problem when it comes to recursion, and we hope the Parallel Extensions to the .NET Framework go a long way towards making the merging of these two concepts significantly easier.