Chasing state of the art

Exploring best practices in problem solving

Posts
  • Chasing state of the art

    Parallel partition phase for quick sort

    • 0 Comments

    A while ago I was wrapping my head around parallel merge sort. Since it requires additional O(n) space in practice it is the best choice if available memory is not constrained. Otherwise it is better to consider quick sort. As in merge sort in quick sort we have two phases:

    • rearrange elements into two partitions such that left one contains elements less than or equal to the selected pivot element and greater or equal to the pivot elements are in the right one
    • recursively sort (independent) partitions

    Second phase is naturally parallelized using task parallelism since partitions are independent (partition elements will remain inside partition boundaries when the sort of the whole array is finished). You can find an example of this behavior in parallel quick sort. It is a good start. But the first phase is still contributes O(n) at each recursion level. By parallelizing partition phase we can further speed up quick sort.

    Sequential version of partition phase is pretty straightforward.

    class PartitionHelper<T>
    {
        private readonly T[] m_arr;
        private readonly IComparer<T> m_comparer;
    
        public PartitionHelper(T[] arr, IComparer<T> comparer)
        {
            m_arr = arr;
            m_comparer = comparer;
        }
    
        // Moves elements within range around pivot sequentially and
        // returns position of the first element equal to the pivot.
        public int SequentialPartition(T pivot, int from, int to)
        {
            var j = from;
            for (var i = from; i < to; i++) {
                if (m_comparer.Compare(m_arr[i], pivot) < 0) {
                    SwapElements(i, j++);
                }
            }
            return j;
        }
    
        private void SwapElements(int from, int to)
        {
            var tmp = m_arr[from];
            m_arr[from] = m_arr[to];
            m_arr[to] = tmp;
        }
        
    ...

    An interesting point is that we do not know in advance how the partitioning will be done since it is data dependent (position of an element depends on other elements). Still independent pieces can be carved out. Here is the core idea.

    Let’s assume an array that looks like below where x denotes some element, p denotes selected pivot, e is equal, l is less and g is greater elements than pivot.

    e l l (g l g e e l) x x x x x x x x x (l l g g e l) g e g p

               left                                    right

    Let’s assume we selected two blocks of elements within array called left (containing elements g l g e e l) such that all elements before are less or equal to the pivot and right (that holds elements l l g g e l) such that all elements after it are greater or equal to the pivot from left and right ends respectively. After the partitioning against pivot is done left block will hold elements less than or equal to the pivot and right block will contain elements greater that or equal to the pivot. In our example left block contains two g elements that do not belong there and right block holds three l elements that must not be there. But this means  that we can swap two l elements from right block with two g elements from left block and left block will comply with partitioning against pivot.

    e l l (l l l e e l) x x x x x x x x x (g g g g e l) g e g p

               left                                   right

    Overall after blocks rearrange operation at least one of them contains correct elements (if the number of elements to be moved in each block is not equal).

    ...
    
        // Enum that indicates which of the blocks are in 
        // in place after arrangment.
        private enum InPlace
        {
            Left,
            Right,
            Both
        }
    
        // Tries to rearranges elements of the two blocks such that
        // right block contains elements greater or equal to the 
        // pivot and/or left block contains elements less than or 
        // equal to the pivot. At least one of the blocks is 
        // correctly reaaranged.
        private InPlace ArrangeBlocks(T pivot, ref int leftFrom, int leftTo, ref int rightFrom, int rightTo)
        {
            while (leftFrom < leftTo && rightFrom < rightTo) {
                while (m_comparer.Compare(m_arr[leftFrom], pivot) <= 0 && ++leftFrom < leftTo) {
                }
    
                while (m_comparer.Compare(m_arr[rightFrom], pivot) >= 0 && ++rightFrom < rightTo) {
                }
    
                if (leftFrom == leftTo || rightFrom == rightTo) {
                    break;
                }
    
                SwapElements(leftFrom++, rightFrom++);
            }
    
            if (leftFrom == leftTo && rightFrom == rightTo) {
                return InPlace.Both;
            }
    
            if (leftFrom == leftTo) {
                return InPlace.Left;
            }
    
            return InPlace.Right;
        }
    
    ...

    Then we can select next left block try to do the same. Repeat it until blocks meet piece by piece making left and right parts of the array as partitioning wants it to be.  Sequential block based algorithm looks like this:

    • select block size and pick block from left and right ends of the array
    • rearrange elements of the two blocks
    • pick next block from the same end if all elements of the block are in place (as the partitioning wants them to be)
    • repeat until all blocks are processed
    • do sequential partitioning of the remaining block (since from a pair of blocks at most one block may remain not rearranged) if one exists

    Interesting bit is that pairs of blocks can be independently rearranged. Workers can pick blocks concurrently from corresponding ends of array and in parallel rearrange elements.

    A block once taken by a worker should not be accessible by other workers. When no more blocks left worker must stop. Basically we have two counters (number of blocks taken from left and right ends of the array). In order to take a block we must atomically increment corresponding counter and check that sum of the two counters is less than equal to total number of blocks otherwise all blocks are exhausted and worker must stop. Doing under a lock is simple and acceptable for large arrays and blocks but inefficient for small arrays and blocks.

    We will pack two counters into a single 32 bit value where lower 16 bits are for right blocks counter and higher 16 bits are for left blocks. To increment right and left blocks counters 1 and 1<<16 must be added to combined value respectively. Atomically updated combined value allows to extract individual counters and make decision on whether block was successfully taken or not.

    Since each worker may attempt to race for the last not taken block care should be taken of overflow. So only 15 bits are used for each counter and so it will require 1<<15 workers to cause overflow that is not realistic.

    ...
    
        // Class that maintains taken blocks in a thread-safe way.
        private class BlockCounter
        {
            private const int c_minBlockSize = 1024;
    
            private readonly int m_blockCount;
            private readonly int m_blockSize;
            private int m_counter;
            private const int c_leftBlock = 1 << 16;
            private const int c_rightBlock = 1;
            private const int c_lowWordMask = 0x0000FFFF;
    
            public BlockCounter(int size)
            {
                // Compute block size given that we have only 15 bits
                // to hold block count.
                m_blockSize = Math.Max(size/Int16.MaxValue, c_minBlockSize);
                m_blockCount = size/m_blockSize;
            }
    
            // Gets selected block size based on total number of 
            // elements and minimum block size.
            public int BlockSize
            {
                get { return m_blockSize; }
            }
    
            // Gets total number of blocks that is equal to the 
            // total number devided evenly by the block size.
            public int BlockCount
            {
                get { return m_blockCount; }
            }
    
            // Takes a block from left end and returns a value which 
            // indicates whether taken block is valid since due to 
            // races a block that is beyond allowed range can be 
            // taken.
            public bool TakeLeftBlock(out int left)
            {
                int ignore;
                return TakeBlock(c_leftBlock, out left, out ignore);
            }
    
            // Takes a block from ringt end and returns its validity. 
            public bool TakeRightBlock(out int right)
            {
                int ignore;
                return TakeBlock(c_rightBlock, out ignore, out right);
            }
    
            // Atomically takes a block either from left or right end 
            // by incrementing higher or lower word of a single 
            // double word and checks that the sum of taken blocks 
            // so far is still within allowed limit.
            private bool TakeBlock(int block, out int left, out int right)
            {
                var counter = unchecked((uint) Interlocked.Add(ref m_counter, block));
                // Extract number of taken blocks from left and right 
                // ends.
                left = (int) (counter >> 16);
                right = (int) (counter & c_lowWordMask);
                // Check that the sum of taken blocks is within 
                // allowed range and decrement them to represent 
                // most recently taken blocks indices.
                return left-- + right-- <= m_blockCount;
            }
        }
    
    ...

    With multiple workers rearranging pairs of blocks we may end up with “wholes”.

    (l l e) (l g l) (l e e) (l l l) (g g e) (e l l) x x x (g g e) (l g e) (e e g)

       l0     l1        l2      l3       l4       l5              r0         r1      r2

    In the example above blocks l1, l4 and r1 are the wholes in left and right partitions of the array meaning they were not completely rearranged. We must compact left and right partitions such that they contain no wholes.

    (l l e) (e l l) (l e e) (l l l) (g g e) (l g l) x x x (l g e) (g g e) (e e g)

      l0      l5       l2        l3      l4        l1              r1       r0        r2

    Now we can do sequential partitioning of range between the end of the left most rearranged block (l3) and beginning of the right most rearranged block (r0).

    ...
    
        // A threshold of range size below which parallel partition
        // will switch to sequential implementation otherwise 
        // parallelization will not be justified.
        private const int c_sequentialThreshold = 8192;
    
        // Moves elements within range around pivot in parallel and 
        // returns position of the first element equal to the pivot.
        public int ParallelPartition(T pivot, int from, int to)
        {
            var size = to - from;
            // If range is too narrow resort to sequential 
            // partitioning.
            if (size < c_sequentialThreshold) {
                return SequentialPartition(pivot, from, to);
            }
    
            var counter = new BlockCounter(size);
            var blockCount = counter.BlockCount;
            var blockSize = counter.BlockSize;
            // Workers will process pairs of blocks and so number 
            // of workers should be less than half the number of 
            // blocks.
            var workerCount = Math.Min(Environment.ProcessorCount, blockCount / 2);
            // After the worker is done it must report blocks that 
            // were not rearranged
            var leftRemaining = AllocateRemainingArray(workerCount);
            var rightRemaining = AllocateRemainingArray(workerCount);
            // and left most and right most rearranged blocks.
            var leftMostBlocks = AllocateMostArray(workerCount);
            var rightMostBlocks = AllocateMostArray(workerCount);
    
            Action<int> worker = index =>
            {
                int localLeftMost = -1, localRightMost = -1;
    
                var leftBlock = localLeftMost;
                var rightBlock = localRightMost;
                int leftFrom = 0, leftTo = 0;
                int rightFrom = 0, rightTo = 0;
                var result = InPlace.Both;
                // Until all blocks are exhausted try to rearrange 
                while (true) {
                    // Depending on the previous step one or two 
                    // blocks must taken.
                    if (result == InPlace.Left || 
                        result == InPlace.Both) {
                        // Left or both blocks wre successfully 
                        // rearranged so we need to update left most 
                        // block.
                        localLeftMost = leftBlock;
                        // and try to take block from the left end.
                        if (!counter.TakeLeftBlock(out leftBlock)) {
                            break;
                        }
    
                        leftFrom = from + leftBlock*blockSize;
                        leftTo = leftFrom + blockSize;
                    }
    
                    if (result == InPlace.Right || 
                        result == InPlace.Both) {
                        // Right or both blocks were successfully 
                        // rearranged update right most and take new 
                        // right block.
                        localRightMost = rightBlock;
                        if (!counter.TakeRightBlock(out rightBlock)){
                            break;
                        }
    
                        rightTo = to - rightBlock*blockSize;
                        rightFrom = rightTo - blockSize;
                    }
                    // Try to rearrange elements of the two blocks 
                    // such that elements of the right block are 
                    // greater or equal to pivot and left block 
                    // contains elements less than or equal to pivot.
                    result = ArrangeBlocks(pivot, ref leftFrom, leftTo, ref rightFrom, rightTo);
                    // At least one of the blocks is correctly 
                    // rearranged and if we are lucky - two of them.
                }
                // If the end of right block was not rearranged mark 
                // it as remaining to be arranged.
                if (rightFrom != rightTo) {
                    rightRemaining[index] = rightBlock;
                }
                // Same for the left block.
                if (leftFrom != leftTo) {
                    leftRemaining[index] = leftBlock;
                }
                // Update worker local left most and right most 
                // arranged blocks.
                leftMostBlocks[index] = localLeftMost;
                rightMostBlocks[index] = localRightMost;
            };
    
            Parallel.For(0, workerCount, worker);
            // Compact arranged blocks from both ends so that all non
            // arranged blocks lie consecutively between arranged 
            // left and right blocks.
            var leftMostBlock = ArrangeRemainingBlocks(from, blockSize, leftRemaining, leftMostBlocks.Max(), 1);
            var rightMostBlock = ArrangeRemainingBlocks(to - blockSize, blockSize, rightRemaining, rightMostBlocks.Max(), -1);
            // Do sequential partitioning of the inner most area.
            return SequentialPartition(pivot, from + (leftMostBlock + 1) * blockSize, to - (rightMostBlock + 1)*blockSize);
        }
    
        // Moves rearranged blocks to cover holes such that all 
        // rearranged blocks are consecutive. Basically it does 
        // compaction and returns most rearranged block.
        private int ArrangeRemainingBlocks(int bound, int blockSize, int[] remaining, int mostBlock, int sign)
        {
            Array.Sort(remaining);
            var j = Array.FindLastIndex(remaining, b => b < mostBlock);
            for (var i = 0; i < remaining.Length && remaining[i] <= mostBlock;) {
                if (remaining[j] == mostBlock) {
                    j--;
                }
                else {
                    SwapBlocks(bound + sign * remaining[i] * blockSize, bound + sign * mostBlock * blockSize, blockSize);
                    i++;
                }
                mostBlock--;
            }
            return mostBlock;
        }
    
        private static int[] AllocateRemainingArray(int workerCount)
        {
            return Enumerable.Repeat(Int32.MaxValue, workerCount).ToArray();
        }
    
        private static int[] AllocateMostArray(int workerCount)
        {
            return Enumerable.Repeat(-1, workerCount).ToArray();
        }
    
        // Swaps two blocks
        private void SwapBlocks(int from, int to, int blockSize)
        {
            for (var i = 0; i < blockSize; i++) {
                SwapElements(from + i, to + i);
            }
        }
    }

    Now we have parallel implementation of the quick sort partition phase. Experiments with random generated arrays of integer values show that it helps to speed up parallel quick sort by approximately 50% on a 8 way machine.

  • Chasing state of the art

    Load-balancing partitioner with work stealing, part two

    • 0 Comments

    In part one work stealing range was introduced that allows stealing of work items in contiguous chunks up to half of available work space. Now is the time for the partitioner itself.

    If you recall partitioning can be done either statically up front or dynamically on demand. As we are looking at the case of known work space size handing out chunk of work items on demand dynamically is the way to deal with uneven work distribution. Since work stealing is a load balancing mechanism itself static partitioning for initial workload distribution and as computation goes work stealing is used to balance it.

    Now as the obvious stuff is out of the way there two important things to consider: how work stealing is done and what is the termination condition of the whole computation.

    Initial static partitioning feeds workers with contiguous chunks of work items to process. Each worker wraps obtained chunk into work stealing range so that other workers if needed can steal from it and continues to take one item at a time until work stealing range is drained potentially with help from other workers. At this point worker accumulated number of processed items as it is required to do the proper termination detection. Worker tries to steal from others while returning back processed items (or basically marking them as processed) through partition list that serves as a coordinator of work stealing and termination detection. Stealing is attempted until succeeded or termination is detected. Upon success worker wraps stolen range and continues its processing as from the beginning.

    Static partitioning assumes work space size knowledge. As work space doesn't grow over time (deliberate decision and if grow is needed it can be done in phases where work items from current phase result in work items for the next phase) it is the basis for termination detection. Initially work space size is set and every worker that encounters drained work stealing range returns back processed items count by decreasing remaining count. Once count reaches zero the processing must be terminated.

    Here is work stealing range partitioner in the flesh.

    class WorkStealingIndexRangePartitioner : Partitioner<int>
    {
        private readonly int m_fromInclusive;
        private readonly int m_toExclusive;
    
        public WorkStealingIndexRangePartitioner(int fromInclusive, int toExclusive)
        {
            if (fromInclusive >= toExclusive)
                throw new ArgumentException();
    
            m_fromInclusive = fromInclusive;
            m_toExclusive = toExclusive;
        }
    
        public override IList<IEnumerator<int>> GetPartitions(int partitionCount)
        {
            if (partitionCount <= 0)
                throw new ArgumentException();
            // Calculate range and partition size
            var rangeSize = m_toExclusive - m_fromInclusive;
            var partitionSize = Math.Max(1, rangeSize / partitionCount);
    
            var partitionList = new PartitionList(rangeSize);
            var partitions = new Partition[partitionCount];
            // Create partitiions by statically diving work items 
            // into even sized chunks which is ok even in case 
            // non uniform workload distribution as it will be 
            // balanced out through work stealing
            var from = m_fromInclusive;
            for (var i = 0; i < partitionCount; i++)
            {
                var to = Math.Min(from + partitionSize, m_toExclusive);
                partitions[i] = new Partition(partitionList, from, to);
                from = to;
            }
            // Wire them through a coordinator
            partitionList.SetPartitions(partitions);
    
            return partitions;
        }
    
        // Partitioning coordinator
        class PartitionList
        {
            // Holds list of available partitions
            private List<Partition> m_partitions;
            // Holds number of remaining items to process
            private int m_remainingCount;
    
            public PartitionList(int count)
            {
                m_remainingCount = count;
            }
    
            public void SetPartitions(IEnumerable<Partition> partitions)
            {
                m_partitions = new List<Partition>(partitions);
            }
    
            // Return number of items as processed and tries to steal 
            // new work items from other partitions
            public bool TryReturnAndSteal(Partition to, int count, out Tuple<int, int> range)
            {
                // Move toward termination condition 
                Interlocked.Add(ref m_remainingCount, -count);
                // Until either termination condition is 
                // reached or successful steal attempt
                range = null;
                while (true)
                {
                    // Enumerate through available partitions and try 
                    // to steal from them
                    foreach (var from in m_partitions)
                    {
                        // Check if nothing to steal
                        if (m_remainingCount <= 0)
                            return false;
                        // Skip requesting partition as it is empty
                        if (from == to)
                            continue;
    
                        range = from.TrySteal();
                        // Keep trying to steal from others if 
                        // unsuccessful
                        if (range != null)
                            return true;
                    }
                }
            }
        }
    
        // Work stealing partition 
        class Partition : IEnumerator<int>
        {
            // Holds range items currently owned
            private WorkStealingRange m_workStealingRange;
            // Holds number of processed items
            private int m_localCount;
            // Holds reference to partitioning coordinator that 
            // controls in addition termination condition
            private readonly PartitionList m_list;
            // Holds current partition element or null if move next
            // was called or returned false
            private int? m_current;
    
            public Partition(PartitionList list, int fromInclusive, int toExclusive)
            {
                m_list = list;
                m_workStealingRange = new WorkStealingRange(fromInclusive, toExclusive);
            }
    
            public int Current
            {
                get
                {
                    if (m_current != null)
                        return (int)m_current;
    
                    throw new InvalidOperationException();
                }
            }
    
            object IEnumerator.Current
            {
                get { return Current; }
            }
    
            // Tries to steal from local steal range
            public Tuple<int, int> TrySteal()
            {
                return m_workStealingRange.TryStealRange();
            }
    
            public bool MoveNext()
            {
                // First try to take item local from available range
                var local = m_workStealingRange.TryTakeOne();
                if (local != null)
                {
                    // Mark item as processed 
                    m_localCount++;
                    // and set up current item
                    m_current = local;
                    return true;
                }
                // Otherwise try to steal from others
                Tuple<int, int> range;
                if (m_list.TryReturnAndSteal(this, m_localCount, out range))
                {
                    // Stolen something
                    var from = range.Item1;
                    var to = range.Item2;
                    // Keep very first element to yourself
                    m_localCount = 1;
                    m_current = from++;
                    // If anything left expose it to allow others
                    // to steal
                    if (to - from > 0)
                        m_workStealingRange = new WorkStealingRange(from, to);
    
                    return true;
                }
                // Termination condition reached so nothing to steal 
                // from others
                return false;
            }
    
            public void Dispose()
            {
            }
    
            public void Reset()
            {
                throw new NotSupportedException();
            }
        }
    }

    Rough benchmarking shows that on random workload input it performs as well as standard partitioners for indexed collections while providing good performance for worst case scenario in workload distribution.

  • Chasing state of the art

    Load-balancing partitioner with work stealing, part one

    • 0 Comments

    Data parallelism is a computing parallelization technique where data can be separated into independent pieces and distributed across parallel computing nodes. This technique is the core of the Parallel LINQ that partitions data into segments and executes query on each segment in parallel. Depending on scenarios and expected workload distribution different partitioning schemes (I highly recommend to read linked post before reading further) can be employed that can be essentially divided into two types:

    • Static where partitioning is done up front which is quite simple but may perform poorly when workload distribution is not even
    • Dynamic where partition is done on demand by providing chunks of work to idle workers which better deals with uneven workload distribution

    Both types has one thing in common which is once chunk of work is taken by worker it is never given back until processed meaning worker has exclusive responsibility for it. In case of uneven workload distribution it may lead to poor performance which is the performance of the slowest worker. For example, when most of the heavy work is handed out to a single worker even though all other workers are finished in parallel the overall computation is not finished until unlucky worker will finish executing heavy work sequentially.

    In order to deal with uneven workload distribution work stealing can be used. Joe Duffy explores in great details of how to build custom thread pool that uses work stealing to balance workload among workers. The approach allows to steal one work item which is in most cases sufficient as work item usually produces other work items and so chances are that idle worker once processed stolen work item will have more work to do (otherwise it can go steal other work item if any).

    In data parallelism scenarios stealing a chunk of work items in one shot may be more beneficial (to avoid high synchronization costs). Work stealing benefits from initial work distribution compared to having all work handed over to a single worker and let the others to steal from it. Thus static partitioning with work stealing of chunks of work items is what we are looking for. Static partitioning assumes work space size knowledge which in most cases there.

    Parallel LINQ uses partitioner concept to abstract partitioning mechanism. Before developing custom partitioner (this will be part two of the series) work stealing part must be in place:

    • Work space is known in advance, it is not growing over time and provides indexed access
    • Thieves must be able to steal work items in contiguous chunks ideally half of the available items

    As work space is known in advance it is represented through a range of integer values. So basically indexes will be the subject rather than elements themselves as it quite easy to map to the actual elements. Range will be accessed from lower bound by the owner of the range and every time will try to take one index. Higher bound of the range is represented basically by other range called steal range. It defines bounds of indexes that are eligible for stealing. Thieves will contend for the steal range with each other and with owner in case the very last item is in the steal range. Essentially work space can be looked at as [low .. [mid .. high)) where [mid .. high) is a steal range and [low .. high) is overall work space.

    Stealing is the fate of idle workers and thus they take the burden letting owner be ignorant of their presence until they are too close:

    • Load available steal range and lower bound
    • If steal range falls behind meaning no more items left return value that indicates unsuccessful steal attempt
    • Otherwise construct new steal range and attempt to atomically compare and swap it
      • If succeeded observed range was stolen
      • otherwise either other thief succeeded; or the owner if the range contained the very last item; or owner updated steal range as between the moment thief observed steal range and now owner consumed a lot of items and is close to steal range

    Owner must be able to take one item at a time without heavy synchronization as follows:

    • Reserve item at the lower bound by advancing it by one and once the item is reserved it cannot be reached unless it is in the steal range
    • Load available steal range; the order is really important otherwise due to reordering the same item can stolen and taken by owner
    • If reserved lower bound item is not in observed steal range
      • If steal range is too close try to update to a smaller one and don't worry if unsuccessfully as next steal or local take will make right
      • Return reserved item as successfully taken
    • If reserved item in the steal range meaning this the very last one and so contend for it with thieves
      • Try to atomically compare and swap to a steal range that falls behind to indicate no more items left
      • If succeeded return reserved item as successfully taken
      • Otherwise lost the race so return value that indicates unsuccessful take attempt
    • Otherwise the last item was stolen before owner even contended for it

    Now that algorithm is in place here is the implementation.

    class WorkStealingRange
    {
        // Holds range that is available for stealing
        private volatile Tuple<int, int> m_stealRange;
        // Holds index next to be taken locally
        private volatile int m_low;
    
        public WorkStealingRange(int low, int high)
        {
            m_low = low;
            m_stealRange = CreateStealRange(low, high);
        }
    
        // tries to steal range of items 
        public Tuple<int, int> TryStealRange()
        {
            // Contend for available steal range
            var oldRange = m_stealRange;
            var mid = oldRange.Item1;
            var low = m_low;
            // If steal range is behind lower bound it means no 
            // work items left 
            if (low > mid)
                // Return null to indicate failed steal attempt
                return null;
            // Calculate new steal range that will replace current
            // in case of success
            var newRange = CreateStealRange(low, mid);
            // Contend with other thieves and owner (in case steal 
            // range consists of the single last item)
            if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange)
                // Lost the race so indicate failed steal attempt
                return null;
            // Won contention for the steal range
            return oldRange;
        }
    
        // Tries to take one item locally
        public int? TryTakeOne()
        {
            var low = m_low;
            // Reserve item using exchange to avoid legal 
            // reordering with steal range read below
            Interlocked.Exchange(ref m_low, low + 1);
            // Now that the lowest element is reserved it is either
            // not avaible to thieves or it is the last one and
            // is in steal range
            var oldRange = m_stealRange;
            var mid = oldRange.Item1;
            // If observed non empty steal range that doesn't 
            // contain reserved item it safe to return it as 
            // nobody can reach reserved item now
            if (low < mid)
            {
                var high = oldRange.Item2;
                // If ahead not enough space in particular at least 
                // two times of observed steal range attempt to 
                // adjust steal range to prevent stealing more than 
                // half of items
                if (mid - low <= 2 * (high - mid))
                {
                    // Try to make steal range 1/4 of available work
                    // space
                    var newRange = CreateStealRange(low, high);
                    // Don't worry if failed as next steal or local 
                    // take will fix it
                    Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange);
                }
                // Return reserved item as it is not reachable 
                // by thieves
                return low;
            }
            // If observed steal range contains reserved item contend
            // for it with thieves
            if (low == mid)
            {
                // Create new range that falls behind to indicate 
                // termination
                var newRange = CreateStealRange(low, low);
                // Otherwise steal range contains only reserved item 
                // and must contend with the thieves for it
                if (Interlocked.CompareExchange(ref m_stealRange, newRange, oldRange) != oldRange)
                    // Lost the race, return null to indicate no 
                    // more items available
                    return null;
                // Won contention for the last item
                return low;
            }
            // No luck last item was stolen
            return null;
        }
    
        private static Tuple<int, int> CreateStealRange(int low, int high)
        {
            // If range is not empty create new one that is 
            // 1/4 of the available space
            if (low != high)
                return new Tuple<int, int>((low + 3 * high) / 4, high);
            // Otherwise create empty range that falls behind
            return new Tuple<int, int>(low - 1, low - 1);
        }
    }
    

    Next time custom partitioner that uses work stealing range on the surgical table.

  • Chasing state of the art

    Infernal dinner synchronization problem

    • 2 Comments

    Imagine group of hungry people with spoons sitting around pot of stew. Spoon’s handle long enough to reach the pot but it is longer than the arm and no one can feed himself. People are desperate. This is a picture described in recently found piece of lost chapter of Dante’s Inferno. In order to help them Dante suggested to feed one another.

    Only one person can feed another at the same time. While feeding someone else a person cannot eat. People must not starve meaning once hungry a person will be fed. It is assumed that spoon’s handle allows to feed any other person expect for yourself.

    We’ll develop algorithm to let unfortunate ones to synchronize with each other and not to starve. It may seems similar to dinning philosophers problem but the latter has a limited choice of selecting the order of taking forks and the degree of contention is low. However in infernal dinner problem choice space and degree of contention is comparable with the problem size which is the number of people (a person may choose to try to feed any other person while potentially contending with all other but the person to be fed).

    Here are few important observations:

    • If everyone want to eat or to feed others at the same time they are doomed to deadlock. So at any given point in time at least one person must be willing to eat and at least one person to feed.
    • If a person fed someone next time he must eat and if a person ate next time he must feed thus they won’t get to all want to do the same situation that is a deadlock.
    • In order to prevent starvation some sort of fairness must be guaranteed. One person must not be able to get fed infinitely many times while there are other people waiting.
    • People must somehow agree in pairs (hungry one and not) to feed each other and while doing so others must not be able to pair with them.

    The first two are quite straightforward. Any person will either be hungry or not and every time a person eats the state changes. At the beginning at least one person is hungry and at least one is not.

    The last two are more tricky. As you remember there two types of people those that are hungry and those who do not. Let’s assume there are hungry people that line up and wait to be fed. Then people that are willing to feed come and take one by one from the head of the line hungry people and feed them. If no more hungry people left they also line up and wait for hungry people. They basically switched. This a an idea of how hungry and non-hungry people can pair to feed each other. While a pair of people is outside of the queue nobody else can interfere them.

    The queue is represented through linked linked list of the following form h->n0->n1->…->nk where h is a sentinel head node. Head and tail are never equal to null as in case of empty queue they both point to sentinel node. Nodes are added to the tail and removed from the head. In order to remove node from the beginning of the queue head node must be advanced to its successor that must not be non null otherwise the queue is considered empty. Adding is trickier. It is based on the fact that once next of a node is set it is never changed. It is done in two steps. First tail’s next is set to the node to be added and up on success (at this point the node is visible to other threads) tail is advanced to newly added node. Because this process is not atomic other threads may observe the change half way through. In that case a thread may help to finish adding the node by advancing the tail and retry its own operation.

    Now to the line up part. Essentially there are two cases:

    • When the queue is empty or there are already nodes of the same type
      • new node must be added to the end of the queue and waited upon until paired with someone else
    • Otherwise a node at the beginning must be removed and waiting thread notified of formed pair

    Based on this rules waiting queue will either be empty or contain nodes of the same type which is equivalent to a line of either hungry or non-hungry people.

    Here goes the implementation.

    class SyncQueue<T>
    {
        private volatile Node m_head;
        private volatile Node m_tail;
    
        public SyncQueue()
        {
            // Head is a sentinel node and will never be null
            m_head = new Node(default(T), false);
            m_tail = m_head;
        }
    
        public T Exchange(T value, bool mark, CancellationToken cancellationToken)
        {
            var node = new Node(value, mark);
            // Do until exchanged values with thread of 
            // a different type
            while (true)
            {
                cancellationToken.ThrowIfCancellationRequested();
    
                var head = m_head;
                var tail = m_tail;
    
                // If the waiting queue is empty or already contains
                // same type of items
                if (head == tail || tail.m_mark == mark)
                {
                    // Attempt to add current item to the end of the 
                    // waiting queue
                    var nextToTail = tail.m_next;
                    // To avoid costly interlocked operations check 
                    // if assumtion about the tail is still correct
                    if (tail != m_tail)
                        continue;
                    // If next to what observed to be the tail is 
                    // not null then the tail fell behind
                    if (nextToTail != null)
                    {
                        // Help to advance tail to the last node 
                        // and do not worry if it will fail as 
                        // someone else succeed in making tail up 
                        // to date
                        Interlocked.CompareExchange(ref m_tail, nextToTail, tail);
                        // And retry again
                        continue;
                    }
                    // Try to append current node to the end of the 
                    // waiting queue by setting next of the tail
                    // This is a linearization point of waiting case
                    // (adding node to the end of the queue)
                    if (Interlocked.CompareExchange(ref tail.m_next, node, null) != null) 
                        // Retry again if lost the race
                        continue;
                    // Advance the tail with no check for success as 
                    // in case of failure other thread is helping to
                    // advance the tail
                    Interlocked.CompareExchange(ref m_tail, node, tail);
                    // Wait until exchange is complete
                    var spin = new SpinWait();
                    while (node.m_mark == mark)
                    {
                        spin.SpinOnce();
                        cancellationToken.ThrowIfCancellationRequested();
                    }
                    // Correct value will be observed as reading mark 
                    // implies load acquire semantics
                    return node.m_value;
                }
                // Non empty waiting queue with items of a different 
                // type was observed thus attempt to exchange with 
                // thread waiting at the head of the queue through 
                // dequeueing
                var nextToHead = head.m_next;
                // Check if observed head is still consistent and it
                // has successor
                if (head != m_head || nextToHead == null)
                    continue;
                // Observed non-empty queue can either grow which is
                // fine as we are interested here in the head node 
                // otherwise attempt below will fail and will retry 
                // again.
                // Attempt to advance head that is a sentinel node 
                // to its successor that holds sought value and is 
                // supposed to be new sentinel.
                // This is a linearization point of releasing case 
                // (removing node from the beginning of the queue)
                if (Interlocked.CompareExchange(ref m_head, nextToHead, head) != head) 
                    // Retry if lost the race
                    continue;
                // At this point head's successor is dequeued and no
                // longer reachable so values can be safely exchanged
                var local = nextToHead.m_value;
                nextToHead.m_value = value;
                // Switch mark to let waiting thread know that 
                // exchange is complete and making it the last store
                // with release semantics makes sure waiting thread 
                // will observe correct value
                nextToHead.m_mark = mark;
    
                return local;
            }
        }
    
        class Node
        {
            internal volatile Node m_next;
            internal volatile bool m_mark;
            internal T m_value;
    
            internal Node(T value, bool mark)
            {
                m_value = value;
                m_mark = mark;
            }
        }
    }
    
    class Human
    {
        private volatile bool m_hungry;
    
        public Human(bool hungry)
        {
            m_hungry = hungry;
        }
    
        public void WaitAndEat(SyncQueue<Human> waitingQueue, CancellationToken cancellationToken)
        {
            var spin = new SpinWait();
            while (true)
            {
                spin.Reset();
                // The hell seems to have frozen =)
                cancellationToken.ThrowIfCancellationRequested();
    
                // Pair with someone either to feed hungry man 
                // or eat yourself if hungry
                var pairedWith = waitingQueue.Exchange(this, m_hungry, cancellationToken);
                if (!m_hungry)
                    // Feed hungry man
                    pairedWith.Feed();
                else
                    // Wait to be fed
                    while (m_hungry)
                    {
                        spin.SpinOnce();
                        cancellationToken.ThrowIfCancellationRequested();
                    }
            }
        }
    
        private void Feed()
        {
            // Switch to non hungry as just ate
            m_hungry = !m_hungry;
        }
    }

    The infernal dinner is served =)

  • Chasing state of the art

    Shared rooms synchronization problem

    • 0 Comments

    Recently I came across another interesting synchronization problem. Assume there are n rooms. Threads can enter and leave rooms. A room can hold arbitrary number of threads. If a room holds at least one thread it is considered occupied. Only one room can be occupied at a time. With each room exit action is associated that must be executed when the last thread left it. No threads are allowed to enter any room before exit action is executed to the end. Threads that are waiting to enter a room must eventually enter it. It is assumed that threads also at some point leave the room.

    There are several cases for a thread attempting to enter a room:

    • If no room is occupied thread can enter required room
    • If occupied room is the same as required room it is not empty (otherwise it may be the case when the action is being executed at the moment and it is not allowed to enter) and there no other threads waiting for other rooms (otherwise others may starve as continuous stream of coming threads to currently occupied room can prevent it to be set free) thread can enter it
    • Otherwise thread must wait

    Leaving room requires careful thought as well:

    • If the thread is not the last one it is free to go
    • Otherwise it must leave the room and execute exit action while keeping the room occupied to prevent others from entering any room
      • Once exit action is executed if no other thread is waiting the last one is free to go
      • Otherwise it must wakeup waiting ones

    Waiting and waking part can be done using Monitor.Wait and Monitor.PulseAll. Doing so using single sync object is simple but quite inefficient as every pulse all will wakeup all waiting threads (potentially waiting for different rooms) to see that they are not allowed to enter yet as only single room can be occupied at a time. Instead each room will have its own local sync object to wait and pulse on. This will allow to wake up only threads that are now allowed to enter the room they've been waiting on.

    But this is where difficulties come. The decision to be made by entering thread spans across rooms. So the decision is still needs to made using global lock. Now the tricky part is that once the decision is made to wait how not to miss wakeup. Attempting to do it like

    lock (global)
    {
        // Make decision to wait or return
        bool wait = ...;
        if (!wait)
            return;
    }
    // Somewhere here wakeup may be missed
    lock (local)
    {
        // Wait on local based on the decision made above
        Monitor.Wait(local);
    }

    is doomed to suffer from missed wakeups as in between global lock is released and local is acquired the last leaving room thread may try to wakeup waiters. So instead local lock will be acquired while still holding global lock and releasing it only after that.

    var locked = false;
    try
    {
        Monitor.Enter(global, ref locked);
        // Make decision to wait or return
        bool wait = ...;
        if (!wait)
            return;
        
        // Acquifre local hiwle holding global
        lock (local)
        {
            // Release global
            Monitor.Exit(global);
            locked = false;
            // Wait on local based on the decision made above
            Monitor.Wait(local);
        }
    }
    finally
    {
        if (locked)
            Monitor.Exit(global);
    }
    

    Threads indicate that are willing to enter the room by maintaining wait count for each room. It also used to allow threads enter the room in bulk. When the last leaving thread picks up a room to get occupied next, adjusts the count and wakes up waiting threads by pulsing room local sync object.

    In order to make sure that waiting threads enter a room eventually (guaranteeing starvation freedom) the following mechanism is used:

    • If some room is occupied a thread can enter that room only if no other threads are waiting to enter.
    • Last leaving thread runs through other rooms in circles starting from the room right next to currently occupied one to see if any thread is waiting and if such room is found it lets waiting threads to enter in bulk.

    Here goes the thing.

    public class SharedRoomsLock
    {
        // Holds room local locks
        private readonly object[] m_locks;
        // Holds number of threads waiting on to 
        // enter indexed by room numbers
        private readonly int[] m_waiting;
        // Holds actions to be excuted for each room 
        // upon exit
        private readonly Action[] m_actions;
        // Holds number of threads currently in the 
        // occupied room
        private int m_count;
        // Holds index of the room currently occupied
        private int m_occupied = -1;
    
        public SharedRoomsLock(IEnumerable<Action> actions)
        {
            m_actions = actions.ToArray();
    
            var count = m_actions.Length;
    
            m_waiting = new int[count];
            m_locks = Enumerable.Range(0, count)
                .Select(_ => new object()).ToArray();
        }
    
        // Lock ownership is omitted for clarity however
        // can be added using ThreadLocal<T>
        public void Enter(int i)
        {
            var locked = false;
            try
            {
                Monitor.Enter(m_locks, ref locked);
    
                if (
                    // If no room is occupied or
                    m_occupied < 0 ||
                    (
                    // Occupied room that thread is trying 
                    // to enter
                        m_occupied == i &&
                    // and there is still someone in there
                        m_count > 0 &&
                    // and no one waiting to enter other rooms
                        !m_waiting.Any(w => w > 0)))
                {
                    m_occupied = i;
                    m_count++;
                    return;
                }
                // Otherwise indicate desire to enter the room
                m_waiting[i]++;
                // Acquire room local lock before releasing main
                // to avoid missed or incorrect wakeups
                lock (m_locks[i])
                {
                    // Release main lock to allow others to 
                    // proceed including waking thread
                    Monitor.Exit(m_locks);
                    locked = false;
                    // Wait to be woken up by some last to leave
                    // room thread, not necessarily immediately
                    Monitor.Wait(m_locks[i]);
                    // Once woken up thread can safely enter 
                    // the room as count already adjusted 
                }
            }
            finally
            {
                if (locked)
                    Monitor.Exit(m_locks);
            }
        }
    
        public void Exit(int i)
        {
            lock (m_locks)
            {
                // Indicate that thread left the room 
                if (--m_count > 0)
                    // And leave it if not last
                    return;
            }
            // If last execute exit action however not under 
            // the lock as it quite dangerous due to reentracy 
            m_actions[i]();
            // At this point room is still treated as 
            // occupied as only once exit action is executed 
            // it can be set free
            var locked = false;
            try
            {
                Monitor.Enter(m_locks, ref locked);
                // By default set room as not occupied
                m_occupied = -1;
                // Run through other rooms in circles to see if any 
                // thread is waiting thus ensuring some sort of 
                // fairness or at least starvation freedom
                for (var j = 1; j <= m_waiting.Length; j++)
                {
                    var next = (i + j) % m_waiting.Length;
                    var w = m_waiting[next];
                    if (w > 0)
                    {
                        // Found room with waiting threads so it 
                        // will be occupied next
                        m_occupied = next;
                        break;
                    }
                }
                // If no one is waiting 
                if (m_occupied < 0)
                    // There is nothing that can done
                    return;
                // At this there are threads waiting to enter
                // the room so they are allowed to enter in one
                // shot 
                m_count = m_waiting[m_occupied];
                // Closing the doors right after them
                m_waiting[m_occupied] = 0;
                // Acquire room local lock before releasing main
                // to avoid missed or incorrect wakeups
                lock (m_locks[m_occupied])
                {
                    // Releasing main lock is safe because the 
                    // decision made under main lock will still be 
                    // true as no other thread except for those 
                    // already wating will be able to wait on room
                    // local lock that is currently held
                    Monitor.Exit(m_locks);
                    locked = false;
                    // Wake up waiting to enter threads
                    Monitor.PulseAll(m_locks[m_occupied]);
                }
            }
            finally
            {
                if (locked)
                    Monitor.Exit(m_locks);
            }
        }
    }
    
  • Chasing state of the art

    Single Responsibility Principle – discovering violations

    • 0 Comments

    Single Responsibility Principle states:

    A class should have only one reason to change. Responsibilities are reasons to change.

    Violations of this principle leave you face to face with fragile design and all the maintenance nightmares it implies. Unfortunately there is no hundred percent way to prevent it from happening – it is just the nature of design.

    Common sense often helps to spot multiple responsibilities mixed within a single class. However some cases are tricky and require careful attention. This is where design assessment comes in. An object can be looked at from two perspectives:

    • Data
      • What it knows?
      • What connections between objects it maintains?
    • Behavior
      • What it decides?
      • What services it performs?

    Uncontrolled growth in any of the groups can lead to god class (data or behavior forms respectively).

    What it knows?

    Keeping related data and behavior in one place is essential to building consistent abstractions. Failing to do so may lead to:

    • unnecessary details disclosure (by exposing all the data necessary to implement behavior)
    • duplicated or inconsistent behavior (due to poor discoverability)
    • uncontrolled growth (as it is harder to see the big picture due to scattered behavior)

    Following Information Expert pattern promotes the idea:

    Assign a responsibility to the class that has the information needed to fulfill it.

    Consider an example below where publicly visible person name could be incorrectly used to distinguish people (as names are not unique). Thus it makes sense to let Person type to define what equality means as it has all the necessary information (social security number). This will prevent introduction of several inconsistent equality behaviors across the code base.

    class Person
    {
        private string name;
        // Keep social security number in private
        private string ssn;
        
        // Names are not unique
        public string Name { get { return name; } }
    
        public override bool Equals(object obj)
        {
            if (obj == null || obj.GetType() != typeof(Person))
                return false;
            // Names cannot be used to identify people unlike 
            // social security number
            return ssn == ((Person) obj).ssn;
        }
    }

    Keeping related data and behavior in one place is as important as not letting not related sets of data/behavior to be put into the same class. Most of the methods defined on a class should be using most of the data members most of the time.

    Many of you started to work still being a student. Employer needs your abilities to work and most likely he isn’t interested in that you still studying.

    // Smart student - works and studies
    class Student
    {
        private string name;
        private int knowledge;
        private Func<Course, bool> preferences;
        private int experience;
    
        public string Name
        {
            get { return name;}
        }
    
        public void Study()
        {
            knowledge++;
        }
    
        public void Enlist(IEnumerable<Course> courses)
        {
            // Select appropriate courses and enlist
            foreach (var course in courses.Where(preferences))
                course.Enlist(name);
        }
    
        public void Work()
        {
            experience++;
        }
    
        public void Sign(Contract contract)
        {
            // sign job contract with your name
            contract.Agree(name);
        }
    }

    This class clearly has more than one responsibility and the fact that study related methods and work related methods both operate on a subset of data shows that. However separation can solve the problem.

    class Student
    {
        private Person person;
        private int knowledge;
        private Func<Course, bool> preferences;
    
        public Student(Person newStudent)
        {
            person = newStudent;
        }
    
        public void Study()
        {
            knowledge++;
        }
    
        public void Enlist(IEnumerable<Course> courses)
        {
            // Select appropriate courses and enlist
            foreach(var course in courses.Where(preferences))
                course.Enlist(person.Name);
        }
    }
    
    class Employee
    {
        private Person person;
        private int experience;
    
        public Employee(Person newEmployee)
        {
            person = newEmployee;
        }
    
        public void Work()
        {
            experience++;
        }
    
        public void Sign(Contract contract)
        {
            // sign job contract with person's name
            contract.Agree(person.Name);
        }
    }

    What connections between objects it maintains?

    What is connection between objects anyway? For example, a document can be seen a set of glyphs (text, graphics, structural elements like columns and rows). The connection between them is the fact they belong to the same document. So basically the document maintains connection between glyphs. So what must attract our attention in this area to spot Single Responsibility Principle violations?

    If the object maintains connections between objects of different abstraction levels it is highly likely that it does someone’s job.

    Consider an object model where a man wants to build a house. Man class maintains connection between tasks (got from an architect) and workers to build a house.  And this where different abstraction levels (from domain model perspective) meet – a man hired foreman who is supposed to manage construction but it so happens that a man wants to control everything and treats hired foreman as a simple worker giving him tasks to do in not necessarily correct order. Basically a man got foreman’s responsibility (or in other words foreman abstraction isn't consistent).

    abstract class Task
    {
        public bool Completed { get; private set; }
    
        public virtual void Do()
        {
            DoCore();
            Completed = true;
        }
    
        protected abstract void DoCore();
    }
    
    class Worker
    {
        public virtual bool CanDo(Task task)
        {
            // Can do everything worker =)
            return true;
        }
    
        public virtual void Accept(Task task)
        {
            task.Do();
        }
    }
    
    class Foreman : Worker
    {
        IEnumerable<Worker> workers;
        Func<Task, bool> next;
    
        public override bool CanDo(Task task)
        {
            // Can do things only on time or otherwise the house 
            // won't stand long
            return next(task);
        }  
    
        public override void Accept(Task task)
        {
            // Foremen looks for a worker who can do the task
            workers.Where(w => w.CanDo(task)).First().Accept(task);
        }
    }
    
    // A man who tries to manage his new house construction
    class Man
    {
        IEnumerable<Task> construction;
        Foreman foremen;
    
        public void BuildHouse()
        {
            IEnumerable<Task> tasks;
            // As far as we don't know what to do next we'll ask foreman
            while ((tasks = construction.Where(t => foremen.CanDo(t) && !t.Completed)).Any())
            {
                foreach (var task in tasks)
                {
                    foremen.Accept(task);
                }
            }
        }
    }

    A man must let foreman to manage tasks or become a foreman himself (we’ll take first approach or otherwise the house won't stand long).

    class Foreman
    {
        IEnumerable<Worker> workers;
        Func<Task, bool> next;
        
        public void Manage(IEnumerable<Task> construction)
        {
            IEnumerable<Task> tasks;
            // Foreman selects tasks that are not completed and must be done next
            while((tasks = construction.Where(t => !t.Completed && next(t))).Any())
            {
                foreach (var task in tasks)
                {
                    workers.Where(w => w.CanDo(task)).First().Accept(task);
                }
            }
        }
    }
    
    class Man
    {
        IEnumerable<Task> construction;
        Foreman foremen;
    
        public void BuildHouse()
        {
            // Foreman takes the whole construction to manage
            foremen.Manage(construction);
            // Just check that everything is done
            if (construction.Where(t => !t.Completed).Any())
            {
                throw new InvalidOperationException("Something isn't done!");
            }
        }
    }

    Now abstractions are leveled - man allows construction to be managed by foreman (foreman abstraction is now consistent).

    What it decides?

    It is quite uncomfortable when someone asks you for your opinion and then makes his own mind and tells you what and how to do. Why don’t tell what to do in the first place and let me decide how to do it. Isn’t that looking like you are doing my job? This is what Tell Don’t Ask Principle is about:

    As the caller, you should not make decisions based on the called object’s state and then change the object’s state.

    Here is couple of teenager definitions – which do you think makes more sense?

    class Teenager
    {
        private int age;
        private List<string> clothes;
    
        // Is it age that drives your clothes preferences?
        public int Age { get { return age; } }
    
        // Do you really wan't your parents to dress you 
        // just based on your age?
        public void TakeNew(string clothing)
        {
            clothes.Add(clothing);
        }
    }
    
    // ... or
    
    class Teenager
    {
        private List<string> clothes;
        // Preference is something personal
        private Func<string, bool> preference;
    
        // And you want to able to select clothes based on your 
        // preferences?
        public bool SelectNew(IEnumerable<string> shop)
        {
            var clothing = shop.Where(preference).FirstOrDefault();
            if (clothing != null)
            {
                // Got something you like
                clothes.Add(clothing);
            }
            // Tell your parents whether you need to go to other shop =)
            return clothing != null;
        }
    }

    By violating Tell Don’t Ask principle caller gains responsibility on make decision instead of the called one. Busted!

    What services it performs?

    Service provider can do the job by itself or ask collaborators for help. Keeping dependencies on collaborators explicit facilitate controlled class growth. Classes should not contain more objects than a developer can fit in his or her short-term memory (5-7). Otherwise it is possible to introduce responsibilities into class which are either duplicated or not related. Whenever number of collaborators growth more than 5-7 you should consider whether all collaborators still help you to form single abstraction or a new one was introduced.

    On the other hand it makes sense to look at consumers and in particular on how they consume supplied services. If consumers use different subsets of provided services it may be a sign that class captures more than one responsibility.

    Recall the student example. Original class has two consumers: employer and university (for example). Both consumers were interested in a subset of provided methods (working and studying related respectively). And as we discovered the class captured two abstractions: employee and student.

    Summary:

    • DO pay careful attention to responsibilities assignment
    • CONSIDER decomposing class into areas (what it knows, maintains, does and decides) and performing assessment in each area
  • Chasing state of the art

    Inject or locate dependencies?

    • 0 Comments

    Inversion of Control pattern allows to decouple components (consumers) from their dependencies and takes care of dependencies location and lifetime management through delegation of these responsibilities to external (with respect to dependent type) component. This pattern actively used in composite application development.

    Inversion of Control comes in two flavors (Unity provides both capabilities):

    Service locator holds references to services and knows how to locate them. It is further used by dependent component to obtain necessary services. In other words consumers play active role.

    interface IService
    {
        void Do();
    }
    
    class ActiveConsumer
    {
        private readonly IUnityContainer m_locator;
    
        // Reference to service locator comes from outside
        public ActiveConsumer(IUnityContainer serviceLocator)
        {
            m_locator = serviceLocator;
        }
    
        public void Do()
        {
            // In order to fulfill its task active consumer relies 
            // on service implementation that is obtained on demand 
            // from service locator
            var service = m_locator.Resolve<IService>();
            service.Do();
        }
    }

    Dependency injection makes dependent components passive (little or no work is done to get its dependencies). The only responsibility consumers still care about is to express their dependencies somehow (the way dependencies are expressed depends on pattern implementation, but for this example we will use single constructor automatic injection supported by Unity).

    class PassiveConsumer
    {
        private readonly IService m_service;
    
        // This constructor is used to inject service dependency
        public PassiveConsumer(IService svc)
        {
            m_service = svc;
        }
    
        public void Do()
        {
            // We got this dependency from outside and done nothing 
            // to let it happen - so just use it
            m_service.Do();
        }
    }
    
    ...
    
    // At this point container resolves consumer's dependency 
    // and injects it during construction 
    var passiveConsumer = container.Resolve<PassiveConsumer>();
    passiveConsumer.Do();

    So what is the difference?

    First, is dependency from service locator appropriate? If the component in question is supposed to be reused by others you may end up with putting unnecessary constraints (for example you are using some open source service locator but developers that could reuse your component are not allowed to use any open source stuff due to customer’s demand and thus won’t be able to reuse the component).

    Second, dependencies visibility. Service locator makes consumer’s “real” dependencies hidden and dependency from service locator itself visible. When dependencies are explicit it is much easier to understand dependent class. Explicit dependencies allows you to assess and control the growth of the component. For example, if your component accepts 10 services in its constructor it may be a sign that it does, or knows or decides too much and it is time to split it. Consider the same thing when using service locator. In order for you to spot number of dependencies you need to look for all unique usage occurrences of service locator. It is not that hard with modern IDE but still it is not that easy as looking at component’s contract.

    On the other hand, it makes sense to consider the audience of the component. If it will be reused by others and dependencies are hidden it may require deep knowledge of component’s inner workings in order to use it.

    Third, consumer’s relation with dependency. Dependency injection promotes constant relations (from lifetime perspective). Consumer obtains its dependency at construction time and lives with it. On the other hand service locator compasses to temporary relations – get service instance when it is time, call its methods, discard it. Why discard? Because if the component has a constant relation why not use dependency injection otherwise which gives you explicit dependencies?

    But anyway, what particular case forces locator usage? When consumer has longer lifetime than its dependency. For example, you are writing smart client application. You organized presentation layer using Model-View-Presenter pattern. Presenter calls remote service in response to user interaction. View controlled by a presenter can be opened for a long time. If presenter gets remote service proxy dependency only once it may happen that proxy will go into faulted state (for example, due to network connectivity problems) and any subsequent calls to it will result in exception. So it is better to dispose proxy every time a particular task accomplished and create new one when new task is on the way or cache it and in response to proxy going faulted create a new one (which is of course harder as long as you need to handle all cases where it used and maintain cache). Thus it seems that service locator is more appropriate in this case.

    However we can make short lived dependencies explicit. Let’s assume that IService implementation instance must be disposed every time it is used.

    interface IService : IDisposable
    {
        void Do();
    }
    
    // This is still active consumer as it uses service locator to get service instance
    class ActiveConsumer
    {
        private readonly IUnityContainer m_locator;
    
        public ActiveConsumer(IUnityContainer serviceLocator)
        {
            m_locator = serviceLocator;
        }
    
        public void Do()
        {
            using (var service = m_locator.Resolve<IService>())
            {
                service.Do();
            }
        }
    }

    Service locator has wide surface (it terms of services it can provide) and this makes consumer’s contract opaque. What we need to do is narrow the surface but still provide ability to create service instances (as long as we need to dispose them every time). Abstract factory will do the thing. Factory provides clear connection with service it creates. On the other hand we need to make consumer’s dependency from factory explicit. We will use dependency injection.

    interface IServiceFactory
    {
        IService CreateService();
    }
    
    // Consumer is no longer active as it gets its dependencies from outside
    class PassiveConsumer
    {
        private readonly IServiceFactory m_factory;
    
        // The dependency is now explicit
        public PassiveConsumer(IServiceFactory serviceFactory)
        {
            m_factory = serviceFactory;
        }
    
        public void Do()
        {
            // We still can create service instances on demand
            using (var service = m_factory.CreateService())
            {
                service.Do();
            }
        }
    }

    How about this? That is not all. Factory clearly and explicitly states the relation between dependent component and dependency (service that is created by factory) – it is a temporary relation (as long as it provides ability to create new instances).

    Summary:

    • DO make dependencies explicit
    • DO use dependencies to assess and control growth of the dependent component
    • CONSIDER component audience when choosing between dependency injection and service locator
    • CONSIDER using abstract factory pattern and dependency injection to make short lived dependencies (in comparison with dependent component lifetime) explicit
  • Chasing state of the art

    Sleeping barber synchronization problem

    Sleeping barber problem is a classic synchronization problem proposed by Dijkstra that goes as follows:

    A barbershop consists of a waiting room with n chairs, and the
    barber room containing the barber chair. If there are no customers
    to be served, the barber goes to sleep. If a customer enters the
    barbershop and all chairs are occupied, then the customer leaves
    the shop. If the barber is busy, but chairs are available, then the
    customer sits in one of the free chairs. If the barber is asleep, the
    customer wakes up the barber. Write a program to coordinate the
    barber and the customers.

    Back in the days when Dijkstra proposed this problem (it was in 1965) probably rhythm of life was hasteless and barbers had chance to sleep at work and customers could wait until he woke up. 

    Most of the solutions use some sort of WaitHandle to do the trick. For example, classic solution is based on semaphores. Waiting on wait handles is not free.

    But now let’s assume that analogy for this problem a barber that desperately needs customers so he is running around in circles while waiting impatiently when there are no customers. Customers are also quite busy so if the waiting queue is empty they want to be served immediately otherwise they go from corner to corner while waiting.

    I call this problem “crazy barber”. From the problem statement it follows that we should avoid using any “sleeping” mechanisms to do the synchronization.

    Haircut we represent through an Action that barber must execute and thus it cannot be null.

    Number of chairs in the waiting room is known in advance and never changes. Waiting queue can not overflow because any customer that sees no free chairs turns around and leaves barbershop. So we can represent waiting queue as circular array of fixed size. Two indices are used to represent it. Head points to next request to be serviced if not null. Tail points to next free slot where request can be put.

    Barber waits next in line (the one head index points to) non null request to get serviced. To mark slot as free for itself it nullifies it once obtained reference to request. Next head index is advanced to make this slot available for use by customers. Once it completed with execution it notifies waiting customer that it is free to go by changing done flag.

    Customer first checks if there are free slots. Then it competes with other customers for a free slot (the one tail index points to). If successful it puts request that combines action and done flag into waiting queue array with the index value of just advanced tail. Once successfully queued request customer waits until value in the  done flag is not changed.

    Here goes “crazy barber”.

    class CrazyBarber
    {
        private readonly int m_capacity;
        // Circular array that holds queued items
        private volatile Request[] m_queue;
        // Points to next free slot
        private volatile int m_tail;
        // Points to a slot where next item to be executed 
        // is expected
        private volatile int m_head;
    
        public CrazyBarber(int capacity)
        {
            m_capacity = capacity;
            m_queue = new Request[m_capacity];
        }
    
        // Queues action for execution if there is free slot and 
        // waits for its execution completion
        public bool TryGetExecuted(Action action)
        {
            if (action == null)
                throw new ArgumentException();
    
            var waitToEnq = new SpinWait();
            while (true)
            {
                // Load tail first as if it will change compare and 
                // swap below will fail anyway
                var tail = m_tail;
                // Now load head and this is the linearization point 
                // of full queue case which results in unsuccessful 
                // attempt
                var head = m_head;
                // Check if queue has some free slots
                if (tail - head >= m_capacity)
                    // The queue is full, no luck
                    return false;
                // Create request before interlocked operation as 
                // it implies full barrier and thus will prevent
                // partially initialized request to be visible to 
                // worker loop
                var request = new Request { m_action = action };
                // Compete for the tail slot
                if (Interlocked.CompareExchange(ref m_tail, tail + 1, tail) != tail)
                {
                    // We lost due to contention, spin briefly and 
                    // retry
                    waitToEnq.SpinOnce();
                    continue;
                }
                var index = tail % m_capacity;
                // Here is the linearization point of successfull 
                // attempt
                m_queue[index] = request;
                
                var waitToExe = new SpinWait();
                // Wait until enqueued action is not executed
                while (!request.m_done)
                    waitToExe.SpinOnce();
    
                return true;
            }
        }
    
        // Runs single worker loop that does the execution and
        // must not be called from multiple threads
        public void Run(CancellationToken cancellationToken)
        {
            var waitToDeq = new SpinWait();
            while (true)
            {
                var head = m_head;
                var index = head % m_capacity;
                waitToDeq.Reset();
                // Though array field is marked as volatile access 
                // to its elements are not treated as volatile 
                // however its enough to make sure loop condition 
                // is not optimized 
                // Wait until new item is available or cancellation 
                // is requested
                while (m_queue[index] == null)
                {
                    if (cancellationToken.IsCancellationRequested)
                        return;
                    waitToDeq.SpinOnce();
                }
                // Get request to be serviced and nullify it to 
                // mark slot as free for yourself
                var request = m_queue[index];
                m_queue[index] = null;
                // As there is only one worker advance head without
                // interlocked and here is the linearization point 
                // of making free slot
                m_head = head + 1;
                // Do not call TryGetExecuted from action or it will 
                // deadlock
                request.m_action();
                // Make sure ready notification is not made visible 
                // through reordering before action is completed
                // and store release guarantees that here
                request.m_done = true;
            }
        }
    
        class Request
        {
            internal Action m_action;
            internal volatile bool m_done;
        }
    }

    Although tail index at some point will overflow let’s assume “crazy” barber won’t try to make around 2 billion haircuts =).

  • Chasing state of the art

    Concurrent Object Pool

    Object pool is a set of initialized objects that are kept ready to use, rather than allocated and destroyed on demand.

    Wikipedia

    Object pool usage may get performance improvement in case pooled object initialization cost and frequency of instantiation are high and at any period in time number of used objects is low (ThreadPool is a good example).

    There are many questions to take into account when designing object pools. How to handle acquire request when there are no free objects? In single threaded scenarios you may choose to create new object or let the caller know that the request cannot be fulfilled. In multithreaded scenarios you may choose to wait for free objects until other threads release them. How to organize access to pooled objects? Based on your logic you may choose to use most recently used first strategy, or least recently or even random first. Besides that you need to provide synchronized access to internal object storage.   

    Let’s assume that we made decision on handling empty pool case (object pool growth strategy) and access to pooled objects (most recently used first approach) and now focused on minimizing synchronization costs to internal object storage.

    The simplest way of doing it will be to protect any access with a lock. That will work however under high contention it may result in lock convoys (in most cases time spent within the lock will be pretty short necessary only to take ready to use object). We may use lock-free data structure as storage such as ConcurrentStack<T>. Or we may try to reduce contention.

    The idea is somewhat similar to the algorithm used in ThreadPool where each worker thread in addition to global work item queue maintains local work item double ended queue (where one end is exposed to the owner and the other to worker threads that may want to steal work items if global queue is empty). Each worker thread puts newly created work items into its local queue to avoid synchronization costs. When worker thread is ready to process next work item it first tries to dequeue from its local queue, if fails it tries to get work item from global queue and lastly resorts to stealing from other threads (check out cool series of articles from Joe Duffy on building custom thread pool). 

    In order to build object pool we will maintain local per thread storage of pooled objects in addition to global storage. Pooled objects are stored in segments of a selected size (depends on usage scenarios so it must be specified during object pool creation). Global storage holds set of segments. In order to acquire pooled object thread must:

    1. Get local segment (if any)
    2. If no local segment present or it is empty try to get segment out of global storage (if any)
    3. If global storage is empty create new segment
    4. Update local segment
    5. Get item from local segment and return it

    Returning object look like this:

    1. Put item into local segment
    2. If segment has grown beyond threshold split it into two parts and one of them put back to global storage.

    Thus most of the time thread will work with its own local set of pooled objects. However it has its own cost. If for example a different thread comes and acquires pooled object and then never again use pooled objects we’ll get orphaned segment. So as usual it is a matter of tradeoff. If you have a limited set of threads that will work with object pool for a long time usage of this approach can be justified. If however you have either many threads that work with object pool for short periods of time and then never come back you’d probably better look into other approaches.

    // Represents thread safe object pool
    public class ConcurrentObjectPool<T>
    {
        private readonly int m_segmentSize;
        // Thread local pool used without synchronization 
        // to reduce costs
        private ThreadLocal<Segment> m_localPool = new ThreadLocal<Segment>();
        // Global pool that is used once there is nothing or 
        // too much in local pool
        private readonly ConcurrentStack<Segment> m_globalPool = new ConcurrentStack<Segment>();
        // Factory function that is used from potentionally multiple 
        // threads to produce pooled objects and thus must be thread 
        // safe
        private readonly Func<T> m_factory;
    
        public ConcurrentObjectPool(Func<T> factory, int segmentSize)
        {
            m_factory = factory;
            m_segmentSize = segmentSize;
        }
    
        // Acquires object from pool
        public PoolObject<T> Acquire()
        {
            var local = m_localPool.Value;
                
            T item;
            // Try to acquire pooled object from local pool
            // first to avoid synchronization penalties
            if (local != null && local.TryPop(out item))
                return new PoolObject<T>(item, this);
            // If failed (either due to empty or not yet 
            // initialized local pool) try to acquire segment
            // that will be local pool from global pool
            if (!m_globalPool.TryPop(out local))
            {
                // If failed create new segment using object 
                // factory
                var items = Enumerable.Range(0, m_segmentSize)
                    .Select(_ => m_factory());
                local = new Segment(m_segmentSize, items);
            }
            m_localPool.Value = local;
            // Eventually get object from local non-empty pool    
            local.TryPop(out item);
            return new PoolObject<T>(item, this);
        }
    
        // Releases pooled ojbect back to the pool however 
        // it is accessible publicly to avoid multiple releases
        // of the same object
        internal void Release(T poolObject)
        {
            var local = m_localPool.Value;
            // Return object back to local pool first 
            var divided = local.Push(poolObject);
            // If local pool has grown beyond threshold 
            // return extra segment back to global pool
            if (divided != null)
                m_globalPool.Push(divided);
        }
    
        // Represents chunk of pooled objects
        class Segment
        {
            private readonly int m_size;
            // Using stack to store pooled objects assuming 
            // that hot objects (recently used) provide better 
            // locality
            private readonly Stack<T> m_items;
    
            public Segment(int size, IEnumerable<T> items)
            {
                m_size = size;
                m_items = new Stack<T>(items);
            }
    
            public bool TryPop(out T item)
            {
                item = default(T);
                // Pop item if any available
                if (m_items.Count > 0)
                {
                    item = m_items.Pop();
                    return true;
                }
                return false;
            }
    
            public Segment Push(T item)
            {
                m_items.Push(item);
                // If current segment size is still smaller
                // than twice of original size no need to split
                if (m_items.Count < 2 * m_size)
                    return null;
                // Otherwise split current segment to get it 
                // pushed into global pool
                var items = Enumerable.Range(0, m_size)
                    .Select(_ => m_items.Pop());
                return new Segment(m_size, items);
            }
        }
    }
    
    // Represents disposable wrapper around pooled object 
    // that is used to return object back to the pool
    public class PoolObject<T> : IDisposable
    {
        private readonly ConcurrentObjectPool<T> m_pool;
        private bool m_disposed;
    
        private readonly T m_value;
    
        // Get reference to the pool to return value to
        public PoolObject(T value, ConcurrentObjectPool<T> pool)
        {
            m_value = value;
            m_pool = pool;
        }
    
        public T Value
        {
            get
            {
                // Make sure value can't be obtained (though we can't 
                // guarantee that it is not used) anymore after it is
                // released back to the pool
                ThrowIfDisposed();
                return m_value;
            }
        }
    
        public void Dispose()
        {
            if (m_disposed)
                return;
            // As we are disposing pooled object disposal basically 
            // equivalent to returning the object back to the pool
            m_disposed = true;
            m_pool.Release(m_value);
        }
    
        private void ThrowIfDisposed()
        {
            if (m_disposed)
                throw new ObjectDisposedException("Pool object has been disposed");
        }
    }

    And it is used somewhat like this

    // Initialized elsewhere
    ConcurrentObjectPool<T> pool = ...
    ...
    using (var obj = pool.Acquire())
    {
        // Use pooled object value
        Process(obj.Value);
    }

    I omitted IDisposable implementation on the object pool to make code simpler. However in order to implement it we will need to track all segments in a separate collection as otherwise thread local segments won’t be accessible from the thread disposing the pool.  

  • Chasing state of the art

    Concurrent set based on sorted singly linked list

    Set is an abstract data structure that can store certain values, without any particular order, and no repeated values. Static sets that do not change with time, and allow only query operations while mutable sets allow also the insertion and/or deletion of elements from the set.

    Wikipedia

    Though set definition says it doesn’t imply particular of values it is referred to how consuming code treats set. One legitimate way (though not the most efficient one) to implement set is to use sorted singly linked list that will allow us to eliminate duplicate values. Other ways include but not limited to self-balancing binary search tree, skip lists or hash table.

    As the title says we are about to explore algorithm for concurrent set. Taking this into account we can use ConcurrentDictionary<TKey, TValue> to implement concurrent set. Assuming it is done =) let’s take a look at other options. At this point .NET Framework has to support for concurrent self-balancing binary search tree or skip lists and building those is quite tricky. On the other hand concurrent sorted singly linked list is still feasible solution. This well known algorithm contains useful techniques.

    For mutable sets at least the following operations must be supported: add, remove, contains. The simplest way to do this is to wrap any list modifications with lock leading to coarse-grained synchronization. However under high contention this single lock will become a bottleneck taking into account that all operations has O(n) time complexity and thus serializing them will lead to significant performance hit.

    Consider list with the following contents: 1->3->4->6->7. Operations Add(2) and Add(5) even when run concurrently do not interfere as they modify distinct areas of the list 1->(2)->3->4->(5)->6->7. Add operation affects two consequent nodes where new node must be added in between. The same is true for remove operation (it affects two consequent nodes: the node to be removed and its predecessor).These nodes will be used as sync roots to make thread safe modifications of the list. In order to prevent deadlocks nodes are locked always in the same order (predecessor first and than its successor).

    What if we need to add new node either at the beginning or at the end? In that case we do not have a pair nodes. To work around this the list will contain two sentinel nodes (head and tail sentinels that remain in the list forever and any value is more than value in the head and less than value in the tail). Basically any list looks like h->x0->x1->…->xn->t assuming h contains negative infinity and t contains positive infinity values.

    Let’s assume we have a list: h->1->2->5->6->t. Two threads (A and B) execute operations Add(3) and Add(4) respectively concurrently. Both of them need to add new node between nodes that contain 2 and 5 (h->1->2->(x)->5->6->t). One of them will be first who will succeed in locking both nodes. After nodes are successfully locked it will proceed with adding new node (let’s assume thread A succeeded). Once thread A finished with list modifications the list will be h->1->[2]->4->[5]->6->t yet thread B is trying to lock nodes in brackets. Thread B eventually will succeed in locking but his expectations may not be true anymore as it happens in this case (node that holds value 2 no longer points to node that holds 5).

    Because between the moment a thread starts his attempt to lock pair of nodes and the moment it eventually succeeds in doing so the list can be modified by other thread as it tries to lock two nodes which is not atomic. Thus after a thread succeeds in locking it must do validation that its expectations are still true.

    However dealing with node removal is even more tricky. Assume we have a list h->1->2->5->6->t and thread A attempts to Add(3) concurrently with thread B trying to Remove(2) or Remove(5) and thread B succeeds to be first. In that case once thread A will lock nodes that contain 2 and 5 it may observe list that is now h->1->5->6->t or h->1->2->6->t meaning of the locked nodes is no longer in the list and adding new value in between will lead to lost value (if predecessor was removed) or resurrected value (if successor was removed and now new nodes points to it).

    Thus once a thread succeeds in locking both nodes it must check that locked nodes are still not removed from the list and predecessor still points to its observed previously successor. If the validation fails the operation must fallback and start again.

    Well if the node is removed from the list how do we know that? One way to make removed node’s next reference null. However this is a bad idea because in that case other thread traverses list without locks (and this is deliberate behavior) may observe unexpected null reference. For example, assume we have a list h->1->2->3->6->t. Thread A tries to Add(4) while thread B tries to Remove(2). Assume that thread A while searching for 3->6 pair of nodes (to add 4 in between) moved its current reference to the node that contains 2 and it was preempted. Then thread B succeeds in Remove(2). If thread B set removed node next reference to null once thread B will wake up it will miserably fail though there are still sought nodes in the list. Thus list node’s next reference must always be non-null.

    So how do we remove nodes. We must preserve next pointers even if the node is removed. Thus the node is simply marked as removed and then its predecessor’s next is updated. Thus a node to remove is no longer reachable but still points other list nodes. Thus any traverses in-progress won’t break. From memory perspective we rely on garbage collector to reclaim memory occupied by non reachable nodes. 

    Because of the mechanism chosen for nodes remove we can safely traverse the list with no locks. And this is true for all three operations. Contains operation just need to validate found nodes.

    Now let’s code the thing.

    public class ConcurrentSet<T>
    {
        private readonly IComparer<T> m_comparer;
        // Sentinel nodes
        private readonly Node m_head;
        private readonly Node m_tail;
    
        public ConcurrentSet(IComparer<T> comparer)
        {
            m_comparer = comparer;
            // Sentinel nodes cannot be removed from the
            // list and logically contain negative and 
            // positive infinity values from T
            m_tail = new Node(default(T));
            m_head = new Node(default(T), m_tail);
        }
    
        // Adds item to the set if no such item 
        // exists
        public bool Add(T item)
        {
            // Continue attempting until succeeded 
            // or failed
            while (true)
            {
                Node pred, curr;
                // Find where new node must be added
                Find(item, out pred, out curr);
                // Locks nodes starting from predecessor
                // to synchronize concurrent access
                lock (pred)
                {
                    lock (curr)
                    {
                        // Check if found nodes still
                        // meet expectations
                        if (!Validate(pred, curr))
                            continue;
                        // If the value is already in the
                        // set we are done
                        if (Equal(curr, item))
                            return false;
                        // Otherwise add new node
                        var node = new Node(item, curr);
                        // At this point new node becomes
                        // reachable
                        pred.m_next = node;
                        return true;
                    }
                }
            }
        }
    
        // Removes item from the list if such item
        // exists
        public bool Remove(T item)
        {
            // Continue attempting until succeeded 
            // or failed
            while (true)
            {
                Node pred, curr;
                // Find node that must be removed and 
                // its predecessor
                Find(item, out pred, out curr);
                // Locks nodes starting from predecessor
                // to synchronize concurrent access
                lock (pred)
                {
                    lock (curr)
                    {
                        // Check if found nodes still
                        // meet expectations
                        if (!Validate(pred, curr))
                            continue;
                        // If the value is not in the set 
                        // we are done
                        if (!Equal(curr, item))
                            return false;
                        // Otherwise mark node as removed 
                        curr.m_removed = true;
                        // And make it unreachable
                        pred.m_next = curr.m_next;
                        return true;
                    }
                }
            }
        }
    
        // Checks if given item exists in the list
        public bool Contains(T item)
        {
            Node pred, curr;
            Find(item, out pred, out curr);
    
            return !curr.m_removed && Equal(curr, item);
        }
    
        // Searches for pair consequent nodes such that 
        // curr node contains a value equal or greater 
        // than given item
        void Find(T item, out Node pred, out Node curr)
        {
            // Traverse the list without locks as removed
            // nodes still point to other nodes
            pred = m_head;
            curr = m_head.m_next;
            while (Less(curr, item))
            {
                pred = curr;
                curr = curr.m_next;
            }
        }
    
        static bool Validate(Node pred, Node curr)
        {
            // Validate that pair of nodes previously 
            // found still meets the expectations 
            // which essentially is checking whether 
            // nodes still point to each other and no one
            // was removed from the list
            return !pred.m_removed &&
                    !curr.m_removed &&
                    pred.m_next == curr;
        }
    
        bool Less(Node node, T item)
        {
            return node != m_tail &&
                    m_comparer.Compare(node.m_value, item) < 0;
        }
    
        bool Equal(Node node, T item)
        {
            return node != m_tail &&
                    m_comparer.Compare(node.m_value, item) == 0;
        }
    
        class Node
        {
            internal readonly T m_value;
            internal volatile Node m_next;
            internal volatile bool m_removed;
    
            internal Node(T value, Node next = null)
            {
                m_value = value;
                m_next = next;
            }
        }
    }

    This algorithm uses fine grained synchronization that improves concurrency and with lazy nodes removal allows us to traverse list with no locks at all. This is quite important as usually Contains operation is used more frequent than Add and Remove.

    Hope this pile of bits makes sense to you =).

  • Chasing state of the art

    Merge binary search trees in place

    Binary search tree is a fundamental data structure that is used for searching and sorting. It also common problem to merge two binary search trees into one.

    The simplest solution to do this is to take every element of one tree and insert it into the other tree. This may be really inefficient as it depends on how well target tree is balanced and it doesn’t take into account structure of the source tree.

    A more efficient way of doing this is to use insertion into root. Assuming we have two trees A and B we insert root of tree A into tree B and using rotations move inserted root to become new root of tree B. Next we recursively merge left and right sub-trees of trees A and B. This algorithm takes into account both trees structure but insertion still depends on how balanced target tree is.

    We can look at the problem from a different perspective. Binary search tree organizes its nodes in sorted order. Merging two trees means organizing nodes from both trees in sorted order. This sounds exactly like merge phase of merge sort. However trees cannot be directly consumed by this algorithm. So we need to convert them into sorted singly linked lists first using tree nodes. Then merge lists into a single sorted linked list. This list gives us sorted order for sought tree. This list must be converted back to tree. We got the plan, let’s go for it.

    In order to convert binary search tree into sorted singly linked list we traverse tree in order converting sub-trees into lists and appending them to the resulting one.

    // Converts tree to sorted singly linked list and appends it 
    // to the head of the existing list and returns new head.
    // Left pointers are used as next pointer to form singly
    // linked list thus basically forming degenerate tree of 
    // single left oriented branch. Head of the list points 
    // to the node with greatest element.
    static TreeNode<T> ToSortedList<T>(TreeNode<T> tree, TreeNode<T> head)
    {
        if (tree == null)
            // Nothing to convert and append
            return head;
        // Do conversion using in order traversal
        // Convert first left sub-tree and append it to 
        // existing list
        head = ToSortedList(tree.Left, head);
        // Append root to the list and use it as new head
        tree.Left = head;
        // Convert right sub-tree and append it to list 
        // already containing left sub-tree and root
        return ToSortedList(tree.Right, tree);
    }

    Merging sorted linked lists is quite straightforward.

    // Merges two sorted singly linked lists into one and 
    // calculates the size of merged list. Merged list uses 
    // right pointers to form singly linked list thus forming 
    // degenerate tree of single right oriented branch. 
    // Head points to the node with smallest element.
    static TreeNode<T> MergeAsSortedLists<T>(TreeNode<T> left, TreeNode<T> right, IComparer<T> comparer, out int size)
    {
        TreeNode<T> head = null;
        size = 0;
        // See merge phase of merge sort for linked lists
        // with the only difference in that this implementations
        // reverts the list during merge
        while (left != null || right != null)
        {
            TreeNode<T> next;
            if (left == null)
                next = DetachAndAdvance(ref right);
            else if (right == null)
                next = DetachAndAdvance(ref left);
            else
                next = comparer.Compare(left.Value, right.Value) > 0
                            ? DetachAndAdvance(ref left)
                            : DetachAndAdvance(ref right);
            next.Right = head;
            head = next;
            size++;
        }
        return head;
    }
    
    static TreeNode<T> DetachAndAdvance<T>(ref TreeNode<T> node)
    {
        var tmp = node;
        node = node.Left;
        tmp.Left = null;
        return tmp;
    }

    Rebuilding tree from sorted linked list is quite interesting. To build balanced tree we must know the number of nodes in the final tree. That is why it is calculated during merge phase. Knowing the size allows to uniformly distribute nodes and build optimal tree from height perspective. Optimality depends on usage scenarios and in this case we assume that every element in the tree has the same probability to be sought.

    // Converts singly linked list into binary search tree 
    // advancing list head to next unused list node and 
    // returning created tree root
    static TreeNode<T> ToBinarySearchTree<T>(ref TreeNode<T> head, int size)
    {
        if (size == 0)
            // Zero sized list converts to null 
            return null;
    
        TreeNode<T> root;
        if (size == 1)
        {
            // Unit sized list converts to a node with 
            // left and right pointers set to null
            root = head;
            // Advance head to next node in list
            head = head.Right;
            // Left pointers were so only right needs to 
            // be nullified
            root.Right = null;
            return root;
        }
    
        var leftSize = size / 2;
        var rightSize = size - leftSize - 1;
        // Create left substree out of half of list nodes
        var leftRoot = ToBinarySearchTree(ref head, leftSize);
        // List head now points to the root of the subtree
        // being created
        root = head;
        // Advance list head and the rest of the list will 
        // be used to create right subtree
        head = head.Right;
        // Link left subtree to the root
        root.Left = leftRoot;
        // Create right subtree and link it to the root
        root.Right = ToBinarySearchTree(ref head, rightSize);
        return root;
    }

    Now putting everything together.

    public static TreeNode<T> Merge<T>(TreeNode<T> left, TreeNode<T> right, IComparer<T> comparer)
    {
        Contract.Requires(comparer != null);
    
        if (left == null || right == null)
            return left ?? right;
        // Convert both trees to sorted lists using original tree nodes 
        var leftList = ToSortedList(left, null);
        var rightList = ToSortedList(right, null);
        int size;
        // Merge sorted lists and calculate merged list size
        var list = MergeAsSortedLists(leftList, rightList, comparer, out size);
        // Convert sorted list into optimal binary search tree
        return ToBinarySearchTree(ref list, size);
    }

    This solution is O(n + m) time and O(1) space complexity where n and m are sizes of the trees to merge.

  • Chasing state of the art

    Shared restroom synchronization problem

    Assume you own a bar that have single restroom with n stalls. In order to avoid lawsuits you want to make sure that only people of the same gender can be in the restroom at the same time and no accidents occur (nobody peed their pants). How would you write synchronization algorithm for it?

    Translating into concurrency language we want to build a synchronization algorithm that allow no more than n threads of the same kind to enter critical section and it should be starvation free (threads that are trying to enter critical section eventually enter it).

    The problem can be divided three parts:

    • How to allow only threads of the same kind to enter critical section
    • How to limit number of threads inside critical section to configured number
    • How to make the whole thing starvation free

    Mutual exclusion algorithms have a good property with respect to starvation freedom. Assume you have two starvation free mutual algorithms A and B. Combined in the following way:

    Enter code A

    Enter code B

    Critical section

    Leave code B

    Leave Code A

    they form another starvation free mutual exclusion algorithm.

    Limiting number of threads inside critical section to configured number can be easily solved with SemaphoreSlim (starvation free). Thus we need to solve problem of allowing only threads of the same kind to enter critical section.

    Let’s denote two types of threads: black and white. Assume that thread tries to enter critical section. The following case are possible:

    • No other threads are in the critical section, so it can enter
    • There are threads of the same color in the critical section
      • no threads of the different color are waiting, so it can enter
      • there are waiting threads of the different color, so it cannot enter to prevent starvation of the waiting threads and must wait
    • There are threads of the different color in the critical section so it must wait

    Now to prevent starvation we will switch turn to the different color once group of threads of current color leaves critical section. Turn is set to color that is now allowed to enter critical section. However if no threads are in the critical section a thread may enter if its not its turn (basically it captures the turn).

    class WhiteBlackLock
    {
        private readonly object _sync = new object();
        private readonly int[] _waiting = new int[2];
        private int _turn;
        private int _count;
    
        public IDisposable EnterWhite()
        {
            return Enter(0);
        }
    
        public IDisposable EnterBlack()
        {
            return Enter(1);
        }
    
        private IDisposable Enter(int color)
        {
            lock (_sync)
            {
                if (_waiting[1 - _turn] == 0 && 
                    (_count == 0 || _turn == color))
                {
                    // Nobody is waiting and either no one is in the 
                    // critical section or this thread has the same 
                    // color
                    _count++;
                    _turn = color;
                }
                else
                {
                    // Either somebody is waiting to enter critical 
                    // section or this thread has a different color 
                    // than the ones already in the critical section 
                    // and thus wait with the rest of the same color
                    _waiting[color]++;
                    // Wait until current group 
                    while (_waiting[color] > 0)
                        Monitor.Wait(_sync);
                }
                // Wrap critical section leaving in a disposable to 
                // enable convenient use with using statement
                return new Disposable(this);
            }
        }
    
        private void Leave()
        {
            lock (_sync)
            {
                // Indicate completion
                if (--_count != 0) 
                    return;
                // If this is the last one of the current group make 
                // way for threads of other color to run by switching 
                // turn
                _turn = 1 - _turn;
                // Before threads are awoken count must be set to 
                // waiting group size so that they can properly report 
                // their completion and not change turn too fast
                _count = _waiting[_turn];
                // Indicatet that current group can enter critical 
                // section
                _waiting[_turn] = 0;
                // Wake up wating threads
                Monitor.PulseAll(_sync);
            }
        }
    
        class Disposable : IDisposable
        {
            private readonly WhiteBlackLock _lock;
            private int _disposed;
    
            public Disposable(WhiteBlackLock @lock)
            {
                _lock = @lock;
            }
    
            public void Dispose()
            {
                // Make sure only the first call of allowed multiple 
                // calls leaves critical section
                if (Interlocked.Exchange(ref _disposed, 1) == 0)
                    _lock.Leave();
            }
        }
    }

    In order to avoid lock ownership tracking leaving critical section is represented through disposable.

    var semaphore = new SemaphoreSlim(n);
    var whiteBlackLock = new WhiteBlackLock();
    
    // Worker thread code to enter critical section
    using (whiteBlackLock.EnterWhite())
    {
        semaphore.Wait();
        // Critical section goes here
        semaphore.Release();
    }

    Now your restroom can safely serve the needs of your customers =).

  • Chasing state of the art

    Merge sequences in parallel

    In practice you may find yourself in a situation when you have several sequences of data that you want to drain in parallel and merge the results into a single sequence. This is what Merge combinator for sequences does. Reactive Extensions had it for enumerable and observable sequences however at some point it was decided to no longer support it for enumerable sequences in Reactive Extensions as it may be expressed through the same combinator for observable sequences. 

    I find it quite interesting to implement Merge combinator for enumerable sequences but to make it more interesting let’s change behavior a little bit. The original behavior of the EnumerableEx.Merge was to drain source sequences as fast as workers can and buffer results in a resulting queue until it is consumed by merged sequence enumerator. It can be implemented (maybe this is not the most efficient way but still) using

    • BlockingCollection<T> to synchronize producers that drain original sequences and consumer that merges elements from all sequences into resulting sequence
    • CancellationTokenSource to notify producers that early termination is requested

    Instead let’s change behavior to allow producer proceed with getting next element from source sequence only once previously “produced” element is consumed (which basically means merged sequence enumerator reached it). This is sort of two way synchronization between producer and consumer where consumer cannot proceed unless there are ready elements and producer cannot proceed unless previous element is consumed. It is kind of similar to blocking collection. However in this case there is only one consumer and consumers aren’t blocked because the queue is full but rather until previously enqueued element is dequeued.Let’s code the thing!

    class Consumer<T>
    {
        private readonly IEnumerable<IEnumerable<T>> _sources;
    
        private readonly object _sync = new object();
    
        private readonly Queue<Producer<T>> _globalQueue = new Queue<Producer<T>>();
        private readonly Queue<Producer<T>> _localQueue = new Queue<Producer<T>>();
    
        // No need to mark these fields as volatile as 
        // only consumer thread updates and reads them
        private bool _done;
        private int _count;
    
        private readonly IList<Exception> _exceptions = new List<Exception>();
    
        public Consumer(IEnumerable<IEnumerable<T>> sources)
        {
            _sources = sources;
        }
    
        // Merges sources in parallel
        public IEnumerable<T> Merge()
        {
            Start();
            while (true)
            {
                Wait();
                while (_localQueue.Count > 0)
                {
                    // Use local queue to yield values
                    var producer = _localQueue.Dequeue();
                    // Get the actual value out of ready producer
                    // while simultaneously allowing it to proceed 
                    // and observe early termination request or get 
                    // next value
                    var next = producer.GetNext(!_done);
                    // Using Notificaiton<T> from Rx to simplify
                    // sequence processing
                    if (next.Kind != NotificationKind.OnNext)
                    {
                        _count--;
                        if (next.Kind == NotificationKind.OnError)
                        {
                            // Observed exception leads to early 
                            // termination however merge will allow 
                            // already running (not waiting) 
                            // producers to finish (potentially 
                            // bringing more exceptions to be 
                            // observed) while requesting waiters 
                            // to terminate
                            _done = true;
                            // Store observed exception to be further 
                            // thrown as part of the aggregate 
                            // exception
                            _exceptions.Add(next.Exception);
                        }
                    }
                    else
                    {
                        yield return next.Value;
                    }
                }
                // Loop until all producers finished
                if (_count == 0)
                {
                    // Either normally
                    if (_exceptions.Count == 0)
                        yield break;
                    // Or exceptions were observed
                    throw new AggregateException(_exceptions);
                }
            }
        }
    
        // Notifies consumer of ready elements
        public void Enqueue(Producer<T> producer)
        {
            // Notify consumer of a ready producer
            lock (_sync)
            {
                // Consumer will either observe non-empty 
                // global queue (if it is not waiting already) 
                // or pulse (if it is waiting)
                _globalQueue.Enqueue(producer);
                Monitor.Pulse(_sync);
            }
        }
    
        // Waits for ready elements
        private void Wait()
        {
            lock (_sync)
            {
                // As the only consumer is draining the global queue
                // once an empty queue is observed and consumer is 
                // notified the queue will be non-empty
                if (_globalQueue.Count == 0)
                    Monitor.Wait(_sync);
                // Copy whatever available to local queue to further 
                // drain without bothering taking a lock
                while (_globalQueue.Count > 0)
                    _localQueue.Enqueue(_globalQueue.Dequeue());
            }
        }
    
        // Starts producers
        private void Start()
        {
            try
            {
                foreach (var source in _sources)
                {
                    var producer = new Producer<T>(this, source.GetEnumerator());
                    Task.Factory.StartNew(producer.Enumerate);
                    _count++;
                }
            }
            catch (Exception ex)
            {
                // If none of producers are started successfully 
                // just rethrow the exception
                if (_count == 0)
                    throw;
                // Some producers started notify them of early 
                // termination
                _done = true;
                // Store observed exception to be further thrown as 
                // part of the aggregate exception
                _exceptions.Add(ex);
            }
        }
    }
    
    class Producer<T>
    {
        private readonly object _sync = new object();
        private readonly Consumer<T> _consumer;
    
        private IEnumerator<T> _enum;
    
        private volatile Notification<T> _next;
        private volatile bool _cont = true;
        private volatile bool _awake;
    
        public Producer(Consumer<T> consumer, IEnumerator<T> enumerator)
        {
            _consumer = consumer;
            _enum = enumerator;
        }
    
        // Drains source sequence
        public void Enumerate()
        {
            try
            {
                // Loop always observes non-null value as 
                // it is initially non-null and everytime consumer 
                // awakes producer it is set non-null value
                while (_cont && _enum.MoveNext())
                {
                    // Set continuation flag to null and wait to be 
                    // awoken by the consumer
                    _awake = false;
                    // Notify consumer of a ready element
                    _next = new Notification<T>.OnNext(_enum.Current);
                    _consumer.Enqueue(this);
    
                    lock (_sync)
                    {
                        // If consumer was pretty quick and producer 
                        // missed the pulse it will observe the awake 
                        // flag and thus won't wait
                        if (!_awake)
                            Monitor.Wait(_sync);
                    }
                }
                _next = new Notification<T>.OnCompleted();
            }
            catch (Exception ex)
            {
                _next = new Notification<T>.OnError(ex);
            }
            finally
            {
                _enum.Dispose();
                _enum = null;
            }
            // Notify consumer that the producer completed 
            _consumer.Enqueue(this);
        }
    
        // Awakes/notifies producer that it can proceed
        public Notification<T> GetNext(bool cont)
        {
            // Store ready element in local variable 
            // because once producer is unleashed below 
            // it may override ready yet not returned element
            var next = _next;
    
            lock (_sync)
            {
                // Set awake flag in case producer miss the pulse 
                _awake = true;
                _cont = cont;
                Monitor.Pulse(_sync);
            }
    
            return next;
        }
    }

    With Producer<T> and Consumer<T> in place Merge combinator can be implemented as:

    static class EnumerableEx
    {
        public static IEnumerable<T> Merge<T>(IEnumerable<IEnumerable<T>> sources)
        {
            Contract.Requires(sources != null);
            return new Consumer<T>(sources).Merge();
        }
    }

    At any given moment at most n elements will be in a global queue where n is the number of source sequences.

    Great exercise!

  • Chasing state of the art

    Longest consecutive elements sequence

    A tiny detail that can be uncovered by looking at a problem on a different angle usually is the key to the solution. So many times I looked at a great API design or problem solution saying “how I didn’t see it”. But sometimes I do see. It was a problem of searching for the longest consecutive elements sequence within unsorted array of integers. For example, in {5, 7, 3, 4, 9, 10, 1, 15, 1, 3, 12, 2, 11} sought sequence is {1, 2, 3, 4, 5}.

    The first thing that comes mind is to sort given array in O(n log n ) and look for longest consecutive elements sequence. Eh, we can do better than that.

    Using bit vector indexed by numbers from original array may not be justified due incomparable solution space and original array size (although time complexity is O(n)).

    Let’s look at the problem more closely. The problem may be reduced to problem of effective range manipulation. Disjoint-set structure or interval trees offer O(n log n) building time complexity. But they do not take into consideration fact that we are dealing with integers. Knowing range boundaries we can definitely say what numbers are in there (for example, [1..3] contains 1, 2, 3). O(1) time complexity for range manipulation operations with O(1) space complexity for each range are the things we are looking for. We can do this using two hash tables:

    • ‘low’ with range start numbers as keys and range end numbers as values
    • ‘high’ with range end numbers as keys and range start numbers as values

    For example, for a range [x..y] the tables will hold low[x] = y and high[y] = x. The algorithm looks the following:

    • Scan original array and for each element:
      • If it already belongs to any range skip it
      • Otherwise create unit size range [i..i]
      • If there is a range next to the right [i+1.. y] merge with it to produce [i..y] range
      • If there is a range next to the left [x..i-1] merge with it as well (either [i..i] or merged [i..y] will be merged with [x..i-1])
    • Scan through created ranges to find out the longest

    The only question left is how to check if an element already belongs to some range as we are keeping only range boundaries in hash tables. Any number is either processed previously and thus in one of the ranges or a new range created out of it (and potentially merged with others). Thus if a number previously seen it is in some range and we do not need to process it. So before scanning original array we can simply remove any duplicates in O(n) time and space.

    class Solver
    {
        public static Tuple<int, int> FindMaxRange(IEnumerable<int> seq)
        {
            // Generate all ranges and select maximum
            return EnumerateRanges(seq)
                .Aggregate((x, y) => Length(x) > Length(y) ? x : y);
        }
    
        public static IEnumerable<Tuple<int, int>> EnumerateRanges(IEnumerable<int> seq)
        {
            var low = new Dictionary<int, int>();
            var high = new Dictionary<int, int>();
    
            // Remove duplicates
            foreach (var val in seq.Distinct())
            {
                // Create unit size range
                low[val] = high[val] = val;
                // Merge [i..i] with [i+1..y]
                var endsWith = MergeWithNext(val, low, high, 1);
                // Merge [i..endsWith] with [x..i-1]
                MergeWithNext(endsWith, high, low, -1);
            }
    
            return low.Select(p => Tuple.Create(p.Key, p.Value));
        }
    
        static int MergeWithNext(int currStart, IDictionary<int, int> low, IDictionary<int, int> high, int sign)
        {
            var currEnd = low[currStart];
            var nextStart = currEnd + sign;
            if (low.ContainsKey(nextStart))
            {
                low[currStart] = low[nextStart];
                high[low[currStart]] = currStart;
                low.Remove(nextStart);
                high.Remove(currEnd);
            }
            return low[currStart];
        }
    
        static int Length(Tuple<int, int> t)
        {
            return t.Item2 - t.Item1;
        }
    }

    The solution has O(n) time and space complexity assuming hash table implementation has O(1) time and space complexity for each operation/element.

  • Chasing state of the art

    Parallel string matching

    String matching is about searching for occurrence (first or all occurrences – it makes difference from parallelization point of view as you shall see soon) of a pattern in a given text.

    This problem naturally fits into data parallelism scenario although details depend on whether we want to find first or all occurrences of a pattern. Parallel searching for all occurrences is simpler as searching space is static (whole text needs to be looked through) in contrast to searching for the first occurrence that may be anywhere in a text (reducing search space improves performance otherwise the solution is either equal to sequential search till first occurrence or parallel search of all occurrences with getting the first one).

    In order to find all pattern occurrences text can be separated into chunks that are processed in parallel. Each chunk overlaps with its immediate neighbor (except for the last one) for no more than length(pattern) - 1 characters to cover the case when pattern occurs in text such that chunk starting position is within pattern.

    public static class ParallelAlgorithms
    {
        // Find all occurrences of a pattern in a text
        public static IEnumerable<int> IndexesOf(this string text, string pattern, int startIndex, int count)
        {
            var proc = Environment.ProcessorCount;
            // Do range partitioning
            var chunk = (count + proc - 1) / proc;
            var indexes = new IEnumerable<int>[proc];
    
            Parallel.For(0, proc, 
                p => 
                {
                    // Define overlapping with its immediate 
                    // neighbor chunk except for the last one
                    var before = p * chunk;
                    var now = Math.Min(chunk + pattern.Length - 1, count - p * chunk);
                    // Sequentially search for patterns occurences 
                    // and store local result
                    indexes[p] = SequentialIndexesOf(text, pattern, startIndex + before, now);
                });
            return indexes.SelectMany(p => p);
        }
    
        static IEnumerable<int> SequentialIndexesOf(string text, string pattern, int startIndex, int count)
        {
            for (var i = 0; i <= count - pattern.Length;)
            {
                var found = text.IndexOf(pattern, startIndex + i, count - i);
                // No pattern occurrences is found till the end
                if (found == -1)
                    break;
                yield return found;
                // Proceed with next to found position
                i = found + 1;
            }
        }
    }

    Now for the first occurrence scenario search space must be reduced minimizing amount of unnecessary work (it will be non-zero in most cases due speculative processing).

    One way to do this is to separate text into chunks and process them in parallel such that if pattern is found in chunk(i) processing of any chunk(j) where j > i is cancelled. Assuming n is the length of a text and p equals to logical processor count in the worst case (when first occurrence of a pattern is at the end of the first chunk) amount of the unnecessary work will be (p - 1)*n/p. On the other hand range partitioning has poor performance when workload is unbalanced.

    What we can do instead is dynamic partitioning where whole text is separated into groups of chunks of the same length within group and chunk length in consequent group is two times large than previous group had. Group size should be set to number of logical processors. Thus, if c denotes chunk of unit size text will be separated like c, c, c, …, cc, cc, cc, …, cccc, cccc, cccc,…  Now this sequence of chunks can be processed in parallel (respecting order) breaking at first found occurrence. Thus in worst case at most p*c amount of unnecessary work will be done.

    This partitioning strategy is used in PLINQ and called chunk partitioning. Although text can be treated as a sequence of characters and chunk partitioning can be used out of the box but that is not what we want as otherwise we will process individual characters rather than text chunks. Instead we’ll produce sequence of chunks manually and using single item partitioner and Parallel.ForEach process them in parallel respecting order.

    public static class ParallelAlgorithms
    {
        // Find first occurrence of a pattern in a text
        public static int IndexOf(this string text, string pattern, int startIndex, int count)
        {
            var minChunkSize = pattern.Length << 5;
            var maxChunkSize = minChunkSize << 3;
            // Create sequence of chunks
            var chunks = PartitionRangeToChunks(startIndex, count, minChunkSize, maxChunkSize, Environment.ProcessorCount);
            // Process chunks in parallel respecting order
            var chunkPartitioner = SingleItemPartitioner.Create(chunks);
            var concurrentBag = new ConcurrentBag<int>();
            Parallel.ForEach(chunkPartitioner,
                (range, loop) =>
                {
                    var start = range.Item1;
                    var length = Math.Min(startIndex + count - start, range.Item2 + pattern.Length - 1);
                    var index = text.IndexOf(pattern, start, length);
                    // No pattern occurrences in this chunk
                    if (index < 0)
                        return;
                    // Store shared result
                    concurrentBag.Add(index);
                    // Let running parallel iterations complete and 
                    // prevent starting new ones
                    loop.Break();
                });
            // Pick first occurrence or indicate no occurrence
            return concurrentBag.Count > 0 ? concurrentBag.Min() : -1;
        }
    
        static IEnumerable<Tuple<int, int>> PartitionRangeToChunks(int start, int count, int minChunkSize, int maxChunkSize, int doubleAfter)
        {
            var end = start + count;
            var chunkSize = Math.Min(minChunkSize, count);
            while (start < end)
            {
                for (var i = 0; i < doubleAfter && start < end; i++)
                {
                    var next = Math.Min(end, start + chunkSize);
                    yield return Tuple.Create(start, next - start);
                    start = next;
                }
    
                chunkSize = Math.Min(maxChunkSize, chunkSize * 2);
            }
        }
    }

    Search in parallel! =)

  • Chasing state of the art

    Parallel graph search

    Graph is a concept used to describe relationships between things where vertices (or nodes) represent things and edges represent relationships. Graph is made up of two finite sets (vertices and edges) and denoted by G = (V, E). Nodes usually are denoted by non-negative integers (for example, graph with 3 nodes with have nodes denoted by 0, 1, 2) and edges are pairs of nodes (s, t) where s is a source and t is a target node.

    Searching for a simple path (path with no repeated vertices) that connects two nodes is one of the basic problems that are solved using graph search. We need to visit each node once (meaning visited nodes tracking is on) to find out is there any path between two given nodes. Time complexity for graph search is O(V).

    Let’s parallelize the thing to speed it up.

    Graph representation is irrelevant for our case so let’s assume implementation of the following public API is in place:

    public class Graph
    {
        // Initializes new graph with given number of nodes and 
        // set of edges
        public Graph(int nodes, IEnumerable<Tuple<int, int>> edges);
        // Gets number of nodes in the graph
        public int Nodes { get; }
        // Gets list of nodes adjacent to given one
        public IEnumerable<int> AdjacentTo(int node);
    }

    Starting with a source node for each node being processed search condition is checked and search is expanded to adjacent nodes that are not yet visited. Search continues until condition is met or no more reachable not visited nodes are left.

    First part of the solution is to process nodes in parallel until no more is left or early termination is requested. The idea is to maintain two work queues for current and next phases. Items from current phase queue are processed in parallel and new work items are added to next phase queue. At the end of phase queues are switched. While this mechanism covers “while not empty” scenario, cooperative cancellation covers early search termination.

    public static class ParallelAlgorithms
    {
        public static void DoWhileNotEmpty<T>(IEnumerable<T> seed, Action<T, Action<T>> body, CancellationToken cancellationToken)
        {
            // Maintain two work queues for current and next phases
            var curr = new ConcurrentQueue<T>(seed);
            var next = new ConcurrentQueue<T>();
            var parallelOptions = new ParallelOptions {CancellationToken = cancellationToken};
            // Until there is something to do
            while (!curr.IsEmpty)
            {
                // Function to add work for the next phase
                Action<T> add = next.Enqueue;
                // Process current work queue in parallel while 
                // populating next one
                Parallel.ForEach(curr, parallelOptions, t => body(t, add));
                // Switch queues and empty next one
                curr = next;
                next = new ConcurrentQueue<T>();
            }
        }
    }

    Now that we have processing mechanism we need to bring visited nodes tracking mechanism. As part of our solution we must find a path that connects two given nodes if one exists. Thus instead of just remembering which nodes were visited we’ll keep track of parent nodes used to visit a node. Unvisited nodes are marked with –1. Once search is finished we can reconstruct path backwards between target and source by following parent links.

    public static class ParallelGraph
    {
        public static int[] Search(this Graph graph, int source, Func<Tuple<int, int>, bool> func)
        {
            const int undef = -1;
            // Start with dummy loop edge
            var seed = Enumerable.Repeat(Tuple.Create(source, source), 1);
            // Mark all nodes as not yet visited
            var tree = Enumerable.Repeat(undef, graph.Nodes).ToArray();
            tree[source] = source;
            // Cancellation token source used to exit graph search 
            // once search condition is met.
            var cancellationTokenSource = new CancellationTokenSource();
            try
            {
                // Until there are reachable not visited nodes 
                ParallelAlgorithms
                    .DoWhileNotEmpty(seed, (edge, add) =>
                {
                    var from = edge.Item2;
    
                    // and search condition is not met
                    if (!func(edge))
                        cancellationTokenSource.Cancel();
    
                    // Try expand search to adjacent nodes
                    foreach (var to in graph.AdjacentTo(from))
                    {
                        // Expand search to not yet visited nodes 
                        // (otherwise if node is expanded and 
                        // then checked during processing we may 
                        // end up with lots of already visited 
                        // nodes in memory which is a problem 
                        // for large dense graphs) marking nodes 
                        // to visit next
                        if (Interlocked.CompareExchange(ref tree[to], from, undef) != undef)
                            continue;
                        add(Tuple.Create(from, to));
                    }
                }, cancellationTokenSource.Token);
            }
            catch (OperationCanceledException ex)
            {
                // Check if exception originated from parallel 
                // processing cancellation request
                if (ex.CancellationToken != cancellationTokenSource.Token)
                    throw;
            }
            // In the end we have spanning tree that connects
            // all the nodes reachable from source
            return tree;
        }
    
        public static IEnumerable<int> FindPath(this Graph graph, int from, int to)
        {
            // Search within graph until reached 'to'
            var tree = Search(graph, from, edge => edge.Item2 != to);
            // Check if there is a path that connects 'from' and 'to'
            if (tree[to] == -1)
                return Enumerable.Empty<int>();
            // Reconstruct path that connects 'from' and 'to'
            var path = new List<int>();
            while (tree[to] != to)
            {
                path.Add(to);
                to = tree[to];
            }
            path.Add(to);
            // Reverse path as it is reconstructed backwards
            return Enumerable.Reverse(path);
        }
    }

    Done!

  • Chasing state of the art

    Joining Microsoft

    Find a job you like and you add five days to every week.

    - H. Jackson Brown, Jr.

    I decided to join Microsoft in attempt to make my week longer. Lot’s of things to learn and challenging problems to solve. But this is where interesting topics come from.

    I will continue chasing state of the art soon. Now I need to get materialized in Seattle area =).

  • Chasing state of the art

    Parallel merge sort

    Divide and conquer algorithm solves the problem by:

    1. dividing problem into two or more smaller independent sub-problems of the same type of problem
    2. solving each sub-problem recursively
    3. and combining their results.

    Sub-problem independency makes divide and conquer algorithms natural for dynamic task parallelism. Quicksort is a good example (actually you can find its parallel version here). We’ll focus on parallel merge sort as it better parallelizes. Basically the term parallelism means ratio T1/Ti where T1 is the execution time on a single processor and Ti is the execution on infinite number of processors.

    Let’s implement sequential merge sort first as parallel merge sort will use it. In general merge sort in terms of divide and conquer defined as follows:

    1. If the array is of size 0 or 1 it is already sorted; otherwise divide it into two sub-arrays of about half the size
    2. recursively sort each sub-array
    3. and merge sorted sub-arrays into one sorted array.

    It is quite straightforward with a small improvement that allows to avoid unnecessary allocations and data copying.

    class MergeSortHelper<T>
    {
        private readonly IComparer<T> _comparer;
    
        public MergeSortHelper()
            : this(Comparer<T>.Default)
        {
        }
    
        public MergeSortHelper(IComparer<T> comparer)
        {
            _comparer = comparer;
        }
    
        public void MergeSort(T[] array, int low, int high, bool parallel)
        {
            // Create a copy of the original array. Switching between
            // original array and its copy will allow to avoid 
            // additional array allocations and data copying.
            var copy = (T[]) array.Clone();
            if (parallel)
                ParallelMergeSort(array, copy, low, high, GetMaxDepth());
            else
                SequentialMergeSort(array, copy, low, high);
        }
    
        private void SequentialMergeSort(T[] to, T[] temp, int low, int high)
        {
            if (low >= high)
                return;
            var mid = (low + high) / 2;
            // On the way down the recursion tree both arrays have 
            // the same data so we can switch them. Sort two 
            // sub-arrays first so that they are placed into the temp 
            // array.
            SequentialMergeSort(temp, to, low, mid);
            SequentialMergeSort(temp, to, mid + 1, high);
            // Once temp array contains two sorted sub-arrays
            // they are merged into target array.
            SequentialMerge(to, temp, low, mid, mid + 1, high, low);
            // On the way up either we are done as the target array
            // is the original array and now contains required 
            // sub-array sorted or it is the temp array from previous 
            // step and contains smaller sub-array that will be 
            // merged into the target array from previous step 
            // (which is the temp array of this step and so we 
            // can destroy its contents).
        }
    
        // Although sub-arrays being merged in sequential version 
        // are adjacent that is not the case for parallel version 
        // and thus sub-arrays boundaries must be specified 
        // explicitly.
        private void SequentialMerge(T[] to, T[] temp, int lowX, int highX, int lowY, int highY, int lowTo)
        {
            var highTo = lowTo + highX - lowX + highY - lowY + 1;
            for (; lowTo <= highTo; lowTo++)
            {
                if (lowX > highX)
                    to[lowTo] = temp[lowY++];
                else if (lowY > highY)
                    to[lowTo] = temp[lowX++];
                else
                    to[lowTo] = Less(temp[lowX], temp[lowY])
                                    ? temp[lowX++]
                                    : temp[lowY++];
            }
        }
    
        private bool Less(T x, T y)
        {
            return _comparer.Compare(x, y) < 0;
        }
        
        ...

    Now we need to parallelize it. Let’s get obvious out of the way. Sorting two sub-arrays can be done in parallel just like parallel quicksort. We can proceed to merge step only once both sub-arrays are sorted.

    Parallelizing merge is a more interesting task. Sequential version looks like a single indivisible task. We need to find a way to separate merging into independent tasks that can be run in parallel. The idea behind algorithm is the following:

    1. Let’s assume we want to merge sorted arrays X and Y. Select X[m] median element in X. Elements in X[ .. m-1] are less than or equal to X[m]. Using binary search find index k of the first element in Y greater than X[m]. Thus Y[ .. k-1] are less than or equal to X[m] as well. Elements in X[m+1 .. ] are greater than or equal to X[m] and Y[k .. ] are greater. So merge(X, Y) can be defined as concat(merge(X[ .. m–1], Y[ .. k–1]), X[m], merge(X[m+1 .. ], Y[k .. ]))
    2. now we can recursively in parallel do merge(X[ .. m-1], Y[ .. k–1]) and merge(X[m+1 .. ], Y[k .. ])
    3. and then concat results.

    Although algorithm description pretty abstract there will be more detailed comments in code below.

        ...
    
        private const int SEQUENTIAL_THRESHOLD = 2048;
    
        // Recursion depth is utilized to limit number of spawned 
        // tasks. 
        private void ParallelMergeSort(T[] to, T[] temp, int low, int high, int depth)
        {
            if (high - low + 1 <= SEQUENTIAL_THRESHOLD || depth <= 0)
            {
                // Resort to sequential algorithm if either 
                // recursion depth limit is reached or sub-problem 
                // size is not big enough to solve it in parallel.
                SequentialMergeSort(to, temp, low, high);
                return;
            }
    
            var mid = (low + high) / 2;
            // The same target/temp arrays switching technique 
            // as in sequential version applies in parallel 
            // version. sub-arrays are independent and thus can 
            // be sorted in parallel.
            depth--;
            Parallel.Invoke(
                () => ParallelMergeSort(temp, to, low, mid, depth),
                () => ParallelMergeSort(temp, to, mid + 1, high, depth)
                );
            // Once both taks ran to completion merge sorted 
            // sub-arrays in parallel.
            ParallelMerge(to, temp, low, mid, mid + 1, high, low, depth);
        }
    
        // As parallel merge is itself recursive the same mechanism
        // for tasks number limititation is used (recursion depth).
        private void ParallelMerge(T[] to, T[] temp, int lowX, int highX, int lowY, int highY, int lowTo, int depth)
        {
            var lengthX = highX - lowX + 1;
            var lengthY = highY - lowY + 1;
    
            if (lengthX + lengthY <= SEQUENTIAL_THRESHOLD || depth <= 0)
            {
                // Resort to sequential algorithm in case of small 
                // sub-problem or deep recursion.
                SequentialMerge(to, temp, lowX, highX, lowY, highY, lowTo);
                return;
            }
    
            if (lengthX < lengthY)
            {
                // Make sure that X range no less than Y range and 
                // if needed swap them.
                ParallelMerge(to, temp, lowY, highY, lowX, highX, lowTo, depth);
                return;
            }
    
            // Get median of the X sub-array. As X sub-array is 
            // sorted it means that X[lowX .. midX - 1] are less 
            // than or equal to median and X[midx + 1 .. highX] 
            // are greater or equal to median.
            var midX = (lowX + highX) / 2;
            // Find element in the Y sub-array that is strictly 
            // greater than X[midX]. Again as Y sub-array is 
            // sorted Y[lowY .. midY - 1] are less than or equal 
            // to X[midX] and Y[midY .. highY] are greater than 
            // X[midX].
            var midY = BinarySearch(temp, lowY, highY, temp[midX]);
            // Now we can compute final position in the target 
            // array of median of the X sub-array.
            var midTo = lowTo + midX - lowX + midY - lowY;
            to[midTo] = temp[midX];
            // The rest is to merge X[lowX .. midX - 1] with 
            // Y[lowY .. midY - 1] and X[midx + 1 .. highX] 
            // with Y[midY .. highY] preceeding and following 
            // median respectively in the target array. As 
            // pairs are idependent from their final position 
            // perspective they can be merged in parallel.
            depth--;
            Parallel.Invoke(
                () => ParallelMerge(to, temp, lowX, midX - 1, lowY, midY - 1, lowTo, depth),
                () => ParallelMerge(to, temp, midX + 1, highX, midY, highY, midTo + 1, depth)
                );
        }
    
        // Searches for index the first element in low to high 
        // range that is strictly greater than provided value 
        // and all elements within specified range are smaller 
        // or equal than index of the element next to range is 
        // returned.
        private int BinarySearch(T[] from, int low, int high, T lessThanOrEqualTo)
        {
            high = Math.Max(low, high + 1);
            while (low < high)
            {
                var mid = (low + high) / 2;
                if (Less(from[mid], lessThanOrEqualTo))
                    low = mid + 1;
                else
                    high = mid;
            }
            return low;
        }
    
        private static int GetMaxDepth()
        {
            // Although at each step we split unsorted array
            // into two equal size sub-arrays sorting them 
            // not be perfectly balanced because parallel merge 
            // may not be balanced. So we add some extra space for 
            // task creation and so will keep CPUs busy.
            return (int) Math.Log(Environment.ProcessorCount, 2) + 4;
        }
    }

    Now that the thing is ready let’s do some comparison. We’ll use small benchmark helper.

    public class Benchmark
    {
        public static IEnumerable<long> Run<T>(Func<int, T[]> generator, Action<T[]> action, int times)
        {
            var samples = new long[times];
            var sw = new Stopwatch();
            for (var i = 0; i < times; i++)
            {
                var input = generator(i);
                sw.Restart();
                action(input);
                sw.Stop();
                samples[i] = sw.ElapsedMilliseconds;
            }
            return samples;
        }
    }

    We'll calculate and compare average running time for a number of samples for sequential quicksort, parallel quicksort and parallel merge sort.

    const int count = 10 * 1000 * 1000;
    const int iterations = 10;
    var rnd = new Random();
    
    var input = new int[count];
    Func<int, int[]> gen = i =>
                    {
                        // Avoid allocating large array for every
                        // run and just fill existing one 
                        for (var j = 0; j < input.Length; j++)
                            input[j] = rnd.Next();
    
                        return input;
                    };
    
    Action<int[]> seqArraySort, parallelQuickSort, parallelMergeSort;
    // If Array.Sort<T>(T[] a, IComparer<T> c) sees 
    // Comparer<T>.Default it resorts to unmanaged CLR provided 
    // sorting implementation, so we use here dummy comparer to 
    // force it to use managed quicksort.
    seqArraySort = a => Array.Sort(a, new IntComparer());
    // Parallel quicksort implementation from TPL extras.
    // We use dummy comparer as well because at some point
    // parallel quicksort resorts back to Array.Sort
    parallelQuickSort = a => ParallelAlgorithms.Sort(a, new IntComparer());
    // Our parallel merge sort implementation
    parallelMergeSort = a => new MergeSortHelper<int>().MergeSort(a, 0, a.Length - 1, true);
    
    var sq = Benchmark.Run(gen, seqArraySort, iterations).Average();
    var pq = Benchmark.Run(gen, parallelQuickSort, iterations).Average();
    var pm = Benchmark.Run(gen, parallelMergeSort, iterations).Average();

    On a two cores machine I got that parallel merge sort is more than 2x faster than sequential quicksort and up to 25% faster than parallel quicksort but at the cost of additional O(n) space. Still it is a good example of how to use dynamic task parallelism. Enjoy!

  • Chasing state of the art

    Traverse binary tree in level order by spiral

    Another puzzle is at stake, folks. This time it is binary tree related (not necessarily binary search tree as we are not interested in data relations but rather in binary tree structure). We need to traverse binary tree level by level (level is defined as set of all nodes at the same distance from root) in such a way that traversal direction within level changes from level to level thus forming a spiral. For example, consider binary tree below (it is rotated 90 degrees counter clockwise). Asterisk means NIL node.

                    *
                20
                    *
             19
                       *
                   18
                       *
                17
                       *
                   13
                       *
          12
              *
       11
           *
    10
                 *
              7
                 *
           6
                 *
              5
                 *
        4
              *
           2
              *

    Desired traversal for this tree will be: 10, 4, 11, 12, 6, 2, 5, 7, 19, 20, 17, 13, 18. Dissected into levels it looks like:

    1. 10
    2. 4, 11
    3. 12, 6, 2
    4. 5, 7, 19
    5. 20, 17
    6. 13, 18

    It may not be obvious how to approach the problem. We’ll start with a well know problem of traversing binary tree level by level (also known as breadth first traversal). It can be done using queue.

    static IEnumerable<T> LevelOrderLeftToRight<T>(TreeNode<T> root)
    {
        if (root == null)
           yield break;
    
        var next = new Queue<TreeNode<T>>();
        next.Enqueue(root);
    
        while(next.Count > 0)
        {
           var node = next.Dequeue();
           yield return node.Data;
    
           EnqueueIfNotNull(next, node.Left);
           EnqueueIfNotNull(next, node.Right);
        }
    }
    
    static void EnqueueIfNotNull<T>(Queue<T> queue, T value)
       where T:class
    {
        if (value != null)
           queue.Enqueue(value);
    }

    The queue contains yet to be examined nodes in level by level left to right (as we first enqueue left child and then right) order. At each step node at the front is dequeued, examined and its children are enqueued for further examination thus preserving level by level left to right order. It yields the following traversal: 10, 4, 11, 2, 6, 12, 5, 7, 19, 17, 20, 13, 18. Let’s split node sequence into levels:

    1. 10
    2. 4, 11
    3. 2, 6, 12
    4. 5, 7, 19
    5. 17, 20
    6. 13, 18

    That looks close to sought-for order except every odd numbered level has must be reversed. With all nodes in the same container (queue) it is hard to do this. Let’s change code a little bit to make every two adjacent layers are separated into different containers.

    static IEnumerable<T> LevelOrderLeftToRight<T>(TreeNode<T> root)
    {
        if (root == null)
           yield break;
    
        var curr = new Queue<TreeNode<T>>();
        var next = new Queue<TreeNode<T>>();
        next.Enqueue(root);
    
       do
       {
          // Swap level containers
          Swap(ref curr, ref next);
          // Examine all nodes at current level
          while (curr.Count > 0)
          {
             var node = curr.Dequeue();
             yield return node.Data;
             // Fill next level preserving order
             EnqueueIfNotNull(next, node.Left);
             EnqueueIfNotNull(next, node.Right);
          }
          // Continue until next level has nodes
       } while (next.Count > 0);
    }
    
    static void Swap<T>(ref T a, ref T b)
    {
       var tmp = a;
       a = b;
       b = tmp;
    }

    With adjacent levels separated we can change order within levels. Next level is a set of child nodes from current level. FIFO container (queue) makes sure that child nodes of earlier examined node will also be examined earlier. But this is opposite to what we are looking for. Change it to LIFO container (stack)! And that’s it. Child nodes of earlier examined node will be examined later.

    static IEnumerable<T> LevelOrderBySpiral<T>(TreeNode<T> root)
    {
       if (root == null)
          yield break;
    
       var curr = new Stack<TreeNode<T>>();
       var next = new Stack<TreeNode<T>>();
       // Specifies direction for the next level
       var leftToRight = true;
       next.Push(root);
    
       do
       {
          Swap(ref curr, ref next);
          while (curr.Count > 0)
          {
             var node = curr.Pop();
             yield return node.Data;
             // If next level must be traversed from left to right
             // we must first push right child node and then left
             // and in opposite order if next level will be 
             // traversed from right to left
             PushIfNotNull(next, leftToRight ? node.Right : node.Left);
             PushIfNotNull(next, leftToRight ? node.Left : node.Right);
          }
          // Change direction within level
          leftToRight = !leftToRight;
       } while (next.Count > 0);
    }
    
    static void PushIfNotNull<T>(Stack<T> stack, T value)
       where T : class
    {
       if (value != null)
          stack.Push(value);
    }

    The code yields sought-for order.

  • Chasing state of the art

    External merge sort

    If data to be sorted doesn’t fit into main memory external sorting is applicable. External merge sort can be separated into two phases:

    1. Create initial runs (run is a sequence of records that are in correct relative order or in other words sorted chunk of original file).
    2. Merge created runs into single sorted file.

    To implement this algorithm I will use solutions from my previous posts so it may be helpful for you to look at them:

    Let’s assume that M records at the same time are allowed to be loaded into main memory. One of the ways to create initial runs is to successively read M records from original file, sort them in memory and write back to disk. However we will use approach that allows us to create longer runs. It is called replacement selection.

    The core structure behind this algorithm is priority queue. Taking one by one current minimum element out of the queue forms ordered sequence. And this is exactly what run stands for. The algorithm can be described as follows:

    1. Create two priority queues (that will contain items for current and next runs respectively) with capacity of M records.
    2. Prefill current run priority queue from unsorted file with M records.
    3. Create current run if there are elements in the current run priority queue:
      1. Take minimum element out of current run priority queue and append it to current run (basically write it to disk).
      2. Take next element from unsorted file (this is the replacement part of the algorithm) and compare it with just appended element.
      3. If it is less then it cannot be part of the current run (or otherwise order will be destroyed) and thus it is queued to the next  run priority queue.
      4. Otherwise it is part of the current run and it is queued to the current run priority queue.
      5. Continue steps 1 through 4 until current run priority queue is empty.
    4. Switch current and next runs priority queues and repeat step 3.

    At any given moment at most M records are loaded into main memory as single written element into current run is replaced with single element from unsorted file if any (depending on comparison it either goes into current or next run).

    Next step is to merge created initial runs. For the merge step we will use simplified algorithm (more advanced algorithms work with multiple physical devices to distribute runs, take into account data locality, etc.) based on k-way merge:

    1. Append created runs into a queue.
    2. Until there are more than one run in the queue:
      1. Dequeue and merge K runs into a single run and put it into the queue.
    3. Remaining run represents sorted original file.

    Yeap, it is that simple. And let’s code it.

    The implementation abstracts file structure and reading/writing details making algorithm more concise and easier to understand.

    abstract class ExternalSorter<T>
    {
    	private readonly IComparer<T> m_comparer;
    	private readonly int m_capacity;
    	private readonly int m_mergeCount;
    
    	protected ExternalSorter(IComparer<T> comparer, int capacity, int mergeCount)
    	{
    		m_comparer = comparer;
    		m_capacity = capacity;
    		m_mergeCount = mergeCount;
    	}
    
    	// Sorts unsorted file and returns sorted file name
    	public string Sort(string unsorted)
    	{
    		var runs = Distribute(unsorted);
    		return Merge(runs);
    	}
    
    	// Write run to disk and return created file name
    	protected abstract string Write(IEnumerable<T> run);
    	// Read run from file with given name
    	protected abstract IEnumerable<T> Read(string name);
    
    	// Merge step in this implementation is simpler than 
    	// the one used in polyphase merge sort - it doesn't
    	// take into account distribution over devices
    	private string Merge(IEnumerable<string> runs)
    	{
    		var queue = new Queue<string>(runs);
    		var runsToMerge = new List<string>(m_mergeCount);
    		// Until single run is left do merge
    		while (queue.Count > 1)
    		{
    			// Priority queue must not contain records more than 
    			// required
    			var count = m_mergeCount;
    			while (queue.Count > 0 && count-- > 0)
    				runsToMerge.Add(queue.Dequeue());
    			// Perform n-way merge on selected runs where n is 
    			// equal to number of physical devices with 
    			// distributed runs but in our case we do not take 
    			// into account them and thus n is equal to capacity
    			var merged = runsToMerge.Select(Read).OrderedMerge(m_comparer);
    			queue.Enqueue(Write(merged));
    
    			runsToMerge.Clear();
    		}
    		// Last run represents source file sorted
    		return queue.Dequeue();
    	}
    
    	// Distributes unsorted file into several sorted chunks
    	// called runs (run is a sequence of records that are 
    	// in correct relative order)
    	private IEnumerable<string> Distribute(string unsorted)
    	{
    		var source = Read(unsorted);
    		using (var enumerator = source.GetEnumerator())
    		{
    			var curr = new PriorityQueue<T>(m_comparer);
    			var next = new PriorityQueue<T>(m_comparer);
    			// Prefill priority queue to capacity which is used 
    			// to create runs
    			while (curr.Count < m_capacity && enumerator.MoveNext())
    				curr.Enqueue(enumerator.Current);
    			// Until unsorted source and priority queues are 
    			// exhausted
    			while (curr.Count > 0)
    			{
    				// Create next run and write it to disk
    				var sorted = CreateRun(enumerator, curr, next);
    				var run = Write(sorted);
    
    				yield return run;
    
    				Swap(ref curr, ref next);
    			}
    		}
    	}
    
    	private IEnumerable<T> CreateRun(IEnumerator<T> enumerator, PriorityQueue<T> curr, PriorityQueue<T> next)
    	{
    		while (curr.Count > 0)
    		{
    			var min = curr.Dequeue();
    			yield return min;
    			// Trying to move run to an end enumerator will 
    			// result in returning false and thus current 
    			// queue will simply be emptied step by step
    			if (!enumerator.MoveNext())
    				continue;
    
    			// Check if current run can be extended with 
    			// next element from unsorted source
    			if (m_comparer.Compare(enumerator.Current, min) < 0)
    			{
    				// As current element is less than min in 
    				// current run it may as well be less than 
    				// elements that are already in the current 
    				// run and thus from this element goes into 
    				// next run
    				next.Enqueue(enumerator.Current);
    			}
    			else
    			{
    				// Extend current run
    				curr.Enqueue(enumerator.Current);
    			}
    		}
    	}
    
    	private static void Swap<U>(ref U a, ref U b)
    	{
    		var tmp = a;
    		a = b;
    		b = tmp;
    	}
    }

    In the example below I created type that sorts text files containing single number per line.

    class TextFileOfNumbersExternalSorter : ExternalSorter<int>
    {
    	public TextFileOfNumbersExternalSorter(int capacity, int mergeCount)
    		: base(Comparer<int>.Default, capacity, mergeCount)
    	{
    	}
    
    	protected override string Write(IEnumerable<int> run)
    	{
    		var file = Path.GetTempFileName();
    		using (var writer = new StreamWriter(file))
    		{
    			run.Run(writer.WriteLine);
    		}
    		return file;
    	}
    
    	protected override IEnumerable<int> Read(string name)
    	{
    		using (var reader = new StreamReader(name))
    		{
    			while (!reader.EndOfStream)
    				yield return Int32.Parse(reader.ReadLine());
    		}
    		File.Delete(name);
    	}
    }

    That is used like this:

    // capacity, mergeCount and unsortedFileName are initialized elsewhere
    var sorter = new TextFileOfNumbersExternalSorter(capacity, mergeCount);
    var sortedFileName = sorter.Sort(unsortedFileName);

    That’s it folks!

  • Chasing state of the art

    Minimum window that contains all characters

    Like most of life's problems, this one can be solved with bending

    - Bender B.Rodrigues

    Let’s bend another problem. Given set of characters P and string T find minimum window in T that contains all characters in P. Applicable solution is restricted to O(length(T)) time complexity. For example, given a string T “of characters and as” and set of characters T in a form of a string “aa s” the minimum window will be “and as”.

    The problem can be broken into two parts:

    • How to select window?
    • How to check that selected window contains all characters from P?

    Selecting every possible window (all unique pairs (i, j) where 0 <= i <= j < length(T)) will lead to solution worse than O(length(T)^2) because you still need to check if all characters from P are within selected window. Instead we will check every possible window ending position. Thus there are at least length(T) windows to consider.

    Any feasible window has length equal to or greater than length(P). Performing recheck for any considered window will result in a solution no better than O(length(T)*length(P)). Instead we need to use check results from previous iteration.

    Now we need to make sure that checking if a particular character is in P is done in an optimal way. Taking into account that a particular character may appear more than once and window thus must contain appropriate number of characters. We will use hash table to map unique characters from P to their count for fast lookup.

    And now let’s tie all things together.

    • Until reached the end of T move by one current window ending position.
    • Append next character to the end of previous window which to this moment doesn’t contain all necessary characters. Char to count map is used to track the number of characters left to find. Basically if character is in P count is decremented. The number may become negative meaning that there are more than required characters.
    • If unmatched character count goes to zero the window contains all required characters. However there may be redundant characters. Thus we try to compact current window. It is ok to do this as we are looking for minimum window and any window that is extended from this one won’t be better.
    • Once window is compacted compare it with the minimum one and updated it if needed.
    • If current window contains all the characters remove from it the first one simply by moving by one starting position to make sure that at each iteration previous window doesn’t contain all the characters (there is no point in appending new characters to a window that already contains all of the required ones).

    Code the thing! =)

    static string FindMinWindow(string t, string p)
    {
    	// Create char to count mapping for fast lookup
    	// as some characters may appear more than once
    	var charToCount = new Dictionary<char, int>();
    	foreach (var c in p)
    	{
    		if (!charToCount.ContainsKey(c))
    			charToCount.Add(c, 0);
    		charToCount[c]++;
    	}
    
    	var unmatchesCount = p.Length;
    	int minWindowLength = t.Length + 1, minWindowStart = -1;
    	int currWindowStart = 0, currWindowEnd = 0;
    	for (; currWindowEnd < t.Length; currWindowEnd++)
    	{
    		var c = t[currWindowEnd];
    		// Skip chars that are not in P
    		if (!charToCount.ContainsKey(c))
    			continue;
    		// Reduce unmatched characters count
    		charToCount[c]--;
    		if (charToCount[c] >= 0)
    			// But do this only while count is positive
    			// as count may go negative which means 
    			// that there are more than required characters
    			unmatchesCount--;
    
    		// No complete match, so continue searching
    		if (unmatchesCount > 0)
    			continue;
    
    		// Decrease window as much as possible by removing 
    		// either chars that are not in T or those that 
    		// are in T but there are too many of them
    		c = t[currWindowStart];
    		var contains = charToCount.ContainsKey(c);
    		while (!contains || charToCount[c] < 0)
    		{
    			if (contains)
    				// Return character to P
    				charToCount[c]++;
    
    			c = t[++currWindowStart];
    			contains = charToCount.ContainsKey(c);
    		}
    
    		if (minWindowLength > currWindowEnd - currWindowStart + 1)
    		{
    			minWindowLength = currWindowEnd - currWindowStart + 1;
    			minWindowStart = currWindowStart;
    		}
    
    		// Remove last char from window - it is definitely in a 
    		// window because we stopped at it during decrease phase
    		charToCount[c]++;
    		unmatchesCount++;
    		currWindowStart++;
    	}
    
    	return minWindowStart > -1 ?
    	       t.Substring(minWindowStart, minWindowLength) :
    	       String.Empty;
    }

    Every character is examined at most twice (during appending to the end and during compaction) so the whole solution has O(length(T)) time complexity assuming hash table lookup is O(1) operation.

  • Chasing state of the art

    Print numbers by spiral

    Recently I came across simple yet interesting coding problem. So here is the deal. You are given positive integer N. Print first N ^ 2 positive integers in matrix form in a such a way that within matrix numbers form spiral starting from its center and goring clockwise. For example, for N = 5 matrix to be printed is:

    21 22 23 24 25
    20 7 8 9 10
    19 6 1 2 11
    18 5 4 3 12
    17 16 15 14 13

    Optimize it for speed and space.

    One way you can approach it is to create N x N matrix and fill it with numbers that form spiral and then print whole matrix row by row. But this solution will be of N ^ 2 space complexity. Let’s try to reach O(1) space complexity.

    The key observation here is how matrix changes when N changes by 1.

    N = 1.

    1

    N = 2.

    1 2
    4 3

    N = 3.

    7 8 9
    6 1 2
    5 4 3

    N = 4.

    7 8 9 10
    6 1 2 11
    5 4 3 12
    16 15 14 13

    Can you see the pattern here? At every step we extend previous matrix (P) with additional column and row (C). If N is even we extend previous matrix of size N – 1 with right column and bottom row

    P C
    C C

    and with left column and top row if it is odd

    C C
    C P

    This leads us to naturally recursive algorithm. We have three cases:

    1. Print whole row of the current matrix (top when N is odd or bottom when N is even).
    2. Print row from previous matrix of size N - 1 first and then print value that belongs to current matrix (when N is even).
    3. Print value that belongs to current matrix and then print row from previous matrix of size N - 1 (when N is odd).
    4. Print matrix line by line.

    So basically to print a row we need to know matrix size N and row index. Here goes the solution.

    static void Print(int n)
    {
    	for(int i = 0; i < n; i++)
    	{
    		PrintLine(n, i);
    		Console.WriteLine();
    	}
    }
    
    static void PrintLine(int n, int i)
    {
    	// Number of integers in current matrix
    	var n2 = n*n;
    	// Number of itegers in previous matrix of size n - 1
    	var m2 = (n - 1)*(n - 1);
    
    	if (n % 2 == 0)
    	{
    		if (i == n - 1)
    		{
    			// n is even and we are at the last row so just 
    			// print it
    			for(int k = n2; k > n2 - n; k--)
    			{
    				PrintNum(k);
    			}
    		}
    		else
    		{
    			// Print row from previous matrix of size n - 1 
    			// first and then print value that belongs to current 
    			// matrix. Previous matrix is at the top left corner 
    			// so no need to adjust row index
    			PrintLine(n - 1, i);
    			// Skip all integers from previous matrix and upper 
    			// ones in this columnas integers must form clockwise 
    			// spiral
    			PrintNum(m2 + 1 + i);
    		}
    	}
    	else
    	{
    		if (i == 0)
    		{
    			// n is odd and we are at the first row so just 
    			// print it
    			for(int k = m2 + n; k <= n2; k++)
    			{
    				PrintNum(k);
    			}
    		}
    		else
    		{
    			// Print value that belongs to current matrix and
    			// then print row from previous matrix of size n - 1
    			// Skip all integers from previous matric and bottom
    			// ones in this column as integers must form clockwise
    			// spiral
    			PrintNum(m2 + n - i);
    			// Previous matrix is at the bottom right corner so
    			// row index must be reduced by 1
    			PrintLine(n - 1, i - 1);
    		}
    	}
    }
    
    static void PrintNum(int n)
    {
    	Console.Write("{0, -4}  ", n);
    }
    

    If stack is not considered then this solution has O(1) space complexity otherwise O(N).

  • Chasing state of the art

    Suffix array

    Find all occurrences of a pattern (of length m) in a text (of length n) is quite commonly encountered string matching problem. For example, you hit Ctrl-F in your browser and type string you want to find while browser highlights every occurrence of a typed string on a page.

    The naive solution is to at each iteration “shift” pattern along the text by 1 position and check if all characters of a pattern match to corresponding characters in text. This solution has O((n – m + 1)*m) complexity.

    If either pattern or text is fixed it can be preprocessed to speed up the search. For example, if pattern is fixed we can use Knuth-Morris-Pratt algorithm to preprocess it in O(m) time and make search of its occurrences complexity O(n).

    Fixed text that is queried many times can also be preprocessed to support fast patterns search. One way to do this is to build suffix array. The idea behind it pretty simple. It is basically a list of sorted in lexicographical order suffixes (which starts at some position inside the string and runs till the end of the string) of the subject text. For example, for the “mississippi” string we have the following:

    i
    ippi
    issippi
    ississippi
    mississippi
    pi
    ppi
    sippi
    sissippi
    ssippi
    ssissippi

    However due to strings immutability in .NET it is not practical to represent each suffix as separate string as it requires O(n^2) space. So instead starting positions of suffixes will be sorted. But why suffixes are selected in the first place? Because searching for every occurrence of a pattern is basically searching for every suffix that starts with the pattern.

    Once they are sorted we can use binary search to find lower and upper bounds that enclose all suffixes that start with the pattern. Comparison of a suffix with a pattern during binary search should take into account only m (length of the pattern) characters as we are looking for suffixes that start with the pattern.

    // Suffix array represents simple text indexing mechanism.
    public class SuffixArray : IEnumerable<int>
    {
      private const int c_lower = 0;
      private const int c_upper = -1;
    
      private readonly string m_text;
      private readonly int[] m_pos;
      private readonly int m_lower;
      private readonly int m_upper;
    
      SuffixArray(string text, int[] pos, int lower, int upper)
      {
        m_text = text;
        m_pos = pos;
        // Inclusive lower and upper boundaries define search range.
        m_lower = lower;
        m_upper = upper;
      }
    
      public static SuffixArray Build(string text)
      {
        Contract.Requires<ArgumentException>(!String.IsNullOrEmpty(text));
    
        var length = text.Length;
        // Sort starting positions of suffixes in lexicographical 
        // order.
        var pos = Enumerable.Range(0, length).ToArray();
        Array.Sort(pos, (x, y) => String.Compare(text, x, text, y, length));
        // By default all suffixes are in search range.
        return new SuffixArray(text, pos, 0, text.Length - 1);
      }
    
      public SuffixArray Search(string str)
      {
        Contract.Requires<ArgumentException>(!String.IsNullOrEmpty(str));
    
        // Search range is empty so nothing to narrow.
        if (m_lower > m_upper)
          return this;
        // Otherwise search for boundaries that enclose all 
        // suffixes that start with supplied string.
        var lower = Search(str, c_lower);
        var upper = Search(str, c_upper);
        // Once precomputed sorted suffixes positions don't change
        // but the boundaries do so that next refinement 
        // can be done within smaller range and thus faster.
        // For example, you may narrow search range to suffixes 
        // that start with "ab" and then search within this smaller
        // search range suffixes that start with "abc".
        return new SuffixArray(m_text, m_pos, lower + 1, upper);
      }
    
      public IEnumerator<int> GetEnumerator()
      {
        // Enumerates starting positions of suffixes that fall 
        // into search range.
        for (var i = m_lower; i <= m_upper; i++)
          yield return m_pos[i];
      }
    
      IEnumerator IEnumerable.GetEnumerator()
      {
        return GetEnumerator();
      }
    
      private int Compare(string w, int i)
      {
        // Comparison takes into account maximum length(w) 
        // characters. For example, strings "ab" and "abc" 
        // are thus considered equal.
        return String.Compare(w, 0, m_text, m_pos[i], w.Length);
      }
    
      private int Search(string w, int bound)
      {
        // Depending on bound value binary search results 
        // in either lower or upper boundary.
        int x = m_lower - 1, y = m_upper + 1;
        if (Compare(w, m_lower) < 0)
          return x;
        if (Compare(w, m_upper) > 0)
          return y;
        while (y - x > 1)
        {
          var m = (x + y)/2;
          // If bound equals to 0 left boundary andvances to median 
          // only // if subject is strictly greater than median and 
          // thus search results in lower bound (position that 
          // preceeds first suffix equal to or greater than 
          // subject w). Otherwise search results in upper bound 
          // (position that preceeds fisrt suffix that is greater 
          // than subject).
          if (Compare(w, m) > bound)
            x = m;
          else
            y = m;
        }
        return x;
      }
    }

    This implementation is simple (it has O(n^2 log n) complexity to sort and O(m log n) to search where n stands for text length and m for pattern length) and can be improved. It doesn’t take into account the fact that suffixes not arbitrary strings are sorted. On the other hand suffixes may share common prefixes and that may be used to speed up  binary search.

    Here an example of narrowing the search.

    var str = ...;
    var sa = SuffixArray.Build(str);
    string pat;
    while ((pat = Console.ReadLine()) != String.Empty)
    {
      sa = sa.Search(pat);
      foreach (var pos in sa)
      {
        Console.WriteLine(str.Substring(pos));
      }
    }

    Happy Easter, folks!

  • Chasing state of the art

    Selecting k smallest or largest elements

    There are cases when you need to select a number of best (according to some definition) elements out of finite sequence (list). For example, select 10 most popular baby names in a particular year or select 10 biggest files on your hard drive.  

    While selecting single minimum or maximum element can easily be done iteratively in O(n) selecting k smallest or largest elements (k smallest for short) is not that simple.

    It makes sense to take advantage of sequences APIs composability. We’ll design an extension method with the signature defined below:

    public static IEnumerable<TSource> TakeSmallest<TSource>(
     this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)

    The name originates from the fact that selecting k smallest elements can logically be expressed in terms of Enumerable.TakeWhile supplying predicate that returns true if an element is one of the k smallest. As the logical predicate is not changing only count do it is burned into method’s name (instead of “While” that represents changing predicate we have “Smallest”).

    Now let’s find the solution.

    If the whole list is sorted first k elements is what we are looking for.

    public static IEnumerable<TSource> TakeSmallest<TSource>(
     this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)
    {
     return source.OrderBy(x => x, comparer).Take(count);
    }

    It is O(n log n) solution where n is the number of elements in the source sequence. We can do better.

    Priority queue yields better performance characteristics if only subset of sorted sequence is required.

    public static IEnumerable<TSource> TakeSmallest<TSource>(
     this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)
    {
     var queue = new PriorityQueue<TSource>(source, comparer);
     while (count > 0 && queue.Count > 0)
     {
      yield return queue.Dequeue();
      count--;
     }
    }

    It requires O(n) to build priority queue based on binary min heap and O(k log n) to retrieve first k elements. Better but we’ll improve more.

    Quicksort algorithm picks pivot element, reorders elements such that the ones less than pivot go before it while greater elements go after it (equal can go either way). After that pivot is in its final position. Then both partitions are sorted recursively making whole sequence sorted. In order to prevent worst case scenario pivot selection can be randomized.

    Basically we are interested in the k smallest elements themselves and not the ordering relation between them. Assuming partitioning just completed let’s denote set of elements that are before pivot (including pivot itself) by L and set of elements that are after pivot by H. According to partition definition L contains |L| (where |X| denotes number of elements in a set X) smallest elements. If |L| is equal to k we are done. If it is less than k than look for k smallest elements in L. Otherwise as we already have |L| smallest elements look for k - |L| smallest elements in H.

    public static IEnumerable<TSource> TakeSmallest<TSource>(
      this IEnumerable<TSource> source, int count)
    {
      return TakeSmallest(source, count, Comparer<TSource>.Default);
    }
    
    public static IEnumerable<TSource> TakeSmallest<TSource>(
      this IEnumerable<TSource> source, int count, IComparer<TSource> comparer)
    {
      Contract.Requires<ArgumentNullException>(source != null);
      // Sieve handles situation when count >= source.Count()
      Contract.Requires<ArgumentOutOfRangeException>(count > 0);
      Contract.Requires<ArgumentNullException>(comparer != null);
    
      return new Sieve<TSource>(source, count, comparer);
    }
    
    class Sieve<T> : IEnumerable<T>
    {
      private readonly IEnumerable<T> m_source;
      private readonly IComparer<T> m_comparer;
      private readonly int m_count;
    
      private readonly Random m_random;
    
      public Sieve(IEnumerable<T> source, int count, IComparer<T> comparer)
      {
        m_source = source;
        m_count = count;
        m_comparer = comparer;
        m_random = new Random();
      }
    
      public IEnumerator<T> GetEnumerator()
      {
        var col = m_source as ICollection<T>;
        if (col != null && m_count >= col.Count)
        {
          // There is not point in copying data
          return m_source.GetEnumerator();
        }
        var buf = m_source.ToArray();
        if (m_count >= buf.Length)
        {
          // Buffer already contains exact amount elements
          return buf.AsEnumerable().GetEnumerator();
        }
        // Find the solution
        return GetEnumerator(buf);
      }
    
      IEnumerator IEnumerable.GetEnumerator()
      {
        return GetEnumerator();
      }
    
      private IEnumerator<T> GetEnumerator(T[] buf)
      {
        var n = buf.Length;
        var k = m_count;
        // After rearrange is completed fist k 
        // items are the smallest elements
        Rearrange(buf, 0, n - 1, k);
        for (int i = 0; i < k; i++)
        {
          yield return buf[i];
        }
      }
    
      private void Rearrange(T[] buf, int l, int u, int k)
      {
        if (l == u)
        {
          return;
        }
        // Partition elements around randomly selected pivot
        var q = RandomizedPartition(buf, l, u);
        // Compute size of low partition (includes pivot)
        var s = q - l + 1;
        // We are done as low partition is what we were looking for
        if (k == s)
        {
          return;
        }
    
        if (k < s)
        {
          // Smallest elements group is less than low partition
          // find it there
          Rearrange(buf, l, q - 1, k);
        }
        else
        {
          // Low partition is in smallest elements group, find the 
          // rest in high partition
          Rearrange(buf, q + 1, u, k - s);
        }
      }
    
      private int RandomizedPartition(T[] buf, int l, int u)
      {
        // Select pivot randomly and swap it with the last element
        // to prevent worst case scenario where pivot is the 
        // largest remaining element
        Swap(buf, m_random.Next(l, u + 1), u);
        // Divides elements into two partitions:
        // - Low partition where elements that are less than pivot 
        // and pivot itself
        // - High partition contains the rest 
        var k = l;
        for (var i = l; i < u; i++)
        {
          if (m_comparer.Compare(buf[i], buf[u]) < 0)
          {
            Swap(buf, k++, i);
          }
        }
        // Put pivot into its final location
        Swap(buf, k, u);
        return k;
      }
    
      private static void Swap(T[] a, int i, int j)
      {
        var tmp = a[i];
        a[i] = a[j];
        a[j] = tmp;
      }
    }

    The solution is expected O(n) which means quit good performance in practice. Let’s run the thing.

    const int count = 100;
    const int max = 100;
    var rnd = new Random();
    var seq = Enumerable.Range(0, count).Select(_ => rnd.Next(max)).ToArray();
    Func<int, int> i = x => x;
    
    for(var k = 1; k < count / 2; k++)
    {
      var a = seq.TakeSmallest(k).OrderBy(i);
      var b = seq.OrderBy(i).Take(k);
    
      Debug.Assert(a.SequenceEqual(b));
    }

    Enjoy!

  • Chasing state of the art

    K-way merge

    The classic merge (the one used in Merge Sort) takes as input some sorted lists and at each step outputs element with next smallest key thus producing sorted list that contains all the elements of the input lists.

    An instance of a list is a computer representation of the mathematical concept of a finite sequence, that is, a tuple.

    It is not always practical to have whole sequence in memory because of its considerable size nor to constraint sequence to be finite as only K first elements may be needed. Thus our algorithm must produce monotonically increasing (according to some comparison logic) potentially infinite sequence.

    Two-way merge is the simplest variation where two lists are merged (it is named OrderedMerge to avoid confusion with EnumerableEx.Merge).

    public static IEnumerable<T> OrderedMerge<T>(
      this IEnumerable<T> first,
      IEnumerable<T> second,
      IComparer<T> comparer)
    {
      using (var e1 = first.GetEnumerator())
      {
        using (var e2 = second.GetEnumerator())
        {
          var c1 = e1.MoveNext();
          var c2 = e2.MoveNext();
          while (c1 && c2)
          {
            if (comparer.Compare(e1.Current, e2.Current) < 0)
            {
              yield return e1.Current;
              c1 = e1.MoveNext();
            }
            else
            {
              yield return e2.Current;
              c2 = e2.MoveNext();
            }
          }
          if (c1 || c2)
          {
            var e = c1 ? e1 : e2;
            do
            {
              yield return e.Current;
            } while (e.MoveNext());
          }
        }
      }
    }

    This algorithm runs in O(n) where n is a number of merged elements (as we may not measure it by the number of elements in sequences because of their potential infinity).

    N-way merge is a more general algorithm that allows to merge N monotonically increasing sequences into one. It is used in external sorting. Here the most general overload signature (others are omitted as you can easily create them).

    public static IEnumerable<T> OrderedMerge<T>(
      this IEnumerable<IEnumerable<T>> sources,
      IComparer<T> comparer)

    Naive implementation will take advantage of existing two-way merge and composition.

    public static IEnumerable<T> NaiveOrderedMerge<T>(
      this IEnumerable<IEnumerable<T>> sources,
      IComparer<T> comparer)
    {
      return sources.Aggregate((seed, curr) => seed.OrderedMerge(curr, comparer));
    }

    Lets denote merging two sequences Si and Sj where i != j and both within [0, m) (m – is the number of sequences) with (Si, Sj). Then what the code above does is (((S0, S1), S2), … Sm). This implementation is naive because fetching next smallest element takes O(m) making total running time to O(nm). We can do better than that.

    Recall that in my previous post we implemented priority queue based on binary heap that allows to get next smallest element in O(log n) where n is the size of the queue. Here is a solution sketch:

    • The queue will hold non empty sequences.
    • The priority of a sequence is its next element.
    • At each step we dequeue out of the queue sequence that has smallest next element. This element is next in the merged sequence.
    • If dequeued sequence is not empty it must be enqueued back because it may contain next smallest element in the merged  sequence.

    Thus we will have queue of size that doesn’t exceed m (number of sequences) and thus making total running time O(n log m).

    Other interesting aspect resource management. Each sequence has associated resources that needs to be released once merged sequence is terminated normally or abnormally. Number of sequences is not known in advance. We will use solution that is described in my previous post Disposing sequence of resources.

    Now let’s do it.

    // Convenience overloads are not included only most general one
    public static IEnumerable<T> OrderedMerge<T>(
      this IEnumerable<IEnumerable<T>> sources,
      IComparer<T> comparer)
    {
      // Make sure sequence of ordered sequences is not null
      Contract.Requires<ArgumentNullException>(sources != null);
      // and it doesn't contain nulls
      Contract.Requires(Contract.ForAll(sources, s => s != null));
      Contract.Requires<ArgumentNullException>(comparer != null);
      // Precondition checking is done outside of iterator because
      // of its lazy nature
      return OrderedMergeHelper(sources, comparer);
    }
    
    private static IEnumerable<T> OrderedMergeHelper<T>(
      IEnumerable<IEnumerable<T>> sources,
      IComparer<T> elementComparer)
    {
      // Each sequence is expected to be ordered according to 
      // the same comparison logic as elementComparer provides
      var enumerators = sources.Select(e => e.GetEnumerator());
      // Disposing sequence of lazily acquired resources as 
      // a single resource
      using (var disposableEnumerators = enumerators.AsDisposable())
      {
        // The code below holds the following loop invariant:
        // - Priority queue contains enumerators that positioned at 
        // sequence element
        // - The queue at the top has enumerator that positioned at 
        // the smallest element of the remaining elements of all 
        // sequences
    
        // Ensures that only non empty sequences participate  in merge
        var nonEmpty = disposableEnumerators.Where(e => e.MoveNext());
        // Current value of enumerator is its priority 
        var comparer = new EnumeratorComparer<T>(elementComparer);
        // Use priority queue to get enumerator with smallest 
        // priority (current value)
        var queue = new PriorityQueue<IEnumerator<T>>(nonEmpty, comparer);
    
        // The queue is empty when all sequences are empty
        while (queue.Count > 0)
        {
          // Dequeue enumerator that positioned at element that 
          // is next in the merged sequence
          var min = queue.Dequeue();
          yield return min.Current;
          // Advance enumerator to next value
          if (min.MoveNext())
          {
            // If it has value that can be merged into resulting
            // sequence put it into the queue
            queue.Enqueue(min);
          }
        }
      }
    }
    
    // Provides comparison functionality for enumerators
    private class EnumeratorComparer<T> : Comparer<IEnumerator<T>>
    {
      private readonly IComparer<T> m_comparer;
    
      public EnumeratorComparer(IComparer<T> comparer)
      {
        m_comparer = comparer;
      }
    
      public override int Compare(
         IEnumerator<T> x, IEnumerator<T> y)
      {
        return m_comparer.Compare(x.Current, y.Current);
      }
    }

    It works well with infinite sequences and cases where we need only K first elements and it fetches only bare minimum out of source sequences.

    Run the thing.

    // Function that generates sequence of length k of random numbers
    Func<int, IEnumerable<int>> gen = k => Enumerable.Range(0, k)
      .Select(l => rnd.Next(max));
    // Generate sequence of random lengths and each length project
    // to a sequence of that length of random numbers
    var seqs = gen(count).Select(k => gen(k)
      .OrderBy(l => l).AsEnumerable());
    
    var p = -1;
    foreach (var c in seqs.OrderedMerge(Comparer<int>.Default))
    {
      Debug.Assert(p <= c);
      Console.WriteLine(c);
      p = c;
    }

    Enjoy!

  • Chasing state of the art

    Right-hand side Enumerable.Zip

    With Reactive Extensions for .NET (Rx) and .NET Framework 4 a new LINQ operator was introduced – Zip (Bart De Smet gives excellent explanation about the idea and implementation details behind Zip operator). In a nutshell it merges two sequences into one sequence by using the selector function.

    int[] numbers = { 1, 2, 3, 4 };
    string[] words = { "one", "two", "three" };
    
    numbers.Zip(words, (first, second) => first + " " + second)
     .Run(Console.WriteLine); 
    
    // 1 one
    // 2 two
    // 3 three

    While it seems pretty simple and clear there is a subtlety in its behavior. Zip is implemented something like this (omitting arguments checking):

    public static IEnumerable<TResult> Zip<TFirst, TSecond, TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
      {
       using (IEnumerator<TFirst> fe = first.GetEnumerator())
       {
        using (IEnumerator<TSecond> se = second.GetEnumerator())
        {
         // What if call to fe.MoveNext() throws an exception
         // returns false or never returns?
         while (fe.MoveNext() && se.MoveNext())
         {
          yield return resultSelector(fe.Current, se.Current);
         }
        }
       }
      }

    A call to MoveNext on first sequence’s enumerator may prevent from calling MoveNext on second sequence’s enumerator because of exception thrown, terminated or stuck sequence (it is explained in great details in “Zip’em together” section of More LINQ with System.Interactive – More combinators for your Swiss Army Knife and this is where this post topic came from).

    As an exercise we will implement Zip operator that could fetch results from both sources simultaneously, combining their results or exceptions into produced results.

    Here is a solution sketch:

    • As either enumerator can get stuck two worker threads are used to fetch results out of sequences and one coordination thread (actually caller’s thread is used) that combines results, terminations or exceptions.
    • Coordinator waits for notification that
      • either both workers successfully fetched next value out of sequences
      • or either of them normally or abnormally (exception is thrown) terminated.
    • Coordinator allows workers to proceed only if both workers successfully fetched next value. Before signaling workers to proceed coordinator resets primitive to wait for notifications from workers on to support next iteration.
    • Workers notify coordinator on fetched values and termination.
    • Once notified coordinator workers wait until
      • either all workers produced results and coordinator allows them to fetch next results from sequences (in order to prevent fetching unnecessary results and potentially producing unnecessary side effects)
      • or coordinator notified them on outer sequence termination due to either of sequences termination (normal or abnormal).
    • When all workers proceed primitive must be reset to support next iteration.
    • Once outer sequence is terminated the last alive worker must dispose shared resources.

    As we expect that zipping may be done on infinite sequences or fetching next item from a sequence may take infinitely long time or even consumer may wait indefinitely long before fetching next zipped item explicitly spawned threads are used to avoid seizing thread pool threads for indefinitely long time.

    For now notification problems do not take into account abnormal notifications (we’ll take a look at it below).

    Task Parallel Library provides for both waiting problems very convenient primitives:

    • CountdownEvent - represents a synchronization primitive that is signaled when its count reaches zero.
    • Barrier - enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases.

    Coordinator needs to track remaining work to be completed and be awaken when it is done. CountdownEvent starts with the number of workers (in this case 2) and reset back to this value just before allowing workers to proceed to next iteration.

    Things a little bit more trickier with workers wait problem. Barrier allows to setup number of participants. If we will setup number of participants equal to number of worker then once they all arrive at the barrier they will proceed to next iteration. But this not what we want as workers must not proceed without coordinator’s permission. In that case we’ll make him a participant as well. Coordinator will arrive at the barrier once next zipped value (next iteration) is requested. After every one pass the barrier it automatically goes to its initial state so no explicit reset is required.

    But what to do in case of normal or abnormal termination of either sequences. Zip sequence must be terminated as well and we cannot wait for the other sequence to fetch next value or terminate because it may take indefinitely long time. Fortunately both primitives (CountdownEvent and Barrier) support new .NET 4 Cancellation Framework and in particular:

    Coordinator waits on the CountdownEvent instance with a CancellationToken instance that workers can use to propagate cancellation thus notifying about termination.

    Workers wait on the Barrier instance with CancellationToken instance that coordinator can use to notify about outer sequence (zip sequence) termination.

    The last thing is how to deal uniformly with values, exceptions and normal sequence termination. Hopefully Reactive Extensions for .NET (Rx) got the right tool for that:

    Basically value, exception and termination are represented by a type derived from Notification<T> where T is a sequence element type.

    Let’s now put everything together.

    public static class ZipEx
    {
     public static IEnumerable<TResult> RightHandZip<TFirst, TSecond, TResult>(this IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
     {
      Contract.Requires(first != null);
      Contract.Requires(second != null);
      Contract.Requires(resultSelector != null);
    
      return new Zipper<TFirst, TSecond>().Zip(first, second, resultSelector);
     }
    
     class Zipper<TFirst, TSecond>
     {
      private const int c_sourceCount = 2;
      private int m_workersCount = c_sourceCount;
    
      private CountdownEvent m_zipEvent;
      private CancellationTokenSource m_zipCancellation;
    
      private Barrier m_srcBarrier;
      private CancellationTokenSource m_srcCancellation;
    
      private volatile Notification<TFirst> m_first;
      private volatile Notification<TSecond> m_second;
    
      public IEnumerable<TResult> Zip<TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
      {
       // Coordinator tracks remaining work through
          // count down event
       m_zipEvent = new CountdownEvent(c_sourceCount);
          // Workers may use this cancellation token source
          // to awake coordinator in case of ternimation of 
          // either sequence
       m_zipCancellation = new CancellationTokenSource();
          // Here we basically state that coordinator also
          // barrier participant
       m_srcBarrier = new Barrier(c_sourceCount + 1);
          // Coordinator may use this cancellation token source
          // to awake workers in case of outer sequence (zip)
          // termination
       m_srcCancellation = new CancellationTokenSource();
    
       // Spawn workers that will fetch results from both 
       // sequences simultaneously
       RunWorker(first, n => { m_first = n; });
       RunWorker(second, n => { m_second = n; });
    
       var token = m_zipCancellation.Token;
       while (true)
       {
        try
        {
         // Wait until either all workers fetched next 
         // values or any worker notified on completition 
         // (either due to exception or sequence completition)
         m_zipEvent.Wait(token);
        }
        catch (OperationCanceledException)
        {
         // Notify workers that zip sequence is terminated
         m_srcCancellation.Cancel();
         // If zip sequence is terminated due to exception(s) 
         // throw either AggregateException if both sequences 
         // resulted in exception or the exception that 
         // terminated either of them
         ThrowIfError();
         // Otherwise sequence is terminated due to one of 
         // the sequences completion
         yield break;
        }
        // Reset count down event for the next round
        m_zipEvent.Reset(c_sourceCount);
        // Yield next zipped value
        yield return resultSelector(m_first.Current, m_second.Current);
        // Only once consumer asks for the next value we allow 
        // workers to attempt to fetch next value
        // Zipper is a barrier participant that arrives at the  
         // barrier once next zipped is requested
        m_srcBarrier.SignalAndWait();
       }
      }
    
      private void RunWorker<T>(IEnumerable<T> enumerable, Action<Notification<T>> update)
      {
       // In order to fetch results from sequences we will use 
       // manually spawned threads as we do not want to seize 
       // ThreadPool threads for indefinite time. 
       new Thread(() =>
          {
           var token = m_srcCancellation.Token;
           foreach (var notification in enumerable.Materialize())
           {
            update(notification);
    
            if (notification.Kind == NotificationKind.OnNext)
            {
             // Notify on sucessfully fetched value out 
                // of sequence
             m_zipEvent.Signal();
            }
            else
            {
          // Either sequence completition or error 
                // notifications terminate zipped sequence
             m_zipCancellation.Cancel();
            }
    
            try
            {
          // Wait until next zipped value is requested or 
          // zip sequence is terminated
             m_srcBarrier.SignalAndWait(token);
            }
            catch (OperationCanceledException)
            {
          // Last alive worker is responsible for resources 
          // disposal
             DisposeIfLast();
             return;
            }
           }
          }) {IsBackground = true}.Start();
      }
    
      private void DisposeIfLast()
      {
       if (Interlocked.Decrement(ref m_workersCount) == 0)
       {
        DisposeAndNullify(ref m_zipEvent);
        DisposeAndNullify(ref m_srcBarrier);
        DisposeAndNullify(ref m_zipCancellation);
        DisposeAndNullify(ref m_srcCancellation);
       }
      }
    
      private void ThrowIfError()
      {
       var ex1 = ExtractErrorOrNothing(m_first);
       var ex2 = ExtractErrorOrNothing(m_second);
    
       if (ex1 != null && ex2 != null)
       {
        throw new AggregateException(ex1, ex2);
       }
       ThrowIfNotNull(ex1);
       ThrowIfNotNull(ex2);
      }
    
      private static Exception ExtractErrorOrNothing<T>(Notification<T> n)
      {
       if (n != null && n.Kind == NotificationKind.OnError)
       {
        return ((Notification<T>.OnError) n).Exception;
       }
       return null;
      }
    
      private static void ThrowIfNotNull(Exception ex)
      {
       if (ex != null)
       {
        throw ex;
       }
      }
    
      private static void DisposeAndNullify<T>(ref T res)
       where T : class, IDisposable
      {
       if (res != null)
       {
        res.Dispose();
        res = null;
       }
      }
     }
    }

    Let’s setup examples infrastructure

    static IEnumerable<int> Infinite(string id)
    {
      Console.Write("{0} -> X ", id);
      while(true) {}
      yield break;
    }
    
    static IEnumerable<int> Seq(string id, int start, int count)
    {
      return Enumerable.Range(0, count).Select(x => x * 2 + start)
        .Do(x => Console.Write("{0} -> {1} ", id, x));
    }
    
    static IEnumerable<int> Abnormal(string id)
    {
      return EnumerableEx.Throw<int>(new Exception())
        .Finally(() => Console.Write("{0} -> E ", id));
    }
    
    static void RunTest(IEnumerable<int> first, IEnumerable<int> second)
    {
      Func<int, int, string> format = (f, s) => String.Format("[{0}, {1}] ", f, s);
      try
      {
        first.RightHandZip(second, format).Run(Console.Write);
      }
      catch (Exception ex)
      {
        Console.Write(ex.GetType());
      }
      Console.WriteLine();
    }

    and run some examples

    const int count = 3;
    // Both sequences terminate
    RunTest(Seq("F", 0, count), Seq("S", 1, count));
    // Terminates normally
    // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5]
    
    // First normally terminates and second continues
    RunTest(Seq("F", 0, count), Seq("S", 1, count + 1));
    // Terminates normally
    // F -> 0 S -> 1 [0, 1] S -> 3 F -> 2 [2, 3] F -> 4 S -> 5 [4, 5] S -> 7
    
    // First abnormally terminates and second continues
    RunTest(Seq("F", 0, count).Concat(Abnormal("F")), Seq("S", 1, count + 1));
    // Terminates abnormally
    // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] S -> 5 F -> 4 [4, 5] S -> 7 F -> E System.Exception
    
    // Both terminate abnormally
    RunTest(Seq("F", 0, count).Concat(Abnormal("F")), Seq("S", 1, count).Concat(Abnormal("S")));
    // Terminates abnormally
    // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] S -> 5 F -> 4 [4, 5] F -> E S -> E System.AggregateException
    
    // First stucks and second terminates abnormally
    RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count).Concat(Abnormal("S")));
    // Terminates abnormally
    // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] S -> E F -> X System.Exception
    
    // First stucks and second terminates normally
    RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count));
    // Terminates normally
    // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] F -> X
    
    // First stucks and second continues
    RunTest(Seq("F", 0, count).Concat(Infinite("F")), Seq("S", 1, count + 1));
    // Stucks
    // F -> 0 S -> 1 [0, 1] F -> 2 S -> 3 [2, 3] F -> 4 S -> 5 [4, 5] F -> X S -> 7

    That’s it.

  • Chasing state of the art

    Binary heap based priority queue

    Design of container that supports items ordering raises lots of interesting design questions to consider. To be more concrete we will design simple priority queue based on binary min heap that supports the following operations:

    • Enqueue – add an item to the queue with an associated priority.
    • Dequeue - remove the element from the queue that has the highest priority and return it.
    • Peek – look at the highest priority element without removing it.

    Good design that solves wrong problems isn’t better than the bad one. So the first step is to identify right problems to solve. Priority queue maintains set of items with associated key (priority). Items get off the queue based on employed ordering mechanism for keys (priorities). Basically the two problems we need to solve (from API design perspective) are the ways to represent:

    • association of a key (priority) and corresponding item
    • ordering mechanism for keys (priorities)

    Association can be either explicit (PriorityQueue<TItem, TKey>, where key type is explicitly stated) or implicit (PriorityQueue<TItem>, where key type is of no interest). Each type parameter must have concrete consumers. Priority queue itself doesn’t care (although priority queues with updateable priority do) about keys but rather about comparing keys. Client code cannot benefit from explicit keys as well because it can easily access associated key as the client code defines what key actually means. So there is no point in cluttering API with irrelevant details (of what keys really are). Thus we will use PriorityQueue<T> (as now we have the only type parameter we will use short name for it) and let consumers provide comparison logic of two items based on whatever consumer defines as keys.

    There are several options to represent comparison mechanism.

    Item type may be constrained to support comparison through generic type parameter constraint:

    class PriorityQueue<T> 
      where T : IComparable<T>
    {
    }

    Though this approach benefits from clearly stated comparison mechanism it implies significant limitations:

    • It doesn’t support naturally comparison of items of the same type using different aspects (for example, in one case objects of Customer type are compared using number of orders and in the other – using date of last order). Of course consumer can create lightweight wrapper that aggregates object to compare and does actual comparison but it is not feasible from additional memory consumption and additional usage complexity perspectives.
    • It doesn’t support naturally changing order direction (ascending <-> descending) and thus it may require adding support into the data structure itself.

    With those limitations in mind we can use comparers – something that knows how to compare two objects:

    • A type that implements IComparer<T> which benefits from .NET Framework support (it provides great documentation support and default implementation).
    • or a delegate Func<T, T, int> that accepts two parameters of type T and returns integer value indicating whether one is less than, equal to, or greater than the other. It benefits from anonymous functions conveniences.

    Comparers are designed for particular usage scenarios and single instance corresponds to items container. Thus limitation mentioned above are not applied to comparers.

    Taking into account value of .NET Framework support for IComparer<T> and that it is easy to create wrapper that derives from Comparer<T> and delegates comparison to aggregated function we will use IComparer<T> approach (although it seems costless to add also support for Func<T, T, int> mechanism and create wrapper ourselves in most cases it is best to avoid providing means to do the same thing in multiple ways or otherwise potential confusion may outweigh benefits).

    Now putting everything together.

    // Unbounded priority queue based on binary min heap
    public class PriorityQueue<T>
    {
      private const int c_initialCapacity = 4;
      private readonly IComparer<T> m_comparer;
      private T[] m_items;
      private int m_count;
    
      public PriorityQueue()
        : this(Comparer<T>.Default)
      {
      }
    
      public PriorityQueue(IComparer<T> comparer)
        : this(comparer, c_initialCapacity)
      {
      }
    
      public PriorityQueue(IComparer<T> comparer, int capacity)
      {
        Contract.Requires<ArgumentOutOfRangeException>(capacity >= 0);
        Contract.Requires<ArgumentNullException>(comparer != null);
    
        m_comparer = comparer;
        m_items = new T[capacity];
      }
    
      public PriorityQueue(IEnumerable<T> source)
        : this(source, Comparer<T>.Default)
      {
      }
    
      public PriorityQueue(IEnumerable<T> source, IComparer<T> comparer)
      {
        Contract.Requires<ArgumentNullException>(source != null);
        Contract.Requires<ArgumentNullException>(comparer != null);
    
        m_comparer = comparer;
        // In most cases queue that is created out of sequence 
        // of items will be emptied step by step rather than 
        // new items added and thus initially the queue is 
        // not expanded but rather left full
        m_items = source.ToArray();
        m_count = m_items.Length;
        // Restore heap order
        FixWhole();
      }
    
      public int Capacity
      {
        get { return m_items.Length; }
      }
    
      public int Count
      {
        get { return m_count; }
      }
    
      public void Enqueue(T e)
      {
        m_items[m_count++] = e;
        // Restore heap if it was broken
        FixUp(m_count - 1);
        // Once items count reaches half of the queue capacity 
        // it is doubled 
        if (m_count >= m_items.Length/2)
        {
          Expand(m_items.Length*2);
        }
      }
    
      public T Dequeue()
      {
        Contract.Requires<InvalidOperationException>(m_count > 0);
    
        var e = m_items[0];
        m_items[0] = m_items[--m_count];
        // Restore heap if it was broken
        FixDown(0);
        // Once items count reaches one eighth  of the queue 
        // capacity it is reduced to half so that items
        // still occupy one fourth (if it is reduced when 
        // count reaches one fourth after reduce items will
        // occupy half of queue capacity and next enqueued item
        // will require queue expand)
        if (m_count <= m_items.Length/8)
        {
          Expand(m_items.Length/2);
        }
    
        return e;
      }
    
      public T Peek()
      {
        Contract.Requires<InvalidOperationException>(m_count > 0);
    
        return m_items[0];
      }
    
      private void FixWhole()
      {
        // Using bottom-up heap construction method enforce
        // heap property
        for (int k = m_items.Length/2 - 1; k >= 0; k--)
        {
          FixDown(k);
        }
      }
    
      private void FixUp(int i)
      {
        // Make sure that starting with i-th node up to the root
        // the tree satisfies the heap property: if B is a child 
        // node of A, then key(A) ≤ key(B)
        for (int c = i, p = Parent(c); c > 0; c = p, p = Parent(p))
        {
          if (Compare(m_items[p], m_items[c]) < 0)
          {
            break;
          }
          Swap(m_items, c, p);
        }
      }
    
      private void FixDown(int i)
      {
        // Make sure that starting with i-th node down to the leaf 
        // the tree satisfies the heap property: if B is a child 
        // node of A, then key(A) ≤ key(B)
        for (int p = i, c = FirstChild(p); c < m_count; p = c, c = FirstChild(c))
        {
          if (c + 1 < m_count && Compare(m_items[c + 1], m_items[c]) < 0)
          {
            c++;
          }
          if (Compare(m_items[p], m_items[c]) < 0)
          {
            break;
          }
          Swap(m_items, p, c);
        }
      }
    
      private static int Parent(int i)
      {
        return (i - 1)/2;
      }
    
      private static int FirstChild(int i)
      {
        return i*2 + 1;
      }
    
      private int Compare(T a, T b)
      {
        return m_comparer.Compare(a, b);
      }
    
      private void Expand(int capacity)
      {
        Array.Resize(ref m_items, capacity);
      }
    
      private static void Swap(T[] arr, int i, int j)
      {
        var t = arr[i];
        arr[i] = arr[j];
        arr[j] = t;
      }
    }

    Example below prints top 200 elements from sequence of mscorlib types ordered by full name (sorting it first and than taking first 200 elements is less efficient).

    class TypeNameComparer : Comparer<Type>
    {
      public override int Compare(Type x, Type y)
      {
        Contract.Requires(x != null);
        Contract.Requires(y != null);
    
        return x.FullName.CompareTo(y.FullName);
      }
    }
    
    ...
    
    const int count = 200;
    var types = typeof (object).Assembly.GetTypes();
    var typesQueue = new PriorityQueue<Type>(types, new TypeNameComparer());
    
    for (int i = 0; i < count && typesQueue.Count > 0; i++)
    {
      Console.WriteLine(typesQueue.Dequeue());
    }

    That’s it.

  • Chasing state of the art

    Queue based on a single stack

    Looking at things from different perspectives allows to understand them better. On the other hand mind bending practice improves your ability to find solutions.

    Previously we were Disposing sequence of resources with Reactive Extensions. This time we will build FIFO (first in, first out) collection based on single LIFO (last in, first out) collection with no additional explicit storage.

    It is not that insane as it looks. Assume that items come out of stack in the order they must appear in the queue (FIFO). Choosing the opposite order is also possible however is not practical (see below). To make it happen we simply need to make sure that items in the stack (LIFO) are placed in the opposite order. Items queued first must appear at the top of the stack. This basically means that in order to queue item all items must be popped, the item  pushed and then existent items pushed inversely to pop order. But we have no additional explicit storage requirement. Then store items implicitly through recursion.

    public class StackBasedQueue<T> : IEnumerable<T>
    {
      private readonly Stack<T> m_items;
    
      public StackBasedQueue()
        : this(Enumerable.Empty<T>())
      {
      }
    
      public StackBasedQueue(IEnumerable<T> items)
      {
        // Items must be reversed as we want first 
        // item to appear on top of stack
        m_items = new Stack<T>(items.Reverse());
      }
    
      public int Count
      {
        get { return m_items.Count; }
      }
    
      public void Enqueue(T item)
      {
        // If stack is empty then simply push item
        // as it will be the first and the last item 
        // in the queue
        if (m_items.Count == 0)
        {
          m_items.Push(item);
          return;
        }
    
        // The item must be placed at the bottom of the stack
        // To do this existent items must be popped, the item  
        // pushed and then existent items pushed inversely to 
        // pop order
        var tmp = m_items.Pop();
        Enqueue(item);
        m_items.Push(tmp);
      }
    
      public T Dequeue()
      {
        ThrowIfEmpy();
        // If stack is not empty item on top of it 
        // is next to be dequeued or peeked
        return m_items.Pop();
      }
    
      public T Peek()
      {
        ThrowIfEmpy();
        return m_items.Peek();
      }
    
      public IEnumerator<T> GetEnumerator()
      {
        // As items queued first must appear at the top of the
        // stack we can enumerate items directly
        return m_items.GetEnumerator();
      }
    
      IEnumerator IEnumerable.GetEnumerator()
      {
        return GetEnumerator();
      }
    
      private void ThrowIfEmpy()
      {
        if (Count == 0)
        {
          throw new InvalidOperationException("The queue is empty.");
        }
      }
    }

    Enqueue is a O(n) operation (where n is the number items in the stack). Dequeue and Peek is a O(1) operation. Enumerating through all items is a O(n) operation. Choosing the opposite order will make enumerating through all items O(n^2) operation which is not practical.

    It is just an exercise so it must not be used in real world scenarios (otherwise at some point queue size may become big enough so that next attempt to enqueue an item will result in StackOverflowException) but standard Queue<T> instead.

  • Chasing state of the art

    Disposing sequence of resources with Reactive Extensions

    Recall my previous post on Disposing sequence of resources where we were solving imperatively the following problems:

    • Create single disposable representation for a sequence of disposable resources
    • Defer resources allocation to avoid exception propagation until cleanup can be guaranteed and avoid unnecessary (allocated resources that aren’t used) allocations
    • Dispose only consumed resources and preserve nested try{}finally{} blocks semantics (resources that are allocated first from sequence order perspective disposed last; any exception thrown from a finally block hides exception being propagated)

    Now with Reactive Extensions for .NET (Rx) is out we will do it in more LINQ-ish manner with the help of interactive features of Reactive Extensions:

    • EnumerableEx.Publish – publishes the values of source to each use of the bound parameter.
    • EnumerableEx.Share - shares cursor of all enumerators to the sequence.
    • EnumerableEx.Finally – invokes finallyAction after source sequence terminates normally or by an exception.

    A great explanation of how EnumerableEx.Publish works (disregard naming difference) is given in Taming Your Sequence’s Side-Effects Through IEnumerable.Let. The following example illustrates the point.

    static Random m_seeder = new Random();
    
    static IEnumerable<int> GenerateRandomSequence(int count)
    {
      var rnd = new Random(m_seeder.Next());
      for(int i = 0; i < count; i++)
      {
        yield return rnd.Next(count);
      }
    }
    
    ...
    
    const int count = 5;
    var xs = GenerateRandomSequence(count);
    // Each we iterate xs we may get a different sequence 
    var equals = xs.SequenceEqual(xs);
    // However it can be solved through memoization
    xs.Publish(seq =>
                 {
                   // Every time we iterate seq we get the same 
                   // sequence
                   equals = seq.SequenceEqual(seq);
                   return seq;
                 });

    EnumerableEx.Share makes sure that any iteration is made with respect to the same cursor.

    var xs = Enumerable.Range(0, count);
    // Prints every sequence element to console
    // Without sharing for each of count iterations it will print 
    // first element of a potentially different sequence (recall 
    // random sequence example) 
    var shared = xs.Share(); 
    for(int i = 0; i < count; i++)
    {
      shared.Take(1).Run(Console.WriteLine);
    }

    EnumerableEx.Finally does exactly what its description says (see more details here).

    static IEnumerable<int> GenerateThrowingSequence(int count)
    {
      for(int i = 0; i < count; i++)
      {
        if (i > 0 && i % 3 == 0)
        {
          throw new Exception();
        }
        yield return i;
      }
    }
    
    ...
    
    // Prints 0, 1, 2, Finally, Caught
    try
    {
      GenerateThrowingSequence(count).Finally(() => Console.WriteLine("Finally"))
        .Run(Console.WriteLine);
    }
    catch (Exception)
    {
      Console.WriteLine("Caught");
    }
    
    // Prints 0, 1, Finally
    GenerateThrowingSequence(count).Finally(() => Console.WriteLine("Finally"))
      .Take(2).Run(Console.WriteLine);

    Now putting everything together. Publish will help us to defer resources allocation and avoid unnecessary allocations. Share and Finally will take care of disposal.

    static class Disposables
    {
      // Disposes projected resources once they are no longer needed
      public static void Using<TSource, TResource>(
        // Source sequence projected to disposable resources
        this IEnumerable<TSource> source,
        // Resource projection
        Func<TSource, TResource> resourceSelector,
        // Resources usage action
        Action<IEnumerable<TResource>> resourcesUsage)
          where TResource : IDisposable
      {
        var rcount = 0;
        source
          // At this point resources are not created but 
          // only projection is set
          .Select(
          s =>
            {
              // As we do not want to unnecessarily create 
              // and then immediately dispose potentially expensive
              // resources we will count created resources
              // and later dispose only used ones
              rcount++;
              return resourceSelector(s);
            })
          .Publish(
          rs =>
            {
              // During sequence iteration resources will be created
              // However not all resources may be iterated through or 
              // an exception may be thrown in the middle and thus 
              // not all resources may be created (therefore not 
              // disposed)
              try
              {
                // Supplied resources sequence can be iterated 
                // multiple times with each of side effects occurs 
                // only once and sequence elements memoized and 
                // reused during each iteration
                resourcesUsage(rs);
                return Enumerable.Empty<TResource>();
              }
              finally
              {
                // We must dispose only those resources we used
                // (counted and memoized above during first 
                // iteration)
                rs = rs.Take(rcount)
                  // Disposing resources must be done in the opposite 
                  // order to preserve nested try{}finally{} blocks 
                  // semantics
                  .Reverse().Do(r =>
                                  {
                                    rcount--;
                                    r.Dispose();
                                  })
                  // Once resource is disposed it must not be 
                  // iterated again and this what Share takes 
                  // care of
                  .Share();
    
                Action final = null;
                final = () =>
                          {
                            // Stop once every resource was given 
                            // a chance to dispose as Finally is 
                            // called even on empty sequences and 
                            // otherwise it leads to stack overflow
                            if (rcount > 0)
                            {
                              // Dispose only used resources and 
                              // leave untouched the rest
                              rs.Finally(final).Run();
                            }
                          };
                final();
              }
            })
          // Evaluate the sequence (triggers resources usage)
          .Run();
      }
    }

    Usage example below illustrates situation where during resource disposal an exception is thrown. In this case we must give chance to preceding (from resource sequence order perspective) resource to be disposed. However if an exception is thrown while disposing preceding resources that exception will hide previous one.

    // Fake full of side effects resource =)
    class Resource : IDisposable
    {
      private readonly int m_i;
    
      public Resource(int i)
      {
        m_i = i;
        Console.WriteLine("Created {0}", m_i);
      }
    
      public void Use()
      {
        Console.WriteLine("Using {0}", m_i);
      }
    
      public void Dispose()
      {
        Console.WriteLine("Disposed {0}", m_i);
        // Simulate resource disposal that results in exception
        if (m_i % 2 == 1)
        {
          throw new Exception(m_i.ToString());
        }
      }
    }
    
    ...
    
    try
    {
      Enumerable.Range(0, 5)
        .Using(i => new Resource(i),
               rs =>
                 {
                   // First resources 0, 1 and 2 are created 
                   // and used
                   rs.Take(3).Run(r => r.Use());
                   // then already created resource 2 is used 
                   // and resource 3 is created and used
                   rs.Skip(1).Take(3).Run(r => r.Use());
                 });
    }
    catch (Exception ex)
    {
      // As resources are disposed in the opposite order
      // the latest exception is propagated
      Console.WriteLine("Exception {0}", ex.Message);
    }

    This produces the following output:

    Created 0 // iterating, if not iterated previously resource is created
    Using 0
    Created 1
    Using 1
    Created 2
    Using 2
    Using 1   // otherwise reused
    Using 2   // reused again
    Created 3 // wasn’t iterated previously, created
    Using 3
    Disposed 3 // disposing in the opposite order, throws exception
    Disposed 2 // still disposing continues
    Disposed 1 // throws exception that hides exception thrown earlier
    Disposed 0 // disposing continues
    Exception 1 // exception is caught

    That’s it! Hopefully you’ve enjoyed.

    I hope we’ll meet next year. Happy New Year!

  • Chasing state of the art

    Chaining responsibilities

    The idea behind Chain of Responsibility pattern is quite simple and powerful:

    Avoid coupling the sender of a request to its receiver by giving more than one object a chance to handle the request. Chain the receiving objects and pass the request along the chain until an object handles it.

    You can find lots of object oriented implementations out there so as an exercise we will rather try to do it in a more functional way. For simplicity Func<T, R> will be considered as handler contract. The basic idea looks like this:

    Func<T, R> h = t =>
       {
           // Decide whether you can handle request
           bool canHandle = ...;
           // Get successor from somewhere
           Func<T, R> successor = ...;
           if (canHandle)
               // Handle request represented by t
           else
               // Delegate request to successor
               return successor(t);
       };

    The first thing to solve is how to get successor. As handler must support composition it cannot simply create closure over successor. On the other hand it can be represented as function that returns actual handler closed over its successor:

    Func<Func<T, R>, Func<T, R>> h = successor => t => 
       {
           bool canHandle = ...;
           if (canHandle)
               // Handle request represented by t
           else
               // Delegate request to closed over successor
               return successor(t);
       };

    Now we need to compose handlers into a chain.

    // Creates chain of responsibility out of handlers
    static Func<T, R> Chain<T, R>(IEnumerable<Func<Func<T, R>, Func<T, R>>> handlers)
    {
        // By default if none of handlers can handle incoming request an exception is thrown
        Func<T, R> notSupported = t => { throw new NotSupportedException(); };
        return Chain(handlers, notSupported);
    }
    
    // Creates chain of responsibility out of regular and default handlers
    static Func<T, R> Chain<T, R>(IEnumerable<Func<Func<T, R>, Func<T, R>>> handlers, Func<T, R> def)
    {
        // Assuming that order of handlers within the chains must be the same as in handlers sequence 
        return handlers
            // Handlers needs to be reversed first or otherwise they will appear in the opposite order 
            .Reverse()
            // Iteratively close each handler over its successor
            .Aggregate(def, (a, f) => f(a));
    }

    To make it more clear lets expand chaining of simple two handlers case:

    // default handler
    Func<int, int> d = x => x;
    // two handlers appear in sequence in order of declaration
    Func<Func<int, int>, Func<int, int>> h1 = s => t => t < 10 ? t*2 : s(t);
    Func<Func<int, int>, Func<int, int>> h2 = s => t => t < 5 ? t + 3 : s(t);
    
    // 1. Reverse handlers
    // h2
    // h1
    
    // 2. Close h2 over d
    // tmp1 = t => t < 10 ? t * 2 : d(t);
    Func<int, int> tmp1 = h2(d); 
    
    // 3. Close h1 over tmp1
    // tmp2 = t => t < 5 ? t + 3 : tmp1(t);
    Func<int, int> tmp2 = h1(tmp1); 
    
    // 4. tmp2 is the chain

    Now handlers are dynamically composed into chains to address particular scenario. 

    As a chaining exercise let’s create the following application (a team of developers tries to handle a project):

    • Project is divided into a number of task of complexity that doesn’t exceed particular threshold.
    • In order to handle the project development team is staffed. A developer with skill X can handle task of complexity C when C <= X otherwise he contributes to task completion making task’s complexity smaller by X and delegates the rest of the task to next developer. Completed task is linked to developer who completed it.
    • If the team cannot handle particular task they ask for help for an external expert.

    Prepare

    // Staff development team that will do the project
    static IEnumerable<Func<Func<int, int>, Func<int, int>>> Staff(int teamSize, int maxSkill)
    {
        var rnd = new Random();
        for (int i = 0; i < teamSize; i++)
        {
            int dev = i;
            // Developers may differ in their skills
            int skill = rnd.Next(maxSkill);
            // If developer can handle incoming task he reports by returning his id that he completed the task
            // If not (not enough skills) he contributes to task and delegates to next developer smaller task
            yield return c => t => t <= skill ? dev : c(t - skill);
        }
    }
    
    // Create work break down structure for the project
    static IEnumerable<int> Work(int projectSize, int maxTaskComplexity)
    {
        var rnd = new Random();
        for (int i = 0; i < projectSize; i++)
        {
            yield return rnd.Next(maxTaskComplexity) + 1;
        }
    }

    and march to the end.

    // Create tasks
    var work = Work(projectSize, maxTaskComplexity).ToArray();
    // If the team cannot handle particular task they ask for help unknown guru
    Func<int, int> askForHelp = t => -1;
    // Create chain out of developers to form a team with a backup
    var team = Chain(Staff(teamSize, maxTaskComplexity), askForHelp);
    // Hand out each task to the team
    var project = from w in work
                  select new {Task = w, CompletedBy = team(w)};
    
    foreach(var status in project)
    {
        Console.WriteLine("Task {0} completed by {1}", status.Task, status.CompletedBy);
    }

    Have chaining fun!

  • Chasing state of the art

    Making callbacks more explicit

    Recall my previous post Events and Callbacks vs. Explicit Calls that outlined pros and cons of both (events and callbacks on one side and explicit calls on the other side) approaches.

    Explicit calls imply that callee plays certain role from caller’s perspective thus making the relationship between the two explicit as well. Consider simple undo/redo example:

    interface ICommand
    {
        // Not every command can be undone. Returns true if it can be undone,
        // false - otherwise.
        bool CanUndo { get; }
        // Executes the command.
        void Do();
        // Reverts changes. NotSupportedException must be thrown in case 
        // command cannot be undone.
        void UnDo();
    }
    
    // Provides undo/redo mechanism.
    class History
    {
        // Makes supplied command part of the history.
        public void Remember(ICommand cmd)
        {
            // implementation is not relevant.
        }
    
        // other members elided.
    }

    Caller (History) clearly communicates its expectations by requiring callee (particular command) to conform with command’s contract (indivisible logical unit).

    While being explicit makes code more clear it has its price. It requires consumers to create new types that conform to contract. It is not a problem if the type created will be reused by other parts of the application or it has complex logic. But it is also common that it may have trivial logic that is used only in caller (undo/redo mechanism in this case) scenario. In this case creating a new type sounds like an overhead.

    I whish C# has capabilities (“Object Expressions”) similar to F# where I can implement interface “in place” like this:

    let cmd = 
        { 
            new ICommand with 
                member x.CanUndo 
                    with get() = false
                member x.Do() = Console.Write("Done") 
                member x.UnDo() = raise (new NotSupportedException())
        }

    Although we can provide default implementation that uses delegates.

    class ActionCommand : ICommand
    {
        private readonly Action m_do;
        private readonly Action m_undo;
    
        public ActionCommand(Action doCallback, Action undoCallback)
        {
            if (doCallback == null)
            {
                throw new ArgumentNullException("doCallback");
            }
            m_do = doCallback;
            m_undo = undoCallback;
        }
    
        public bool CanUndo
        {
            get { return m_undo != null; }
        }
    
        public void Do()
        {
            m_do();
        }
    
        public void UnDo()
        {
            if (!CanUndo)
            {
                throw new NotSupportedException();
            }
            m_undo();
        }
    }

    While conforming to contract ActionCommand eases creation of lightweight scenario dedicated implementations “in place” avoiding types proliferation. But still the type must be discovered first by developers. It is negligible effort but it is still nonzero. In order to level this effort let the original consuming code do the job.

    // Provides undo/redo mechanism
    class History
    {
        // Makes supplied command part of the history
        public void Remember(ICommand cmd)
        {
            // implementation is not relevant
        }
    
        // Makes command represented by pair of callbacks part of the history
        public void Remember(Action doCallback, Action undoCallback)
        {
            Remember(new ActionCommand(doCallback, undoCallback));
        }
    
        // other members elided
    }

    You should not put every possible “shortcut” into overloads but only the most commonly used one.

    What benefits does this approach has? It benefits from being close to explicitly defined role making it easier to understand and use callback based API that is useful in case of lightweight single scenario use implementations.

    Summary:

    • CONSIDER creating callback based overload next to overload that consumes corresponding abstraction
  • Chasing state of the art

    Disposing sequence of resources

    C# “using” statement has several advantages over its expanded equivalent:

    • Shortcut is more readable
    • If local variable form for resource acquisition is used it is read-only inside using statement and thus prevents you from spoiling resource disposal

    Whenever you need to obtain several resources (number is known at compile time), use and then dispose them “using” statement is usually the choice:

    using(var aStream = File.Open("a.txt", FileMode.Open))
    {
        using(var bStream = File.Open("b.txt", FileMode.Open))
        {
            // Use both streams
        }
    }

    However it is not always the case. There may be a case when number of resources to obtain is not known at compile time. For example, basic external merge sorting algorithm separates large file into chunks (total number depends on original file size and available memory) that can be sorted in memory and then written to disk. Sorted chunks iteratively merged until a single chunk is left (which is sorted original file). During merge iteration several files must be opened (number is not known in advance), processed and then disposed. As we cannot use “using” statement directly it might look like this:

    IEnumerable<string> files = ...; // Initialized elsewhere
    
    var streams = new List<Stream>();
    try
    {
        // As we may get half way through opening
        // files and got exception because file doesn't
        // exist opened streams must be remembered
        foreach (var file in files)
        {
            streams.Add(File.Open(file, FileMode.Open));
        }
    
        // Use streams 
    }
    finally
    {
        // Dispose opened streams
        foreach (var stream in streams)
        {
            stream.Dispose();
        }
    }

    Unfortunately we lost all advantages of “using” statement (looks messy and collection of opened streams or its contents can be modified before “finally” block). It would be nice to have something like this:

    using (var streams = ???)
    {
        // streams must be IEnumerable<Stream>
    }

    For reference types expansion of “using” statement looks like this (struct types differ in how resource is disposed):

    using (ResourceType resource = expression) statement 
    
    // is expanded to
    
    {
        ResourceType resource = expression;
        try
        {
            statement;
        }
        finally
        {
            if (resource != null) ((IDisposable)resource).Dispose();
        }
    }

    If an exception happens during expression evaluation resource won’t be disposed (as there is nothing to dispose). However any exceptions inside statement are ok. So we need to somehow define how file names are converted into streams but still avoid any exceptions. Lazy evaluation will be handy.

    // Projected sequence won’t get evaluated until it is enumerated
    // and thus file related exceptions (if any) are also postponed
    files.Select(file => File.Open(file, FileMode.Open))

    Still we cannot use it inside “using” statement as it is not IDisposable. So basically what we want is a disposable sequence that takes care of disposing its elements (required to be IDisposable).

    interface IDisposableSequence<T> : IEnumerable<T>, IDisposable
        where T:IDisposable
    { }

    Sequence of disposable elements can be wrapped through

    static class Disposable
    {
        // Defined as an extension method that augments minimal needed interface
        public static IDisposableSequence<T> AsDisposable<T>(this IEnumerable<T> seq)
            where T:IDisposable
        {
             return new DisposableSequence<T>(seq);
        }
    }
    
    class DisposableSequence<T> : IDisposableSequence<T>
        where T:IDisposable
    {
        public DisposableSequence(IEnumerable<T> sequence)
        {
           ... // an implementation goes here
        }
        
        ... // Other members elided for now
    }

    We are close. But there is subtle issue. Obtaining resource is a side effect. Enumerating multiple times through projected into resources sequence will result in unwanted side effects which of course must be avoided. In this particular case enumerating (and thus projecting it) through the same element (file name) more than once will attempt to open already opened file and result in exception as File.Open uses FileShare.None by default.

    So we need to avoid side effects by memorizing obtained resources.

    class DisposableSequence<T> : IDisposableSequence<T>
        where T : IDisposable
    {
        private IEnumerable<T> m_seq;
        private IEnumerator<T> m_enum;
        private Node<T> m_head;
        private bool m_disposed;
    
        public DisposableSequence(IEnumerable<T> sequence)
        {
            m_seq = sequence;
        }
    
        public IEnumerator<T> GetEnumerator()
        {
            ThrowIfDisposed();
    
            // Enumerator is built traversing lazy linked list 
            // and forcing it to expand if possible
            var n = EnsureHead();
            while (n != null)
            {
                yield return n.Value;
                n = n.GetNext(true);
            }
        }
    
        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }
    
        public void Dispose()
        {
            if (!m_disposed)
            {
                m_disposed = true;
    
                // As sequence creates enumerator it is responsible 
                // for its disposal
                if (m_enum != null)
                {
                    m_enum.Dispose();
                    m_enum = null;
                }
    
                // As it is possible that not all resources were 
                // obtained (for example, inside using statement 
                // only half of lazy evaluated sequence elements 
                // were enumerated and thus only half of resources 
                // obtained) we do not want to obtain them now
                // as they are going to be disposed immediately. 
                // Thus we traverse only through already created 
                // lazy linked list nodes and dispose obtained 
                // resources
                Dispose(m_head);
    
                m_seq = null;
            }
        }
    
        private Node<T> EnsureHead()
        {
            // Obtain enumerator once
            if (m_enum == null)
            {
                m_enum = m_seq.GetEnumerator();
                // Try to expand to first element
                if (m_enum.MoveNext())
                {
                    // Created node caches current element
                    m_head = new Node<T>(m_enum);
                }
            }
            return m_head;
        }
    
        private void ThrowIfDisposed()
        {
            if (m_disposed)
            {
                throw new ObjectDisposedException("DisposableSequence");
            }
        }
    
        private static void Dispose(Node<T> h)
        {
            if (h == null)
            {
                return;
            }
    
            try
            {
                // Disposing resources must be done in the opposite 
                // to usage order. With recursion it will have the 
                // same semantics as nested try{}finally{} blocks.
                Dispose(h.GetNext(false));
            }
            finally
            {
                h.Value.Dispose();
            }
        }
    
        class Node<V>
        {
            private readonly V m_value;
            private IEnumerator<V> m_enum;
            private Node<V> m_next;
    
            public Node(IEnumerator<V> enumerator)
            {
                m_value = enumerator.Current;
                m_enum = enumerator;
            }
    
            public V Value
            {
                get { return m_value; }
            }
    
            public Node<V> GetNext(bool force)
            {
                // Expand only if forced and not expanded before
                if (force && m_enum != null)
                {
                    if (m_enum.MoveNext())
                    {
                        m_next = new Node<V>(m_enum);
                    }
                    m_enum = null;
                }
                return m_next;
            }
        }
    }

    Once enumerated resources are memorized inside lazy linked list. It expands only more than already memorized resources are requested.

    After putting things together our desired “using” statement usage looks like this

    using (var streams = files.Select(file => File.Open(file, FileMode.Open)).AsDisposable())
    {
        // streams is IEnumerable<Stream> and IDisposable
    }

    Enjoy!

    Update.

    In general it is a good practice to acquire resource right before its usage and dispose it when it is no longer needed otherwise system may experience resources exhaustion.

    Described above approach can be used whenever resources should be acquired and disposed together (they all have the same actual usage time) and you do not know number of resources in advance. Otherwise you must use one or more "using" statements and dispose resources as they are no longer needed.

    You must carefully consider that even if grouped under a single "using" statement (using described approach) resources have different actual usage time they won't be disposed (unless done explicitly inside "using" statement assuming that multiple calls to Dispose method are allowed) until processing of all resources is completed (holding some of them unnecessarily).

Page 1 of 1 (32 items)