Multiple thread-local state elements in a loop

Multiple thread-local state elements in a loop

  • Comments 13

The Parallel.For/ForEach loop constructs included in Parallel Extensions support a variant of thread-local state to aid in efficiently passing data between loop iterations.  Consider one such overload of Parallel.For:

public static void For<TLocal>(
int fromInclusive, int toExclusive,
Func<TLocal> threadLocalInit,
Action<int, ParallelState<TLocal>> body,
Action<TLocal> threadLocalFinally);

The threadLocalInit delegate is called once per thread for each thread involved in processing loop iterations, and it's called on the thread before the thread processes any iterations of the loop.  Each iteration is then provided with the result through the ParallelState<TLocal> instance that's passed to the body delegate.  As this instance is mutable, the body delegate can update the value in the ParallelState<TLocal>'s ThreadLocalState property for the next iteration to see, though it can also treat the value as read-only if updates aren't relevant.  Finally, after a thread is done executing iterations, the threadLocalFinally delegate is called, provided with the ThreadLocalState value.

As an example of using one of these methods:

Parallel.For(0, N, () => new NonThreadSafeData(), (i,loop)=>
{
    UseData(loop.ThreadLocalState);
});

Here, an instance of NonThreadSafeData is constructed once per thread that's used to process iterations.  In this fashion, the loop body can be sure that no other thread is currently accessing the same instance (providing thread-safety through isolation), and at the same time we create a minimal number of these instances, only enough to ensure that we have plenty to go around for each involved thread.

Another common use for this thread-local state is to support aggregations, where aggregations can be performed without incurring the cost of an interlocked or other expensive synchronization operation for each element, preferring to aggregate locally and then only combine values once per thread:

int total = 0;
Parallel.ForEach(data, ()=>0, (elem,i,loop)=> { loop.ThreadLocalState += Process(elem); }, partial => Interlocked.Add(ref total, partial));

Several developers now have asked me if there's any way to pass multiple pieces of data, rather than just one, between iterations of the loop.  The answer is yes, but doing so requires a bit of extra code.  For example, in the previous code snippet, I'm aggregating the values that result from calling the Process method on each element in the data set.  What if I also wanted to track how many elements there were?  To do so, I can create a small type that serves merely to store multiple values:

class MultipleValues { public int Total, Count; }

With this type in hand, I can now make my thread-local state an instance of this type:

int total = 0, count = 0;
Parallel.ForEach(data, 
()=>new MultipleValues { Total=0, Count=0 },
(elem,i,loop)=> { loop.ThreadLocalState.Total += Process(elem); loop.ThreadLocalState.Count++; }, partial => { Interlocked.Add(ref total, partial.Total); Interlocked.Add(ref count, partial.Count); });

It would be nice for these situations if we didn't have to declare such a type and if we could instead take advantage of anonymous types in C# and Visual Basic.  Unfortunately for this situation, in C# anonymous types generate read-only properties, which means they're not appropriate if you need to update the generated properties, as I'm doing here by incrementing them.  Of course, if you don't need to modify the property values (as was the case with my initial NonThreadSafeData example), or if you're using a language (like Visual Basic) where anonymous types aren't read-only by default, anonymous types could be quite beneficial in this regard.

Leave a Comment
  • Please add 4 and 6 and type the answer here:
  • Post
  • I would like to point out that anonymous types in VB are NOT read-only, unless you specifically ask for it.

  • Excellent point, thanks.

  • Excellent post. Regarding the last sample, how to implement the max or min or average value within the Parallel.For. Or simple,How to compare values between threads.

  • In the June 2008 CTP, PLINQ aggregations are more powerful than they were in the December 2007 CTP. The

  • Easiest way would just be to use PLINQ.  For example, to compute a max of some computation on a set of values:

    var max = (from value in values.AsParallel()
                    select SomeComputation(value)).Max();

    You can do it with TPL, but it takes more code, for example something like:

    var max = Int32.MinValue;
    object obj = new object();
    Parallel.ForEach(
       values, ()=>Int32.MinValue, (value,i,loop) =>
    {
       var c = SomeComputation(value);
       if (c > loop.ThreadLocalState)
           loop.ThreadLocalState = c;
    },
    c => { lock(obj) if (c > max) max = c; });

  • Can you help me out with using multiple threads to access multiple functions at the same time

  • Samir, can you provide more details on what you're trying to accomplish? An example?

  • Need help with this code. If password is incorrect it loops all three times without allowing user to enter anything different.

    Private Sub btnOk_Click(ByVal sender As System.Object, ByVal e As System.EventArgs) Handles btnOk.Click

           Dim intcounter As Integer = 1

           'Validate password

           Do Until intcounter > 3

               If txtPassword.Text.ToUpper = "PASSWORD" Then

                   Main.ShowDialog()

                   Me.Close()

               Else

                   MessageBox.Show("Re-Enter Password!", "Incorrect Password " & intcounter, MessageBoxButtons.OK)

               End If

               intcounter = intcounter + 1

           Loop

           MessageBox.Show("Invalid Password....Contact Your Administrator", "Incorrect Password", MessageBoxButtons.OK, MessageBoxIcon.Error)

           Me.Close()

       End Sub

  • nevermind too slow I got it already

  • I have an interesting problem that I can't seem to find any info on.  I'm doing a Parallel.ForEach and trying to use TLocal to manage my thread state.  My problem is that my thread state is a decent-sized object and my parallel body is slow.  So, TPL is starting up hundreds of Tasks until I run out of memory from all the TLocal instances that are being initialized.  I have MaxDegreeOfParallelism set to 10 but it's running hundreds of Tasks inside of those threads.  How can I prevent this?  My original implementation managed the threads explicitly but I was hoping to get away from that and let .NET decide how many threads my machines can handle.

    Thanks!

  • Hi Wes-

    As you've discovered, Parallel.For/ForEach won't use more concurrency than the MaxDegreeOfParallelism, but that doesn't mean they're limited to a specific number of tasks.  In fact, these constructs will recycle tasks every now and then in order to give the underlying scheduler a chance to reclaim the threads, so that the overall number of threads available in the system may be decreased, or so that the threads may be reused elsewhere, such as in another AppDomain.

    One way to solve your problem is to not use the local state support that's part of the loop, but to instead use a type like ThreadLocal<T> to maintain the state, e.g.

    var state = new ThreadLocal<DecentSizedObject>(() => new DecentSizedObject());

    Parallel.For(..., i=>

    {

       DecentSizedObject dso = state.Local;

       ... // use dso here

    });

    If you need to clean up the objects after the loop, you can keep track of the objects created by the ThreadLocal<T> constructor, e.g.

    var list = new ConcurrentQueue<DecentSizedObject>();

    var state = new ThreadLocal<DecentSizedObject>(()=> {

       var obj = new DecentSizedObject();

       list.Enqueue(obj);

       return obj;

    });

    Parallel.For(..., i=>

    {

       ...

    });

    foreach(var obj in list) { Cleanup(obj); }

    Or with .NET 4.5 you can use the new Values property to be able to enumerate all of the created objects.

  • how to write logging for parallel.foreach ?  for each thread i want to write in a file with different name

    and the name comes dynamically from the code

    in the below case site  will be filename.........[ site1,site2,site3,site4]

    i need 4 files[ site1,site2,site3,site4] but i am getting 2 or 3 files randomly bcs before it writes one log file the value of the site gets changed and it will skip the file

    for ex

    static void Main(string[] args)

           {

               Program p=new Program();

              // LogEntry log = new LogEntry();

               string requestid = "SQ1234";

               Constants.AppConfigData.QuoteId = requestid;

               Array site = new Array[10];

               List<string> lst = new List<string> {"site1","site2","site3","site4" };

               var abc=(from obj in lst select obj);

               //  for (int site = 0; site < 10; site++)

               Parallel.ForEach(abc, site1 =>

               {

                   Constants.AppConfigData.siteid = site1;

                   Task[] Taskab = new Task[2];

                  Taskab[0] = Task.Factory.StartNew(() => p.gretings(site1, requestid));

                  Taskab[1] = Task.Factory.StartNew(() => p.welcome(site1, requestid));

                  Log.WriteToLog(site1, requestid, Constants.LogLevel.Sitelevel, Constants.ComponentType.sqe);

                  // Task.WaitAll(Taskab);

               });

              // Task.Factory.StartNew(() => p.logent());

              // p.gretings();

              // p.logent();

             //  Logger.Write(log);

               Log.WriteToLog("main message", requestid,Constants.LogLevel.Quotelevel,Constants.ComponentType.sqe);

           }

  • @prasad s g: Questions like this are better asked on the forum at social.msdn.microsoft.com/.../threads . That said, with your parallel loop, multiple threads will be executing the body delegate, each with a different input.  But each of those concurrent invocations is modifying and reading from shared state, e.g. Constants.AppConfigData.  I expect that's the cause of your problem.

Page 1 of 1 (13 items)