Welcome to MSDN Blogs Sign in | Join | Help

Blocking queues

In many concurrent systems, one thread performs some work, the result of which another thread consumes. This producer/consumer pattern is frequently implemented on top of blocking queues.

If you examine the behavior of System.Collections.Queue and System.Collections.Generic.Queue<T>, you’ll find that they both throw InvalidOperationException from the Dequeue method (which attempts to remove and return an item from the queue) if there are no items in the Queue (and, thus, there’s nothing to be removed). For a producer/consumer scenario, that behavior is typically suboptimal. A producer creating results can store those results into a queue, and a consumer can retrieve those results from the queue in order to process them. But for a concurrent application, where the producer and consumer are executing on separate threads (and where there might be multiple instances of both the producers and consumers executing simultaneously), synchronization mechanisms are necessary on top of queue. More specifically, typically a design calls for a consumer to block until an item is available for it to consume. Rather than attempting to reimplement such blocking functionality every time we need it, we can wrap it up into a new type:

class BlockingQueue<T> : IDisposable
{
    private Queue<T> _queue = new Queue<T>();
    private Semaphore _semaphore = new Semaphore(0, int.MaxValue);

    public void Enqueue(T data)
    {
        if (data == null) throw new ArgumentNullException(“data”);
        lock (_queue) _queue.Enqueue(data);
        _semaphore.Release();
    }

    public T Dequeue()
    {
        _semaphore.WaitOne();
        lock (_queue) return _queue.Dequeue();
    }

    void IDisposable.Dispose()
    {
        if (_semaphore != null)
        {
            _semaphore.Close();
            _semaphore = null;
        }
    }
}

BlockingQueue<T> contains two private fields and exposes two public methods (plus an implementation of IDisposable to allow for timely cleanup). The public methods mimic those of Queue<T>: Enqueue, accepting the data of type T to be enqueued, and Dequeue, returning an item of type T. Enqueue has the same semantics as Queue<T>, except BlockingQueue<T>.Enqueue is thread-safe (Queue<T>.Enqueue is not). Dequeue, however, has very different semantics from both Queue.Dequeue and Queue<T>.Dequeue. In addition to being thread-safe (which the other Dequeue methods are not), it also blocks until it knows for sure it'll be able to dequeue an item from the queue.

The first of the two private methods is fairly self-explanatory. The _queue member of type Queue<T> is the actual queue that's storing the data. Any calls to BlockingQueue<T>.Enqueue or BlockingQueue<T>.Dequeue delegate to the corresponding methods on _queue. However, as you can see a few additional things are taking place. First, a monitor is used to synchronize access to the queue. But more importantly, a counting semaphore is used to keep track of the number of items that are available to be removed from the queue. The semaphore starts with a count of 0, which means that any calls to Dequeue will block. Every call to Enqueue increments the semaphore's counter, allowing another non-blocking call to Dequeue.

This, of course, isn't the only way to build a BlockingQueue, nor do we absolutely require the System.Threading.Semaphore class. For example, you can implement a semaphore in purely managed code with a simple class like the following:

public class ManagedSemaphore
{
    private int _count;
    private object _lock;

    public ManagedSemaphore(int initialCount)
    {
        if (initialCount < 0) throw new ArgumentOutOfRangeException("initialCount", "Initial count must be >= 0.");
        _count = initialCount;
        _lock = new object();
    }

    public void WaitOne()
    {
        lock (_lock)
        {
            while (_count <= 0) Monitor.Wait(_lock);
            _count--;
        }
    }

    public void Release()
    {
        lock (_lock)
        {
            _count++;
            Monitor.Pulse(_lock);
        }
    }
}

Like System.Threading.Semaphore, this ManagedSemaphore class exposes a WaitOne and a Release method. Internally, it has two member fields, an integer count and an object used for a monitor lock. The Release method aquires the lock, and while holding it, increments the internal count and then signals a single thread that might be waiting on the same lock that it should wake up and attempt to retrieve an item (if no thread is waiting, the next time one does call WaitOne, it'll see a positive count). The WaitOne method similarly takes the lock. While holding the lock, it checks the internal count. If the count is positive, it simply decrements the count and returns. However, if the count is 0, it uses the Monitor.Wait method to temporarily release the lock and wait for it to be signaled by another thread (which will happen when another thread calls Release).

We could substitute this ManagedSemaphore for Semaphore in our BlockingQueue<T>. However, given that this is all managed code, we could also incorporate the semaphore's implementation directly into BlockingQueue<T>:

class BlockingQueue<T> : IEnumerable<T>
{
    private int _count = 0;
    private Queue<T> _queue = new Queue<T>();

    public T Dequeue()
    {
        lock (_queue)
        {
            while (_count <= 0) Monitor.Wait(_queue);
            _count--;
            return _queue.Dequeue();
        }
    }

    public void Enqueue(T data)
    {
        if (data == null) throw new ArgumentNullException("data");
        lock (_queue)
        {
            _queue.Enqueue(data);
            _count++;
            Monitor.Pulse(_queue);
        }
    }
}

While nothing official, in a few basic tests this does show some performance improvements over the original BlockingQueue<T> implemented using System.Threading.Semaphore, clocking in at around 3x the speed.

We can further extend BlockingQueue<T> to implement IEnumerable<T>, making it easy to consume all of the items "in" the queue from a foreach loop:

class BlockingQueue<T> : IEnumerable<T>
{
    private int _count = 0;
    private Queue<T> _queue = new Queue<T>();

    public T Dequeue()
    {
        lock (_queue)
        {
            while (_count <= 0) Monitor.Wait(_queue);
            _count--;
            return _queue.Dequeue();
        }
    }

    public void Enqueue(T data)
    {
        if (data == null) throw new ArgumentNullException("data");
        lock (_queue)
        {
            _queue.Enqueue(data);
            _count++;
            Monitor.Pulse(_queue);
        }
    }

    IEnumerator<T> IEnumerable<T>.GetEnumerator()
    {
        while (true) yield return Dequeue();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<T>)this).GetEnumerator();
    }
}

This allows code like the following:

BlockingQueue<int> _queue = new BlockingQueue<int>();
...
foreach(int item in _queue)
{
    ...
}
Note that, as written, any code after the foreach loop won't actually execute, since the foreach is, for all intents and purposes, an infinite loop. The implementation of the enumerator (as implemented by the C# compiler) will block on _queue.Dequeue until another thread Enqueues an item into _queue. If you want to follow a pattern like this, unless you actually do want an infinite loop, you can use a break statement to get out of it if some condition is true. For example, if you only wanted the first five items, you could modify the loop as follows:
int count=0;
foreach(int item in _queue)
{
    ...
    if (5 == ++count) break;
}
Published Wednesday, April 12, 2006 1:19 PM by toub
Filed under: ,

Comment Notification

If you would like to receive an email when updates are made to this post, please register here

Subscribe to this post's comments using RSS

Comments

Thursday, April 13, 2006 1:41 AM by Stephen Toub

# Bounded blocking queues

In my last post, I took a look at implementing blocking queues in .NET using semaphores (both System.Threading.Semaphore...
Thursday, April 13, 2006 7:16 AM by Jason Haley

# Interesting Finds

Monday, August 28, 2006 9:34 PM by Arun

# re: Blocking queues

Hi,

I tried using the Blocking Queue example using semaphore, I just converted the code from C# to VB.NET , It works fine but with frequent Enqueue and Dequeue there is a block during Dequeuing the data from the queue which consumes CPU Cycle and Performance is hit badly.I am new to Threading and Queuing stuff and dont understand whats going wrong

Thanks and Regards
Arun
You can reply me back at arunv.vijay@gmail.com
Tuesday, August 29, 2006 11:03 AM by toub

# re: Blocking queues

If there are no items in the queue, it's meant to block during the dequeue operation (that's the point of this implementation).  But a monitor wait should not consume CPU resources; how did you come to the conclusion that it is doing so?
Thursday, August 16, 2007 6:51 AM by Sean

# re: Blocking queues

Very cool, just wondered how you would implement Teardown?

e.g

A thread is blocked within the foreach, there are no items in the queue. Some Business logic what's the thread to quit, but its bloked in the foreach?

Thursday, August 16, 2007 1:17 PM by toub

# re: Blocking queues

It'd be a bit more difficult to do with the version that uses a managed-only semaphore implementation, but it'd be relatively easy to do with the version that uses System.Threading.Semaphore.  Basically, you'd have an additional internal WaitHandle, probably a ManualResetEvent, that would be initialized to non-signaled; let's call it _cancel.  Then, in Dequeue, instead of calling _semaphore.WaitOne, you'd call WaitHandle.WaitAny(new WaitHandle[]{_semaphore, _cancel}). You'd add a Cancel method or Teardown or whatever you want to call it, and all it does is signal the _cancel WaitHandle.  Then, when WaitAny returns, you check to see whether it was the semaphore that was signaled or the cancel wait handle, based on the return value from WaitAny.  If it was the semaphore, you do the logic shown; if it was the cancel, you bail out using whatever programming model you prefer (an exception, or changing the signature to return a boolean with an out parameter for the data, etc.)  You could be fancier than this, but that's the basic idea.

Wednesday, November 28, 2007 11:48 AM by Stephen Toub

# Bounded blocking queues

In my last post , I took a look at implementing blocking queues in .NET using semaphores (both System.Threading.Semaphore

Wednesday, March 26, 2008 12:35 PM by jdsvri

# re: Blocking queues

does this work in the Compact Framework 2.0?

Tuesday, February 03, 2009 2:01 PM by Matt

# re: Blocking queues

Won't your implementation of ManagedSemaphore cause a deadlock when one thread is waiting and another releases?

Tuesday, February 03, 2009 3:47 PM by toub

# re: Blocking queues

Why do you believe it will cause a deadlock?  Can you outline the steps that would cause that to happen?

Leave a Comment

(required) 
required 
(required) 
 
Page view tracker