ParallelExtensionsExtras Tour - #4 - BlockingCollectionExtensions

ParallelExtensionsExtras Tour - #4 - BlockingCollectionExtensions

  • Comments 10

(The full set of ParallelExtensionsExtras Tour posts is available here.)  

BlockingCollection<T> encapsulates the core synchronization and coordination necessary to enable classic producer/consumer patterns.  ParallelExtensionsExtras provides the BlockingCollectionExtensions.cs file, which contains several extension methods for BlockingCollection<T> to make several common solutions easier.

GetConsumingPartitioner

It’s common to want to use BlockingCollection<T> in conjunction with either Parallel.ForEach or PLINQ in streaming scenarios: while a producer pumps events into the BlockingCollection<T>, ForEach or PLINQ are used to process those events as quickly as possible.  The connection between BlockingCollection<T> and the consuming parallelization construct can be easily achieved using BlockingCollection<T>’s GetConsumingEnumerable method, which returns an enumerable that continually takes from the collection; this enumerable can then be passed as input to ForEach or PLINQ, e.g.

Parallel.ForEach(_bc.GetConsumingEnumerable(), item =>

{

    … // process item here

});

 

However, there are some potentially important flaws with this simple approach.  First, BlockingCollection<T>’s GetConsumingEnumerable implementation is using BlockingCollection<T>’s internal synchronization which already supports multiple consumers concurrently, but ForEach doesn’t know that, and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.  As such, there’s more synchronization here than is actually necessary, resulting in a potentially non-negligable performance hit.  Second, the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.  While this design can help with overall throughput, for scenarios that are focused more on low latency, that chunking can be prohibitive.

We can solve both of these problems by writing a custom partitioner for BlockingCollection<T>.  Our goal for this partitioner is to address both of the drawbacks mentioned previously, and as it turns out doing so is easy.  A Partitioner<T> represents each partition with an IEnumerator<T>, and the object it uses to generate these partitions is an IEnumerable<T>.  We already have an appropriate generator of such an IEnumerable<T> in the form of GetConsumingEnumerable<T>, so to support a dynamic number of partitions in the partitioner, all we need to do is return the result of GetConsumingEnumerable<T> from the GetDynamicPartitions override.  Supporting a static number of partitions then just requires delegating to the dynamic support to retrieve a fixed number of positions.  Here is the implementation:

public static Partitioner<T> GetConsumingPartitioner<T>(

    this BlockingCollection<T> collection)

{

    return new BlockingCollectionPartitioner<T>(collection);

}

 

private class BlockingCollectionPartitioner<T> : Partitioner<T>

{

    private BlockingCollection<T> _collection;

 

    internal BlockingCollectionPartitioner(

        BlockingCollection<T> collection)

    {

        if (collection == null)

            throw new ArgumentNullException("collection");

        _collection = collection;

    }

 

    public override bool SupportsDynamicPartitions {

        get { return true; }

    }

 

    public override IList<IEnumerator<T>> GetPartitions(

        int partitionCount)

    {

        if (partitionCount < 1)

            throw new ArgumentOutOfRangeException("partitionCount");

        var dynamicPartitioner = GetDynamicPartitions();

        return Enumerable.Range(0, partitionCount).Select(_ =>

            dynamicPartitioner.GetEnumerator()).ToArray();

    }

 

    public override IEnumerable<T> GetDynamicPartitions()

    {

        return _collection.GetConsumingEnumerable();

    }

}

 

With this in place, we can implement our streaming with Parallel.ForEach just as we did previously, except by using GetConsumingPartitioner instead of GetConsumingEnumerable.  This relies on the fact that both Parallel.ForEach and PLINQ’s AsParallel have overloads that work with partitioners:

Parallel.ForEach(_bc.GetConsumingPartitioner(), item =>

{

    … // process item here

});

 

AddFromEnumerable and AddFromObservable

In producer/consumer scenarios, it’s sometimes the case that the producer is presenting the data in the form of an IEnumerable<T> already filled with the relevant data.  To get that data into a BlockingCollection<T>, we can utilize a simple utility method to transfer the contents into the BlockingCollection<T>:

public static void AddFromEnumerable<T>(

    this BlockingCollection<T> target, IEnumerable<T> source,

    bool completeAddingWhenDone)

{

    try { foreach (var item in source) target.Add(item); }

    finally { if (completeAddingWhenDone) target.CompleteAdding(); }

}

 

Similarly, the data may be arriving asynchronously in the form of an IObservable<T>.  We can use a simple DelegateBasedObserver class which just implements the IObserver<T> interface to marshal calls to OnNext, OnError, and OnCompleted to the respective delegates passed into the observer instance.  Then, when someone calls AddFromObservable, we subscribe a delegate that takes any data from the observable and adds it to the BlockingCollection<T>:

public static IDisposable AddFromObservable<T>(

    this BlockingCollection<T> target, IObservable<T> source,

    bool completeAddingWhenDone)

{

    return source.Subscribe(new DelegateBasedObserver<T>

    (

        onNext: item => target.Add(item),

        onError: error => {

            if (completeAddingWhenDone) target.CompleteAdding(); },

        onCompleted: () => {

            if (completeAddingWhenDone) target.CompleteAdding(); }

    ));

}

 

ToProducerConsumerCollection

The IProducerConsumerCollection<T> interface was introduced in the .NET Framework 4 to represent concurrent collections that support add/take operations and that are meant to be used in producer/consumer scenarios.  .NET 4 ships three such implementations in the form of ConcurrentQueue<T>, ConcurrentStack<T>, and ConcurrentBag<T>.  BlockingCollection<T> is also thread-safe and supports add/take operations, but it noticeably does not implement IProducerConsumerCollection.  This is a decision we struggled with, and in the end we decided not to implement the interface because there didn’t seem to be one right answer on how adds and takes should behave.  Should they be non-blocking or blocking? If blocking, should they incorporate timeouts?  Etc. Instead, we verified that it’s easy to create an IProducerConsumerCollection<T> wrapper for BlockingCollection<T> so that developers can specify whatever semantics they desire, and we added such an implementation into BlockingCollectionExtensions.cs in ParallelExtensionExtras.

The provided solution includes several overloads of a ToProducerConsumerCollection; here is the largest overload:

public static IProducerConsumerCollection<T> ToProducerConsumerCollection<T>(

    this BlockingCollection<T> collection, int millisecondsTimeout,

    CancellationToken cancellationToken)

{

    return new ProducerConsumerWrapper<T>(

        collection, millisecondsTimeout, cancellationToken);

}

 

In addition to the target BlockingCollection<T>, both a timeout and a CancellationToken are also provided: these serves to enable blocking operations with timeouts as well as with cancellation semantics, and these two values are used when calling BlockingCollection<T>’s TryAdd and TryTake methods.

The rest of the implementation is straightforward:

internal sealed class ProducerConsumerWrapper<T> :

    IProducerConsumerCollection<T>

{

    private readonly BlockingCollection<T> _collection;

    private readonly int _millisecondsTimeout;

    private readonly CancellationToken _cancellationToken;

 

    public ProducerConsumerWrapper(

        BlockingCollection<T> collection, int millisecondsTimeout,

        CancellationToken cancellationToken)

    {

        if (collection == null)

            throw new ArgumentNullException("bc");

        if (millisecondsTimeout < -1)

            throw new ArgumentOutOfRangeException(

                "millisecondsTimeout");

        _collection = collection;

        _millisecondsTimeout = millisecondsTimeout;

        _cancellationToken = cancellationToken;

    }

 

    public bool TryAdd(T item)

    {

        return _collection.TryAdd(

            item, _millisecondsTimeout,

            _cancellationToken);

    }

 

    public bool TryTake(out T item)

    {

        return _collection.TryTake(

            out item, _millisecondsTimeout,

            _cancellationToken);

    }

 

    public void CopyTo(T[] array, int index)

    {

        _collection.CopyTo(array, index);

    }

 

    public T[] ToArray()

    {

        return _collection.ToArray();

    }

 

    ... // the rest of ICollection’s implementation

}

 

Leave a Comment
  • Please add 3 and 1 and type the answer here:
  • Post
  • Just wanted to say thanks for the BlockingCollectionPartitioner<T> example. I was running into some wonky issues using Parallel.ForEach and BlockingCollection that this seems to have fixed. I don't normally like coding by accident, so I have a bit of research to do in order to fully understand what was happening and why this fixes it.

    Basically I was running into intermittent issues where the last item or couple of items would never be read from the BlockingCollection even though they were added. From what you have described I'll bet it was waiting indefinitely for a certain "Chunk" size but never getting there.

    Anyway, this fixed it.

    Cheers,

    Josh

  • I ran into the same issue as Josh. I'm convinced it's a core bug, but this seems to fix it.

  • Josh and Mike, thanks for the info!  This issue you were hitting... was the Parallel.ForEach call returning, or did the ForEach call block forever?  If the latter, it would make sense that it was waiting for enough items to fill the chunk, but if those items were never arriving, it would wait forever for them to.  Or, more specifically, it would wait until either enough elements arrived or until the blocking collection's CompletedAdding method was called, which would inform the Parallel.ForEach that no more data would arrive and so it should stop waiting to fill the chunk.

  • My comment didn't seem to post so here it is again:

    It was blocking forever. I wrote it up on connect, and they refered me back here. It wouldn't be obvious from reading the entry that this would be a side effect. Maybe it would be nice to have a small update or new post about this behavior.

  • Hi,

    I wonder if using the new "EnumerablePartitionerOptions.NoBuffering" that comes in v4.5 it may solve these issues. Something like this perhaps?

    Partitioner.Create<T>(blockingCollection.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering)

  • @Hernan: Yes, EnumerablePartitionerOptions.NoBuffering is meant to address such situations, where the consumer must only pull one item at a time, returning one as soon as it's available rather than waiting for multiple items before returning any.

  • Thank you. your BlockingCollectionPartitioner helped me alot!

  • Stephen, regarding GetConsumingPartitioner(), would this also work if the GetDynamicPartitions() override was implemented not with GetConsumingEnumerable(), but with a while loop (that happens to use a timeout and a cancellation token) as follows?

    public override IEnumerable<T> GetDynamicPartitions()

    {

      while (!blockingCollection.IsCompleted)

      {

         T item;

         if (blockingCollection.TryTake(out item, _consumerTimeout, cancellationToken))

         {

            yield return item;

         }

         else

         {

            blockingCollection.CompleteAdding();

         }

      }

    }

  • @Whitney: Using TryTake in a loop like that should be fine.  But I wouldn't recommend calling blockingCollection.CompleteAdding() like that; if the TryTake returns false, I assume you'd just want to yield break.

  • thx

Page 1 of 1 (10 items)