ParallelExtensionsExtras Tour - #10 - Pipeline

ParallelExtensionsExtras Tour - #10 - Pipeline

  • Comments 5

(The full set of ParallelExtensionsExtras Tour posts is available here.) 

Producer/consumer is a fundamental pattern employed in many parallel applications.  With producer/consumer, one or more producer threads generate data that is consumed by one or more consumer threads.  These consumers can themselves also be producers of further data, typically based on the data they were consuming, which is then consumed again by one or more consumers.  And so on. This forms a pipeline, where each level of consumers represents another “stage” in the pipeline, like workers on an assembly line.

Consider an algorithm that will compress and then encrypt blocks of data.  The nature of the employed algorithms is such that there is carry-over data from one block to the next, which means we must compress and encrypt blocks in order.  We can’t run the compression and encryption algorithm on independent blocks concurrently, but we can run the compression concurrently with the encryption, where while we’re encrypting the compressed block N we can be compressing block N+1.  This is naturally modeled as a pipeline, e.g.:

var compressAndEncrypt =

    Pipeline.Create(rawChunk => Compress(rawChunk))

            .Next(compressedChunk => Encrypt(compressedChunk));

 

IEnumerable<byte[]> inputChunks= ...;

IEnumerable<byte[]> results =
    compressAndEncrypt.Process(inputChunks);

 

There are many ways pipelines can be implemented, and in some cases, you want to dedicate one or more threads to processing each stage of the pipeline.  The Pipeline class in the Pipeline.cs file in ParallelExtensionsExtras provides this functionality, delivering a simple API on top of relatively powerful functionality. Pipeline uses a Task to represent each stage of the pipeline.  This Task is executed on a ThreadPerTaskScheduler so that each stage gets its own dedicated thread.  Further, each stage supports a configurable degree of parallelism, such that multiple dedicated threads may be used to process each stage.  To handle this, the Task uses a Parallel.ForEach to process the input enumerable to that stage, where the Parallel.ForEach is also targeted to the ThreadPerTaskScheduler instance.  An instance of BlockingCollection<T> is employed between each stage as the channel for passing data between the producer and the consumer, and the entire pipeline is cancelable with a CancellationToken.

Leave a Comment
  • Please add 5 and 8 and type the answer here:
  • Post
  • Your library is of great beauty.

  • Hello,

    Could someone explain me why this does not seam to execute.

    Thanks

       class Program

       {

           static void Main(string[] args)

           {

               var generateAndArchive =

                   Pipeline.Create<int, long>(i => Generate(i))

                           .Next(i => Archive(i));

               IEnumerable<int> inputChunks = Enumerable.Range(1, 10000);

               IEnumerable<long> results = generateAndArchive.Process(inputChunks);

               Console.Read();

           }

           private static long Generate(int i)

           {

               Console.WriteLine("Generate" + i.ToString());

               return i;

           }

           private static long Archive(long i)

           {

               Console.WriteLine("Archive" + i.ToString());

               return i;

           }

       }

  • Hi Lombaers-

    As the Pipeline component is currently implemented, it won't start processing until you begin enumerating the results, e.g. foreach(var item in results) { }.  This behavior could certainly be changed, that's just how it currently exists.

  • Hi, is it recommended to uses TPL dataflow instead of Pipelines, since it seems the former does everything that latter can?

  • Hi Huseyn-

    Moving forward, yes, we would likely recommend folks use TPL Dataflow instead of a custom pipelining solution like the one above.  That said, there is a difference between them: this sample uses blocking whereas TPL Dataflow strives to be as asynchronous and non-blocking as possible.  In general, the latter is what you want, only utilizing threading resources when there's actually work to be done.  However, there may be some niche cases where you'd prefer to block or spin, in which case a custom solution like the one presented here might be more appropriate.  My recommendation: use TPL Dataflow for such pipelining scenarios unless you find there's something you need that it doesn't provide.

Page 1 of 1 (5 items)