More Powerful Aggregations in PLINQ

More Powerful Aggregations in PLINQ

  • Comments 3

In the June 2008 CTP, PLINQ aggregations are more powerful than they were in the December 2007 CTP. The reason why they are more powerful is a bit subtle, but this new power enables many useful scenarios, so it is worth it to follow along with the explanation.

To understand where the difference is coming from, let’s quickly review how aggregations work in LINQ to objects. As an example, consider a possible implementation of the Average operator, sans error handling:

    public static double Average(this IEnumerable<int> source)

    {

        return source.Aggregate(

            new double[2],

            (acc, elem) => {

                acc[0] += elem; acc[1]++; return acc;

            },

            acc => acc[0] / acc[1]);

    }

 

The call to Aggregate takes three arguments:

-          The initial accumulator value.

-          A fold function that updates the accumulator for each element.

-          A result conversion function that converts the final accumulator value to the actual answer.

 

Aggregate will initialize an accumulator to the value passed in the first argument: an array containing two doubles. Then, it will iterate over all elements in the input sequence, and maintain the sum of the elements seen so far in the first element of the accumulator array, and their count in the second element.  Finally, to produce the desired answer, Aggregate will call the result conversion function, which divides the sum by the count, and returns the result.

 

Unfortunately, to parallelize the above example in PLINQ, it is not sufficient to simply add AsParallel on the data source. The problem is with the first argument in the Aggregate call. The user only gives us one array of doubles, and then reads and writes to it in the fold function. That is all fine if the fold function is always called from the same thread (and thus sequentially), but breaks down if we have multiple threads calling it concurrently.

 

In the December CTP, the aggregation APIs provided assumed that the user will never do such a thing. We always expected the fold function to return a new instance of the enumerator, like this:

 

    public static double Average(this IEnumerable<int> source)

    {

        return source.AsParallel().Aggregate(

            new double[2],

            (acc, elem) => {

                return new double[] { acc[0] + elem, acc[1] + 1 };

            },

            (acc1, acc2) => {

                return new double[] { acc1[0] + acc2[0], acc1[1] + acc2[1]};

            },

            acc => acc[0] / acc[1]);

    }

Notice that to help parallelize the aggregation, the user also gives us a function to combine two accumulators. That way, after each thread has iterated over a subset of the input, we can combine the partial answers to get the final result.

 

However, the main point of this example is to illustrate inefficiency in the above code. A small array has to be allocated for each element in the input. That is very likely to significantly hurt the performance of the aggregation, especially for aggregations like this one where the work involved in the actual aggregation operation (in this case, a few additions) is minimal.

 

To address this situation, the June 2008 CTP of PLINQ supports a new overload of aggregate. Rather than expecting a value to initialize the accumulator to, the user gives us a factory function that generates the value:

 

    public static double Average(this IEnumerable<int> source)

    {

        return source.AsParallel().Aggregate(

            () => new double[2],

            (acc, elem) => { acc[0] += elem; acc[1]++; return acc; },

            (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },

            acc => acc[0] / acc[1]);

    }

 

Now, PLINQ can initialize an independent accumulator for each thread. Now that each thread gets its own accumulator, both the folding function and the accumulator combining function are free to mutate the accumulators. PLINQ guarantees that accumulators will not be accessed concurrently from multiple threads .

 

Here are several algorithms that can be efficiently implemented with the new Aggregate overload:

 

-          Randomly choose N elements from the input.

-          Aggregate statistics over the input sequence. E.g., compute the letter frequencies in a string.

-          Find N smallest or largest element in the input, for a small N.

 

Also, it is worth pointing out that there is a strong similarity between the new Aggregate overload and the Parallel.For overloads that manipulate thread-local state. They both address the same type of issue – to allow different threads to mutate some local state.

 

Hope you enjoyed reading about the new capabilities of PLINQ aggregations. If you have comments on the API, or ideas on how aggregations apply to problems in your domain, make sure to let us know in the comments!

Leave a Comment
  • Please add 2 and 7 and type the answer here:
  • Post
  • Great stuff.

    This approach of per-thread non-locking aggregation with combining of results from all threads is something I've been looking for in Parallel.For(). This seems akin to the approach Cilk++ is taking with it's Hyperobjects (http://www.cilk.com/multicore-blog/bid/5672/Global-Variable-Reconsidered). This approach is attractive to me, as it is flexible and easily understood.

    I could of course build this approach on top of Parallel.For with its threadLocalInit and threadLocalFinally arguments. But then I have to deal with locking the final result because I'm not sure what context the threadLocalFinally is called from, and whether those will be called reentrantly from the various threads that stole some of the work of the iteration. I suspect (hope?) that with intimate knowledge of the task scheduler you can manage to call finalReduceFunc non-reentrantly without locks.

    I think it would be great if Parallel.For had an overload that took the factory and reducer functions like Aggregate.

    Going back to something Cilk++ is doing with it's HyperObjects, they define a base interface that combines the seedFactory and reduceFuncs. Once that is done, objects can implement that interface, for example a "ParallelList" that you could just define and then pass to Parallel.For() to aggregate all the results. Could be easier to use than passing in four separate delegates, because you can define aggregation classes that package up all of the delegates as member functions.

    I have one final thought regarding naming of the arguments to Aggregate. I was initially confusing the "finalReduceFunc" with the "resultSelector". Probably because we are at the intersection of parallel programming (e.g. mapReduce) and SQL queries (resultSelector). Here's one idea for clearer names:

    seedFactory --> initialValueFactory or identityFactory

    intermediateReduceFunc --> accumulateElementFunc

    finalReduceFunc --> combineAccumulatorsFunc,

    resultSelector --> produceResultFunc

    This technology looks like something that will be very useful in our product and overall I like the direction you are taking. I hope to meet some of the team at PDC.

    Regards,

    Roy Procops

  • Roy,

    Thanks for your insightful comments. I agree that a per-thread non-locking aggregation is a common parallel programming pattern, and exposing it more clearly is definitely a worthy goal.

    One way to achieve that, as you suggest, is to add overloads of Parallel.For that match this pattern. That would be undoubtedly useful, but the concern here is keeping the number of overloads of Parallel.For managable.

    Packaging the four delegates into an object would make sense if we were to use the pattern across our API. It is certainly an interesting concept to think about.

    Also, thanks for your naming suggestions. I will bring up this suggestion internally. Personally, I like the names that you are suggesting better than the original ones, but I have to check whether there are particular reasons why we picked the original names.

    Thanks again for your insightful feedback,

    Igor Ostrovsky

  • Roy,

    Thanks for your insightful comments. I agree that a per-thread non-locking aggregation is a common parallel programming pattern, and exposing it more clearly is definitely a worthy goal.

    One way to achieve that, as you suggest, is to add overloads of Parallel.For that match this pattern. That would be undoubtedly useful, but the concern here is keeping the number of overloads of Parallel.For managable.

    Packaging the four delegates into an object would make sense if we were to use the pattern across our API. It is certainly an interesting concept to think about.

    Also, thanks for your naming suggestions. I will bring up this suggestion internally. Personally, I like the names that you are suggesting better than the original ones, but I have to check whether there are particular reasons why we picked the original names.

    Thanks again for your insightful feedback,

    Igor Ostrovsky

Page 1 of 1 (3 items)