In part one work stealing range was introduced that allows stealing of work items in contiguous chunks up to half of available work space. Now is the time for the partitioner itself.
If you recall partitioning can be done either statically up front or dynamically on demand. As we are looking at the case of known work space size handing out chunk of work items on demand dynamically is the way to deal with uneven work distribution. Since work stealing is a load balancing mechanism itself static partitioning for initial workload distribution and as computation goes work stealing is used to balance it.
Now as the obvious stuff is out of the way there two important things to consider: how work stealing is done and what is the termination condition of the whole computation.
Initial static partitioning feeds workers with contiguous chunks of work items to process. Each worker wraps obtained chunk into work stealing range so that other workers if needed can steal from it and continues to take one item at a time until work stealing range is drained potentially with help from other workers. At this point worker accumulated number of processed items as it is required to do the proper termination detection. Worker tries to steal from others while returning back processed items (or basically marking them as processed) through partition list that serves as a coordinator of work stealing and termination detection. Stealing is attempted until succeeded or termination is detected. Upon success worker wraps stolen range and continues its processing as from the beginning.
Static partitioning assumes work space size knowledge. As work space doesn't grow over time (deliberate decision and if grow is needed it can be done in phases where work items from current phase result in work items for the next phase) it is the basis for termination detection. Initially work space size is set and every worker that encounters drained work stealing range returns back processed items count by decreasing remaining count. Once count reaches zero the processing must be terminated.
Here is work stealing range partitioner in the flesh.
class WorkStealingIndexRangePartitioner : Partitioner<int> { private readonly int m_fromInclusive; private readonly int m_toExclusive; public WorkStealingIndexRangePartitioner(int fromInclusive, int toExclusive) { if (fromInclusive >= toExclusive) throw new ArgumentException(); m_fromInclusive = fromInclusive; m_toExclusive = toExclusive; } public override IList<IEnumerator<int>> GetPartitions(int partitionCount) { if (partitionCount <= 0) throw new ArgumentException(); // Calculate range and partition size var rangeSize = m_toExclusive - m_fromInclusive; var partitionSize = Math.Max(1, rangeSize / partitionCount); var partitionList = new PartitionList(rangeSize); var partitions = new Partition[partitionCount]; // Create partitiions by statically diving work items // into even sized chunks which is ok even in case // non uniform workload distribution as it will be // balanced out through work stealing var from = m_fromInclusive; for (var i = 0; i < partitionCount; i++) { var to = Math.Min(from + partitionSize, m_toExclusive); partitions[i] = new Partition(partitionList, from, to); from = to; } // Wire them through a coordinator partitionList.SetPartitions(partitions); return partitions; } // Partitioning coordinator class PartitionList { // Holds list of available partitions private List<Partition> m_partitions; // Holds number of remaining items to process private int m_remainingCount; public PartitionList(int count) { m_remainingCount = count; } public void SetPartitions(IEnumerable<Partition> partitions) { m_partitions = new List<Partition>(partitions); } // Return number of items as processed and tries to steal // new work items from other partitions public bool TryReturnAndSteal(Partition to, int count, out Tuple<int, int> range) { // Move toward termination condition Interlocked.Add(ref m_remainingCount, -count); // Until either termination condition is // reached or successful steal attempt range = null; while (true) { // Enumerate through available partitions and try // to steal from them foreach (var from in m_partitions) { // Check if nothing to steal if (m_remainingCount <= 0) return false; // Skip requesting partition as it is empty if (from == to) continue; range = from.TrySteal(); // Keep trying to steal from others if // unsuccessful if (range != null) return true; } } } } // Work stealing partition class Partition : IEnumerator<int> { // Holds range items currently owned private WorkStealingRange m_workStealingRange; // Holds number of processed items private int m_localCount; // Holds reference to partitioning coordinator that // controls in addition termination condition private readonly PartitionList m_list; // Holds current partition element or null if move next // was called or returned false private int? m_current; public Partition(PartitionList list, int fromInclusive, int toExclusive) { m_list = list; m_workStealingRange = new WorkStealingRange(fromInclusive, toExclusive); } public int Current { get { if (m_current != null) return (int)m_current; throw new InvalidOperationException(); } } object IEnumerator.Current { get { return Current; } } // Tries to steal from local steal range public Tuple<int, int> TrySteal() { return m_workStealingRange.TryStealRange(); } public bool MoveNext() { // First try to take item local from available range var local = m_workStealingRange.TryTakeOne(); if (local != null) { // Mark item as processed m_localCount++; // and set up current item m_current = local; return true; } // Otherwise try to steal from others Tuple<int, int> range; if (m_list.TryReturnAndSteal(this, m_localCount, out range)) { // Stolen something var from = range.Item1; var to = range.Item2; // Keep very first element to yourself m_localCount = 1; m_current = from++; // If anything left expose it to allow others // to steal if (to - from > 0) m_workStealingRange = new WorkStealingRange(from, to); return true; } // Termination condition reached so nothing to steal // from others return false; } public void Dispose() { } public void Reset() { throw new NotSupportedException(); } } }
Rough benchmarking shows that on random workload input it performs as well as standard partitioners for indexed collections while providing good performance for worst case scenario in workload distribution.
Data parallelism is a computing parallelization technique where data can be separated into independent pieces and distributed across parallel computing nodes. This technique is the core of the Parallel LINQ that partitions data into segments and executes query on each segment in parallel. Depending on scenarios and expected workload distribution different partitioning schemes (I highly recommend to read linked post before reading further) can be employed that can be essentially divided into two types:
Both types has one thing in common which is once chunk of work is taken by worker it is never given back until processed meaning worker has exclusive responsibility for it. In case of uneven workload distribution it may lead to poor performance which is the performance of the slowest worker. For example, when most of the heavy work is handed out to a single worker even though all other workers are finished in parallel the overall computation is not finished until unlucky worker will finish executing heavy work sequentially.
In order to deal with uneven workload distribution work stealing can be used. Joe Duffy explores in great details of how to build custom thread pool that uses work stealing to balance workload among workers. The approach allows to steal one work item which is in most cases sufficient as work item usually produces other work items and so chances are that idle worker once processed stolen work item will have more work to do (otherwise it can go steal other work item if any).
In data parallelism scenarios stealing a chunk of work items in one shot may be more beneficial (to avoid high synchronization costs). Work stealing benefits from initial work distribution compared to having all work handed over to a single worker and let the others to steal from it. Thus static partitioning with work stealing of chunks of work items is what we are looking for. Static partitioning assumes work space size knowledge which in most cases there.
Parallel LINQ uses partitioner concept to abstract partitioning mechanism. Before developing custom partitioner (this will be part two of the series) work stealing part must be in place:
As work space is known in advance it is represented through a range of integer values. So basically indexes will be the subject rather than elements themselves as it quite easy to map to the actual elements. Range will be accessed from lower bound by the owner of the range and every time will try to take one index. Higher bound of the range is represented basically by other range called steal range. It defines bounds of indexes that are eligible for stealing. Thieves will contend for the steal range with each other and with owner in case the very last item is in the steal range. Essentially work space can be looked at as [low .. [mid .. high)) where [mid .. high) is a steal range and [low .. high) is overall work space.
Stealing is the fate of idle workers and thus they take the burden letting owner be ignorant of their presence until they are too close:
Owner must be able to take one item at a time without heavy synchronization as follows:
Now that algorithm is in place here is the implementation.
class WorkStealingRange { // Holds range that is available for stealing private volatile Tuple<int, int> m_stealRange; // Holds index next to be taken locally private volatile int m_low; public WorkStealingRange(int low, int high) { m_low = low; m_stealRange = CreateStealRange(low, high); } // tries to steal range of items public Tuple<int, int> TryStealRange() { // Contend for available steal range var oldRange = m_stealRange; var mid = oldRange.Item1; var low = m_low; // If steal range is behind lower bound it means no // work items left if (low > mid) // Return null to indicate failed steal attempt return null; // Calculate new steal range that will replace current // in case of success var newRange = CreateStealRange(low, mid); // Contend with other thieves and owner (in case steal // range consists of the single last item) if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange) // Lost the race so indicate failed steal attempt return null; // Won contention for the steal range return oldRange; } // Tries to take one item locally public int? TryTakeOne() { var low = m_low; // Reserve item using exchange to avoid legal // reordering with steal range read below Interlocked.Exchange(ref m_low, low + 1); // Now that the lowest element is reserved it is either // not avaible to thieves or it is the last one and // is in steal range var oldRange = m_stealRange; var mid = oldRange.Item1; // If observed non empty steal range that doesn't // contain reserved item it safe to return it as // nobody can reach reserved item now if (low < mid) { var high = oldRange.Item2; // If ahead not enough space in particular at least // two times of observed steal range attempt to // adjust steal range to prevent stealing more than // half of items if (mid - low <= 2 * (high - mid)) { // Try to make steal range 1/4 of available work // space var newRange = CreateStealRange(low, high); // Don't worry if failed as next steal or local // take will fix it Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange); } // Return reserved item as it is not reachable // by thieves return low; } // If observed steal range contains reserved item contend // for it with thieves if (low == mid) { // Create new range that falls behind to indicate // termination var newRange = CreateStealRange(low, low); // Otherwise steal range contains only reserved item // and must contend with the thieves for it if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange) // Lost the race, return null to indicate no // more items available return null; // Won contention for the last item return low; } // No luck last item was stolen return null; } private static Tuple<int, int> CreateStealRange(int low, int high) { // If range is not empty create new one that is // 1/4 of the available space if (low != high) return new Tuple<int, int>((low + 3 * high) / 4, high); // Otherwise create empty range that falls behind return new Tuple<int, int>(low - 1, low - 1); } }
Next time custom partitioner that uses work stealing range on the surgical table.