Note: Cross posted from Sajay.
Permalink

Most common data structures are usually meant for single threaded access and queues are no exception. When there are multiple producers writing to the queue we usually need to make sure that the writer is given a unique index that he can put the element into. If our operations are synchronized already and thread safe we can do something like this 

public void Enqueue(T item) 
{  
    ...  
    tail = (tail+1) % size;  
    items[tail] = item;  
}

On the other hand if 2 threads simultaneously got to the highlighted line then they could even overwrite each other if the read the value of the tail at the same time. Now what are the issues we face.

  1. The read might be stale.
  2. One thread might get an old value and corrupt state.
  3. The value of the tail is corrupted and would not actually get incremented for each thread.

We could consider using Interlocked increment and get away with something like this

public void Enqueue(T item) 
{ 
    int next = Interlocked.Increment(ref tail) % size; 
    ... 
    items[next] = item; 
} 

Hmm this doesn't seem right. Well the issue is the wrap around behavior for interlocked.Incremement. Once it reaches Int32.Max value it then gets back to Int32.Min value and then on it's pretty much useless as the next count would read something like (Int32.MinValue%size) wiz quite a an odd value. For example for size 100 we could end up with indexes like this

. 46,47,-48,-49

Lets look at another approach which is CompareExchage. Some fundamentals about Interlocked.CompareExchange

  1. It returns the latest value for the location.
  2. It will exchange the value if it matches the comperand atomically. 

From this we can deduce that if a thread tried to increment a value and failed then someone already updated it. This also means that we might not succeed in a single try and hence would need to retry until till we successfully get a slot which we will own. The fact that we were able to increment it also means that no other thread will get this slot. So now lets look at the implementation.

class LockFreeCircularIterator 
{
  int index = 0;
  readonly int limit;
  
  public LockFreeCircularIterator(int limit)
  {
    this.limit = limit; 
  }
  public int GetNext()
  {
    int slot = this.index;
    while(true)
    {
      if(slot == (slot = Interlocked.CompareExchange(ref this.index, (slot+1) % limit, slot)))
      {
        return slot;
      }
    }
  }
}

Here we see that we iterate till the index was updated with the incremented value. Each time it failed the new value of the index was read and attempted again. This was one of the implementations of the iterator. This allows multiple threads to write to a circular buffer. I'm omitting the usage as there are quite a few places this could come in hand.