ParallelExtensionsExtras Tour - #8 - ReductionVariable<T>

ParallelExtensionsExtras Tour - #8 - ReductionVariable<T>

  • Comments 4

(The full set of ParallelExtensionsExtras Tour posts is available here.) 

The new .NET 4 System.Threading.ThreadLocal<T> is quite useful when you need per-thread, per-instance storage.  This is in contrast to the fast ThreadStaticAttribute, which supports only per-thread storage (in .NET 4, ThreadLocal<T> actually layers on top of ThreadStaticAttribute to provide the additional per-instance behavior).

ThreadLocal<T> exposes through its Value property the thread-local value of the variable for the current thread.  However, it doesn’t expose in any way a collection of all of the thread-local values that have been created on that ThreadLocal<T> instance.  Having that functionality can be useful in a variety of circumstances.  For example, if you use ThreadLocal<T> to store per-thread instances of some heavyweight and disposable object, you might want to dispose of all of the instances when you’re done using them.  Or if you were implementing a reduction, doing a local reduction on each thread involved in the computation, you would need access to all of the thread-local values at the end in order to do the final cross-thread reduction.

This extra functionality is quite similar to the combinable type provided in Visual C++ 10.  And although ThreadLocal<T> doesn’t provide this functionality out-of-the-box, we can easily layer it on top of it.  This is precisely what the ReductionVariable<T> type in ParallelExtensionsExtras provides (see the ReductionVariable.cs file).

The ReductionVariable<T> type is very similar to ThreadLocal<T> in surface area, the key difference being that it adds a Values property (plural) that’s an IEnumerable<T> in addition to exposing the familiar Value property (singular) that returns a T:

public sealed class ReductionVariable<T>

{

    public ReductionVariable(Func<T> seedFactory);

    public T Value { get; }

    public IEnumerable<T> Values { get; }

    ... // a few additional public helper methods

}

 

The ReductionVariable<T> instance maintains three private fields:

private readonly Func<T> _seedFactory;

private readonly ThreadLocal<StrongBox<T>> _threadLocal;

private readonly ConcurrentQueue<StrongBox<T>> _values;

 

The Func<> is the user-provided delegate used to initialize a thread’s local value when one hasn’t been initialized yet (if the seed factory is null, the default value of T will be used).  The ThreadLocal<> is the thread-local storage, and the ConcurrentQueue<> is used to store all of the values from all of the threads.

Because we need to maintain a list of all of the thread-local values, and because the thread-local values might change over time, we need to wrap the values with an object we can always use to reference each of them.  For that purpose, we utilize the System.Runtime.CompilerServices.StrongBox<T> type, which is simply an object that contains a field of type T.  Then, instead of using a ThreadLocal<T>, we utilize a ThreadLocal<StrongBox<T>>, and instead of using a ConcurrentQueue<T>, we use a ConcurrentQueue<StrongBox<T>>.  In this manner, the Value field of the StrongBox instances may change repeatedly, but the StrongBox<T> object reference will remain constant.  From this, the rest of the implementation falls into place.

The constructor initializes the ThreadLocal<StrongBox<T>> to create a StrongBox<T> that wraps a newly created seed value and adds that new StrongBox<T> instance to the queue of values.

public ReductionVariable(Func<T> seedFactory)

{

    _seedFactory = seedFactory;

    _threadLocal = new ThreadLocal<StrongBox<T>>(CreateValue);

}

 

private StrongBox<T> CreateValue()

{

    var s = new StrongBox<T>(

        _seedFactory != null ? _seedFactory() : default(T));

    _values.Enqueue(s);

    return s;

}

 

The Value property then need only get and set the boxed value’s value:

public T Value

{

    get { return _threadLocal.Value.Value; }

    set { _threadLocal.Value.Value = value; }

}

 

And the Values property need only return the contents of the list.  Of course, we don’t want to return the StrongBox<T> instances, but rather the values they contain, so we’ll use LINQ's Select method to select just the values:

public IEnumerable<T> Values

{

    get { return _values.Select(s => s.Value); }

}

 

With our ReductionVariable<T> in place, we can now utilize it to perform reductions, to track and clean up after thread-local state, and more.  Here’s an example:

var edos = new ReductionVariable<ExpensiveDisposableObject>(

    () => new ExpensiveDisposableObject());

var q = (from d in dataSource.AsParallel

         select ProcessValueWithExpensiveObject(d, edos.Value)).ToArray();

foreach(var edo in edos) edo.Dispose();

Leave a Comment
  • Please add 5 and 4 and type the answer here:
  • Post
  • Its in the title, but pray tell what do you mean by "a reduction" or "performing a reduction".

    I can follow this to a certain point , then I get lost; I suspect because I do not understand what the desired end result is.

    From what I can see a StrongBox creates a piece of immovable memory shared between threads.

    As the assignment to q will not complete until all the calls to ProcessValueWithExpensiveObject have finished.

    Then surely with have finished with the edos; and can safely dispose them.

    What do we gain / what is the benefit of accessing them via a ReductionVariable<> ?

  • "For example, if you use ThreadLocal<T> to store per-thread instances of some heavyweight and disposable object, you might want to dispose of all of the instances when you’re done using them."

    Ahah, so this if the "Process" is really complicated and each thread has (ThreadLocal) private storage inside the ExpensiveDisposable object then this method ensures that one thread can clean up after all the other threads.

    To answer my own question....

  • can you provide a link explaining what is cross-thread reduction?

  • Hi Michael-

    A reduction, sometimes called aggregation, is when you take a bunch of values and combine them together into a new value.  For example, you might have a list of numbers you want to sum.  You could sum each quarter of the list into a partial sum, and then sum those partial sums into the final answer.  That's a reduction.  Each of those partial sums could be computed on a separate thread of execution, such that you'd be summing in parallel, and then at the end you'll have one value per thread that needs to be summed.

Page 1 of 1 (4 items)