A while ago I was wrapping my head around parallel merge sort. Since it requires additional O(n) space in practice it is the best choice if available memory is not constrained. Otherwise it is better to consider quick sort. As in merge sort in quick sort we have two phases:
Second phase is naturally parallelized using task parallelism since partitions are independent (partition elements will remain inside partition boundaries when the sort of the whole array is finished). You can find an example of this behavior in parallel quick sort. It is a good start. But the first phase is still contributes O(n) at each recursion level. By parallelizing partition phase we can further speed up quick sort.
Sequential version of partition phase is pretty straightforward.
class PartitionHelper<T> { private readonly T[] m_arr; private readonly IComparer<T> m_comparer; public PartitionHelper(T[] arr, IComparer<T> comparer) { m_arr = arr; m_comparer = comparer; } // Moves elements within range around pivot sequentially and // returns position of the first element equal to the pivot. public int SequentialPartition(T pivot, int from, int to) { var j = from; for (var i = from; i < to; i++) { if (m_comparer.Compare(m_arr[i], pivot) < 0) { SwapElements(i, j++); } } return j; } private void SwapElements(int from, int to) { var tmp = m_arr[from]; m_arr[from] = m_arr[to]; m_arr[to] = tmp; } ...
An interesting point is that we do not know in advance how the partitioning will be done since it is data dependent (position of an element depends on other elements). Still independent pieces can be carved out. Here is the core idea.
Let’s assume an array that looks like below where x denotes some element, p denotes selected pivot, e is equal, l is less and g is greater elements than pivot.
e l l (g l g e e l) x x x x x x x x x (l l g g e l) g e g p left right
e l l (g l g e e l) x x x x x x x x x (l l g g e l) g e g p
left right
Let’s assume we selected two blocks of elements within array called left (containing elements g l g e e l) such that all elements before are less or equal to the pivot and right (that holds elements l l g g e l) such that all elements after it are greater or equal to the pivot from left and right ends respectively. After the partitioning against pivot is done left block will hold elements less than or equal to the pivot and right block will contain elements greater that or equal to the pivot. In our example left block contains two g elements that do not belong there and right block holds three l elements that must not be there. But this means that we can swap two l elements from right block with two g elements from left block and left block will comply with partitioning against pivot.
e l l (l l l e e l) x x x x x x x x x (g g g g e l) g e g p left right
e l l (l l l e e l) x x x x x x x x x (g g g g e l) g e g p
Overall after blocks rearrange operation at least one of them contains correct elements (if the number of elements to be moved in each block is not equal).
... // Enum that indicates which of the blocks are in // in place after arrangment. private enum InPlace { Left, Right, Both } // Tries to rearranges elements of the two blocks such that // right block contains elements greater or equal to the // pivot and/or left block contains elements less than or // equal to the pivot. At least one of the blocks is // correctly reaaranged. private InPlace ArrangeBlocks(T pivot, ref int leftFrom, int leftTo, ref int rightFrom, int rightTo) { while (leftFrom < leftTo && rightFrom < rightTo) { while (m_comparer.Compare(m_arr[leftFrom], pivot) <= 0 && ++leftFrom < leftTo) { } while (m_comparer.Compare(m_arr[rightFrom], pivot) >= 0 && ++rightFrom < rightTo) { } if (leftFrom == leftTo || rightFrom == rightTo) { break; } SwapElements(leftFrom++, rightFrom++); } if (leftFrom == leftTo && rightFrom == rightTo) { return InPlace.Both; } if (leftFrom == leftTo) { return InPlace.Left; } return InPlace.Right; } ...
Then we can select next left block try to do the same. Repeat it until blocks meet piece by piece making left and right parts of the array as partitioning wants it to be. Sequential block based algorithm looks like this:
Interesting bit is that pairs of blocks can be independently rearranged. Workers can pick blocks concurrently from corresponding ends of array and in parallel rearrange elements.
A block once taken by a worker should not be accessible by other workers. When no more blocks left worker must stop. Basically we have two counters (number of blocks taken from left and right ends of the array). In order to take a block we must atomically increment corresponding counter and check that sum of the two counters is less than equal to total number of blocks otherwise all blocks are exhausted and worker must stop. Doing under a lock is simple and acceptable for large arrays and blocks but inefficient for small arrays and blocks.
We will pack two counters into a single 32 bit value where lower 16 bits are for right blocks counter and higher 16 bits are for left blocks. To increment right and left blocks counters 1 and 1<<16 must be added to combined value respectively. Atomically updated combined value allows to extract individual counters and make decision on whether block was successfully taken or not.
Since each worker may attempt to race for the last not taken block care should be taken of overflow. So only 15 bits are used for each counter and so it will require 1<<15 workers to cause overflow that is not realistic.
... // Class that maintains taken blocks in a thread-safe way. private class BlockCounter { private const int c_minBlockSize = 1024; private readonly int m_blockCount; private readonly int m_blockSize; private int m_counter; private const int c_leftBlock = 1 << 16; private const int c_rightBlock = 1; private const int c_lowWordMask = 0x0000FFFF; public BlockCounter(int size) { // Compute block size given that we have only 15 bits // to hold block count. m_blockSize = Math.Max(size/Int16.MaxValue, c_minBlockSize); m_blockCount = size/m_blockSize; } // Gets selected block size based on total number of // elements and minimum block size. public int BlockSize { get { return m_blockSize; } } // Gets total number of blocks that is equal to the // total number devided evenly by the block size. public int BlockCount { get { return m_blockCount; } } // Takes a block from left end and returns a value which // indicates whether taken block is valid since due to // races a block that is beyond allowed range can be // taken. public bool TakeLeftBlock(out int left) { int ignore; return TakeBlock(c_leftBlock, out left, out ignore); } // Takes a block from ringt end and returns its validity. public bool TakeRightBlock(out int right) { int ignore; return TakeBlock(c_rightBlock, out ignore, out right); } // Atomically takes a block either from left or right end // by incrementing higher or lower word of a single // double word and checks that the sum of taken blocks // so far is still within allowed limit. private bool TakeBlock(int block, out int left, out int right) { var counter = unchecked((uint) Interlocked.Add(ref m_counter, block)); // Extract number of taken blocks from left and right // ends. left = (int) (counter >> 16); right = (int) (counter & c_lowWordMask); // Check that the sum of taken blocks is within // allowed range and decrement them to represent // most recently taken blocks indices. return left-- + right-- <= m_blockCount; } } ...
With multiple workers rearranging pairs of blocks we may end up with “wholes”.
(l l e) (l g l) (l e e) (l l l) (g g e) (e l l) x x x (g g e) (l g e) (e e g)
l0 l1 l2 l3 l4 l5 r0 r1 r2
In the example above blocks l1, l4 and r1 are the wholes in left and right partitions of the array meaning they were not completely rearranged. We must compact left and right partitions such that they contain no wholes.
(l l e) (e l l) (l e e) (l l l) (g g e) (l g l) x x x (l g e) (g g e) (e e g)
l0 l5 l2 l3 l4 l1 r1 r0 r2
Now we can do sequential partitioning of range between the end of the left most rearranged block (l3) and beginning of the right most rearranged block (r0).
... // A threshold of range size below which parallel partition // will switch to sequential implementation otherwise // parallelization will not be justified. private const int c_sequentialThreshold = 8192; // Moves elements within range around pivot in parallel and // returns position of the first element equal to the pivot. public int ParallelPartition(T pivot, int from, int to) { var size = to - from; // If range is too narrow resort to sequential // partitioning. if (size < c_sequentialThreshold) { return SequentialPartition(pivot, from, to); } var counter = new BlockCounter(size); var blockCount = counter.BlockCount; var blockSize = counter.BlockSize; // Workers will process pairs of blocks and so number // of workers should be less than half the number of // blocks. var workerCount = Math.Min(Environment.ProcessorCount, blockCount / 2); // After the worker is done it must report blocks that // were not rearranged var leftRemaining = AllocateRemainingArray(workerCount); var rightRemaining = AllocateRemainingArray(workerCount); // and left most and right most rearranged blocks. var leftMostBlocks = AllocateMostArray(workerCount); var rightMostBlocks = AllocateMostArray(workerCount); Action<int> worker = index => { int localLeftMost = -1, localRightMost = -1; var leftBlock = localLeftMost; var rightBlock = localRightMost; int leftFrom = 0, leftTo = 0; int rightFrom = 0, rightTo = 0; var result = InPlace.Both; // Until all blocks are exhausted try to rearrange while (true) { // Depending on the previous step one or two // blocks must taken. if (result == InPlace.Left || result == InPlace.Both) { // Left or both blocks wre successfully // rearranged so we need to update left most // block. localLeftMost = leftBlock; // and try to take block from the left end. if (!counter.TakeLeftBlock(out leftBlock)) { break; } leftFrom = from + leftBlock*blockSize; leftTo = leftFrom + blockSize; } if (result == InPlace.Right || result == InPlace.Both) { // Right or both blocks were successfully // rearranged update right most and take new // right block. localRightMost = rightBlock; if (!counter.TakeRightBlock(out rightBlock)){ break; } rightTo = to - rightBlock*blockSize; rightFrom = rightTo - blockSize; } // Try to rearrange elements of the two blocks // such that elements of the right block are // greater or equal to pivot and left block // contains elements less than or equal to pivot. result = ArrangeBlocks(pivot, ref leftFrom, leftTo, ref rightFrom, rightTo); // At least one of the blocks is correctly // rearranged and if we are lucky - two of them. } // If the end of right block was not rearranged mark // it as remaining to be arranged. if (rightFrom != rightTo) { rightRemaining[index] = rightBlock; } // Same for the left block. if (leftFrom != leftTo) { leftRemaining[index] = leftBlock; } // Update worker local left most and right most // arranged blocks. leftMostBlocks[index] = localLeftMost; rightMostBlocks[index] = localRightMost; }; Parallel.For(0, workerCount, worker); // Compact arranged blocks from both ends so that all non // arranged blocks lie consecutively between arranged // left and right blocks. var leftMostBlock = ArrangeRemainingBlocks(from, blockSize, leftRemaining, leftMostBlocks.Max(), 1); var rightMostBlock = ArrangeRemainingBlocks(to - blockSize, blockSize, rightRemaining, rightMostBlocks.Max(), -1); // Do sequential partitioning of the inner most area. return SequentialPartition(pivot, from + (leftMostBlock + 1) * blockSize, to - (rightMostBlock + 1)*blockSize); } // Moves rearranged blocks to cover holes such that all // rearranged blocks are consecutive. Basically it does // compaction and returns most rearranged block. private int ArrangeRemainingBlocks(int bound, int blockSize, int[] remaining, int mostBlock, int sign) { Array.Sort(remaining); var j = Array.FindLastIndex(remaining, b => b < mostBlock); for (var i = 0; i < remaining.Length && remaining[i] <= mostBlock;) { if (remaining[j] == mostBlock) { j--; } else { SwapBlocks(bound + sign * remaining[i] * blockSize, bound + sign * mostBlock * blockSize, blockSize); i++; } mostBlock--; } return mostBlock; } private static int[] AllocateRemainingArray(int workerCount) { return Enumerable.Repeat(Int32.MaxValue, workerCount).ToArray(); } private static int[] AllocateMostArray(int workerCount) { return Enumerable.Repeat(-1, workerCount).ToArray(); } // Swaps two blocks private void SwapBlocks(int from, int to, int blockSize) { for (var i = 0; i < blockSize; i++) { SwapElements(from + i, to + i); } } }
Now we have parallel implementation of the quick sort partition phase. Experiments with random generated arrays of integer values show that it helps to speed up parallel quick sort by approximately 50% on a 8 way machine.
In part one work stealing range was introduced that allows stealing of work items in contiguous chunks up to half of available work space. Now is the time for the partitioner itself.
If you recall partitioning can be done either statically up front or dynamically on demand. As we are looking at the case of known work space size handing out chunk of work items on demand dynamically is the way to deal with uneven work distribution. Since work stealing is a load balancing mechanism itself static partitioning for initial workload distribution and as computation goes work stealing is used to balance it.
Now as the obvious stuff is out of the way there two important things to consider: how work stealing is done and what is the termination condition of the whole computation.
Initial static partitioning feeds workers with contiguous chunks of work items to process. Each worker wraps obtained chunk into work stealing range so that other workers if needed can steal from it and continues to take one item at a time until work stealing range is drained potentially with help from other workers. At this point worker accumulated number of processed items as it is required to do the proper termination detection. Worker tries to steal from others while returning back processed items (or basically marking them as processed) through partition list that serves as a coordinator of work stealing and termination detection. Stealing is attempted until succeeded or termination is detected. Upon success worker wraps stolen range and continues its processing as from the beginning.
Static partitioning assumes work space size knowledge. As work space doesn't grow over time (deliberate decision and if grow is needed it can be done in phases where work items from current phase result in work items for the next phase) it is the basis for termination detection. Initially work space size is set and every worker that encounters drained work stealing range returns back processed items count by decreasing remaining count. Once count reaches zero the processing must be terminated.
Here is work stealing range partitioner in the flesh.
class WorkStealingIndexRangePartitioner : Partitioner<int> { private readonly int m_fromInclusive; private readonly int m_toExclusive; public WorkStealingIndexRangePartitioner(int fromInclusive, int toExclusive) { if (fromInclusive >= toExclusive) throw new ArgumentException(); m_fromInclusive = fromInclusive; m_toExclusive = toExclusive; } public override IList<IEnumerator<int>> GetPartitions(int partitionCount) { if (partitionCount <= 0) throw new ArgumentException(); // Calculate range and partition size var rangeSize = m_toExclusive - m_fromInclusive; var partitionSize = Math.Max(1, rangeSize / partitionCount); var partitionList = new PartitionList(rangeSize); var partitions = new Partition[partitionCount]; // Create partitiions by statically diving work items // into even sized chunks which is ok even in case // non uniform workload distribution as it will be // balanced out through work stealing var from = m_fromInclusive; for (var i = 0; i < partitionCount; i++) { var to = Math.Min(from + partitionSize, m_toExclusive); partitions[i] = new Partition(partitionList, from, to); from = to; } // Wire them through a coordinator partitionList.SetPartitions(partitions); return partitions; } // Partitioning coordinator class PartitionList { // Holds list of available partitions private List<Partition> m_partitions; // Holds number of remaining items to process private int m_remainingCount; public PartitionList(int count) { m_remainingCount = count; } public void SetPartitions(IEnumerable<Partition> partitions) { m_partitions = new List<Partition>(partitions); } // Return number of items as processed and tries to steal // new work items from other partitions public bool TryReturnAndSteal(Partition to, int count, out Tuple<int, int> range) { // Move toward termination condition Interlocked.Add(ref m_remainingCount, -count); // Until either termination condition is // reached or successful steal attempt range = null; while (true) { // Enumerate through available partitions and try // to steal from them foreach (var from in m_partitions) { // Check if nothing to steal if (m_remainingCount <= 0) return false; // Skip requesting partition as it is empty if (from == to) continue; range = from.TrySteal(); // Keep trying to steal from others if // unsuccessful if (range != null) return true; } } } } // Work stealing partition class Partition : IEnumerator<int> { // Holds range items currently owned private WorkStealingRange m_workStealingRange; // Holds number of processed items private int m_localCount; // Holds reference to partitioning coordinator that // controls in addition termination condition private readonly PartitionList m_list; // Holds current partition element or null if move next // was called or returned false private int? m_current; public Partition(PartitionList list, int fromInclusive, int toExclusive) { m_list = list; m_workStealingRange = new WorkStealingRange(fromInclusive, toExclusive); } public int Current { get { if (m_current != null) return (int)m_current; throw new InvalidOperationException(); } } object IEnumerator.Current { get { return Current; } } // Tries to steal from local steal range public Tuple<int, int> TrySteal() { return m_workStealingRange.TryStealRange(); } public bool MoveNext() { // First try to take item local from available range var local = m_workStealingRange.TryTakeOne(); if (local != null) { // Mark item as processed m_localCount++; // and set up current item m_current = local; return true; } // Otherwise try to steal from others Tuple<int, int> range; if (m_list.TryReturnAndSteal(this, m_localCount, out range)) { // Stolen something var from = range.Item1; var to = range.Item2; // Keep very first element to yourself m_localCount = 1; m_current = from++; // If anything left expose it to allow others // to steal if (to - from > 0) m_workStealingRange = new WorkStealingRange(from, to); return true; } // Termination condition reached so nothing to steal // from others return false; } public void Dispose() { } public void Reset() { throw new NotSupportedException(); } } }
Rough benchmarking shows that on random workload input it performs as well as standard partitioners for indexed collections while providing good performance for worst case scenario in workload distribution.
Data parallelism is a computing parallelization technique where data can be separated into independent pieces and distributed across parallel computing nodes. This technique is the core of the Parallel LINQ that partitions data into segments and executes query on each segment in parallel. Depending on scenarios and expected workload distribution different partitioning schemes (I highly recommend to read linked post before reading further) can be employed that can be essentially divided into two types:
Both types has one thing in common which is once chunk of work is taken by worker it is never given back until processed meaning worker has exclusive responsibility for it. In case of uneven workload distribution it may lead to poor performance which is the performance of the slowest worker. For example, when most of the heavy work is handed out to a single worker even though all other workers are finished in parallel the overall computation is not finished until unlucky worker will finish executing heavy work sequentially.
In order to deal with uneven workload distribution work stealing can be used. Joe Duffy explores in great details of how to build custom thread pool that uses work stealing to balance workload among workers. The approach allows to steal one work item which is in most cases sufficient as work item usually produces other work items and so chances are that idle worker once processed stolen work item will have more work to do (otherwise it can go steal other work item if any).
In data parallelism scenarios stealing a chunk of work items in one shot may be more beneficial (to avoid high synchronization costs). Work stealing benefits from initial work distribution compared to having all work handed over to a single worker and let the others to steal from it. Thus static partitioning with work stealing of chunks of work items is what we are looking for. Static partitioning assumes work space size knowledge which in most cases there.
Parallel LINQ uses partitioner concept to abstract partitioning mechanism. Before developing custom partitioner (this will be part two of the series) work stealing part must be in place:
As work space is known in advance it is represented through a range of integer values. So basically indexes will be the subject rather than elements themselves as it quite easy to map to the actual elements. Range will be accessed from lower bound by the owner of the range and every time will try to take one index. Higher bound of the range is represented basically by other range called steal range. It defines bounds of indexes that are eligible for stealing. Thieves will contend for the steal range with each other and with owner in case the very last item is in the steal range. Essentially work space can be looked at as [low .. [mid .. high)) where [mid .. high) is a steal range and [low .. high) is overall work space.
Stealing is the fate of idle workers and thus they take the burden letting owner be ignorant of their presence until they are too close:
Owner must be able to take one item at a time without heavy synchronization as follows:
Now that algorithm is in place here is the implementation.
class WorkStealingRange { // Holds range that is available for stealing private volatile Tuple<int, int> m_stealRange; // Holds index next to be taken locally private volatile int m_low; public WorkStealingRange(int low, int high) { m_low = low; m_stealRange = CreateStealRange(low, high); } // tries to steal range of items public Tuple<int, int> TryStealRange() { // Contend for available steal range var oldRange = m_stealRange; var mid = oldRange.Item1; var low = m_low; // If steal range is behind lower bound it means no // work items left if (low > mid) // Return null to indicate failed steal attempt return null; // Calculate new steal range that will replace current // in case of success var newRange = CreateStealRange(low, mid); // Contend with other thieves and owner (in case steal // range consists of the single last item) if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange) // Lost the race so indicate failed steal attempt return null; // Won contention for the steal range return oldRange; } // Tries to take one item locally public int? TryTakeOne() { var low = m_low; // Reserve item using exchange to avoid legal // reordering with steal range read below Interlocked.Exchange(ref m_low, low + 1); // Now that the lowest element is reserved it is either // not avaible to thieves or it is the last one and // is in steal range var oldRange = m_stealRange; var mid = oldRange.Item1; // If observed non empty steal range that doesn't // contain reserved item it safe to return it as // nobody can reach reserved item now if (low < mid) { var high = oldRange.Item2; // If ahead not enough space in particular at least // two times of observed steal range attempt to // adjust steal range to prevent stealing more than // half of items if (mid - low <= 2 * (high - mid)) { // Try to make steal range 1/4 of available work // space var newRange = CreateStealRange(low, high); // Don't worry if failed as next steal or local // take will fix it Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange); } // Return reserved item as it is not reachable // by thieves return low; } // If observed steal range contains reserved item contend // for it with thieves if (low == mid) { // Create new range that falls behind to indicate // termination var newRange = CreateStealRange(low, low); // Otherwise steal range contains only reserved item // and must contend with the thieves for it if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange) // Lost the race, return null to indicate no // more items available return null; // Won contention for the last item return low; } // No luck last item was stolen return null; } private static Tuple<int, int> CreateStealRange(int low, int high) { // If range is not empty create new one that is // 1/4 of the available space if (low != high) return new Tuple<int, int>((low + 3 * high) / 4, high); // Otherwise create empty range that falls behind return new Tuple<int, int>(low - 1, low - 1); } }
Next time custom partitioner that uses work stealing range on the surgical table.
Imagine group of hungry people with spoons sitting around pot of stew. Spoon’s handle long enough to reach the pot but it is longer than the arm and no one can feed himself. People are desperate. This is a picture described in recently found piece of lost chapter of Dante’s Inferno. In order to help them Dante suggested to feed one another.
Only one person can feed another at the same time. While feeding someone else a person cannot eat. People must not starve meaning once hungry a person will be fed. It is assumed that spoon’s handle allows to feed any other person expect for yourself.
We’ll develop algorithm to let unfortunate ones to synchronize with each other and not to starve. It may seems similar to dinning philosophers problem but the latter has a limited choice of selecting the order of taking forks and the degree of contention is low. However in infernal dinner problem choice space and degree of contention is comparable with the problem size which is the number of people (a person may choose to try to feed any other person while potentially contending with all other but the person to be fed).
Here are few important observations:
The first two are quite straightforward. Any person will either be hungry or not and every time a person eats the state changes. At the beginning at least one person is hungry and at least one is not.
The last two are more tricky. As you remember there two types of people those that are hungry and those who do not. Let’s assume there are hungry people that line up and wait to be fed. Then people that are willing to feed come and take one by one from the head of the line hungry people and feed them. If no more hungry people left they also line up and wait for hungry people. They basically switched. This a an idea of how hungry and non-hungry people can pair to feed each other. While a pair of people is outside of the queue nobody else can interfere them.
The queue is represented through linked linked list of the following form h->n0->n1->…->nk where h is a sentinel head node. Head and tail are never equal to null as in case of empty queue they both point to sentinel node. Nodes are added to the tail and removed from the head. In order to remove node from the beginning of the queue head node must be advanced to its successor that must not be non null otherwise the queue is considered empty. Adding is trickier. It is based on the fact that once next of a node is set it is never changed. It is done in two steps. First tail’s next is set to the node to be added and up on success (at this point the node is visible to other threads) tail is advanced to newly added node. Because this process is not atomic other threads may observe the change half way through. In that case a thread may help to finish adding the node by advancing the tail and retry its own operation.
Now to the line up part. Essentially there are two cases:
Based on this rules waiting queue will either be empty or contain nodes of the same type which is equivalent to a line of either hungry or non-hungry people.
Here goes the implementation.
class SyncQueue<T> { private volatile Node m_head; private volatile Node m_tail; public SyncQueue() { // Head is a sentinel node and will never be null m_head = new Node(default(T), false); m_tail = m_head; } public T Exchange(T value, bool mark, CancellationToken cancellationToken) { var node = new Node(value, mark); // Do until exchanged values with thread of // a different type while (true) { cancellationToken.ThrowIfCancellationRequested(); var head = m_head; var tail = m_tail; // If the waiting queue is empty or already contains // same type of items if (head == tail || tail.m_mark == mark) { // Attempt to add current item to the end of the // waiting queue var nextToTail = tail.m_next; // To avoid costly interlocked operations check // if assumtion about the tail is still correct if (tail != m_tail) continue; // If next to what observed to be the tail is // not null then the tail fell behind if (nextToTail != null) { // Help to advance tail to the last node // and do not worry if it will fail as // someone else succeed in making tail up // to date Interlocked.CompareExchange(ref m_tail, nextToTail, tail); // And retry again continue; } // Try to append current node to the end of the // waiting queue by setting next of the tail // This is a linearization point of waiting case // (adding node to the end of the queue) if (Interlocked.CompareExchange(ref tail.m_next, node, null) != null) // Retry again if lost the race continue; // Advance the tail with no check for success as // in case of failure other thread is helping to // advance the tail Interlocked.CompareExchange(ref m_tail, node, tail); // Wait until exchange is complete var spin = new SpinWait(); while (node.m_mark == mark) { spin.SpinOnce(); cancellationToken.ThrowIfCancellationRequested(); } // Correct value will be observed as reading mark // implies load acquire semantics return node.m_value; } // Non empty waiting queue with items of a different // type was observed thus attempt to exchange with // thread waiting at the head of the queue through // dequeueing var nextToHead = head.m_next; // Check if observed head is still consistent and it // has successor if (head != m_head || nextToHead == null) continue; // Observed non-empty queue can either grow which is // fine as we are interested here in the head node // otherwise attempt below will fail and will retry // again. // Attempt to advance head that is a sentinel node // to its successor that holds sought value and is // supposed to be new sentinel. // This is a linearization point of releasing case // (removing node from the beginning of the queue) if (Interlocked.CompareExchange(ref m_head, nextToHead, head) != head) // Retry if lost the race continue; // At this point head's successor is dequeued and no // longer reachable so values can be safely exchanged var local = nextToHead.m_value; nextToHead.m_value = value; // Switch mark to let waiting thread know that // exchange is complete and making it the last store // with release semantics makes sure waiting thread // will observe correct value nextToHead.m_mark = mark; return local; } } class Node { internal volatile Node m_next; internal volatile bool m_mark; internal T m_value; internal Node(T value, bool mark) { m_value = value; m_mark = mark; } } } class Human { private volatile bool m_hungry; public Human(bool hungry) { m_hungry = hungry; } public void WaitAndEat(SyncQueue<Human> waitingQueue, CancellationToken cancellationToken) { var spin = new SpinWait(); while (true) { spin.Reset(); // The hell seems to have frozen =) cancellationToken.ThrowIfCancellationRequested(); // Pair with someone either to feed hungry man // or eat yourself if hungry var pairedWith = waitingQueue.Exchange(this, m_hungry, cancellationToken); if (!m_hungry) // Feed hungry man pairedWith.Feed(); else // Wait to be fed while (m_hungry) { spin.SpinOnce(); cancellationToken.ThrowIfCancellationRequested(); } } } private void Feed() { // Switch to non hungry as just ate m_hungry = !m_hungry; } }
The infernal dinner is served =)
Recently I came across another interesting synchronization problem. Assume there are n rooms. Threads can enter and leave rooms. A room can hold arbitrary number of threads. If a room holds at least one thread it is considered occupied. Only one room can be occupied at a time. With each room exit action is associated that must be executed when the last thread left it. No threads are allowed to enter any room before exit action is executed to the end. Threads that are waiting to enter a room must eventually enter it. It is assumed that threads also at some point leave the room.
There are several cases for a thread attempting to enter a room:
Leaving room requires careful thought as well:
Waiting and waking part can be done using Monitor.Wait and Monitor.PulseAll. Doing so using single sync object is simple but quite inefficient as every pulse all will wakeup all waiting threads (potentially waiting for different rooms) to see that they are not allowed to enter yet as only single room can be occupied at a time. Instead each room will have its own local sync object to wait and pulse on. This will allow to wake up only threads that are now allowed to enter the room they've been waiting on.
But this is where difficulties come. The decision to be made by entering thread spans across rooms. So the decision is still needs to made using global lock. Now the tricky part is that once the decision is made to wait how not to miss wakeup. Attempting to do it like
lock (global) { // Make decision to wait or return bool wait = ...; if (!wait) return; } // Somewhere here wakeup may be missed lock (local) { // Wait on local based on the decision made above Monitor.Wait(local); }
is doomed to suffer from missed wakeups as in between global lock is released and local is acquired the last leaving room thread may try to wakeup waiters. So instead local lock will be acquired while still holding global lock and releasing it only after that.
var locked = false; try { Monitor.Enter(global, ref locked); // Make decision to wait or return bool wait = ...; if (!wait) return; // Acquifre local hiwle holding global lock (local) { // Release global Monitor.Exit(global); locked = false; // Wait on local based on the decision made above Monitor.Wait(local); } } finally { if (locked) Monitor.Exit(global); }
Threads indicate that are willing to enter the room by maintaining wait count for each room. It also used to allow threads enter the room in bulk. When the last leaving thread picks up a room to get occupied next, adjusts the count and wakes up waiting threads by pulsing room local sync object.
In order to make sure that waiting threads enter a room eventually (guaranteeing starvation freedom) the following mechanism is used:
Here goes the thing.
public class SharedRoomsLock { // Holds room local locks private readonly object[] m_locks; // Holds number of threads waiting on to // enter indexed by room numbers private readonly int[] m_waiting; // Holds actions to be excuted for each room // upon exit private readonly Action[] m_actions; // Holds number of threads currently in the // occupied room private int m_count; // Holds index of the room currently occupied private int m_occupied = -1; public SharedRoomsLock(IEnumerable<Action> actions) { m_actions = actions.ToArray(); var count = m_actions.Length; m_waiting = new int[count]; m_locks = Enumerable.Range(0, count) .Select(_ => new object()).ToArray(); } // Lock ownership is omitted for clarity however // can be added using ThreadLocal<T> public void Enter(int i) { var locked = false; try { Monitor.Enter(m_locks, ref locked); if ( // If no room is occupied or m_occupied < 0 || ( // Occupied room that thread is trying // to enter m_occupied == i && // and there is still someone in there m_count > 0 && // and no one waiting to enter other rooms !m_waiting.Any(w => w > 0))) { m_occupied = i; m_count++; return; } // Otherwise indicate desire to enter the room m_waiting[i]++; // Acquire room local lock before releasing main // to avoid missed or incorrect wakeups lock (m_locks[i]) { // Release main lock to allow others to // proceed including waking thread Monitor.Exit(m_locks); locked = false; // Wait to be woken up by some last to leave // room thread, not necessarily immediately Monitor.Wait(m_locks[i]); // Once woken up thread can safely enter // the room as count already adjusted } } finally { if (locked) Monitor.Exit(m_locks); } } public void Exit(int i) { lock (m_locks) { // Indicate that thread left the room if (--m_count > 0) // And leave it if not last return; } // If last execute exit action however not under // the lock as it quite dangerous due to reentracy m_actions[i](); // At this point room is still treated as // occupied as only once exit action is executed // it can be set free var locked = false; try { Monitor.Enter(m_locks, ref locked); // By default set room as not occupied m_occupied = -1; // Run through other rooms in circles to see if any // thread is waiting thus ensuring some sort of // fairness or at least starvation freedom for (var j = 1; j <= m_waiting.Length; j++) { var next = (i + j) % m_waiting.Length; var w = m_waiting[next]; if (w > 0) { // Found room with waiting threads so it // will be occupied next m_occupied = next; break; } } // If no one is waiting if (m_occupied < 0) // There is nothing that can done return; // At this there are threads waiting to enter // the room so they are allowed to enter in one // shot m_count = m_waiting[m_occupied]; // Closing the doors right after them m_waiting[m_occupied] = 0; // Acquire room local lock before releasing main // to avoid missed or incorrect wakeups lock (m_locks[m_occupied]) { // Releasing main lock is safe because the // decision made under main lock will still be // true as no other thread except for those // already wating will be able to wait on room // local lock that is currently held Monitor.Exit(m_locks); locked = false; // Wake up waiting to enter threads Monitor.PulseAll(m_locks[m_occupied]); } } finally { if (locked) Monitor.Exit(m_locks); } } }
Single Responsibility Principle states:
A class should have only one reason to change. Responsibilities are reasons to change.
Violations of this principle leave you face to face with fragile design and all the maintenance nightmares it implies. Unfortunately there is no hundred percent way to prevent it from happening – it is just the nature of design.
Common sense often helps to spot multiple responsibilities mixed within a single class. However some cases are tricky and require careful attention. This is where design assessment comes in. An object can be looked at from two perspectives:
Uncontrolled growth in any of the groups can lead to god class (data or behavior forms respectively).
What it knows?
Keeping related data and behavior in one place is essential to building consistent abstractions. Failing to do so may lead to:
Following Information Expert pattern promotes the idea:
Assign a responsibility to the class that has the information needed to fulfill it.
Consider an example below where publicly visible person name could be incorrectly used to distinguish people (as names are not unique). Thus it makes sense to let Person type to define what equality means as it has all the necessary information (social security number). This will prevent introduction of several inconsistent equality behaviors across the code base.
class Person { private string name; // Keep social security number in private private string ssn; // Names are not unique public string Name { get { return name; } } public override bool Equals(object obj) { if (obj == null || obj.GetType() != typeof(Person)) return false; // Names cannot be used to identify people unlike // social security number return ssn == ((Person) obj).ssn; } }
Keeping related data and behavior in one place is as important as not letting not related sets of data/behavior to be put into the same class. Most of the methods defined on a class should be using most of the data members most of the time.
Many of you started to work still being a student. Employer needs your abilities to work and most likely he isn’t interested in that you still studying.
// Smart student - works and studies class Student { private string name; private int knowledge; private Func<Course, bool> preferences; private int experience; public string Name { get { return name;} } public void Study() { knowledge++; } public void Enlist(IEnumerable<Course> courses) { // Select appropriate courses and enlist foreach (var course in courses.Where(preferences)) course.Enlist(name); } public void Work() { experience++; } public void Sign(Contract contract) { // sign job contract with your name contract.Agree(name); } }
This class clearly has more than one responsibility and the fact that study related methods and work related methods both operate on a subset of data shows that. However separation can solve the problem.
class Student { private Person person; private int knowledge; private Func<Course, bool> preferences; public Student(Person newStudent) { person = newStudent; } public void Study() { knowledge++; } public void Enlist(IEnumerable<Course> courses) { // Select appropriate courses and enlist foreach(var course in courses.Where(preferences)) course.Enlist(person.Name); } } class Employee { private Person person; private int experience; public Employee(Person newEmployee) { person = newEmployee; } public void Work() { experience++; } public void Sign(Contract contract) { // sign job contract with person's name contract.Agree(person.Name); } }
What connections between objects it maintains?
What is connection between objects anyway? For example, a document can be seen a set of glyphs (text, graphics, structural elements like columns and rows). The connection between them is the fact they belong to the same document. So basically the document maintains connection between glyphs. So what must attract our attention in this area to spot Single Responsibility Principle violations?
If the object maintains connections between objects of different abstraction levels it is highly likely that it does someone’s job.
Consider an object model where a man wants to build a house. Man class maintains connection between tasks (got from an architect) and workers to build a house. And this where different abstraction levels (from domain model perspective) meet – a man hired foreman who is supposed to manage construction but it so happens that a man wants to control everything and treats hired foreman as a simple worker giving him tasks to do in not necessarily correct order. Basically a man got foreman’s responsibility (or in other words foreman abstraction isn't consistent).
abstract class Task { public bool Completed { get; private set; } public virtual void Do() { DoCore(); Completed = true; } protected abstract void DoCore(); } class Worker { public virtual bool CanDo(Task task) { // Can do everything worker =) return true; } public virtual void Accept(Task task) { task.Do(); } } class Foreman : Worker { IEnumerable<Worker> workers; Func<Task, bool> next; public override bool CanDo(Task task) { // Can do things only on time or otherwise the house // won't stand long return next(task); } public override void Accept(Task task) { // Foremen looks for a worker who can do the task workers.Where(w => w.CanDo(task)).First().Accept(task); } } // A man who tries to manage his new house construction class Man { IEnumerable<Task> construction; Foreman foremen; public void BuildHouse() { IEnumerable<Task> tasks; // As far as we don't know what to do next we'll ask foreman while ((tasks = construction.Where(t => foremen.CanDo(t) && !t.Completed)).Any()) { foreach (var task in tasks) { foremen.Accept(task); } } } }
A man must let foreman to manage tasks or become a foreman himself (we’ll take first approach or otherwise the house won't stand long).
class Foreman { IEnumerable<Worker> workers; Func<Task, bool> next; public void Manage(IEnumerable<Task> construction) { IEnumerable<Task> tasks; // Foreman selects tasks that are not completed and must be done next while((tasks = construction.Where(t => !t.Completed && next(t))).Any()) { foreach (var task in tasks) { workers.Where(w => w.CanDo(task)).First().Accept(task); } } } } class Man { IEnumerable<Task> construction; Foreman foremen; public void BuildHouse() { // Foreman takes the whole construction to manage foremen.Manage(construction); // Just check that everything is done if (construction.Where(t => !t.Completed).Any()) { throw new InvalidOperationException("Something isn't done!"); } } }
Now abstractions are leveled - man allows construction to be managed by foreman (foreman abstraction is now consistent).
What it decides?
It is quite uncomfortable when someone asks you for your opinion and then makes his own mind and tells you what and how to do. Why don’t tell what to do in the first place and let me decide how to do it. Isn’t that looking like you are doing my job? This is what Tell Don’t Ask Principle is about:
As the caller, you should not make decisions based on the called object’s state and then change the object’s state.
Here is couple of teenager definitions – which do you think makes more sense?
class Teenager { private int age; private List<string> clothes; // Is it age that drives your clothes preferences? public int Age { get { return age; } } // Do you really wan't your parents to dress you // just based on your age? public void TakeNew(string clothing) { clothes.Add(clothing); } } // ... or class Teenager { private List<string> clothes; // Preference is something personal private Func<string, bool> preference; // And you want to able to select clothes based on your // preferences? public bool SelectNew(IEnumerable<string> shop) { var clothing = shop.Where(preference).FirstOrDefault(); if (clothing != null) { // Got something you like clothes.Add(clothing); } // Tell your parents whether you need to go to other shop =) return clothing != null; } }
By violating Tell Don’t Ask principle caller gains responsibility on make decision instead of the called one. Busted!
What services it performs?
Service provider can do the job by itself or ask collaborators for help. Keeping dependencies on collaborators explicit facilitate controlled class growth. Classes should not contain more objects than a developer can fit in his or her short-term memory (5-7). Otherwise it is possible to introduce responsibilities into class which are either duplicated or not related. Whenever number of collaborators growth more than 5-7 you should consider whether all collaborators still help you to form single abstraction or a new one was introduced.
On the other hand it makes sense to look at consumers and in particular on how they consume supplied services. If consumers use different subsets of provided services it may be a sign that class captures more than one responsibility.
Recall the student example. Original class has two consumers: employer and university (for example). Both consumers were interested in a subset of provided methods (working and studying related respectively). And as we discovered the class captured two abstractions: employee and student.
Summary:
Inversion of Control pattern allows to decouple components (consumers) from their dependencies and takes care of dependencies location and lifetime management through delegation of these responsibilities to external (with respect to dependent type) component. This pattern actively used in composite application development.
Inversion of Control comes in two flavors (Unity provides both capabilities):
Service locator holds references to services and knows how to locate them. It is further used by dependent component to obtain necessary services. In other words consumers play active role.
interface IService { void Do(); } class ActiveConsumer { private readonly IUnityContainer m_locator; // Reference to service locator comes from outside public ActiveConsumer(IUnityContainer serviceLocator) { m_locator = serviceLocator; } public void Do() { // In order to fulfill its task active consumer relies // on service implementation that is obtained on demand // from service locator var service = m_locator.Resolve<IService>(); service.Do(); } }
Dependency injection makes dependent components passive (little or no work is done to get its dependencies). The only responsibility consumers still care about is to express their dependencies somehow (the way dependencies are expressed depends on pattern implementation, but for this example we will use single constructor automatic injection supported by Unity).
class PassiveConsumer { private readonly IService m_service; // This constructor is used to inject service dependency public PassiveConsumer(IService svc) { m_service = svc; } public void Do() { // We got this dependency from outside and done nothing // to let it happen - so just use it m_service.Do(); } } ... // At this point container resolves consumer's dependency // and injects it during construction var passiveConsumer = container.Resolve<PassiveConsumer>(); passiveConsumer.Do();
So what is the difference?
First, is dependency from service locator appropriate? If the component in question is supposed to be reused by others you may end up with putting unnecessary constraints (for example you are using some open source service locator but developers that could reuse your component are not allowed to use any open source stuff due to customer’s demand and thus won’t be able to reuse the component).
Second, dependencies visibility. Service locator makes consumer’s “real” dependencies hidden and dependency from service locator itself visible. When dependencies are explicit it is much easier to understand dependent class. Explicit dependencies allows you to assess and control the growth of the component. For example, if your component accepts 10 services in its constructor it may be a sign that it does, or knows or decides too much and it is time to split it. Consider the same thing when using service locator. In order for you to spot number of dependencies you need to look for all unique usage occurrences of service locator. It is not that hard with modern IDE but still it is not that easy as looking at component’s contract.
On the other hand, it makes sense to consider the audience of the component. If it will be reused by others and dependencies are hidden it may require deep knowledge of component’s inner workings in order to use it.
Third, consumer’s relation with dependency. Dependency injection promotes constant relations (from lifetime perspective). Consumer obtains its dependency at construction time and lives with it. On the other hand service locator compasses to temporary relations – get service instance when it is time, call its methods, discard it. Why discard? Because if the component has a constant relation why not use dependency injection otherwise which gives you explicit dependencies?
But anyway, what particular case forces locator usage? When consumer has longer lifetime than its dependency. For example, you are writing smart client application. You organized presentation layer using Model-View-Presenter pattern. Presenter calls remote service in response to user interaction. View controlled by a presenter can be opened for a long time. If presenter gets remote service proxy dependency only once it may happen that proxy will go into faulted state (for example, due to network connectivity problems) and any subsequent calls to it will result in exception. So it is better to dispose proxy every time a particular task accomplished and create new one when new task is on the way or cache it and in response to proxy going faulted create a new one (which is of course harder as long as you need to handle all cases where it used and maintain cache). Thus it seems that service locator is more appropriate in this case.
However we can make short lived dependencies explicit. Let’s assume that IService implementation instance must be disposed every time it is used.
interface IService : IDisposable { void Do(); } // This is still active consumer as it uses service locator to get service instance class ActiveConsumer { private readonly IUnityContainer m_locator; public ActiveConsumer(IUnityContainer serviceLocator) { m_locator = serviceLocator; } public void Do() { using (var service = m_locator.Resolve<IService>()) { service.Do(); } } }
Service locator has wide surface (it terms of services it can provide) and this makes consumer’s contract opaque. What we need to do is narrow the surface but still provide ability to create service instances (as long as we need to dispose them every time). Abstract factory will do the thing. Factory provides clear connection with service it creates. On the other hand we need to make consumer’s dependency from factory explicit. We will use dependency injection.
interface IServiceFactory { IService CreateService(); } // Consumer is no longer active as it gets its dependencies from outside class PassiveConsumer { private readonly IServiceFactory m_factory; // The dependency is now explicit public PassiveConsumer(IServiceFactory serviceFactory) { m_factory = serviceFactory; } public void Do() { // We still can create service instances on demand using (var service = m_factory.CreateService()) { service.Do(); } } }
How about this? That is not all. Factory clearly and explicitly states the relation between dependent component and dependency (service that is created by factory) – it is a temporary relation (as long as it provides ability to create new instances).
Sleeping barber problem is a classic synchronization problem proposed by Dijkstra that goes as follows:
A barbershop consists of a waiting room with n chairs, and the barber room containing the barber chair. If there are no customers to be served, the barber goes to sleep. If a customer enters the barbershop and all chairs are occupied, then the customer leaves the shop. If the barber is busy, but chairs are available, then the customer sits in one of the free chairs. If the barber is asleep, the customer wakes up the barber. Write a program to coordinate the barber and the customers.
Back in the days when Dijkstra proposed this problem (it was in 1965) probably rhythm of life was hasteless and barbers had chance to sleep at work and customers could wait until he woke up.
Most of the solutions use some sort of WaitHandle to do the trick. For example, classic solution is based on semaphores. Waiting on wait handles is not free.
But now let’s assume that analogy for this problem a barber that desperately needs customers so he is running around in circles while waiting impatiently when there are no customers. Customers are also quite busy so if the waiting queue is empty they want to be served immediately otherwise they go from corner to corner while waiting.
I call this problem “crazy barber”. From the problem statement it follows that we should avoid using any “sleeping” mechanisms to do the synchronization.
Haircut we represent through an Action that barber must execute and thus it cannot be null.
Number of chairs in the waiting room is known in advance and never changes. Waiting queue can not overflow because any customer that sees no free chairs turns around and leaves barbershop. So we can represent waiting queue as circular array of fixed size. Two indices are used to represent it. Head points to next request to be serviced if not null. Tail points to next free slot where request can be put.
Barber waits next in line (the one head index points to) non null request to get serviced. To mark slot as free for itself it nullifies it once obtained reference to request. Next head index is advanced to make this slot available for use by customers. Once it completed with execution it notifies waiting customer that it is free to go by changing done flag.
Customer first checks if there are free slots. Then it competes with other customers for a free slot (the one tail index points to). If successful it puts request that combines action and done flag into waiting queue array with the index value of just advanced tail. Once successfully queued request customer waits until value in the done flag is not changed.
Here goes “crazy barber”.
class CrazyBarber { private readonly int m_capacity; // Circular array that holds queued items private volatile Request[] m_queue; // Points to next free slot private volatile int m_tail; // Points to a slot where next item to be executed // is expected private volatile int m_head; public CrazyBarber(int capacity) { m_capacity = capacity; m_queue = new Request[m_capacity]; } // Queues action for execution if there is free slot and // waits for its execution completion public bool TryGetExecuted(Action action) { if (action == null) throw new ArgumentException(); var waitToEnq = new SpinWait(); while (true) { // Load tail first as if it will change compare and // swap below will fail anyway var tail = m_tail; // Now load head and this is the linearization point // of full queue case which results in unsuccessful // attempt var head = m_head; // Check if queue has some free slots if (tail - head >= m_capacity) // The queue is full, no luck return false; // Create request before interlocked operation as // it implies full barrier and thus will prevent // partially initialized request to be visible to // worker loop var request = new Request { m_action = action }; // Compete for the tail slot if (Interlocked.CompareExchange(ref m_tail, tail + 1, tail) != tail) { // We lost due to contention, spin briefly and // retry waitToEnq.SpinOnce(); continue; } var index = tail % m_capacity; // Here is the linearization point of successfull // attempt m_queue[index] = request; var waitToExe = new SpinWait(); // Wait until enqueued action is not executed while (!request.m_done) waitToExe.SpinOnce(); return true; } } // Runs single worker loop that does the execution and // must not be called from multiple threads public void Run(CancellationToken cancellationToken) { var waitToDeq = new SpinWait(); while (true) { var head = m_head; var index = head % m_capacity; waitToDeq.Reset(); // Though array field is marked as volatile access // to its elements are not treated as volatile // however its enough to make sure loop condition // is not optimized // Wait until new item is available or cancellation // is requested while (m_queue[index] == null) { if (cancellationToken.IsCancellationRequested) return; waitToDeq.SpinOnce(); } // Get request to be serviced and nullify it to // mark slot as free for yourself var request = m_queue[index]; m_queue[index] = null; // As there is only one worker advance head without // interlocked and here is the linearization point // of making free slot m_head = head + 1; // Do not call TryGetExecuted from action or it will // deadlock request.m_action(); // Make sure ready notification is not made visible // through reordering before action is completed // and store release guarantees that here request.m_done = true; } } class Request { internal Action m_action; internal volatile bool m_done; } }
Although tail index at some point will overflow let’s assume “crazy” barber won’t try to make around 2 billion haircuts =).
Object pool is a set of initialized objects that are kept ready to use, rather than allocated and destroyed on demand. Wikipedia
Object pool is a set of initialized objects that are kept ready to use, rather than allocated and destroyed on demand.
Wikipedia
Object pool usage may get performance improvement in case pooled object initialization cost and frequency of instantiation are high and at any period in time number of used objects is low (ThreadPool is a good example).
There are many questions to take into account when designing object pools. How to handle acquire request when there are no free objects? In single threaded scenarios you may choose to create new object or let the caller know that the request cannot be fulfilled. In multithreaded scenarios you may choose to wait for free objects until other threads release them. How to organize access to pooled objects? Based on your logic you may choose to use most recently used first strategy, or least recently or even random first. Besides that you need to provide synchronized access to internal object storage.
Let’s assume that we made decision on handling empty pool case (object pool growth strategy) and access to pooled objects (most recently used first approach) and now focused on minimizing synchronization costs to internal object storage.
The simplest way of doing it will be to protect any access with a lock. That will work however under high contention it may result in lock convoys (in most cases time spent within the lock will be pretty short necessary only to take ready to use object). We may use lock-free data structure as storage such as ConcurrentStack<T>. Or we may try to reduce contention.
The idea is somewhat similar to the algorithm used in ThreadPool where each worker thread in addition to global work item queue maintains local work item double ended queue (where one end is exposed to the owner and the other to worker threads that may want to steal work items if global queue is empty). Each worker thread puts newly created work items into its local queue to avoid synchronization costs. When worker thread is ready to process next work item it first tries to dequeue from its local queue, if fails it tries to get work item from global queue and lastly resorts to stealing from other threads (check out cool series of articles from Joe Duffy on building custom thread pool).
In order to build object pool we will maintain local per thread storage of pooled objects in addition to global storage. Pooled objects are stored in segments of a selected size (depends on usage scenarios so it must be specified during object pool creation). Global storage holds set of segments. In order to acquire pooled object thread must:
Returning object look like this:
Thus most of the time thread will work with its own local set of pooled objects. However it has its own cost. If for example a different thread comes and acquires pooled object and then never again use pooled objects we’ll get orphaned segment. So as usual it is a matter of tradeoff. If you have a limited set of threads that will work with object pool for a long time usage of this approach can be justified. If however you have either many threads that work with object pool for short periods of time and then never come back you’d probably better look into other approaches.
// Represents thread safe object pool public class ConcurrentObjectPool<T> { private readonly int m_segmentSize; // Thread local pool used without synchronization // to reduce costs private ThreadLocal<Segment> m_localPool = new ThreadLocal<Segment>(); // Global pool that is used once there is nothing or // too much in local pool private readonly ConcurrentStack<Segment> m_globalPool = new ConcurrentStack<Segment>(); // Factory function that is used from potentionally multiple // threads to produce pooled objects and thus must be thread // safe private readonly Func<T> m_factory; public ConcurrentObjectPool(Func<T> factory, int segmentSize) { m_factory = factory; m_segmentSize = segmentSize; } // Acquires object from pool public PoolObject<T> Acquire() { var local = m_localPool.Value; T item; // Try to acquire pooled object from local pool // first to avoid synchronization penalties if (local != null && local.TryPop(out item)) return new PoolObject<T>(item, this); // If failed (either due to empty or not yet // initialized local pool) try to acquire segment // that will be local pool from global pool if (!m_globalPool.TryPop(out local)) { // If failed create new segment using object // factory var items = Enumerable.Range(0, m_segmentSize) .Select(_ => m_factory()); local = new Segment(m_segmentSize, items); } m_localPool.Value = local; // Eventually get object from local non-empty pool local.TryPop(out item); return new PoolObject<T>(item, this); } // Releases pooled ojbect back to the pool however // it is accessible publicly to avoid multiple releases // of the same object internal void Release(T poolObject) { var local = m_localPool.Value; // Return object back to local pool first var divided = local.Push(poolObject); // If local pool has grown beyond threshold // return extra segment back to global pool if (divided != null) m_globalPool.Push(divided); } // Represents chunk of pooled objects class Segment { private readonly int m_size; // Using stack to store pooled objects assuming // that hot objects (recently used) provide better // locality private readonly Stack<T> m_items; public Segment(int size, IEnumerable<T> items) { m_size = size; m_items = new Stack<T>(items); } public bool TryPop(out T item) { item = default(T); // Pop item if any available if (m_items.Count > 0) { item = m_items.Pop(); return true; } return false; } public Segment Push(T item) { m_items.Push(item); // If current segment size is still smaller // than twice of original size no need to split if (m_items.Count < 2 * m_size) return null; // Otherwise split current segment to get it // pushed into global pool var items = Enumerable.Range(0, m_size) .Select(_ => m_items.Pop()); return new Segment(m_size, items); } } } // Represents disposable wrapper around pooled object // that is used to return object back to the pool public class PoolObject<T> : IDisposable { private readonly ConcurrentObjectPool<T> m_pool; private bool m_disposed; private readonly T m_value; // Get reference to the pool to return value to public PoolObject(T value, ConcurrentObjectPool<T> pool) { m_value = value; m_pool = pool; } public T Value { get { // Make sure value can't be obtained (though we can't // guarantee that it is not used) anymore after it is // released back to the pool ThrowIfDisposed(); return m_value; } } public void Dispose() { if (m_disposed) return; // As we are disposing pooled object disposal basically // equivalent to returning the object back to the pool m_disposed = true; m_pool.Release(m_value); } private void ThrowIfDisposed() { if (m_disposed) throw new ObjectDisposedException("Pool object has been disposed"); } }
And it is used somewhat like this
// Initialized elsewhere ConcurrentObjectPool<T> pool = ... ... using (var obj = pool.Acquire()) { // Use pooled object value Process(obj.Value); }
I omitted IDisposable implementation on the object pool to make code simpler. However in order to implement it we will need to track all segments in a separate collection as otherwise thread local segments won’t be accessible from the thread disposing the pool.
Set is an abstract data structure that can store certain values, without any particular order, and no repeated values. Static sets that do not change with time, and allow only query operations while mutable sets allow also the insertion and/or deletion of elements from the set. Wikipedia
Set is an abstract data structure that can store certain values, without any particular order, and no repeated values. Static sets that do not change with time, and allow only query operations while mutable sets allow also the insertion and/or deletion of elements from the set.
Though set definition says it doesn’t imply particular of values it is referred to how consuming code treats set. One legitimate way (though not the most efficient one) to implement set is to use sorted singly linked list that will allow us to eliminate duplicate values. Other ways include but not limited to self-balancing binary search tree, skip lists or hash table.
As the title says we are about to explore algorithm for concurrent set. Taking this into account we can use ConcurrentDictionary<TKey, TValue> to implement concurrent set. Assuming it is done =) let’s take a look at other options. At this point .NET Framework has to support for concurrent self-balancing binary search tree or skip lists and building those is quite tricky. On the other hand concurrent sorted singly linked list is still feasible solution. This well known algorithm contains useful techniques.
For mutable sets at least the following operations must be supported: add, remove, contains. The simplest way to do this is to wrap any list modifications with lock leading to coarse-grained synchronization. However under high contention this single lock will become a bottleneck taking into account that all operations has O(n) time complexity and thus serializing them will lead to significant performance hit.
Consider list with the following contents: 1->3->4->6->7. Operations Add(2) and Add(5) even when run concurrently do not interfere as they modify distinct areas of the list 1->(2)->3->4->(5)->6->7. Add operation affects two consequent nodes where new node must be added in between. The same is true for remove operation (it affects two consequent nodes: the node to be removed and its predecessor).These nodes will be used as sync roots to make thread safe modifications of the list. In order to prevent deadlocks nodes are locked always in the same order (predecessor first and than its successor).
What if we need to add new node either at the beginning or at the end? In that case we do not have a pair nodes. To work around this the list will contain two sentinel nodes (head and tail sentinels that remain in the list forever and any value is more than value in the head and less than value in the tail). Basically any list looks like h->x0->x1->…->xn->t assuming h contains negative infinity and t contains positive infinity values.
Let’s assume we have a list: h->1->2->5->6->t. Two threads (A and B) execute operations Add(3) and Add(4) respectively concurrently. Both of them need to add new node between nodes that contain 2 and 5 (h->1->2->(x)->5->6->t). One of them will be first who will succeed in locking both nodes. After nodes are successfully locked it will proceed with adding new node (let’s assume thread A succeeded). Once thread A finished with list modifications the list will be h->1->[2]->4->[5]->6->t yet thread B is trying to lock nodes in brackets. Thread B eventually will succeed in locking but his expectations may not be true anymore as it happens in this case (node that holds value 2 no longer points to node that holds 5).
Because between the moment a thread starts his attempt to lock pair of nodes and the moment it eventually succeeds in doing so the list can be modified by other thread as it tries to lock two nodes which is not atomic. Thus after a thread succeeds in locking it must do validation that its expectations are still true.
However dealing with node removal is even more tricky. Assume we have a list h->1->2->5->6->t and thread A attempts to Add(3) concurrently with thread B trying to Remove(2) or Remove(5) and thread B succeeds to be first. In that case once thread A will lock nodes that contain 2 and 5 it may observe list that is now h->1->5->6->t or h->1->2->6->t meaning of the locked nodes is no longer in the list and adding new value in between will lead to lost value (if predecessor was removed) or resurrected value (if successor was removed and now new nodes points to it).
Thus once a thread succeeds in locking both nodes it must check that locked nodes are still not removed from the list and predecessor still points to its observed previously successor. If the validation fails the operation must fallback and start again.
Well if the node is removed from the list how do we know that? One way to make removed node’s next reference null. However this is a bad idea because in that case other thread traverses list without locks (and this is deliberate behavior) may observe unexpected null reference. For example, assume we have a list h->1->2->3->6->t. Thread A tries to Add(4) while thread B tries to Remove(2). Assume that thread A while searching for 3->6 pair of nodes (to add 4 in between) moved its current reference to the node that contains 2 and it was preempted. Then thread B succeeds in Remove(2). If thread B set removed node next reference to null once thread B will wake up it will miserably fail though there are still sought nodes in the list. Thus list node’s next reference must always be non-null.
So how do we remove nodes. We must preserve next pointers even if the node is removed. Thus the node is simply marked as removed and then its predecessor’s next is updated. Thus a node to remove is no longer reachable but still points other list nodes. Thus any traverses in-progress won’t break. From memory perspective we rely on garbage collector to reclaim memory occupied by non reachable nodes.
Because of the mechanism chosen for nodes remove we can safely traverse the list with no locks. And this is true for all three operations. Contains operation just need to validate found nodes.
Now let’s code the thing.
public class ConcurrentSet<T> { private readonly IComparer<T> m_comparer; // Sentinel nodes private readonly Node m_head; private readonly Node m_tail; public ConcurrentSet(IComparer<T> comparer) { m_comparer = comparer; // Sentinel nodes cannot be removed from the // list and logically contain negative and // positive infinity values from T m_tail = new Node(default(T)); m_head = new Node(default(T), m_tail); } // Adds item to the set if no such item // exists public bool Add(T item) { // Continue attempting until succeeded // or failed while (true) { Node pred, curr; // Find where new node must be added Find(item, out pred, out curr); // Locks nodes starting from predecessor // to synchronize concurrent access lock (pred) { lock (curr) { // Check if found nodes still // meet expectations if (!Validate(pred, curr)) continue; // If the value is already in the // set we are done if (Equal(curr, item)) return false; // Otherwise add new node var node = new Node(item, curr); // At this point new node becomes // reachable pred.m_next = node; return true; } } } } // Removes item from the list if such item // exists public bool Remove(T item) { // Continue attempting until succeeded // or failed while (true) { Node pred, curr; // Find node that must be removed and // its predecessor Find(item, out pred, out curr); // Locks nodes starting from predecessor // to synchronize concurrent access lock (pred) { lock (curr) { // Check if found nodes still // meet expectations if (!Validate(pred, curr)) continue; // If the value is not in the set // we are done if (!Equal(curr, item)) return false; // Otherwise mark node as removed curr.m_removed = true; // And make it unreachable pred.m_next = curr.m_next; return true; } } } } // Checks if given item exists in the list public bool Contains(T item) { Node pred, curr; Find(item, out pred, out curr); return !curr.m_removed && Equal(curr, item); } // Searches for pair consequent nodes such that // curr node contains a value equal or greater // than given item void Find(T item, out Node pred, out Node curr) { // Traverse the list without locks as removed // nodes still point to other nodes pred = m_head; curr = m_head.m_next; while (Less(curr, item)) { pred = curr; curr = curr.m_next; } } static bool Validate(Node pred, Node curr) { // Validate that pair of nodes previously // found still meets the expectations // which essentially is checking whether // nodes still point to each other and no one // was removed from the list return !pred.m_removed && !curr.m_removed && pred.m_next == curr; } bool Less(Node node, T item) { return node != m_tail && m_comparer.Compare(node.m_value, item) < 0; } bool Equal(Node node, T item) { return node != m_tail && m_comparer.Compare(node.m_value, item) == 0; } class Node { internal readonly T m_value; internal volatile Node m_next; internal volatile bool m_removed; internal Node(T value, Node next = null) { m_value = value; m_next = next; } } }
This algorithm uses fine grained synchronization that improves concurrency and with lazy nodes removal allows us to traverse list with no locks at all. This is quite important as usually Contains operation is used more frequent than Add and Remove.
Hope this pile of bits makes sense to you =).
Binary search tree is a fundamental data structure that is used for searching and sorting. It also common problem to merge two binary search trees into one.
The simplest solution to do this is to take every element of one tree and insert it into the other tree. This may be really inefficient as it depends on how well target tree is balanced and it doesn’t take into account structure of the source tree.
A more efficient way of doing this is to use insertion into root. Assuming we have two trees A and B we insert root of tree A into tree B and using rotations move inserted root to become new root of tree B. Next we recursively merge left and right sub-trees of trees A and B. This algorithm takes into account both trees structure but insertion still depends on how balanced target tree is.
We can look at the problem from a different perspective. Binary search tree organizes its nodes in sorted order. Merging two trees means organizing nodes from both trees in sorted order. This sounds exactly like merge phase of merge sort. However trees cannot be directly consumed by this algorithm. So we need to convert them into sorted singly linked lists first using tree nodes. Then merge lists into a single sorted linked list. This list gives us sorted order for sought tree. This list must be converted back to tree. We got the plan, let’s go for it.
In order to convert binary search tree into sorted singly linked list we traverse tree in order converting sub-trees into lists and appending them to the resulting one.
// Converts tree to sorted singly linked list and appends it // to the head of the existing list and returns new head. // Left pointers are used as next pointer to form singly // linked list thus basically forming degenerate tree of // single left oriented branch. Head of the list points // to the node with greatest element. static TreeNode<T> ToSortedList<T>(TreeNode<T> tree, TreeNode<T> head) { if (tree == null) // Nothing to convert and append return head; // Do conversion using in order traversal // Convert first left sub-tree and append it to // existing list head = ToSortedList(tree.Left, head); // Append root to the list and use it as new head tree.Left = head; // Convert right sub-tree and append it to list // already containing left sub-tree and root return ToSortedList(tree.Right, tree); }
Merging sorted linked lists is quite straightforward.
// Merges two sorted singly linked lists into one and // calculates the size of merged list. Merged list uses // right pointers to form singly linked list thus forming // degenerate tree of single right oriented branch. // Head points to the node with smallest element. static TreeNode<T> MergeAsSortedLists<T>(TreeNode<T> left, TreeNode<T> right, IComparer<T> comparer, out int size) { TreeNode<T> head = null; size = 0; // See merge phase of merge sort for linked lists // with the only difference in that this implementations // reverts the list during merge while (left != null || right != null) { TreeNode<T> next; if (left == null) next = DetachAndAdvance(ref right); else if (right == null) next = DetachAndAdvance(ref left); else next = comparer.Compare(left.Value, right.Value) > 0 ? DetachAndAdvance(ref left) : DetachAndAdvance(ref right); next.Right = head; head = next; size++; } return head; } static TreeNode<T> DetachAndAdvance<T>(ref TreeNode<T> node) { var tmp = node; node = node.Left; tmp.Left = null; return tmp; }
Rebuilding tree from sorted linked list is quite interesting. To build balanced tree we must know the number of nodes in the final tree. That is why it is calculated during merge phase. Knowing the size allows to uniformly distribute nodes and build optimal tree from height perspective. Optimality depends on usage scenarios and in this case we assume that every element in the tree has the same probability to be sought.
// Converts singly linked list into binary search tree // advancing list head to next unused list node and // returning created tree root static TreeNode<T> ToBinarySearchTree<T>(ref TreeNode<T> head, int size) { if (size == 0) // Zero sized list converts to null return null; TreeNode<T> root; if (size == 1) { // Unit sized list converts to a node with // left and right pointers set to null root = head; // Advance head to next node in list head = head.Right; // Left pointers were so only right needs to // be nullified root.Right = null; return root; } var leftSize = size / 2; var rightSize = size - leftSize - 1; // Create left substree out of half of list nodes var leftRoot = ToBinarySearchTree(ref head, leftSize); // List head now points to the root of the subtree // being created root = head; // Advance list head and the rest of the list will // be used to create right subtree head = head.Right; // Link left subtree to the root root.Left = leftRoot; // Create right subtree and link it to the root root.Right = ToBinarySearchTree(ref head, rightSize); return root; }
Now putting everything together.
public static TreeNode<T> Merge<T>(TreeNode<T> left, TreeNode<T> right, IComparer<T> comparer) { Contract.Requires(comparer != null); if (left == null || right == null) return left ?? right; // Convert both trees to sorted lists using original tree nodes var leftList = ToSortedList(left, null); var rightList = ToSortedList(right, null); int size; // Merge sorted lists and calculate merged list size var list = MergeAsSortedLists(leftList, rightList, comparer, out size); // Convert sorted list into optimal binary search tree return ToBinarySearchTree(ref list, size); }
This solution is O(n + m) time and O(1) space complexity where n and m are sizes of the trees to merge.
Assume you own a bar that have single restroom with n stalls. In order to avoid lawsuits you want to make sure that only people of the same gender can be in the restroom at the same time and no accidents occur (nobody peed their pants). How would you write synchronization algorithm for it?
Translating into concurrency language we want to build a synchronization algorithm that allow no more than n threads of the same kind to enter critical section and it should be starvation free (threads that are trying to enter critical section eventually enter it).
The problem can be divided three parts:
Mutual exclusion algorithms have a good property with respect to starvation freedom. Assume you have two starvation free mutual algorithms A and B. Combined in the following way:
Enter code A
Enter code B
Critical section
Leave code B
Leave Code A
they form another starvation free mutual exclusion algorithm.
Limiting number of threads inside critical section to configured number can be easily solved with SemaphoreSlim (starvation free). Thus we need to solve problem of allowing only threads of the same kind to enter critical section.
Let’s denote two types of threads: black and white. Assume that thread tries to enter critical section. The following case are possible:
Now to prevent starvation we will switch turn to the different color once group of threads of current color leaves critical section. Turn is set to color that is now allowed to enter critical section. However if no threads are in the critical section a thread may enter if its not its turn (basically it captures the turn).
class WhiteBlackLock { private readonly object _sync = new object(); private readonly int[] _waiting = new int[2]; private int _turn; private int _count; public IDisposable EnterWhite() { return Enter(0); } public IDisposable EnterBlack() { return Enter(1); } private IDisposable Enter(int color) { lock (_sync) { if (_waiting[1 - _turn] == 0 && (_count == 0 || _turn == color)) { // Nobody is waiting and either no one is in the // critical section or this thread has the same // color _count++; _turn = color; } else { // Either somebody is waiting to enter critical // section or this thread has a different color // than the ones already in the critical section // and thus wait with the rest of the same color _waiting[color]++; // Wait until current group while (_waiting[color] > 0) Monitor.Wait(_sync); } // Wrap critical section leaving in a disposable to // enable convenient use with using statement return new Disposable(this); } } private void Leave() { lock (_sync) { // Indicate completion if (--_count != 0) return; // If this is the last one of the current group make // way for threads of other color to run by switching // turn _turn = 1 - _turn; // Before threads are awoken count must be set to // waiting group size so that they can properly report // their completion and not change turn too fast _count = _waiting[_turn]; // Indicatet that current group can enter critical // section _waiting[_turn] = 0; // Wake up wating threads Monitor.PulseAll(_sync); } } class Disposable : IDisposable { private readonly WhiteBlackLock _lock; private int _disposed; public Disposable(WhiteBlackLock @lock) { _lock = @lock; } public void Dispose() { // Make sure only the first call of allowed multiple // calls leaves critical section if (Interlocked.Exchange(ref _disposed, 1) == 0) _lock.Leave(); } } }
In order to avoid lock ownership tracking leaving critical section is represented through disposable.
var semaphore = new SemaphoreSlim(n); var whiteBlackLock = new WhiteBlackLock(); // Worker thread code to enter critical section using (whiteBlackLock.EnterWhite()) { semaphore.Wait(); // Critical section goes here semaphore.Release(); }
Now your restroom can safely serve the needs of your customers =).
In practice you may find yourself in a situation when you have several sequences of data that you want to drain in parallel and merge the results into a single sequence. This is what Merge combinator for sequences does. Reactive Extensions had it for enumerable and observable sequences however at some point it was decided to no longer support it for enumerable sequences in Reactive Extensions as it may be expressed through the same combinator for observable sequences.
I find it quite interesting to implement Merge combinator for enumerable sequences but to make it more interesting let’s change behavior a little bit. The original behavior of the EnumerableEx.Merge was to drain source sequences as fast as workers can and buffer results in a resulting queue until it is consumed by merged sequence enumerator. It can be implemented (maybe this is not the most efficient way but still) using
Instead let’s change behavior to allow producer proceed with getting next element from source sequence only once previously “produced” element is consumed (which basically means merged sequence enumerator reached it). This is sort of two way synchronization between producer and consumer where consumer cannot proceed unless there are ready elements and producer cannot proceed unless previous element is consumed. It is kind of similar to blocking collection. However in this case there is only one consumer and consumers aren’t blocked because the queue is full but rather until previously enqueued element is dequeued.Let’s code the thing!
class Consumer<T> { private readonly IEnumerable<IEnumerable<T>> _sources; private readonly object _sync = new object(); private readonly Queue<Producer<T>> _globalQueue = new Queue<Producer<T>>(); private readonly Queue<Producer<T>> _localQueue = new Queue<Producer<T>>(); // No need to mark these fields as volatile as // only consumer thread updates and reads them private bool _done; private int _count; private readonly IList<Exception> _exceptions = new List<Exception>(); public Consumer(IEnumerable<IEnumerable<T>> sources) { _sources = sources; } // Merges sources in parallel public IEnumerable<T> Merge() { Start(); while (true) { Wait(); while (_localQueue.Count > 0) { // Use local queue to yield values var producer = _localQueue.Dequeue(); // Get the actual value out of ready producer // while simultaneously allowing it to proceed // and observe early termination request or get // next value var next = producer.GetNext(!_done); // Using Notificaiton<T> from Rx to simplify // sequence processing if (next.Kind != NotificationKind.OnNext) { _count--; if (next.Kind == NotificationKind.OnError) { // Observed exception leads to early // termination however merge will allow // already running (not waiting) // producers to finish (potentially // bringing more exceptions to be // observed) while requesting waiters // to terminate _done = true; // Store observed exception to be further // thrown as part of the aggregate // exception _exceptions.Add(next.Exception); } } else { yield return next.Value; } } // Loop until all producers finished if (_count == 0) { // Either normally if (_exceptions.Count == 0) yield break; // Or exceptions were observed throw new AggregateException(_exceptions); } } } // Notifies consumer of ready elements public void Enqueue(Producer<T> producer) { // Notify consumer of a ready producer lock (_sync) { // Consumer will either observe non-empty // global queue (if it is not waiting already) // or pulse (if it is waiting) _globalQueue.Enqueue(producer); Monitor.Pulse(_sync); } } // Waits for ready elements private void Wait() { lock (_sync) { // As the only consumer is draining the global queue // once an empty queue is observed and consumer is // notified the queue will be non-empty if (_globalQueue.Count == 0) Monitor.Wait(_sync); // Copy whatever available to local queue to further // drain without bothering taking a lock while (_globalQueue.Count > 0) _localQueue.Enqueue(_globalQueue.Dequeue()); } } // Starts producers private void Start() { try { foreach (var source in _sources) { var producer = new Producer<T>(this, source.GetEnumerator()); Task.Factory.StartNew(producer.Enumerate); _count++; } } catch (Exception ex) { // If none of producers are started successfully // just rethrow the exception if (_count == 0) throw; // Some producers started notify them of early // termination _done = true; // Store observed exception to be further thrown as // part of the aggregate exception _exceptions.Add(ex); } } } class Producer<T> { private readonly object _sync = new object(); private readonly Consumer<T> _consumer; private IEnumerator<T> _enum; private volatile Notification<T> _next; private volatile bool _cont = true; private volatile bool _awake; public Producer(Consumer<T> consumer, IEnumerator<T> enumerator) { _consumer = consumer; _enum = enumerator; } // Drains source sequence public void Enumerate() { try { // Loop always observes non-null value as // it is initially non-null and everytime consumer // awakes producer it is set non-null value while (_cont && _enum.MoveNext()) { // Set continuation flag to null and wait to be // awoken by the consumer _awake = false; // Notify consumer of a ready element _next = new Notification<T>.OnNext(_enum.Current); _consumer.Enqueue(this); lock (_sync) { // If consumer was pretty quick and producer // missed the pulse it will observe the awake // flag and thus won't wait if (!_awake) Monitor.Wait(_sync); } } _next = new Notification<T>.OnCompleted(); } catch (Exception ex) { _next = new Notification<T>.OnError(ex); } finally { _enum.Dispose(); _enum = null; } // Notify consumer that the producer completed _consumer.Enqueue(this); } // Awakes/notifies producer that it can proceed public Notification<T> GetNext(bool cont) { // Store ready element in local variable // because once producer is unleashed below // it may override ready yet not returned element var next = _next; lock (_sync) { // Set awake flag in case producer miss the pulse _awake = true; _cont = cont; Monitor.Pulse(_sync); } return next; } }
With Producer<T> and Consumer<T> in place Merge combinator can be implemented as:
static class EnumerableEx { public static IEnumerable<T> Merge<T>(IEnumerable<IEnumerable<T>> sources) { Contract.Requires(sources != null); return new Consumer<T>(sources).Merge(); } }
At any given moment at most n elements will be in a global queue where n is the number of source sequences.
Great exercise!
A tiny detail that can be uncovered by looking at a problem on a different angle usually is the key to the solution. So many times I looked at a great API design or problem solution saying “how I didn’t see it”. But sometimes I do see. It was a problem of searching for the longest consecutive elements sequence within unsorted array of integers. For example, in {5, 7, 3, 4, 9, 10, 1, 15, 1, 3, 12, 2, 11} sought sequence is {1, 2, 3, 4, 5}.
The first thing that comes mind is to sort given array in O(n log n ) and look for longest consecutive elements sequence. Eh, we can do better than that.
Using bit vector indexed by numbers from original array may not be justified due incomparable solution space and original array size (although time complexity is O(n)).
Let’s look at the problem more closely. The problem may be reduced to problem of effective range manipulation. Disjoint-set structure or interval trees offer O(n log n) building time complexity. But they do not take into consideration fact that we are dealing with integers. Knowing range boundaries we can definitely say what numbers are in there (for example, [1..3] contains 1, 2, 3). O(1) time complexity for range manipulation operations with O(1) space complexity for each range are the things we are looking for. We can do this using two hash tables:
For example, for a range [x..y] the tables will hold low[x] = y and high[y] = x. The algorithm looks the following:
The only question left is how to check if an element already belongs to some range as we are keeping only range boundaries in hash tables. Any number is either processed previously and thus in one of the ranges or a new range created out of it (and potentially merged with others). Thus if a number previously seen it is in some range and we do not need to process it. So before scanning original array we can simply remove any duplicates in O(n) time and space.
class Solver { public static Tuple<int, int> FindMaxRange(IEnumerable<int> seq) { // Generate all ranges and select maximum return EnumerateRanges(seq) .Aggregate((x, y) => Length(x) > Length(y) ? x : y); } public static IEnumerable<Tuple<int, int>> EnumerateRanges(IEnumerable<int> seq) { var low = new Dictionary<int, int>(); var high = new Dictionary<int, int>(); // Remove duplicates foreach (var val in seq.Distinct()) { // Create unit size range low[val] = high[val] = val; // Merge [i..i] with [i+1..y] var endsWith = MergeWithNext(val, low, high, 1); // Merge [i..endsWith] with [x..i-1] MergeWithNext(endsWith, high, low, -1); } return low.Select(p => Tuple.Create(p.Key, p.Value)); } static int MergeWithNext(int currStart, IDictionary<int, int> low, IDictionary<int, int> high, int sign) { var currEnd = low[currStart]; var nextStart = currEnd + sign; if (low.ContainsKey(nextStart)) { low[currStart] = low[nextStart]; high[low[currStart]] = currStart; low.Remove(nextStart); high.Remove(currEnd); } return low[currStart]; } static int Length(Tuple<int, int> t) { return t.Item2 - t.Item1; } }
The solution has O(n) time and space complexity assuming hash table implementation has O(1) time and space complexity for each operation/element.
String matching is about searching for occurrence (first or all occurrences – it makes difference from parallelization point of view as you shall see soon) of a pattern in a given text.
This problem naturally fits into data parallelism scenario although details depend on whether we want to find first or all occurrences of a pattern. Parallel searching for all occurrences is simpler as searching space is static (whole text needs to be looked through) in contrast to searching for the first occurrence that may be anywhere in a text (reducing search space improves performance otherwise the solution is either equal to sequential search till first occurrence or parallel search of all occurrences with getting the first one).
In order to find all pattern occurrences text can be separated into chunks that are processed in parallel. Each chunk overlaps with its immediate neighbor (except for the last one) for no more than length(pattern) - 1 characters to cover the case when pattern occurs in text such that chunk starting position is within pattern.
public static class ParallelAlgorithms { // Find all occurrences of a pattern in a text public static IEnumerable<int> IndexesOf(this string text, string pattern, int startIndex, int count) { var proc = Environment.ProcessorCount; // Do range partitioning var chunk = (count + proc - 1) / proc; var indexes = new IEnumerable<int>[proc]; Parallel.For(0, proc, p => { // Define overlapping with its immediate // neighbor chunk except for the last one var before = p * chunk; var now = Math.Min(chunk + pattern.Length - 1, count - p * chunk); // Sequentially search for patterns occurences // and store local result indexes[p] = SequentialIndexesOf(text, pattern, startIndex + before, now); }); return indexes.SelectMany(p => p); } static IEnumerable<int> SequentialIndexesOf(string text, string pattern, int startIndex, int count) { for (var i = 0; i <= count - pattern.Length;) { var found = text.IndexOf(pattern, startIndex + i, count - i); // No pattern occurrences is found till the end if (found == -1) break; yield return found; // Proceed with next to found position i = found + 1; } } }
Now for the first occurrence scenario search space must be reduced minimizing amount of unnecessary work (it will be non-zero in most cases due speculative processing).
One way to do this is to separate text into chunks and process them in parallel such that if pattern is found in chunk(i) processing of any chunk(j) where j > i is cancelled. Assuming n is the length of a text and p equals to logical processor count in the worst case (when first occurrence of a pattern is at the end of the first chunk) amount of the unnecessary work will be (p - 1)*n/p. On the other hand range partitioning has poor performance when workload is unbalanced.
What we can do instead is dynamic partitioning where whole text is separated into groups of chunks of the same length within group and chunk length in consequent group is two times large than previous group had. Group size should be set to number of logical processors. Thus, if c denotes chunk of unit size text will be separated like c, c, c, …, cc, cc, cc, …, cccc, cccc, cccc,… Now this sequence of chunks can be processed in parallel (respecting order) breaking at first found occurrence. Thus in worst case at most p*c amount of unnecessary work will be done.
This partitioning strategy is used in PLINQ and called chunk partitioning. Although text can be treated as a sequence of characters and chunk partitioning can be used out of the box but that is not what we want as otherwise we will process individual characters rather than text chunks. Instead we’ll produce sequence of chunks manually and using single item partitioner and Parallel.ForEach process them in parallel respecting order.
public static class ParallelAlgorithms { // Find first occurrence of a pattern in a text public static int IndexOf(this string text, string pattern, int startIndex, int count) { var minChunkSize = pattern.Length << 5; var maxChunkSize = minChunkSize << 3; // Create sequence of chunks var chunks = PartitionRangeToChunks(startIndex, count, minChunkSize, maxChunkSize, Environment.ProcessorCount); // Process chunks in parallel respecting order var chunkPartitioner = SingleItemPartitioner.Create(chunks); var concurrentBag = new ConcurrentBag<int>(); Parallel.ForEach(chunkPartitioner, (range, loop) => { var start = range.Item1; var length = Math.Min(startIndex + count - start, range.Item2 + pattern.Length - 1); var index = text.IndexOf(pattern, start, length); // No pattern occurrences in this chunk if (index < 0) return; // Store shared result concurrentBag.Add(index); // Let running parallel iterations complete and // prevent starting new ones loop.Break(); }); // Pick first occurrence or indicate no occurrence return concurrentBag.Count > 0 ? concurrentBag.Min() : -1; } static IEnumerable<Tuple<int, int>> PartitionRangeToChunks(int start, int count, int minChunkSize, int maxChunkSize, int doubleAfter) { var end = start + count; var chunkSize = Math.Min(minChunkSize, count); while (start < end) { for (var i = 0; i < doubleAfter && start < end; i++) { var next = Math.Min(end, start + chunkSize); yield return Tuple.Create(start, next - start); start = next; } chunkSize = Math.Min(maxChunkSize, chunkSize * 2); } } }
Search in parallel! =)
Graph is a concept used to describe relationships between things where vertices (or nodes) represent things and edges represent relationships. Graph is made up of two finite sets (vertices and edges) and denoted by G = (V, E). Nodes usually are denoted by non-negative integers (for example, graph with 3 nodes with have nodes denoted by 0, 1, 2) and edges are pairs of nodes (s, t) where s is a source and t is a target node.
Searching for a simple path (path with no repeated vertices) that connects two nodes is one of the basic problems that are solved using graph search. We need to visit each node once (meaning visited nodes tracking is on) to find out is there any path between two given nodes. Time complexity for graph search is O(V).
Let’s parallelize the thing to speed it up.
Graph representation is irrelevant for our case so let’s assume implementation of the following public API is in place:
public class Graph { // Initializes new graph with given number of nodes and // set of edges public Graph(int nodes, IEnumerable<Tuple<int, int>> edges); // Gets number of nodes in the graph public int Nodes { get; } // Gets list of nodes adjacent to given one public IEnumerable<int> AdjacentTo(int node); }
Starting with a source node for each node being processed search condition is checked and search is expanded to adjacent nodes that are not yet visited. Search continues until condition is met or no more reachable not visited nodes are left.
First part of the solution is to process nodes in parallel until no more is left or early termination is requested. The idea is to maintain two work queues for current and next phases. Items from current phase queue are processed in parallel and new work items are added to next phase queue. At the end of phase queues are switched. While this mechanism covers “while not empty” scenario, cooperative cancellation covers early search termination.
public static class ParallelAlgorithms { public static void DoWhileNotEmpty<T>(IEnumerable<T> seed, Action<T, Action<T>> body, CancellationToken cancellationToken) { // Maintain two work queues for current and next phases var curr = new ConcurrentQueue<T>(seed); var next = new ConcurrentQueue<T>(); var parallelOptions = new ParallelOptions {CancellationToken = cancellationToken}; // Until there is something to do while (!curr.IsEmpty) { // Function to add work for the next phase Action<T> add = next.Enqueue; // Process current work queue in parallel while // populating next one Parallel.ForEach(curr, parallelOptions, t => body(t, add)); // Switch queues and empty next one curr = next; next = new ConcurrentQueue<T>(); } } }
Now that we have processing mechanism we need to bring visited nodes tracking mechanism. As part of our solution we must find a path that connects two given nodes if one exists. Thus instead of just remembering which nodes were visited we’ll keep track of parent nodes used to visit a node. Unvisited nodes are marked with –1. Once search is finished we can reconstruct path backwards between target and source by following parent links.
public static class ParallelGraph { public static int[] Search(this Graph graph, int source, Func<Tuple<int, int>, bool> func) { const int undef = -1; // Start with dummy loop edge var seed = Enumerable.Repeat(Tuple.Create(source, source), 1); // Mark all nodes as not yet visited var tree = Enumerable.Repeat(undef, graph.Nodes).ToArray(); tree[source] = source; // Cancellation token source used to exit graph search // once search condition is met. var cancellationTokenSource = new CancellationTokenSource(); try { // Until there are reachable not visited nodes ParallelAlgorithms .DoWhileNotEmpty(seed, (edge, add) => { var from = edge.Item2; // and search condition is not met if (!func(edge)) cancellationTokenSource.Cancel(); // Try expand search to adjacent nodes foreach (var to in graph.AdjacentTo(from)) { // Expand search to not yet visited nodes // (otherwise if node is expanded and // then checked during processing we may // end up with lots of already visited // nodes in memory which is a problem // for large dense graphs) marking nodes // to visit next if (Interlocked.CompareExchange(ref tree[to], from, undef) != undef) continue; add(Tuple.Create(from, to)); } }, cancellationTokenSource.Token); } catch (OperationCanceledException ex) { // Check if exception originated from parallel // processing cancellation request if (ex.CancellationToken != cancellationTokenSource.Token) throw; } // In the end we have spanning tree that connects // all the nodes reachable from source return tree; } public static IEnumerable<int> FindPath(this Graph graph, int from, int to) { // Search within graph until reached 'to' var tree = Search(graph, from, edge => edge.Item2 != to); // Check if there is a path that connects 'from' and 'to' if (tree[to] == -1) return Enumerable.Empty<int>(); // Reconstruct path that connects 'from' and 'to' var path = new List<int>(); while (tree[to] != to) { path.Add(to); to = tree[to]; } path.Add(to); // Reverse path as it is reconstructed backwards return Enumerable.Reverse(path); } }
Done!
Find a job you like and you add five days to every week. - H. Jackson Brown, Jr.
Find a job you like and you add five days to every week.
- H. Jackson Brown, Jr.
I decided to join Microsoft in attempt to make my week longer. Lot’s of things to learn and challenging problems to solve. But this is where interesting topics come from.
I will continue chasing state of the art soon. Now I need to get materialized in Seattle area =).
Divide and conquer algorithm solves the problem by:
Sub-problem independency makes divide and conquer algorithms natural for dynamic task parallelism. Quicksort is a good example (actually you can find its parallel version here). We’ll focus on parallel merge sort as it better parallelizes. Basically the term parallelism means ratio T1/Ti where T1 is the execution time on a single processor and Ti is the execution on infinite number of processors.
Let’s implement sequential merge sort first as parallel merge sort will use it. In general merge sort in terms of divide and conquer defined as follows:
It is quite straightforward with a small improvement that allows to avoid unnecessary allocations and data copying.
class MergeSortHelper<T> { private readonly IComparer<T> _comparer; public MergeSortHelper() : this(Comparer<T>.Default) { } public MergeSortHelper(IComparer<T> comparer) { _comparer = comparer; } public void MergeSort(T[] array, int low, int high, bool parallel) { // Create a copy of the original array. Switching between // original array and its copy will allow to avoid // additional array allocations and data copying. var copy = (T[]) array.Clone(); if (parallel) ParallelMergeSort(array, copy, low, high, GetMaxDepth()); else SequentialMergeSort(array, copy, low, high); } private void SequentialMergeSort(T[] to, T[] temp, int low, int high) { if (low >= high) return; var mid = (low + high) / 2; // On the way down the recursion tree both arrays have // the same data so we can switch them. Sort two // sub-arrays first so that they are placed into the temp // array. SequentialMergeSort(temp, to, low, mid); SequentialMergeSort(temp, to, mid + 1, high); // Once temp array contains two sorted sub-arrays // they are merged into target array. SequentialMerge(to, temp, low, mid, mid + 1, high, low); // On the way up either we are done as the target array // is the original array and now contains required // sub-array sorted or it is the temp array from previous // step and contains smaller sub-array that will be // merged into the target array from previous step // (which is the temp array of this step and so we // can destroy its contents). } // Although sub-arrays being merged in sequential version // are adjacent that is not the case for parallel version // and thus sub-arrays boundaries must be specified // explicitly. private void SequentialMerge(T[] to, T[] temp, int lowX, int highX, int lowY, int highY, int lowTo) { var highTo = lowTo + highX - lowX + highY - lowY + 1; for (; lowTo <= highTo; lowTo++) { if (lowX > highX) to[lowTo] = temp[lowY++]; else if (lowY > highY) to[lowTo] = temp[lowX++]; else to[lowTo] = Less(temp[lowX], temp[lowY]) ? temp[lowX++] : temp[lowY++]; } } private bool Less(T x, T y) { return _comparer.Compare(x, y) < 0; } ...
Now we need to parallelize it. Let’s get obvious out of the way. Sorting two sub-arrays can be done in parallel just like parallel quicksort. We can proceed to merge step only once both sub-arrays are sorted.
Parallelizing merge is a more interesting task. Sequential version looks like a single indivisible task. We need to find a way to separate merging into independent tasks that can be run in parallel. The idea behind algorithm is the following:
Although algorithm description pretty abstract there will be more detailed comments in code below.
... private const int SEQUENTIAL_THRESHOLD = 2048; // Recursion depth is utilized to limit number of spawned // tasks. private void ParallelMergeSort(T[] to, T[] temp, int low, int high, int depth) { if (high - low + 1 <= SEQUENTIAL_THRESHOLD || depth <= 0) { // Resort to sequential algorithm if either // recursion depth limit is reached or sub-problem // size is not big enough to solve it in parallel. SequentialMergeSort(to, temp, low, high); return; } var mid = (low + high) / 2; // The same target/temp arrays switching technique // as in sequential version applies in parallel // version. sub-arrays are independent and thus can // be sorted in parallel. depth--; Parallel.Invoke( () => ParallelMergeSort(temp, to, low, mid, depth), () => ParallelMergeSort(temp, to, mid + 1, high, depth) ); // Once both taks ran to completion merge sorted // sub-arrays in parallel. ParallelMerge(to, temp, low, mid, mid + 1, high, low, depth); } // As parallel merge is itself recursive the same mechanism // for tasks number limititation is used (recursion depth). private void ParallelMerge(T[] to, T[] temp, int lowX, int highX, int lowY, int highY, int lowTo, int depth) { var lengthX = highX - lowX + 1; var lengthY = highY - lowY + 1; if (lengthX + lengthY <= SEQUENTIAL_THRESHOLD || depth <= 0) { // Resort to sequential algorithm in case of small // sub-problem or deep recursion. SequentialMerge(to, temp, lowX, highX, lowY, highY, lowTo); return; } if (lengthX < lengthY) { // Make sure that X range no less than Y range and // if needed swap them. ParallelMerge(to, temp, lowY, highY, lowX, highX, lowTo, depth); return; } // Get median of the X sub-array. As X sub-array is // sorted it means that X[lowX .. midX - 1] are less // than or equal to median and X[midx + 1 .. highX] // are greater or equal to median. var midX = (lowX + highX) / 2; // Find element in the Y sub-array that is strictly // greater than X[midX]. Again as Y sub-array is // sorted Y[lowY .. midY - 1] are less than or equal // to X[midX] and Y[midY .. highY] are greater than // X[midX]. var midY = BinarySearch(temp, lowY, highY, temp[midX]); // Now we can compute final position in the target // array of median of the X sub-array. var midTo = lowTo + midX - lowX + midY - lowY; to[midTo] = temp[midX]; // The rest is to merge X[lowX .. midX - 1] with // Y[lowY .. midY - 1] and X[midx + 1 .. highX] // with Y[midY .. highY] preceeding and following // median respectively in the target array. As // pairs are idependent from their final position // perspective they can be merged in parallel. depth--; Parallel.Invoke( () => ParallelMerge(to, temp, lowX, midX - 1, lowY, midY - 1, lowTo, depth), () => ParallelMerge(to, temp, midX + 1, highX, midY, highY, midTo + 1, depth) ); } // Searches for index the first element in low to high // range that is strictly greater than provided value // and all elements within specified range are smaller // or equal than index of the element next to range is // returned. private int BinarySearch(T[] from, int low, int high, T lessThanOrEqualTo) { high = Math.Max(low, high + 1); while (low < high) { var mid = (low + high) / 2; if (Less(from[mid], lessThanOrEqualTo)) low = mid + 1; else high = mid; } return low; } private static int GetMaxDepth() { // Although at each step we split unsorted array // into two equal size sub-arrays sorting them // not be perfectly balanced because parallel merge // may not be balanced. So we add some extra space for // task creation and so will keep CPUs busy. return (int) Math.Log(Environment.ProcessorCount, 2) + 4; } }
Now that the thing is ready let’s do some comparison. We’ll use small benchmark helper.
public class Benchmark { public static IEnumerable<long> Run<T>(Func<int, T[]> generator, Action<T[]> action, int times) { var samples = new long[times]; var sw = new Stopwatch(); for (var i = 0; i < times; i++) { var input = generator(i); sw.Restart(); action(input); sw.Stop(); samples[i] = sw.ElapsedMilliseconds; } return samples; } }
We'll calculate and compare average running time for a number of samples for sequential quicksort, parallel quicksort and parallel merge sort.
const int count = 10 * 1000 * 1000; const int iterations = 10; var rnd = new Random(); var input = new int[count]; Func<int, int[]> gen = i => { // Avoid allocating large array for every // run and just fill existing one for (var j = 0; j < input.Length; j++) input[j] = rnd.Next(); return input; }; Action<int[]> seqArraySort, parallelQuickSort, parallelMergeSort; // If Array.Sort<T>(T[] a, IComparer<T> c) sees // Comparer<T>.Default it resorts to unmanaged CLR provided // sorting implementation, so we use here dummy comparer to // force it to use managed quicksort. seqArraySort = a => Array.Sort(a, new IntComparer()); // Parallel quicksort implementation from TPL extras. // We use dummy comparer as well because at some point // parallel quicksort resorts back to Array.Sort parallelQuickSort = a => ParallelAlgorithms.Sort(a, new IntComparer()); // Our parallel merge sort implementation parallelMergeSort = a => new MergeSortHelper<int>().MergeSort(a, 0, a.Length - 1, true); var sq = Benchmark.Run(gen, seqArraySort, iterations).Average(); var pq = Benchmark.Run(gen, parallelQuickSort, iterations).Average(); var pm = Benchmark.Run(gen, parallelMergeSort, iterations).Average();
On a two cores machine I got that parallel merge sort is more than 2x faster than sequential quicksort and up to 25% faster than parallel quicksort but at the cost of additional O(n) space. Still it is a good example of how to use dynamic task parallelism. Enjoy!
Another puzzle is at stake, folks. This time it is binary tree related (not necessarily binary search tree as we are not interested in data relations but rather in binary tree structure). We need to traverse binary tree level by level (level is defined as set of all nodes at the same distance from root) in such a way that traversal direction within level changes from level to level thus forming a spiral. For example, consider binary tree below (it is rotated 90 degrees counter clockwise). Asterisk means NIL node.
* 20 * 19 * 18 * 17 * 13 * 12 * 11 * 10 * 7 * 6 * 5 * 4 * 2 *
Desired traversal for this tree will be: 10, 4, 11, 12, 6, 2, 5, 7, 19, 20, 17, 13, 18. Dissected into levels it looks like:
It may not be obvious how to approach the problem. We’ll start with a well know problem of traversing binary tree level by level (also known as breadth first traversal). It can be done using queue.
static IEnumerable<T> LevelOrderLeftToRight<T>(TreeNode<T> root) { if (root == null) yield break; var next = new Queue<TreeNode<T>>(); next.Enqueue(root); while(next.Count > 0) { var node = next.Dequeue(); yield return node.Data; EnqueueIfNotNull(next, node.Left); EnqueueIfNotNull(next, node.Right); } } static void EnqueueIfNotNull<T>(Queue<T> queue, T value) where T:class { if (value != null) queue.Enqueue(value); }
The queue contains yet to be examined nodes in level by level left to right (as we first enqueue left child and then right) order. At each step node at the front is dequeued, examined and its children are enqueued for further examination thus preserving level by level left to right order. It yields the following traversal: 10, 4, 11, 2, 6, 12, 5, 7, 19, 17, 20, 13, 18. Let’s split node sequence into levels:
That looks close to sought-for order except every odd numbered level has must be reversed. With all nodes in the same container (queue) it is hard to do this. Let’s change code a little bit to make every two adjacent layers are separated into different containers.
static IEnumerable<T> LevelOrderLeftToRight<T>(TreeNode<T> root) { if (root == null) yield break; var curr = new Queue<TreeNode<T>>(); var next = new Queue<TreeNode<T>>(); next.Enqueue(root); do { // Swap level containers Swap(ref curr, ref next); // Examine all nodes at current level while (curr.Count > 0) { var node = curr.Dequeue(); yield return node.Data; // Fill next level preserving order EnqueueIfNotNull(next, node.Left); EnqueueIfNotNull(next, node.Right); } // Continue until next level has nodes } while (next.Count > 0); } static void Swap<T>(ref T a, ref T b) { var tmp = a; a = b; b = tmp; }
With adjacent levels separated we can change order within levels. Next level is a set of child nodes from current level. FIFO container (queue) makes sure that child nodes of earlier examined node will also be examined earlier. But this is opposite to what we are looking for. Change it to LIFO container (stack)! And that’s it. Child nodes of earlier examined node will be examined later.
static IEnumerable<T> LevelOrderBySpiral<T>(TreeNode<T> root) { if (root == null) yield break; var curr = new Stack<TreeNode<T>>(); var next = new Stack<TreeNode<T>>(); // Specifies direction for the next level var leftToRight = true; next.Push(root); do { Swap(ref curr, ref next); while (curr.Count > 0) { var node = curr.Pop(); yield return node.Data; // If next level must be traversed from left to right // we must first push right child node and then left // and in opposite order if next level will be // traversed from right to left PushIfNotNull(next, leftToRight ? node.Right : node.Left); PushIfNotNull(next, leftToRight ? node.Left : node.Right); } // Change direction within level leftToRight = !leftToRight; } while (next.Count > 0); } static void PushIfNotNull<T>(Stack<T> stack, T value) where T : class { if (value != null) stack.Push(value); }
The code yields sought-for order.
If data to be sorted doesn’t fit into main memory external sorting is applicable. External merge sort can be separated into two phases:
To implement this algorithm I will use solutions from my previous posts so it may be helpful for you to look at them:
Let’s assume that M records at the same time are allowed to be loaded into main memory. One of the ways to create initial runs is to successively read M records from original file, sort them in memory and write back to disk. However we will use approach that allows us to create longer runs. It is called replacement selection.
The core structure behind this algorithm is priority queue. Taking one by one current minimum element out of the queue forms ordered sequence. And this is exactly what run stands for. The algorithm can be described as follows:
At any given moment at most M records are loaded into main memory as single written element into current run is replaced with single element from unsorted file if any (depending on comparison it either goes into current or next run).
Next step is to merge created initial runs. For the merge step we will use simplified algorithm (more advanced algorithms work with multiple physical devices to distribute runs, take into account data locality, etc.) based on k-way merge:
Yeap, it is that simple. And let’s code it.
The implementation abstracts file structure and reading/writing details making algorithm more concise and easier to understand.
abstract class ExternalSorter<T> { private readonly IComparer<T> m_comparer; private readonly int m_capacity; private readonly int m_mergeCount; protected ExternalSorter(IComparer<T> comparer, int capacity, int mergeCount) { m_comparer = comparer; m_capacity = capacity; m_mergeCount = mergeCount; } // Sorts unsorted file and returns sorted file name public string Sort(string unsorted) { var runs = Distribute(unsorted); return Merge(runs); } // Write run to disk and return created file name protected abstract string Write(IEnumerable<T> run); // Read run from file with given name protected abstract IEnumerable<T> Read(string name); // Merge step in this implementation is simpler than // the one used in polyphase merge sort - it doesn't // take into account distribution over devices private string Merge(IEnumerable<string> runs) { var queue = new Queue<string>(runs); var runsToMerge = new List<string>(m_mergeCount); // Until single run is left do merge while (queue.Count > 1) { // Priority queue must not contain records more than // required var count = m_mergeCount; while (queue.Count > 0 && count-- > 0) runsToMerge.Add(queue.Dequeue()); // Perform n-way merge on selected runs where n is // equal to number of physical devices with // distributed runs but in our case we do not take // into account them and thus n is equal to capacity var merged = runsToMerge.Select(Read).OrderedMerge(m_comparer); queue.Enqueue(Write(merged)); runsToMerge.Clear(); } // Last run represents source file sorted return queue.Dequeue(); } // Distributes unsorted file into several sorted chunks // called runs (run is a sequence of records that are // in correct relative order) private IEnumerable<string> Distribute(string unsorted) { var source = Read(unsorted); using (var enumerator = source.GetEnumerator()) { var curr = new PriorityQueue<T>(m_comparer); var next = new PriorityQueue<T>(m_comparer); // Prefill priority queue to capacity which is used // to create runs while (curr.Count < m_capacity && enumerator.MoveNext()) curr.Enqueue(enumerator.Current); // Until unsorted source and priority queues are // exhausted while (curr.Count > 0) { // Create next run and write it to disk var sorted = CreateRun(enumerator, curr, next); var run = Write(sorted); yield return run; Swap(ref curr, ref next); } } } private IEnumerable<T> CreateRun(IEnumerator<T> enumerator, PriorityQueue<T> curr, PriorityQueue<T> next) { while (curr.Count > 0) { var min = curr.Dequeue(); yield return min; // Trying to move run to an end enumerator will // result in returning false and thus current // queue will simply be emptied step by step if (!enumerator.MoveNext()) continue; // Check if current run can be extended with // next element from unsorted source if (m_comparer.Compare(enumerator.Current, min) < 0) { // As current element is less than min in // current run it may as well be less than // elements that are already in the current // run and thus from this element goes into // next run next.Enqueue(enumerator.Current); } else { // Extend current run curr.Enqueue(enumerator.Current); } } } private static void Swap<U>(ref U a, ref U b) { var tmp = a; a = b; b = tmp; } }
In the example below I created type that sorts text files containing single number per line.
class TextFileOfNumbersExternalSorter : ExternalSorter<int> { public TextFileOfNumbersExternalSorter(int capacity, int mergeCount) : base(Comparer<int>.Default, capacity, mergeCount) { } protected override string Write(IEnumerable<int> run) { var file = Path.GetTempFileName(); using (var writer = new StreamWriter(file)) { run.Run(writer.WriteLine); } return file; } protected override IEnumerable<int> Read(string name) { using (var reader = new StreamReader(name)) { while (!reader.EndOfStream) yield return Int32.Parse(reader.ReadLine()); } File.Delete(name); } }
That is used like this:
// capacity, mergeCount and unsortedFileName are initialized elsewhere var sorter = new TextFileOfNumbersExternalSorter(capacity, mergeCount); var sortedFileName = sorter.Sort(unsortedFileName);
That’s it folks!
Like most of life's problems, this one can be solved with bending - Bender B.Rodrigues
Like most of life's problems, this one can be solved with bending
- Bender B.Rodrigues
Let’s bend another problem. Given set of characters P and string T find minimum window in T that contains all characters in P. Applicable solution is restricted to O(length(T)) time complexity. For example, given a string T “of characters and as” and set of characters T in a form of a string “aa s” the minimum window will be “and as”.
The problem can be broken into two parts:
Selecting every possible window (all unique pairs (i, j) where 0 <= i <= j < length(T)) will lead to solution worse than O(length(T)^2) because you still need to check if all characters from P are within selected window. Instead we will check every possible window ending position. Thus there are at least length(T) windows to consider.
Any feasible window has length equal to or greater than length(P). Performing recheck for any considered window will result in a solution no better than O(length(T)*length(P)). Instead we need to use check results from previous iteration.
Now we need to make sure that checking if a particular character is in P is done in an optimal way. Taking into account that a particular character may appear more than once and window thus must contain appropriate number of characters. We will use hash table to map unique characters from P to their count for fast lookup.
And now let’s tie all things together.
Code the thing! =)
static string FindMinWindow(string t, string p) { // Create char to count mapping for fast lookup // as some characters may appear more than once var charToCount = new Dictionary<char, int>(); foreach (var c in p) { if (!charToCount.ContainsKey(c)) charToCount.Add(c, 0); charToCount[c]++; } var unmatchesCount = p.Length; int minWindowLength = t.Length + 1, minWindowStart = -1; int currWindowStart = 0, currWindowEnd = 0; for (; currWindowEnd < t.Length; currWindowEnd++) { var c = t[currWindowEnd]; // Skip chars that are not in P if (!charToCount.ContainsKey(c)) continue; // Reduce unmatched characters count charToCount[c]--; if (charToCount[c] >= 0) // But do this only while count is positive // as count may go negative which means // that there are more than required characters unmatchesCount--; // No complete match, so continue searching if (unmatchesCount > 0) continue; // Decrease window as much as possible by removing // either chars that are not in T or those that // are in T but there are too many of them c = t[currWindowStart]; var contains = charToCount.ContainsKey(c); while (!contains || charToCount[c] < 0) { if (contains) // Return character to P charToCount[c]++; c = t[++currWindowStart]; contains = charToCount.ContainsKey(c); } if (minWindowLength > currWindowEnd - currWindowStart + 1) { minWindowLength = currWindowEnd - currWindowStart + 1; minWindowStart = currWindowStart; } // Remove last char from window - it is definitely in a // window because we stopped at it during decrease phase charToCount[c]++; unmatchesCount++; currWindowStart++; } return minWindowStart > -1 ? t.Substring(minWindowStart, minWindowLength) : String.Empty; }
Every character is examined at most twice (during appending to the end and during compaction) so the whole solution has O(length(T)) time complexity assuming hash table lookup is O(1) operation.
Recently I came across simple yet interesting coding problem. So here is the deal. You are given positive integer N. Print first N ^ 2 positive integers in matrix form in a such a way that within matrix numbers form spiral starting from its center and goring clockwise. For example, for N = 5 matrix to be printed is:
Optimize it for speed and space.
One way you can approach it is to create N x N matrix and fill it with numbers that form spiral and then print whole matrix row by row. But this solution will be of N ^ 2 space complexity. Let’s try to reach O(1) space complexity.
The key observation here is how matrix changes when N changes by 1.
N = 1.
N = 2.
N = 3.
N = 4.
Can you see the pattern here? At every step we extend previous matrix (P) with additional column and row (C). If N is even we extend previous matrix of size N – 1 with right column and bottom row
and with left column and top row if it is odd
This leads us to naturally recursive algorithm. We have three cases:
So basically to print a row we need to know matrix size N and row index. Here goes the solution.
static void Print(int n) { for(int i = 0; i < n; i++) { PrintLine(n, i); Console.WriteLine(); } } static void PrintLine(int n, int i) { // Number of integers in current matrix var n2 = n*n; // Number of itegers in previous matrix of size n - 1 var m2 = (n - 1)*(n - 1); if (n % 2 == 0) { if (i == n - 1) { // n is even and we are at the last row so just // print it for(int k = n2; k > n2 - n; k--) { PrintNum(k); } } else { // Print row from previous matrix of size n - 1 // first and then print value that belongs to current // matrix. Previous matrix is at the top left corner // so no need to adjust row index PrintLine(n - 1, i); // Skip all integers from previous matrix and upper // ones in this columnas integers must form clockwise // spiral PrintNum(m2 + 1 + i); } } else { if (i == 0) { // n is odd and we are at the first row so just // print it for(int k = m2 + n; k <= n2; k++) { PrintNum(k); } } else { // Print value that belongs to current matrix and // then print row from previous matrix of size n - 1 // Skip all integers from previous matric and bottom // ones in this column as integers must form clockwise // spiral PrintNum(m2 + n - i); // Previous matrix is at the bottom right corner so // row index must be reduced by 1 PrintLine(n - 1, i - 1); } } } static void PrintNum(int n) { Console.Write("{0, -4} ", n); }
If stack is not considered then this solution has O(1) space complexity otherwise O(N).
Find all occurrences of a pattern (of length m) in a text (of length n) is quite commonly encountered string matching problem. For example, you hit Ctrl-F in your browser and type string you want to find while browser highlights every occurrence of a typed string on a page.
The naive solution is to at each iteration “shift” pattern along the text by 1 position and check if all characters of a pattern match to corresponding characters in text. This solution has O((n – m + 1)*m) complexity.
If either pattern or text is fixed it can be preprocessed to speed up the search. For example, if pattern is fixed we can use Knuth-Morris-Pratt algorithm to preprocess it in O(m) time and make search of its occurrences complexity O(n).
Fixed text that is queried many times can also be preprocessed to support fast patterns search. One way to do this is to build suffix array. The idea behind it pretty simple. It is basically a list of sorted in lexicographical order suffixes (which starts at some position inside the string and runs till the end of the string) of the subject text. For example, for the “mississippi” string we have the following:
i ippi issippi ississippi mississippi pi ppi sippi sissippi ssippi ssissippi
However due to strings immutability in .NET it is not practical to represent each suffix as separate string as it requires O(n^2) space. So instead starting positions of suffixes will be sorted. But why suffixes are selected in the first place? Because searching for every occurrence of a pattern is basically searching for every suffix that starts with the pattern.
Once they are sorted we can use binary search to find lower and upper bounds that enclose all suffixes that start with the pattern. Comparison of a suffix with a pattern during binary search should take into account only m (length of the pattern) characters as we are looking for suffixes that start with the pattern.
// Suffix array represents simple text indexing mechanism. public class SuffixArray : IEnumerable<int> { private const int c_lower = 0; private const int c_upper = -1; private readonly string m_text; private readonly int[] m_pos; private readonly int m_lower; private readonly int m_upper; SuffixArray(string text, int[] pos, int lower, int upper) { m_text = text; m_pos = pos; // Inclusive lower and upper boundaries define search range. m_lower = lower; m_upper = upper; } public static SuffixArray Build(string text) { Contract.Requires<ArgumentException>(!String.IsNullOrEmpty(text)); var length = text.Length; // Sort starting positions of suffixes in lexicographical // order. var pos = Enumerable.Range(0, length).ToArray(); Array.Sort(pos, (x, y) => String.Compare(text, x, text, y, length)); // By default all suffixes are in search range. return new SuffixArray(text, pos, 0, text.Length - 1); } public SuffixArray Search(string str) { Contract.Requires<ArgumentException>(!String.IsNullOrEmpty(str)); // Search range is empty so nothing to narrow. if (m_lower > m_upper) return this; // Otherwise search for boundaries that enclose all // suffixes that start with supplied string. var lower = Search(str, c_lower); var upper = Search(str, c_upper); // Once precomputed sorted suffixes positions don't change // but the boundaries do so that next refinement // can be done within smaller range and thus faster. // For example, you may narrow search range to suffixes // that start with "ab" and then search within this smaller // search range suffixes that start with "abc". return new SuffixArray(m_text, m_pos, lower + 1, upper); } public IEnumerator<int> GetEnumerator() { // Enumerates starting positions of suffixes that fall // into search range. for (var i = m_lower; i <= m_upper; i++) yield return m_pos[i]; } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } private int Compare(string w, int i) { // Comparison takes into account maximum length(w) // characters. For example, strings "ab" and "abc" // are thus considered equal. return String.Compare(w, 0, m_text, m_pos[i], w.Length); } private int Search(string w, int bound) { // Depending on bound value binary search results // in either lower or upper boundary. int x = m_lower - 1, y = m_upper + 1; if (Compare(w, m_lower) < 0) return x; if (Compare(w, m_upper) > 0) return y; while (y - x > 1) { var m = (x + y)/2; // If bound equals to 0 left boundary andvances to median // only // if subject is strictly greater than median and // thus search results in lower bound (position that // preceeds first suffix equal to or greater than // subject w). Otherwise search results in upper bound // (position that preceeds fisrt suffix that is greater // than subject). if (Compare(w, m) > bound) x = m; else y = m; } return x; } }
This implementation is simple (it has O(n^2 log n) complexity to sort and O(m log n) to search where n stands for text length and m for pattern length) and can be improved. It doesn’t take into account the fact that suffixes not arbitrary strings are sorted. On the other hand suffixes may share common prefixes and that may be used to speed up binary search.
Here an example of narrowing the search.
var str = ...; var sa = SuffixArray.Build(str); string pat; while ((pat = Console.ReadLine()) != String.Empty) { sa = sa.Search(pat); foreach (var pos in sa) { Console.WriteLine(str.Substring(pos)); } }
Happy Easter, folks!
There are cases when you need to select a number of best (according to some definition) elements out of finite sequence (list). For example, select 10 most popular baby names in a particular year or select 10 biggest files on your hard drive.
While selecting single minimum or maximum element can easily be done iteratively in O(n) selecting k smallest or largest elements (k smallest for short) is not that simple.
It makes sense to take advantage of sequences APIs composability. We’ll design an extension method with the signature defined below:
public static IEnumerable<TSource> TakeSmallest<TSource>( this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)
The name originates from the fact that selecting k smallest elements can logically be expressed in terms of Enumerable.TakeWhile supplying predicate that returns true if an element is one of the k smallest. As the logical predicate is not changing only count do it is burned into method’s name (instead of “While” that represents changing predicate we have “Smallest”).
Now let’s find the solution.
If the whole list is sorted first k elements is what we are looking for.
public static IEnumerable<TSource> TakeSmallest<TSource>( this IEnumerable<TSource> source, int count, IComparer<TSource> comparer) { return source.OrderBy(x => x, comparer).Take(count); }
It is O(n log n) solution where n is the number of elements in the source sequence. We can do better.
Priority queue yields better performance characteristics if only subset of sorted sequence is required.
public static IEnumerable<TSource> TakeSmallest<TSource>( this IEnumerable<TSource> source, int count, IComparer<TSource> comparer) { var queue = new PriorityQueue<TSource>(source, comparer); while (count > 0 && queue.Count > 0) { yield return queue.Dequeue(); count--; } }
It requires O(n) to build priority queue based on binary min heap and O(k log n) to retrieve first k elements. Better but we’ll improve more.
Quicksort algorithm picks pivot element, reorders elements such that the ones less than pivot go before it while greater elements go after it (equal can go either way). After that pivot is in its final position. Then both partitions are sorted recursively making whole sequence sorted. In order to prevent worst case scenario pivot selection can be randomized.
Basically we are interested in the k smallest elements themselves and not the ordering relation between them. Assuming partitioning just completed let’s denote set of elements that are before pivot (including pivot itself) by L and set of elements that are after pivot by H. According to partition definition L contains |L| (where |X| denotes number of elements in a set X) smallest elements. If |L| is equal to k we are done. If it is less than k than look for k smallest elements in L. Otherwise as we already have |L| smallest elements look for k - |L| smallest elements in H.
public static IEnumerable<TSource> TakeSmallest<TSource>( this IEnumerable<TSource> source, int count) { return TakeSmallest(source, count, Comparer<TSource>.Default); } public static IEnumerable<TSource> TakeSmallest<TSource>( this IEnumerable<TSource> source, int count, IComparer<TSource> comparer) { Contract.Requires<ArgumentNullException>(source != null); // Sieve handles situation when count >= source.Count() Contract.Requires<ArgumentOutOfRangeException>(count > 0); Contract.Requires<ArgumentNullException>(comparer != null); return new Sieve<TSource>(source, count, comparer); } class Sieve<T> : IEnumerable<T> { private readonly IEnumerable<T> m_source; private readonly IComparer<T> m_comparer; private readonly int m_count; private readonly Random m_random; public Sieve(IEnumerable<T> source, int count, IComparer<T> comparer) { m_source = source; m_count = count; m_comparer = comparer; m_random = new Random(); } public IEnumerator<T> GetEnumerator() { var col = m_source as ICollection<T>; if (col != null && m_count >= col.Count) { // There is not point in copying data return m_source.GetEnumerator(); } var buf = m_source.ToArray(); if (m_count >= buf.Length) { // Buffer already contains exact amount elements return buf.AsEnumerable().GetEnumerator(); } // Find the solution return GetEnumerator(buf); } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } private IEnumerator<T> GetEnumerator(T[] buf) { var n = buf.Length; var k = m_count; // After rearrange is completed fist k // items are the smallest elements Rearrange(buf, 0, n - 1, k); for (int i = 0; i < k; i++) { yield return buf[i]; } } private void Rearrange(T[] buf, int l, int u, int k) { if (l == u) { return; } // Partition elements around randomly selected pivot var q = RandomizedPartition(buf, l, u); // Compute size of low partition (includes pivot) var s = q - l + 1; // We are done as low partition is what we were looking for if (k == s) { return; } if (k < s) { // Smallest elements group is less than low partition // find it there Rearrange(buf, l, q - 1, k); } else { // Low partition is in smallest elements group, find the // rest in high partition Rearrange(buf, q + 1, u, k - s); } } private int RandomizedPartition(T[] buf, int l, int u) { // Select pivot randomly and swap it with the last element // to prevent worst case scenario where pivot is the // largest remaining element Swap(buf, m_random.Next(l, u + 1), u); // Divides elements into two partitions: // - Low partition where elements that are less than pivot // and pivot itself // - High partition contains the rest var k = l; for (var i = l; i < u; i++) { if (m_comparer.Compare(buf[i], buf[u]) < 0) { Swap(buf, k++, i); } } // Put pivot into its final location Swap(buf, k, u); return k; } private static void Swap(T[] a, int i, int j) { var tmp = a[i]; a[i] = a[j]; a[j] = tmp; } }
The solution is expected O(n) which means quit good performance in practice. Let’s run the thing.
const int count = 100; const int max = 100; var rnd = new Random(); var seq = Enumerable.Range(0, count).Select(_ => rnd.Next(max)).ToArray(); Func<int, int> i = x => x; for(var k = 1; k < count / 2; k++) { var a = seq.TakeSmallest(k).OrderBy(i); var b = seq.OrderBy(i).Take(k); Debug.Assert(a.SequenceEqual(b)); }
Enjoy!
The classic merge (the one used in Merge Sort) takes as input some sorted lists and at each step outputs element with next smallest key thus producing sorted list that contains all the elements of the input lists.
An instance of a list is a computer representation of the mathematical concept of a finite sequence, that is, a tuple.
It is not always practical to have whole sequence in memory because of its considerable size nor to constraint sequence to be finite as only K first elements may be needed. Thus our algorithm must produce monotonically increasing (according to some comparison logic) potentially infinite sequence.
Two-way merge is the simplest variation where two lists are merged (it is named OrderedMerge to avoid confusion with EnumerableEx.Merge).
public static IEnumerable<T> OrderedMerge<T>( this IEnumerable<T> first, IEnumerable<T> second, IComparer<T> comparer) { using (var e1 = first.GetEnumerator()) { using (var e2 = second.GetEnumerator()) { var c1 = e1.MoveNext(); var c2 = e2.MoveNext(); while (c1 && c2) { if (comparer.Compare(e1.Current, e2.Current) < 0) { yield return e1.Current; c1 = e1.MoveNext(); } else { yield return e2.Current; c2 = e2.MoveNext(); } } if (c1 || c2) { var e = c1 ? e1 : e2; do { yield return e.Current; } while (e.MoveNext()); } } } }
This algorithm runs in O(n) where n is a number of merged elements (as we may not measure it by the number of elements in sequences because of their potential infinity).
N-way merge is a more general algorithm that allows to merge N monotonically increasing sequences into one. It is used in external sorting. Here the most general overload signature (others are omitted as you can easily create them).
public static IEnumerable<T> OrderedMerge<T>( this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer)
Naive implementation will take advantage of existing two-way merge and composition.
public static IEnumerable<T> NaiveOrderedMerge<T>( this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer) { return sources.Aggregate((seed, curr) => seed.OrderedMerge(curr, comparer)); }
Lets denote merging two sequences Si and Sj where i != j and both within [0, m) (m – is the number of sequences) with (Si, Sj). Then what the code above does is (((S0, S1), S2), … Sm). This implementation is naive because fetching next smallest element takes O(m) making total running time to O(nm). We can do better than that.
Recall that in my previous post we implemented priority queue based on binary heap that allows to get next smallest element in O(log n) where n is the size of the queue. Here is a solution sketch:
Thus we will have queue of size that doesn’t exceed m (number of sequences) and thus making total running time O(n log m).
Other interesting aspect resource management. Each sequence has associated resources that needs to be released once merged sequence is terminated normally or abnormally. Number of sequences is not known in advance. We will use solution that is described in my previous post Disposing sequence of resources.
Now let’s do it.
// Convenience overloads are not included only most general one public static IEnumerable<T> OrderedMerge<T>( this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer) { // Make sure sequence of ordered sequences is not null Contract.Requires<ArgumentNullException>(sources != null); // and it doesn't contain nulls Contract.Requires(Contract.ForAll(sources, s => s != null)); Contract.Requires<ArgumentNullException>(comparer != null); // Precondition checking is done outside of iterator because // of its lazy nature return OrderedMergeHelper(sources, comparer); } private static IEnumerable<T> OrderedMergeHelper<T>( IEnumerable<IEnumerable<T>> sources, IComparer<T> elementComparer) { // Each sequence is expected to be ordered according to // the same comparison logic as elementComparer provides var enumerators = sources.Select(e => e.GetEnumerator()); // Disposing sequence of lazily acquired resources as // a single resource using (var disposableEnumerators = enumerators.AsDisposable()) { // The code below holds the following loop invariant: // - Priority queue contains enumerators that positioned at // sequence element // - The queue at the top has enumerator that positioned at // the smallest element of the remaining elements of all // sequences // Ensures that only non empty sequences participate in merge var nonEmpty = disposableEnumerators.Where(e => e.MoveNext()); // Current value of enumerator is its priority var comparer = new EnumeratorComparer<T>(elementComparer); // Use priority queue to get enumerator with smallest // priority (current value) var queue = new PriorityQueue<IEnumerator<T>>(nonEmpty, comparer); // The queue is empty when all sequences are empty while (queue.Count > 0) { // Dequeue enumerator that positioned at element that // is next in the merged sequence var min = queue.Dequeue(); yield return min.Current; // Advance enumerator to next value if (min.MoveNext()) { // If it has value that can be merged into resulting // sequence put it into the queue queue.Enqueue(min); } } } } // Provides comparison functionality for enumerators private class EnumeratorComparer<T> : Comparer<IEnumerator<T>> { private readonly IComparer<T> m_comparer; public EnumeratorComparer(IComparer<T> comparer) { m_comparer = comparer; } public override int Compare( IEnumerator<T> x, IEnumerator<T> y) { return m_comparer.Compare(x.Current, y.Current); } }
It works well with infinite sequences and cases where we need only K first elements and it fetches only bare minimum out of source sequences.
Run the thing.
// Function that generates sequence of length k of random numbers Func<int, IEnumerable<int>> gen = k => Enumerable.Range(0, k) .Select(l => rnd.Next(max)); // Generate sequence of random lengths and each length project // to a sequence of that length of random numbers var seqs = gen(count).Select(k => gen(k) .OrderBy(l => l).AsEnumerable()); var p = -1; foreach (var c in seqs.OrderedMerge(Comparer<int>.Default)) { Debug.Assert(p <= c); Console.WriteLine(c); p = c; }
With Reactive Extensions for .NET (Rx) and .NET Framework 4 a new LINQ operator was introduced – Zip (Bart De Smet gives excellent explanation about the idea and implementation details behind Zip operator). In a nutshell it merges two sequences into one sequence by using the selector function.
int[] numbers = { 1, 2, 3, 4 }; string[] words = { "one", "two", "three" }; numbers.Zip(words, (first, second) => first + " " + second) .Run(Console.WriteLine); // 1 one // 2 two // 3 three
While it seems pretty simple and clear there is a subtlety in its behavior. Zip is implemented something like this (omitting arguments checking):
public static IEnumerable<TResult> Zip<TFirst, TSecond, TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector) { using (IEnumerator<TFirst> fe = first.GetEnumerator()) { using (IEnumerator<TSecond> se = second.GetEnumerator()) { // What if call to fe.MoveNext() throws an exception // returns false or never returns? while (fe.MoveNext() && se.MoveNext()) { yield return resultSelector(fe.Current, se.Current); } } } }
A call to MoveNext on first sequence’s enumerator may prevent from calling MoveNext on second sequence’s enumerator because of exception thrown, terminated or stuck sequence (it is explained in great details in “Zip’em together” section of More LINQ with System.Interactive – More combinators for your Swiss Army Knife and this is where this post topic came from).
As an exercise we will implement Zip operator that could fetch results from both sources simultaneously, combining their results or exceptions into produced results.
Here is a solution sketch:
As we expect that zipping may be done on infinite sequences or fetching next item from a sequence may take infinitely long time or even consumer may wait indefinitely long before fetching next zipped item explicitly spawned threads are used to avoid seizing thread pool threads for indefinitely long time.
For now notification problems do not take into account abnormal notifications (we’ll take a look at it below).
Task Parallel Library provides for both waiting problems very convenient primitives:
Coordinator needs to track remaining work to be completed and be awaken when it is done. CountdownEvent starts with the number of workers (in this case 2) and reset back to this value just before allowing workers to proceed to next iteration.
Things a little bit more trickier with workers wait problem. Barrier allows to setup number of participants. If we will setup number of participants equal to number of worker then once they all arrive at the barrier they will proceed to next iteration. But this not what we want as workers must not proceed without coordinator’s permission. In that case we’ll make him a participant as well. Coordinator will arrive at the barrier once next zipped value (next iteration) is requested. After every one pass the barrier it automatically goes to its initial state so no explicit reset is required.
But what to do in case of normal or abnormal termination of either sequences. Zip sequence must be terminated as well and we cannot wait for the other sequence to fetch next value or terminate because it may take indefinitely long time. Fortunately both primitives (CountdownEvent and Barrier) support new .NET 4 Cancellation Framework and in particular:
Coordinator waits on the CountdownEvent instance with a CancellationToken instance that workers can use to propagate cancellation thus notifying about termination.
Workers wait on the Barrier instance with CancellationToken instance that coordinator can use to notify about outer sequence (zip sequence) termination.
The last thing is how to deal uniformly with values, exceptions and normal sequence termination. Hopefully Reactive Extensions for .NET (Rx) got the right tool for that:
Basically value, exception and termination are represented by a type derived from Notification<T> where T is a sequence element type.
Let’s now put everything together.
public static class ZipEx { public static IEnumerable<TResult> RightHandZip<TFirst, TSecond, TResult>(this IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector) { Contract.Requires(first != null); Contract.Requires(second != null); Contract.Requires(resultSelector != null); return new Zipper<TFirst, TSecond>().Zip(first, second, resultSelector); } class Zipper<TFirst, TSecond> { private const int c_sourceCount = 2; private int m_workersCount = c_sourceCount; private CountdownEvent m_zipEvent; private CancellationTokenSource m_zipCancellation; private Barrier m_srcBarrier; private CancellationTokenSource m_srcCancellation; private volatile Notification<TFirst> m_first; private volatile Notification<TSecond> m_second; public IEnumerable<TResult> Zip<TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector) { // Coordinator tracks remaining work through // count down event m_zipEvent = new CountdownEvent(c_sourceCount); // Workers may use this cancellation token source // to awake coordinator in case of ternimation of // either sequence m_zipCancellation = new CancellationTokenSource(); // Here we basically state that coordinator also // barrier participant m_srcBarrier = new Barrier(c_sourceCount + 1); // Coordinator may use this cancellation token source // to awake workers in case of outer sequence (zip) // termination m_srcCancellation = new CancellationTokenSource(); // Spawn workers that will fetch results from both // sequences simultaneously RunWorker(first, n => { m_first = n; }); RunWorker(second, n => { m_second = n; }); var token = m_zipCancellation.Token; while (true) { try { // Wait until either all workers fetched next // values or any worker notified on completition // (either due to exception or sequence completition) m_zipEvent.Wait(token); } catch (OperationCanceledException) { // Notify workers that zip sequence is terminated m_srcCancellation.Cancel(); // If zip sequence is terminated due to exception(s) // throw either AggregateException if both sequences // resulted in exception or the exception that // terminated either of them ThrowIfError(); // Otherwise sequence is terminated due to one of // the sequences completion yield break; } // Reset count down event for the next round m_zipEvent.Reset(c_sourceCount); // Yield next zipped value yield return resultSelector(m_first.Current, m_second.Current); // Only once consumer asks for the next value we allow // workers to attempt to fetch next value // Zipper is a barrier participant that arrives at the // barrier once next zipped is requested m_srcBarrier.SignalAndWait(); } } private void RunWorker<T>(IEnumerable<T> enumerable, Action<Notification<T>> update) { // In order to fetch results from sequences we will use // manually spawned threads as we do not want to seize // ThreadPool threads for indefinite time. new Thread(() => { var token = m_srcCancellation.Token; foreach (var notification in enumerable.Materialize()) { update(notification); if (notification.Kind == NotificationKind.OnNext) { // Notify on sucessfully fetched value out // of sequence m_zipEvent.Signal(); } else { // Either sequence completition or error // notifications terminate zipped sequence m_zipCancellation.Cancel(); } try { // Wait until next zipped value is requested or // zip sequence is terminated m_srcBarrier.SignalAndWait(token); } catch (OperationCanceledException) { // Last alive worker is responsible for resources // disposal DisposeIfLast(); return; } } }) {IsBackground = true}.Start(); } private void DisposeIfLast() { if (Interlocked.Decrement(ref m_workersCount) == 0) { DisposeAndNullify(ref m_zipEvent); DisposeAndNullify(ref m_srcBarrier); DisposeAndNullify(ref m_zipCancellation); DisposeAndNullify(ref m_srcCancellation); } } private void ThrowIfError() { var ex1 = ExtractErrorOrNothing(m_first); var ex2 = ExtractErrorOrNothing(m_second); if (ex1 != null && ex2 != null) { throw new AggregateException(ex1, ex2); } ThrowIfNotNull(ex1); ThrowIfNotNull(ex2); } private static Exception ExtractErrorOrNothing<T>(Notification<T> n) { if (n != null && n.Kind == NotificationKind.OnError) { return ((Notification<T>.OnError) n).Exception; } return null; } private static void ThrowIfNotNull(Exception ex) { if (ex != null) { throw ex; } } private static void DisposeAndNullify<T>(ref T res) where T : class, IDisposable { if (res != null) { res.Dispose(); res = null; } } } }
Let’s setup examples infrastructure
static IEnumerable<int> Infinite(string id) { Console.Write("{0} -> X ", id); while(true) {} yield break; } static IEnumerable<int> Seq(string id, int start, int count) { return Enumerable.Range(0, count).Select(x => x * 2 + start) .Do(x => Console.Write("{0} -> {1} ", id, x)); } static IEnumerable<int> Abnormal(string id) { return EnumerableEx.Throw<int>(new Exception()) .Finally(() => Console.Write("{0} -> E ", id)); } static void RunTest(IEnumerable<int> first, IEnumerable<int> second) { Func<int, int, string> format = (f, s) => String.Format("[{0}, {1}] ", f, s); try { first.RightHandZip(second, format).Run(Console.Write); } catch (Exception ex) { Console.Write(ex.GetType()); } Console.WriteLine(); }
and run some examples
const int count = 3; // Both sequences terminate RunTest(Seq("F", 0, count), Seq("S", 1, count)); // Terminates normally // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] // First normally terminates and second continues RunTest(Seq("F", 0, count), Seq("S", 1, count + 1)); // Terminates normally // F -> 0 S -> 1 [0, 1] S -> 3 F -> 2 [2, 3] F -> 4 S -> 5 [4, 5] S -> 7 // First abnormally terminates and second continues RunTest(Seq("F", 0, count).Concat(Abnormal("F")), Seq("S", 1, count + 1)); // Terminates abnormally // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] S -> 5 F -> 4 [4, 5] S -> 7 F -> E System.Exception // Both terminate abnormally RunTest(Seq("F", 0, count).Concat(Abnormal("F")), Seq("S", 1, count).Concat(Abnormal("S"))); // Terminates abnormally // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] S -> 5 F -> 4 [4, 5] F -> E S -> E System.AggregateException // First stucks and second terminates abnormally RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count).Concat(Abnormal("S"))); // Terminates abnormally // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] S -> E F -> X System.Exception // First stucks and second terminates normally RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count)); // Terminates normally // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] F -> X // First stucks and second continues RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count + 1)); // Stucks // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] F -> X S -> 7
That’s it.
Design of container that supports items ordering raises lots of interesting design questions to consider. To be more concrete we will design simple priority queue based on binary min heap that supports the following operations:
Good design that solves wrong problems isn’t better than the bad one. So the first step is to identify right problems to solve. Priority queue maintains set of items with associated key (priority). Items get off the queue based on employed ordering mechanism for keys (priorities). Basically the two problems we need to solve (from API design perspective) are the ways to represent:
Association can be either explicit (PriorityQueue<TItem, TKey>, where key type is explicitly stated) or implicit (PriorityQueue<TItem>, where key type is of no interest). Each type parameter must have concrete consumers. Priority queue itself doesn’t care (although priority queues with updateable priority do) about keys but rather about comparing keys. Client code cannot benefit from explicit keys as well because it can easily access associated key as the client code defines what key actually means. So there is no point in cluttering API with irrelevant details (of what keys really are). Thus we will use PriorityQueue<T> (as now we have the only type parameter we will use short name for it) and let consumers provide comparison logic of two items based on whatever consumer defines as keys.
There are several options to represent comparison mechanism.
Item type may be constrained to support comparison through generic type parameter constraint:
class PriorityQueue<T> where T : IComparable<T> { }
Though this approach benefits from clearly stated comparison mechanism it implies significant limitations:
With those limitations in mind we can use comparers – something that knows how to compare two objects:
Comparers are designed for particular usage scenarios and single instance corresponds to items container. Thus limitation mentioned above are not applied to comparers.
Taking into account value of .NET Framework support for IComparer<T> and that it is easy to create wrapper that derives from Comparer<T> and delegates comparison to aggregated function we will use IComparer<T> approach (although it seems costless to add also support for Func<T, T, int> mechanism and create wrapper ourselves in most cases it is best to avoid providing means to do the same thing in multiple ways or otherwise potential confusion may outweigh benefits).
// Unbounded priority queue based on binary min heap public class PriorityQueue<T> { private const int c_initialCapacity = 4; private readonly IComparer<T> m_comparer; private T[] m_items; private int m_count; public PriorityQueue() : this(Comparer<T>.Default) { } public PriorityQueue(IComparer<T> comparer) : this(comparer, c_initialCapacity) { } public PriorityQueue(IComparer<T> comparer, int capacity) { Contract.Requires<ArgumentOutOfRangeException>(capacity >= 0); Contract.Requires<ArgumentNullException>(comparer != null); m_comparer = comparer; m_items = new T[capacity]; } public PriorityQueue(IEnumerable<T> source) : this(source, Comparer<T>.Default) { } public PriorityQueue(IEnumerable<T> source, IComparer<T> comparer) { Contract.Requires<ArgumentNullException>(source != null); Contract.Requires<ArgumentNullException>(comparer != null); m_comparer = comparer; // In most cases queue that is created out of sequence // of items will be emptied step by step rather than // new items added and thus initially the queue is // not expanded but rather left full m_items = source.ToArray(); m_count = m_items.Length; // Restore heap order FixWhole(); } public int Capacity { get { return m_items.Length; } } public int Count { get { return m_count; } } public void Enqueue(T e) { m_items[m_count++] = e; // Restore heap if it was broken FixUp(m_count - 1); // Once items count reaches half of the queue capacity // it is doubled if (m_count >= m_items.Length/2) { Expand(m_items.Length*2); } } public T Dequeue() { Contract.Requires<InvalidOperationException>(m_count > 0); var e = m_items[0]; m_items[0] = m_items[--m_count]; // Restore heap if it was broken FixDown(0); // Once items count reaches one eighth of the queue // capacity it is reduced to half so that items // still occupy one fourth (if it is reduced when // count reaches one fourth after reduce items will // occupy half of queue capacity and next enqueued item // will require queue expand) if (m_count <= m_items.Length/8) { Expand(m_items.Length/2); } return e; } public T Peek() { Contract.Requires<InvalidOperationException>(m_count > 0); return m_items[0]; } private void FixWhole() { // Using bottom-up heap construction method enforce // heap property for (int k = m_items.Length/2 - 1; k >= 0; k--) { FixDown(k); } } private void FixUp(int i) { // Make sure that starting with i-th node up to the root // the tree satisfies the heap property: if B is a child // node of A, then key(A) ≤ key(B) for (int c = i, p = Parent(c); c > 0; c = p, p = Parent(p)) { if (Compare(m_items[p], m_items[c]) < 0) { break; } Swap(m_items, c, p); } } private void FixDown(int i) { // Make sure that starting with i-th node down to the leaf // the tree satisfies the heap property: if B is a child // node of A, then key(A) ≤ key(B) for (int p = i, c = FirstChild(p); c < m_count; p = c, c = FirstChild(c)) { if (c + 1 < m_count && Compare(m_items[c + 1], m_items[c]) < 0) { c++; } if (Compare(m_items[p], m_items[c]) < 0) { break; } Swap(m_items, p, c); } } private static int Parent(int i) { return (i - 1)/2; } private static int FirstChild(int i) { return i*2 + 1; } private int Compare(T a, T b) { return m_comparer.Compare(a, b); } private void Expand(int capacity) { Array.Resize(ref m_items, capacity); } private static void Swap(T[] arr, int i, int j) { var t = arr[i]; arr[i] = arr[j]; arr[j] = t; } }
Example below prints top 200 elements from sequence of mscorlib types ordered by full name (sorting it first and than taking first 200 elements is less efficient).
class TypeNameComparer : Comparer<Type> { public override int Compare(Type x, Type y) { Contract.Requires(x != null); Contract.Requires(y != null); return x.FullName.CompareTo(y.FullName); } } ... const int count = 200; var types = typeof (object).Assembly.GetTypes(); var typesQueue = new PriorityQueue<Type>(types, new TypeNameComparer()); for (int i = 0; i < count && typesQueue.Count > 0; i++) { Console.WriteLine(typesQueue.Dequeue()); }
Looking at things from different perspectives allows to understand them better. On the other hand mind bending practice improves your ability to find solutions.
Previously we were Disposing sequence of resources with Reactive Extensions. This time we will build FIFO (first in, first out) collection based on single LIFO (last in, first out) collection with no additional explicit storage.
It is not that insane as it looks. Assume that items come out of stack in the order they must appear in the queue (FIFO). Choosing the opposite order is also possible however is not practical (see below). To make it happen we simply need to make sure that items in the stack (LIFO) are placed in the opposite order. Items queued first must appear at the top of the stack. This basically means that in order to queue item all items must be popped, the item pushed and then existent items pushed inversely to pop order. But we have no additional explicit storage requirement. Then store items implicitly through recursion.
public class StackBasedQueue<T> : IEnumerable<T> { private readonly Stack<T> m_items; public StackBasedQueue() : this(Enumerable.Empty<T>()) { } public StackBasedQueue(IEnumerable<T> items) { // Items must be reversed as we want first // item to appear on top of stack m_items = new Stack<T>(items.Reverse()); } public int Count { get { return m_items.Count; } } public void Enqueue(T item) { // If stack is empty then simply push item // as it will be the first and the last item // in the queue if (m_items.Count == 0) { m_items.Push(item); return; } // The item must be placed at the bottom of the stack // To do this existent items must be popped, the item // pushed and then existent items pushed inversely to // pop order var tmp = m_items.Pop(); Enqueue(item); m_items.Push(tmp); } public T Dequeue() { ThrowIfEmpy(); // If stack is not empty item on top of it // is next to be dequeued or peeked return m_items.Pop(); } public T Peek() { ThrowIfEmpy(); return m_items.Peek(); } public IEnumerator<T> GetEnumerator() { // As items queued first must appear at the top of the // stack we can enumerate items directly return m_items.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } private void ThrowIfEmpy() { if (Count == 0) { throw new InvalidOperationException("The queue is empty."); } } }
Enqueue is a O(n) operation (where n is the number items in the stack). Dequeue and Peek is a O(1) operation. Enumerating through all items is a O(n) operation. Choosing the opposite order will make enumerating through all items O(n^2) operation which is not practical.
It is just an exercise so it must not be used in real world scenarios (otherwise at some point queue size may become big enough so that next attempt to enqueue an item will result in StackOverflowException) but standard Queue<T> instead.
Recall my previous post on Disposing sequence of resources where we were solving imperatively the following problems:
Now with Reactive Extensions for .NET (Rx) is out we will do it in more LINQ-ish manner with the help of interactive features of Reactive Extensions:
A great explanation of how EnumerableEx.Publish works (disregard naming difference) is given in Taming Your Sequence’s Side-Effects Through IEnumerable.Let. The following example illustrates the point.
static Random m_seeder = new Random(); static IEnumerable<int> GenerateRandomSequence(int count) { var rnd = new Random(m_seeder.Next()); for(int i = 0; i < count; i++) { yield return rnd.Next(count); } } ... const int count = 5; var xs = GenerateRandomSequence(count); // Each we iterate xs we may get a different sequence var equals = xs.SequenceEqual(xs); // However it can be solved through memoization xs.Publish(seq => { // Every time we iterate seq we get the same // sequence equals = seq.SequenceEqual(seq); return seq; });
EnumerableEx.Share makes sure that any iteration is made with respect to the same cursor.
var xs = Enumerable.Range(0, count); // Prints every sequence element to console // Without sharing for each of count iterations it will print // first element of a potentially different sequence (recall // random sequence example) var shared = xs.Share(); for(int i = 0; i < count; i++) { shared.Take(1).Run(Console.WriteLine); }
EnumerableEx.Finally does exactly what its description says (see more details here).
static IEnumerable<int> GenerateThrowingSequence(int count) { for(int i = 0; i < count; i++) { if (i > 0 && i % 3 == 0) { throw new Exception(); } yield return i; } } ... // Prints 0, 1, 2, Finally, Caught try { GenerateThrowingSequence(count).Finally(() => Console.WriteLine("Finally")) .Run(Console.WriteLine); } catch (Exception) { Console.WriteLine("Caught"); } // Prints 0, 1, Finally GenerateThrowingSequence(count).Finally(() => Console.WriteLine("Finally")) .Take(2).Run(Console.WriteLine);
Now putting everything together. Publish will help us to defer resources allocation and avoid unnecessary allocations. Share and Finally will take care of disposal.
static class Disposables { // Disposes projected resources once they are no longer needed public static void Using<TSource, TResource>( // Source sequence projected to disposable resources this IEnumerable<TSource> source, // Resource projection Func<TSource, TResource> resourceSelector, // Resources usage action Action<IEnumerable<TResource>> resourcesUsage) where TResource : IDisposable { var rcount = 0; source // At this point resources are not created but // only projection is set .Select( s => { // As we do not want to unnecessarily create // and then immediately dispose potentially expensive // resources we will count created resources // and later dispose only used ones rcount++; return resourceSelector(s); }) .Publish( rs => { // During sequence iteration resources will be created // However not all resources may be iterated through or // an exception may be thrown in the middle and thus // not all resources may be created (therefore not // disposed) try { // Supplied resources sequence can be iterated // multiple times with each of side effects occurs // only once and sequence elements memoized and // reused during each iteration resourcesUsage(rs); return Enumerable.Empty<TResource>(); } finally { // We must dispose only those resources we used // (counted and memoized above during first // iteration) rs = rs.Take(rcount) // Disposing resources must be done in the opposite // order to preserve nested try{}finally{} blocks // semantics .Reverse().Do(r => { rcount--; r.Dispose(); }) // Once resource is disposed it must not be // iterated again and this what Share takes // care of .Share(); Action final = null; final = () => { // Stop once every resource was given // a chance to dispose as Finally is // called even on empty sequences and // otherwise it leads to stack overflow if (rcount > 0) { // Dispose only used resources and // leave untouched the rest rs.Finally(final).Run(); } }; final(); } }) // Evaluate the sequence (triggers resources usage) .Run(); } }
Usage example below illustrates situation where during resource disposal an exception is thrown. In this case we must give chance to preceding (from resource sequence order perspective) resource to be disposed. However if an exception is thrown while disposing preceding resources that exception will hide previous one.
// Fake full of side effects resource =) class Resource : IDisposable { private readonly int m_i; public Resource(int i) { m_i = i; Console.WriteLine("Created {0}", m_i); } public void Use() { Console.WriteLine("Using {0}", m_i); } public void Dispose() { Console.WriteLine("Disposed {0}", m_i); // Simulate resource disposal that results in exception if (m_i % 2 == 1) { throw new Exception(m_i.ToString()); } } } ... try { Enumerable.Range(0, 5) .Using(i => new Resource(i), rs => { // First resources 0, 1 and 2 are created // and used rs.Take(3).Run(r => r.Use()); // then already created resource 2 is used // and resource 3 is created and used rs.Skip(1).Take(3).Run(r => r.Use()); }); } catch (Exception ex) { // As resources are disposed in the opposite order // the latest exception is propagated Console.WriteLine("Exception {0}", ex.Message); }
This produces the following output:
Created 0 // iterating, if not iterated previously resource is created Using 0 Created 1 Using 1 Created 2 Using 2 Using 1 // otherwise reused Using 2 // reused again Created 3 // wasn’t iterated previously, created Using 3 Disposed 3 // disposing in the opposite order, throws exception Disposed 2 // still disposing continues Disposed 1 // throws exception that hides exception thrown earlier Disposed 0 // disposing continues Exception 1 // exception is caught
That’s it! Hopefully you’ve enjoyed.
I hope we’ll meet next year. Happy New Year!
The idea behind Chain of Responsibility pattern is quite simple and powerful:
Avoid coupling the sender of a request to its receiver by giving more than one object a chance to handle the request. Chain the receiving objects and pass the request along the chain until an object handles it.
You can find lots of object oriented implementations out there so as an exercise we will rather try to do it in a more functional way. For simplicity Func<T, R> will be considered as handler contract. The basic idea looks like this:
Func<T, R> h = t => { // Decide whether you can handle request bool canHandle = ...; // Get successor from somewhere Func<T, R> successor = ...; if (canHandle) // Handle request represented by t else // Delegate request to successor return successor(t); };
The first thing to solve is how to get successor. As handler must support composition it cannot simply create closure over successor. On the other hand it can be represented as function that returns actual handler closed over its successor:
Func<Func<T, R>, Func<T, R>> h = successor => t => { bool canHandle = ...; if (canHandle) // Handle request represented by t else // Delegate request to closed over successor return successor(t); };
Now we need to compose handlers into a chain.
// Creates chain of responsibility out of handlers static Func<T, R> Chain<T, R>(IEnumerable<Func<Func<T, R>, Func<T, R>>> handlers) { // By default if none of handlers can handle incoming request an exception is thrown Func<T, R> notSupported = t => { throw new NotSupportedException(); }; return Chain(handlers, notSupported); } // Creates chain of responsibility out of regular and default handlers static Func<T, R> Chain<T, R>(IEnumerable<Func<Func<T, R>, Func<T, R>>> handlers, Func<T, R> def) { // Assuming that order of handlers within the chains must be the same as in handlers sequence return handlers // Handlers needs to be reversed first or otherwise they will appear in the opposite order .Reverse() // Iteratively close each handler over its successor .Aggregate(def, (a, f) => f(a)); }
To make it more clear lets expand chaining of simple two handlers case:
// default handler Func<int, int> d = x => x; // two handlers appear in sequence in order of declaration Func<Func<int, int>, Func<int, int>> h1 = s => t => t < 10 ? t*2 : s(t); Func<Func<int, int>, Func<int, int>> h2 = s => t => t < 5 ? t + 3 : s(t); // 1. Reverse handlers // h2 // h1 // 2. Close h2 over d // tmp1 = t => t < 10 ? t * 2 : d(t); Func<int, int> tmp1 = h2(d); // 3. Close h1 over tmp1 // tmp2 = t => t < 5 ? t + 3 : tmp1(t); Func<int, int> tmp2 = h1(tmp1); // 4. tmp2 is the chain
Now handlers are dynamically composed into chains to address particular scenario.
As a chaining exercise let’s create the following application (a team of developers tries to handle a project):
Prepare
// Staff development team that will do the project static IEnumerable<Func<Func<int, int>, Func<int, int>>> Staff(int teamSize, int maxSkill) { var rnd = new Random(); for (int i = 0; i < teamSize; i++) { int dev = i; // Developers may differ in their skills int skill = rnd.Next(maxSkill); // If developer can handle incoming task he reports by returning his id that he completed the task // If not (not enough skills) he contributes to task and delegates to next developer smaller task yield return c => t => t <= skill ? dev : c(t - skill); } } // Create work break down structure for the project static IEnumerable<int> Work(int projectSize, int maxTaskComplexity) { var rnd = new Random(); for (int i = 0; i < projectSize; i++) { yield return rnd.Next(maxTaskComplexity) + 1; } }
and march to the end.
// Create tasks var work = Work(projectSize, maxTaskComplexity).ToArray(); // If the team cannot handle particular task they ask for help unknown guru Func<int, int> askForHelp = t => -1; // Create chain out of developers to form a team with a backup var team = Chain(Staff(teamSize, maxTaskComplexity), askForHelp); // Hand out each task to the team var project = from w in work select new {Task = w, CompletedBy = team(w)}; foreach(var status in project) { Console.WriteLine("Task {0} completed by {1}", status.Task, status.CompletedBy); }
Have chaining fun!
Recall my previous post Events and Callbacks vs. Explicit Calls that outlined pros and cons of both (events and callbacks on one side and explicit calls on the other side) approaches.
Explicit calls imply that callee plays certain role from caller’s perspective thus making the relationship between the two explicit as well. Consider simple undo/redo example:
interface ICommand { // Not every command can be undone. Returns true if it can be undone, // false - otherwise. bool CanUndo { get; } // Executes the command. void Do(); // Reverts changes. NotSupportedException must be thrown in case // command cannot be undone. void UnDo(); } // Provides undo/redo mechanism. class History { // Makes supplied command part of the history. public void Remember(ICommand cmd) { // implementation is not relevant. } // other members elided. }
Caller (History) clearly communicates its expectations by requiring callee (particular command) to conform with command’s contract (indivisible logical unit).
While being explicit makes code more clear it has its price. It requires consumers to create new types that conform to contract. It is not a problem if the type created will be reused by other parts of the application or it has complex logic. But it is also common that it may have trivial logic that is used only in caller (undo/redo mechanism in this case) scenario. In this case creating a new type sounds like an overhead.
I whish C# has capabilities (“Object Expressions”) similar to F# where I can implement interface “in place” like this:
let cmd = { new ICommand with member x.CanUndo with get() = false member x.Do() = Console.Write("Done") member x.UnDo() = raise (new NotSupportedException()) }
Although we can provide default implementation that uses delegates.
class ActionCommand : ICommand { private readonly Action m_do; private readonly Action m_undo; public ActionCommand(Action doCallback, Action undoCallback) { if (doCallback == null) { throw new ArgumentNullException("doCallback"); } m_do = doCallback; m_undo = undoCallback; } public bool CanUndo { get { return m_undo != null; } } public void Do() { m_do(); } public void UnDo() { if (!CanUndo) { throw new NotSupportedException(); } m_undo(); } }
While conforming to contract ActionCommand eases creation of lightweight scenario dedicated implementations “in place” avoiding types proliferation. But still the type must be discovered first by developers. It is negligible effort but it is still nonzero. In order to level this effort let the original consuming code do the job.
// Provides undo/redo mechanism class History { // Makes supplied command part of the history public void Remember(ICommand cmd) { // implementation is not relevant } // Makes command represented by pair of callbacks part of the history public void Remember(Action doCallback, Action undoCallback) { Remember(new ActionCommand(doCallback, undoCallback)); } // other members elided }
You should not put every possible “shortcut” into overloads but only the most commonly used one.
What benefits does this approach has? It benefits from being close to explicitly defined role making it easier to understand and use callback based API that is useful in case of lightweight single scenario use implementations.
C# “using” statement has several advantages over its expanded equivalent:
Whenever you need to obtain several resources (number is known at compile time), use and then dispose them “using” statement is usually the choice:
using(var aStream = File.Open("a.txt", FileMode.Open)) { using(var bStream = File.Open("b.txt", FileMode.Open)) { // Use both streams } }
However it is not always the case. There may be a case when number of resources to obtain is not known at compile time. For example, basic external merge sorting algorithm separates large file into chunks (total number depends on original file size and available memory) that can be sorted in memory and then written to disk. Sorted chunks iteratively merged until a single chunk is left (which is sorted original file). During merge iteration several files must be opened (number is not known in advance), processed and then disposed. As we cannot use “using” statement directly it might look like this:
IEnumerable<string> files = ...; // Initialized elsewhere var streams = new List<Stream>(); try { // As we may get half way through opening // files and got exception because file doesn't // exist opened streams must be remembered foreach (var file in files) { streams.Add(File.Open(file, FileMode.Open)); } // Use streams } finally { // Dispose opened streams foreach (var stream in streams) { stream.Dispose(); } }
Unfortunately we lost all advantages of “using” statement (looks messy and collection of opened streams or its contents can be modified before “finally” block). It would be nice to have something like this:
using (var streams = ???) { // streams must be IEnumerable<Stream> }
For reference types expansion of “using” statement looks like this (struct types differ in how resource is disposed):
using (ResourceType resource = expression) statement // is expanded to { ResourceType resource = expression; try { statement; } finally { if (resource != null) ((IDisposable)resource).Dispose(); } }
If an exception happens during expression evaluation resource won’t be disposed (as there is nothing to dispose). However any exceptions inside statement are ok. So we need to somehow define how file names are converted into streams but still avoid any exceptions. Lazy evaluation will be handy.
// Projected sequence won’t get evaluated until it is enumerated // and thus file related exceptions (if any) are also postponed files.Select(file => File.Open(file, FileMode.Open))
Still we cannot use it inside “using” statement as it is not IDisposable. So basically what we want is a disposable sequence that takes care of disposing its elements (required to be IDisposable).
interface IDisposableSequence<T> : IEnumerable<T>, IDisposable where T:IDisposable { }
Sequence of disposable elements can be wrapped through
static class Disposable { // Defined as an extension method that augments minimal needed interface public static IDisposableSequence<T> AsDisposable<T>(this IEnumerable<T> seq) where T:IDisposable { return new DisposableSequence<T>(seq); } } class DisposableSequence<T> : IDisposableSequence<T> where T:IDisposable { public DisposableSequence(IEnumerable<T> sequence) { ... // an implementation goes here } ... // Other members elided for now }
We are close. But there is subtle issue. Obtaining resource is a side effect. Enumerating multiple times through projected into resources sequence will result in unwanted side effects which of course must be avoided. In this particular case enumerating (and thus projecting it) through the same element (file name) more than once will attempt to open already opened file and result in exception as File.Open uses FileShare.None by default.
So we need to avoid side effects by memorizing obtained resources.
class DisposableSequence<T> : IDisposableSequence<T> where T : IDisposable { private IEnumerable<T> m_seq; private IEnumerator<T> m_enum; private Node<T> m_head; private bool m_disposed; public DisposableSequence(IEnumerable<T> sequence) { m_seq = sequence; } public IEnumerator<T> GetEnumerator() { ThrowIfDisposed(); // Enumerator is built traversing lazy linked list // and forcing it to expand if possible var n = EnsureHead(); while (n != null) { yield return n.Value; n = n.GetNext(true); } } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public void Dispose() { if (!m_disposed) { m_disposed = true; // As sequence creates enumerator it is responsible // for its disposal if (m_enum != null) { m_enum.Dispose(); m_enum = null; } // As it is possible that not all resources were // obtained (for example, inside using statement // only half of lazy evaluated sequence elements // were enumerated and thus only half of resources // obtained) we do not want to obtain them now // as they are going to be disposed immediately. // Thus we traverse only through already created // lazy linked list nodes and dispose obtained // resources Dispose(m_head); m_seq = null; } } private Node<T> EnsureHead() { // Obtain enumerator once if (m_enum == null) { m_enum = m_seq.GetEnumerator(); // Try to expand to first element if (m_enum.MoveNext()) { // Created node caches current element m_head = new Node<T>(m_enum); } } return m_head; } private void ThrowIfDisposed() { if (m_disposed) { throw new ObjectDisposedException("DisposableSequence"); } } private static void Dispose(Node<T> h) { if (h == null) { return; } try { // Disposing resources must be done in the opposite // to usage order. With recursion it will have the // same semantics as nested try{}finally{} blocks. Dispose(h.GetNext(false)); } finally { h.Value.Dispose(); } } class Node<V> { private readonly V m_value; private IEnumerator<V> m_enum; private Node<V> m_next; public Node(IEnumerator<V> enumerator) { m_value = enumerator.Current; m_enum = enumerator; } public V Value { get { return m_value; } } public Node<V> GetNext(bool force) { // Expand only if forced and not expanded before if (force && m_enum != null) { if (m_enum.MoveNext()) { m_next = new Node<V>(m_enum); } m_enum = null; } return m_next; } } }
Once enumerated resources are memorized inside lazy linked list. It expands only more than already memorized resources are requested.
After putting things together our desired “using” statement usage looks like this
using (var streams = files.Select(file => File.Open(file, FileMode.Open)).AsDisposable()) { // streams is IEnumerable<Stream> and IDisposable }
Update.
In general it is a good practice to acquire resource right before its usage and dispose it when it is no longer needed otherwise system may experience resources exhaustion.
Described above approach can be used whenever resources should be acquired and disposed together (they all have the same actual usage time) and you do not know number of resources in advance. Otherwise you must use one or more "using" statements and dispose resources as they are no longer needed.
You must carefully consider that even if grouped under a single "using" statement (using described approach) resources have different actual usage time they won't be disposed (unless done explicitly inside "using" statement assuming that multiple calls to Dispose method are allowed) until processing of all resources is completed (holding some of them unnecessarily).