Blocking Collection and the Producer-Consumer Problem

Blocking Collection and the Producer-Consumer Problem

Rate This
  • Comments 14

This time I want to discuss features that belong to the new System.Collections.Concurrent namespace in the.NET Framework 4. When you design parallel applications, you often need thread-safe data storage as well as some mechanism of sending messages between tasks. Once again, this post will touch on just the basics and the most common problems a beginner might encounter, but I’ll provide links for further reading.

This is the fourth post in the parallel programming series. Here’s a list of all the posts:

To keep things short, I’ll start with the code that I have at the end of the Task Schedulers and Synchronization Context post. This is a small parallel WPF application with a responsive UI that has one Start button and displays the results of long-running operations in a text box.

public partial class MainWindow : Window
{
    public MainWindow()
    {
        InitializeComponent();
    }

    public static double SumRootN(int root)
    {
        double result = 0;
        for (int i = 1; i < 10000000; i++)
        {
            result += Math.Exp(Math.Log(i) / root);
        }
        return result;
    }

    private void start_Click(object sender, RoutedEventArgs e)
    {
        textBlock1.Text = "";
        label1.Content = "Milliseconds: ";

        var watch = Stopwatch.StartNew();
        List<Task> tasks = new List<Task>();
        var ui = TaskScheduler.FromCurrentSynchronizationContext();
        for (int i = 2; i < 20; i++)
        {
            int j = i;
            var compute = Task.Factory.StartNew(() =>
            {
                return SumRootN(j);
            });
            tasks.Add(compute);

            var display = compute.ContinueWith(resultTask =>
                            textBlock1.Text += "root " + j.ToString() + " " +
                                               compute.Result.ToString() +
                                               Environment.NewLine,
                                ui);
        }

        Task.Factory.ContinueWhenAll(tasks.ToArray(),
            result =>
            {
                var time = watch.ElapsedMilliseconds;
                label1.Content += time.ToString();
            }, CancellationToken.None, TaskContinuationOptions.None, ui);
    }
}

But imagine that I’m designing a larger application and I need to store the results of the long-running parallel operations somewhere. (I don’t do this in the current version at all.)

Until .NET Framework 4, this task was challenging for C# developers: the collections in the System.Collections and System.Collections.Generic namespaces do not guarantee thread safety, and developers needed to design the locking and synchronization mechanisms themselves. But now generic thread-safe collections are a part of the .NET Framework. So, let me introduce the new namespace: System.Collections.Concurrent.

I’m going to use the BlockingCollection<T> class. This class can help you implement the well-known producer-consumer pattern, where items are produced and consumed by different operations at different rates. I will update my application to imitate the producer-consumer scenario, so that the compute task will become a producer, and the display task will become a consumer.

var results = new BlockingCollection<double>();

I’ll also update the compute task, so that instead of returning the result, it will add it to the results collection.

var compute = Task.Factory.StartNew(() =>
{
    results.Add(SumRootN(j));
});

When all the results are ready, I’ll set the collection to the “completed” state, so the consumer will know that this collection won’t be updated anymore. For this purpose, I’ll call the CompleteAdding method, which will set the IsCompleted property of the results collection to true. The good place to perform this operation is the task that calculates the total time: it waits for all other tasks to finish, which is exactly what I need.

Task.Factory.ContinueWhenAll(tasks.ToArray(),
    result =>
    {
        results.CompleteAdding();
        var time = watch.ElapsedMilliseconds;
        label1.Content += time.ToString();
    }, CancellationToken.None, TaskContinuationOptions.None, ui);

As you can see, the producer part is easy: all tasks can safely write to the results collection, and all locking and synchronization issues are managed by the .NET Framework and TPL.

Now let’s move to the consumer side. I’ll do a small refactoring: I’ll convert the display task into a consume task that will run the display method:

var consume = Task.Factory.StartNew(() => display(results));

I want to start the consume task before I start any of the compute tasks so that the consumer can wait for the producer and I can see the real-time results. That’s why I put the above line right before the main for loop in the button’s event handler.

This is what the naïve first version of the display method might look like. (Don’t forget to convert the ui task scheduler into a field. It’s a local variable in the original code.)

public void display(BlockingCollection<double> results)
{
    double item;
    while (!results.IsCompleted)
    {
        while (!results.TryTake(out item));
        double currentItem = item;
        Task.Factory.StartNew(new Action(() =>
                  textBlock1.Text += currentItem.ToString() + Environment.NewLine),
             CancellationToken.None, TaskCreationOptions.None, ui);
    }
}

This method checks for new elements in the collection, until it’s notified that the collection is completed, which means that it has finished adding new items. If it gets a new element from the collection, it immediately removes the item and prints the value into the UI threads. Did you notice that I copied item to currentItem? It’s all about closure again: you’ll get a list of zero’s otherwise.

This version works and you won’t get any exceptions. But if you run it on a dual-core computer like I did, you’ll discover that it takes twice as long as the version that doesn’t use the collection. In fact, it runs as if the application weren’t parallelized at all! This is just one of the problems that you might run into, so don’t forget to always measure the performance of your parallel applications: it’s easy to cancel out the benefits of parallelization.

Of course, the problem is in this line:

while (!results.TryTake(out item));

An empty loop is rarely a good idea. This was an attempt to implement some kind of messaging between the threads – the consumer is constantly checking the collection and starts working only if it can retrieve a value from it. And it does this over and over again, so one of my processors is fully occupied with this work and can’t compute the values anymore.

One simple trick is to make this loop to consume less processing power. It can be as easy as this (however, this is not a recommended way, but rather an illustration of the principle):

while (!results.TryTake(out item)) Thread.Sleep(200);

Now after each attempt the task simply waits for 200 milliseconds before trying again. And during those 200 milliseconds the processor can compute the results this task is actually waiting for. You can compile and run the code to make sure that the performance indeed improved.

However, it might be tricky to find the perfect wait time. Ideally, I need some kind of message from the collection notifying me that the value was added.

In a blocking collection, you can do this by using a foreach loop. The BlockingCollection class has the GetConsumingEnumerable method that can be used to enumerate through the blocking collection and consume its elements until the collection is completed. It might look like this:

public void display(BlockingCollection<double> results)
{
    foreach (var item in results.GetConsumingEnumerable())
    {
        double currentItem = item;
        Task.Factory.StartNew(new Action(() =>
             textBlock1.Text += currentItem.ToString() + Environment.NewLine),
        CancellationToken.None, TaskCreationOptions.None, ui);
    }
}

Now the display method checks whether there is an item in the results collection and, if there is, consumes the item. When the collection is completed and empty, execution exits the loop. All the locking, synchronization, and messaging between the tasks are managed by the TPL.

The resulting application will probably still be a little slower than the version that didn’t use the collection at all, but of course writing to and reading from thread-safe data storage added some overhead.

If you got lost in all the changes, here’s the full code:

public partial class MainWindow : Window
{
    TaskScheduler ui = TaskScheduler.FromCurrentSynchronizationContext();

    public MainWindow()
    {
        InitializeComponent();
    }

    public static double SumRootN(int root)
    {
        double result = 0;
        for (int i = 1; i < 10000000; i++)
        {
            result += Math.Exp(Math.Log(i) / root);
        }
        return result;
    }

    private void start_Click(object sender, RoutedEventArgs e)
    {
        textBlock1.Text = "";
        label1.Content = "Milliseconds: ";

        var results = new BlockingCollection<double>();
        var watch = Stopwatch.StartNew();
        List<Task> tasks = new List<Task>();

        var consume = Task.Factory.StartNew(() => display(results));

        for (int i = 2; i < 20; i++)
        {
            int j = i;
            var compute = Task.Factory.StartNew(() =>
            {
                results.Add(SumRootN(j));
            });
            tasks.Add(compute);
        }

        Task.Factory.ContinueWhenAll(tasks.ToArray(),
            result =>
            {
                results.CompleteAdding();
                var time = watch.ElapsedMilliseconds;
                label1.Content += time.ToString();
            }, CancellationToken.None, TaskContinuationOptions.None, ui);
    }

    public void display(BlockingCollection<double> results)
    {
        foreach (var item in results.GetConsumingEnumerable())
        {
            double currentItem = item;
            Task.Factory.StartNew(new Action(() =>
                 textBlock1.Text += currentItem.ToString() + Environment.NewLine),
            CancellationToken.None, TaskCreationOptions.None, ui);
        }
    }
}

For now, this is the last post in my parallel programming series. I hope that I’ve provided enough information and examples and bumped into and recovered from enough problems to enable even beginners to continue on their own. (At least I asked fewer questions while writing this post than I did for the first one!)

If you want to learn more about blocking collections, check out BlockingCollection Overview in the MSDN Library and BlockingCollection Extensions on the Parallel Programming with .NET blog.

I could not cover all the features provided by the TPL. If you want to see what else is available, here are some links:

  • Data Structures for Parallel Programming. This MSDN topic lists .NET 4 classes that are useful for parallel programming, such as thread-safe collections, synchronization primitives, and lazy initialization classes.
  • Introduction to PLINQ. Parallel Language Integrated Query, or PLINQ, enables quick and easy parallelization of LINQ queries.

Two more links that I used a lot in this series are Parallel Programming in the .NET Framework on MSDN and the Parallel Programming with .NET team blog.

P.S.

Thanks to Dmitry Lomov, Michael Blome, and Danny Shih for reviewing this and providing helpful comments, to Mick Alberts for editing.

Leave a Comment
  • Please add 6 and 1 and type the answer here:
  • Post
  • Alexandra,

    Is there any thread safe Observable collection in the System.Collections.Concurrent namespace so that consumer should not need to query (pull) the collection, but be notified (push) instead?

    Cheers.

  • -1 for treating Thread.Sleep as an acceptable option. Other than that, this is quite a good post.

  • @Luciano

    As far as I know, not yet.

    @Stephen

    It just was an attempt to illustrate that parallel programming is quite different from sequential programming (you pause the execution and everything works faster). I would not say it's a recommended way of doing things. But I probably was not clear enough in the original post. I tried to make it more explicit.

  • It's probably also worth it to note that IsCompleted will only be true once no more itms will be added *and* the collection is empty.

  • Please please tell me that we're going to get (at least) some of these on the Compact Framework (NETCF), or what ever framework is going to target mobile devices...  Its a real pain having to implement them oneself, and particularly as lots of the event methods are missing SignalAndWait, WaitAll, etc.

    (hmm did my last [Post] work??)

  • Interesting sample. I have created a small framework in .NET 3.5 to implement producer/consumer flows (code.google.com/.../batchflow). I feel that this code could probably also be implemented much easier using the Parallel FX, but I haven't really seen a convincing example yet. This is a nice step 1.

    Have you tried what the memory use is when you start not 20, but thousands of these small Tasks using Task.Factory.StartNew()? Producer/Consumer is normally implemented to prevent having to load all of the items in memory at the same time. It seems that your reader does this nicely, but the way you start it up still loads everything into memory and does not use the blocking feature. Or does the factory do some throttling?

  • If the producer calls a non-blocking async method who's handler adds the result to a BlockingCollection, when can you signal the collection's CompleteAdding() method?

  • Alexandra, you have a talent for making complex things seem simple. Please write a book on C# and publish it on lulu.com or similar. I promise I will buy it.

  • Hello. Can you check this thread. I am having really difficulties. There is no example about how to give parameters on runtime and how to keep certain number of tasks alive all the time.

    social.msdn.microsoft.com/.../6fa6fd12-85c5-4489-81b1-25bc0126d406

  • Hello. plz i need answer for this problem .....

    Rewrite producer and consumer (Bounded Buffer Problem) codes to accomodate for producing and consuming items concurrently when producer and consumer processes are dealing with item at different buffer locations

  • plz send answer to  ((   eng_gazzar_fcih@yahoo.com   ))  thx

  • The code above (listing 6) - might it run to a deadlock?

    My reasoning:

    public void display(BlockingCollection<double> results)

    {

       double item;

       while (!results.IsCompleted)  // 1. Checks and sees that (IsCompleted == false), so enters the loop's body.

       {

           //2a, Meantime, producer marks collection as Completed and other threads are faster to consume items.

          //2b, The collection gets emptied and blocked for adding.

          while (!results.TryTake(out item));  

          //3, "while" blocks forever as nothing will ever be added to already empty collection

           double currentItem = item;

           Task.Factory.StartNew(new Action(() =>

                     textBlock1.Text += currentItem.ToString() + Environment.NewLine),

                CancellationToken.None, TaskCreationOptions.None, ui);

       }

    }

  • Is it problem that you never wait on the consume task?

    What if it throws an exception?

  • Sorry, but isn't the point of a BlockingCollection that it has a Take() method, which will block until an item is available? Which would mean that all the while (TryTake...) stuff, and the GetConsumingEnumerable stuff, is unnecessary.

    In fact, the whole point of the BlockingCollection is to *make* it unnecessary. Isn't it? What am I missing?

Page 1 of 1 (14 items)