Introducing ConcurrentStack < T >

Introducing ConcurrentStack < T >

  • Comments 16

A common problem users run into when writing parallel applications is the lack of the thread-safety support in the .NET collection classes. Users typically need to implement their own synchronization mechanism for achieving the goal of safely reading/writing data to the same shared collection.

A largely deprecated solution was to use the synchronized collections introduced in the .NET Framework 1.0, or to use the SyncRoot mechanism exposed through the ICollection interface.

However, both of these approaches are not recommended to be used, for a variety of reasons, including less-than-ideal performance, but also because by using them, developers can set themselves up for a multitude of race conditions. See http://blogs.msdn.com/bclteam/archive/2005/03/15/396399.aspx for a discussion of these design issues.

Because of all of these reasons, the generic collections introduced in .NET Framework 2.0 do not offer a mechanism of synchronization anymore (i.e. they don’t provide a static Synchronized method), forcing the developers to do the synchronization manually.

Parallel Extensions to the .NET Framework aims to fill this gap by introducing new thread-safe generic collections that don’t suffer from the same issues as their antiquated “Synchronized” relatives. ConcurrentStack<T> is one of them.

In the current moment ConcurrentStack<T> is implemented as a LIFO linked list and uses an interlocked compare exchange for the Push and Pop actions. However, it not recommended as the user to rely on the internal implementation details.

The main and most used methods of a stack data structure are usually Push, Pop, and Peek. A quick look at ConcurrentStack<T> will show that it provides Push, but not Pop and Peek: instead, it provides a TryPop and TryPeek instead. This is quite intentional. One of the most common patterns when using a Stack<T> is to check the stack’s Count, and if it’s greater than 0, pop an item from it and use that item, e.g.:

T item;
if (s.Count > 0)
{
item = s.Pop();
UseData(item);
}

But in a world where this stack is being accessed by multiple threads concurrently, even if the individual Count and Pop methods were thread-safe, we still run into the issue that the stack could be emptied between a successful non-emptiness check and the attempt to pop. ConcurrentStack<T> takes this into account in the APIs it provides. To safely attempt to pop an item from the stack, we can instead write the code:

T item;
if (s.TryPop(out item))
{
UseData(item);
}

In this way using a ConcurrentStack<T> a producers/consumer scenario could be implemented like below:

//initialize an empty concurrent stack

ConcurrentStack<string> m_myConcurrentStack = new ConcurrentStack<string>();

//every producer will produce this number of elements

        const int COUNT = 10;

        public void ConsumeDataFromStack()

        {

            //create the producers

            for (int i = 0; i < COUNT; i++)

            {

         Thread currentProducer = new Thread(new ThreadStart(delegate()

                {

                    for(int currentIndex = COUNT; currentIndex > 0; currentIndex--)

                    {

                        m_myConcurrentStack.Push(

                           Thread.CurrentThread.ManagedThreadId.ToString() + "_" +

                           currentIndex.ToString());

                    }

                }));

                currentProducer.Start();

            }

            //allow the worker threads to start

            Thread.Sleep(500);

            //consume data

            string currentData = "";

            while (m_myConcurrentStack.TryPop(out currentData))

            {

                //ConsumeData(currentData);

            }

        }

PLINQ queries over a concurrent stack

Being an IEnumerable<T> a concurrent stack can be used as data source in PLINQ queries as well.

Below is a sample of such usage.

volatile bool m_producersEnded = false;

const int COUNT = 10;

public void PLinqOverConcurrentStack()

        {

            //create the producers

            Thread[] producers = new Thread[COUNT];

            for (int i = 0; i < COUNT; i++)

            {

                Thread currentProducer = new Thread(new ThreadStart(delegate()

                {

                    for(int currentIndex = COUNT; currentIndex > 0; currentIndex--)

                    {

                        m_myConcurrentStack.Push(currentIndex.ToString());

                    }

                }));

                producers[i] = currentProducer;

            }

            Thread PLINQConsumer = new Thread(new ThreadStart(delegate()

            {

                while (!m_producersEnded)

                {

                    var currentValues = from data in m_myConcurrentStack 

                    where data.Contains("9") select data;

                    foreach (string currentData in currentValues)

                    {

                        //consume data

                    }

                }

            }));

            //start the consumer

            pLinqConsumer.Start();

            //start the producers

            foreach (Thread producer in producers)

            {

                producer.Start();

            }

            //join the producers and the consumer

            foreach (Thread producer in producers)

            {

                producer.Join();

            }

            m_producersEnded = true;

            pLinqConsumer.Join();

        }

Leave a Comment
  • Please add 5 and 3 and type the answer here:
  • Post
  • How about really mixing it up...

    using System;

    using System.Linq;

    using System.Threading;

    using System.Threading.Tasks;

    using System.Threading.Collections;

    class

    ConcurrentStackWithTaskCreate

    {

       const int COUNT = 10;

       static ConcurrentStack<string> _stack;

       public

       static

       void

       ConsumeDataFromStack ()

       {

           Action<int> worker   = (j) => { _stack.Push( Thread.CurrentThread.ManagedThreadId.ToString() + "_" + j ); };

           Parallel.For( 0, COUNT, (i) => Parallel.For( 0, COUNT, j => worker(j) ) );

           Thread.Sleep( 500 );

           // consume data

           string data;

           while( _stack.TryPop( out data ) )

           {

               Console.WriteLine( data );

           }

       }

       public

       static

       void

       Main ()

       {

           _stack = new ConcurrentStack<string>();

           ConsumeDataFromStack();

       }

    }

  • awesome! should we expect ConcurrentQueue soon? ;)

  • Hi Cristina,

    Just to clarify, this ConcurrentStack<T> class does not allow simultaneous readers and writers, correct? One cannot do iterate over the collection (e.g. evaluate a LINQ query) while another thread is modifying the collection, right?

    Another question. Since shared state is one of the big concurrency blockers to using LINQ, have you considered making ConcurrentStack<T> an immutable data structure? This way you wouldn't have to do any interlocked exchanges and consumers wouldn't have to worry about reading the collection on one thread and writing to it on another?

  • Nice article. I hope there are plans to include the TPL in the reference source section of .NET. Developers could learn a lot from the large quantity of comments in the TPL code :)

    Kevin

  • My favorite quote from the post above:

    "However, it not recommended as the user to rely on the internal implantations details. "

  • PingBack from http://blog.cwa.me.uk/2008/06/19/the-morning-brew-118/

  • The June 2008 CTP of Parallel Extensions provides the first look at its 3 rd major piece, a set of coordination

  • Judah, ConcurrentStack<T> allows stack updates from multiple threads. Also allows iterating over it when other threads concurrently update it. However, when we do an enumeration only a snapshot of the current collection is got. Please find below an example of such usage. Paralle.ForEach is used in order to enumerate over the current collection.

    const int COUNT = 300;

    public void EnumeratingOverConcurrentStack()

           {

               ConcurrentStack<string> enumStack = new ConcurrentStack<string>();

               int internalCount = 20;

               //create the worker threads

               Thread[] workers = new Thread[COUNT*2 + 1];

               for (int i = 0; i < COUNT; i++)

               {

                   Thread currentWorker = new Thread(new ThreadStart(delegate()

                   {

                       int currentIndex = COUNT;

                       for (int k = 0; k < internalCount; k++)

                       {

                           m_myConcurrentStack.Push(currentIndex.ToString());

                       }

                   }));

                   workers[i] = currentWorker;

               }

               Thread iterator = new Thread(new ThreadStart(delegate()

               {

                   Parallel.ForEach<string>(m_myConcurrentStack, (s) => enumStack.Push(s));

               }));

               workers[COUNT] = iterator;

               for (int i = 0; i < COUNT; i++)

               {

                   Thread currentWorker = new Thread(new ThreadStart(delegate()

                   {

                       int currentIndex = COUNT;

                       for (int k = 0; k < internalCount; k++)

                       {

                           m_myConcurrentStack.Push(currentIndex.ToString());

                       }

                   }));

                   workers[COUNT + i + 1] = currentWorker;

               }

               //start the worker threads

               foreach (Thread worker in workers)

               {

                   worker.Start();

               }

               //join the threads

               foreach (Thread worker in workers)

               {

                   worker.Join();

               }

               //we can safely get the count - all the worker threads are finished

               Console.WriteLine("Stack count {0}", m_myConcurrentStack.Count);

               Console.WriteLine("Snapshot Stack count {0}", enumStack.Count);

           }

    In regard to your question about using immutable data structures we decided that the current implementation is more useful in the current moment;  its functionality is closer with the current functionality provided by the current synchronized collections.

    Hope that this helps.

  • I have a plea for clarity in the TryXXX patterns of the concurrent collections

    In the post you use the out parameter pattern, which of course is a normal idiom of C#...

    T item;

    if (s.TryPop(out item))

    {

     UseData(item);

    }

    Will you please consider adding Nullable<T> return methods TryXXX methods on the types included in the System.Threading.Collections.

    They would support the following signature.

    public Nullable<T> TryXXX(...);

    To support the following, and IMHO more readable pattern:

    T? item = s.TryPop();

    if (item != null)

    {

      UseData(item);

    }

    I know for sure that I am not the only one that wishes not to have this feature and doesn't want to have to keep creating extension methods on types..

    Thanks,

    Anthony

  • Is it possible to send a ManualResetEvent or set a timeout to the ConcurrentStack<T> methods TryPop or TryPush, so we're able to bail out (for example if there is a lengthy enumeration taking place and the application is being shut down)?

    I ask this because I guess that an enumeration over the structure locks it. Or does it support simultaneous readers and writers?

  • Interesting, so when we do iteration, we're actually iterating over a copy! Cool. Well, that is kind of functional/immutable then, isn't it? I still suggest that a completely immutable data structure may be a better fit for PLINQ, but hey, you guys know your stuff.

    Thanks for the info, that was helpful.

  • Judah wrote:

    "Interesting, so when we do iteration, we're actually iterating over a copy! Cool. Well, that is kind of functional/immutable then, isn't it?"

    Well it's not really a immutability issue, but more an approach at managing state via isolation, since the underlying collection could change it's state after you made your copy.

  • Patrik, TryPop/Push are non-blocking operations.  There's no need to set a timeout, since there's nothing to timeout.  If you want blocking, you can use BlockingCollection<T> (which works with both ConcurrentStack<T> and ConcurrentQueue<T>), and its methods do support timeouts.

  • Chris, the June 2008 CTP also includes a ConcurrentQueue<T> in addition to ConcurrentStack<T> :)

  • PingBack from http://insomniacuresite.info/story.php?id=7920

Page 1 of 2 (16 items) 12