All about Async/Await, System.Threading.Tasks, System.Collections.Concurrent, System.Linq, and more…
In order to explain the issues we encounter when parallelizing aggregations in PLINQ, let's first take a quick look at how aggregations work in LINQ.
Aggregation is an operation that iterates over a sequence of input elements, maintaining an accumulator that contains the intermediate result. At each step, a reduction function takes the current element and accumulator value as inputs, and returns a value that will overwrite the accumulator. The final accumulator value is the result of the computation. A variety of interesting operations can be expressed as aggregations: sum, average, min, max, sum of squares, variance, concatenation, count, count of elements matching a predicate, and so on.
LINQ provides several overloads of Aggregate. A possible implementation (without error checking) of the most general of them is given below:
public static TResult Aggregate<TSource, TAccumulate, TResult>(
this IEnumerable<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, TAccumulate> func,
Func<TAccumulate, TResult> resultSelector
)
{
TAccumulate accumulator = seed;
foreach (TSource elem in source)
accumulator = func(accumulator, elem);
}
return resultSelector(accumulator);
To compute a particular aggregation, the user provides the input sequence (as method parameter source), the initial accumulator value (seed), the reduction function (func), and a function to convert the final accumulator to the result (resultSelector). As a usage example, consider the method below that computes the sum of squares of integers:
public static int SumSquares(IEnumerable<int> source)
return source.Aggregate(0, (sum, x) => sum + x * x, (sum) => sum);
LINQ also exposes a number of predefined aggregations, such as Sum, Average, Max, Min, etc. Even though each one can be implemented using the Aggregate operator, a direct implementation is likely to be more efficient (for example, to avoid a delegate call for each input element).
Let's say that we call SumSquares(Enumerable.Range(1,4)) on a dual-core machine. How can we split up the computation among two threads? We could distribute the elements of the input among the threads. For example, Thread 1 could compute the sum of squares of {1,4} and Thread 2 would compute the sum of squares of {3,2}*. Then, as a last step, we combine the results - add them in this case - and we get the final answer.
Sequential Answer = ((((0 + 1^2) + 2^2) + 3^2) + 4^2) = 30Parallel Answer = (((0 + 1^2) + 4^2) + ((0 + 3^2) + 2^2)) = 30
Note: Notice that elements within each partition do not necessarily appear in the order in which they appear in the input. The reason for this may not be apparent, but it has to do with the presence of other operators in the query.
In the parallel aggregation, we need to do something that we didn't need to in the sequential aggregation: combine the intermediate results (i.e. accumulators). Notice that combining two accumulators may be a different operation than combining an accumulator with an input element. In the SumSquares example, to combine the accumulator with an input element, we square the element and add it to the accumulator. But, to combine two accumulators, we simply add them, without squaring the second one.
In the cases where the accumulator type is different from the element type, it is even more obvious combining accumulators and combining an accumulator with an element are different operations: even their input argument types differ!
Therefore, the most general PLINQ Aggregate overload accepts an intermediate reduce function as well as a final reduce function, while the most general LINQ Aggregate only needs the intermediate reduce function. The signature of the most general PLINQ Aggregate overload is below (compare with the most general LINQ Aggregate overload shown above):
this IParallelEnumerable<TSource> source,
Func<TAccumulate, TSource, TAccumulate> intermediateReduceFunc,
Func<TAccumulate, TAccumulate, TAccumulate> finalReduceFunc,
Func<TAccumulate, TResult> resultSelector )
So, how to tell whether a particular aggregation can be parallelized with PLINQ? The simple approach is to imagine the above parallelization process. The input sequence will be reordered and split up into several partitions. Each partition will be accumulated separately on its own thread, with its accumulator initialized to the seed. Then, all accumulators will be combined using the final reduce function. Does this process produce the correct answer? If it does, then the aggregation can be parallelized using PLINQ.
In the rest of this posting, I will describe more in depth the properties that an aggregation must have in order to parallelize correctly. In typical cases, imagining the parallelization process is the easiest way to find out whether an aggregation will produce the correct answer when ran on PLINQ.
Just as in other types of PLINQ queries, delegates that form a part of the query must be pure, or at least observationally pure. So, if any shared state is accessed, appropriate synchronization must be used.
The parallel version of an aggregation does not necessarily apply the reduction functions in the same order as the sequential computation. In the SumSquares example, the sequential result is computed in a different order than the parallel result. Of course, the two results will be equal because of the special properties of the + operator: associativity and commutativity.
Operator F(x,y) is associative if F(F(x,y),z) = F(x,F(y,z)), and commutative if F(x,y) = F(y,x), for all valid inputs x,y,z. For example, operator Max is commutative because Max(x,y) = Max(y,x) and also associative because Max(Max(x,y),z) = Max(x,Max(y,z)). Operator – is not commutative because it is not true in general that x-y = y-x, and it is not associative because it is not true in general that x-(y-z) = (x-y)-z.
The following table gives examples of operations that fall into different categories with respect to associativity and commutativity:
Associative
No
Yes
Commutative
(a, b) => a / b
(a, b) => a – b
(a, b) => 2 * a + b
(string a, string b) => a.Concat(b)
(a, b) => a
(a, b) => b
(float a, float b) => a + b
(float a, float b) => a * b
(bool a, bool b) => !(a && b)
(int a, int b) => 2 + a * b
(int a, int b) => (a + b) / 2
(int a, int b) => a + b
(int a, int b) => a * b
(a, b) => Min(a, b)
(a, b) => Max(a, b)
An operation must be both associative and commutative in order for the PLINQ parallelization to work correctly. The good news is that many of the interesting aggregations turn out to be both associative and commutative.
Note: For simplicity, this section only considers aggregations where the type of the accumulator is the same as the type of the element (not only the .Net type, but also the “logical” type). After all, if the accumulator type is different from the element type, the intermediate reduction function cannot possibly be commutative because its two arguments are of different types! In the general case, the final reduction function must be associative and commutative, and the intermediate reduction function must be related to the final reduction function in a specific way. See section “Constraints on Reduce Function and Seed” for details.
LINQ allows the user to initialize the accumulator to an arbitrary seed value. In the following example, the user sets the seed to 5, and thus computes 5 + the sum of squares of integers in a sequence.
public static int SumSquaresPlus5(IEnumerable<int> source)
return source.Aggregate(5, (sum, x) => sum + x * x, (sum) => sum);
Unfortunately, if we parallelize this query, several threads will split up the input, and each will initialize its accumulator to 5. As a result, 5 will be added to the result as many times as there are threads, and the computed answer will be incorrect.
Can PLINQ do something to fix this problem?
Non-solution 1: Initialize one accumulator to the seed and the rest of them to the default value of T.
For example, if the input contains integers, why not initialize one thread's accumulator to the user-provided seed, and the other accumulators to 0? The problem is that while 0 is a great initial accumulator value for some aggregations, such as sum, it does not work at all for other aggregations. One such operation is product: if we initialize the accumulator to 0 and then multiply by all elements in the input, the result will be 0, rather than the actual product of the input elements.
Non-solution 2: Initialize the accumulator to be the first element in the input.
This seems like another reasonable solution, and it works for both sum and product operations. However, this approach clearly does not work if the type of the accumulator is different than the type of the input elements. For example, if the aggregation is to compute the sum of lengths of strings, we cannot initialize the integer accumulator to the first string value in the input. Even if the .NET type of the accumulator and the input elements is the same, the first element still may not work as a seed. For example, in order to compute a sum of squares of integers, we cannot initialize the accumulator to be the first integer in the input. It would have to be a square of the first element.
Note: While non-solution 2 does not work in general, it works in many interesting cases. In fact, Both LINQ and PLINQ provide an Aggregate() overload that does not take seed as a parameter, requires that the accumulator type is the same as the element type, and uses the first element in the input (or each partition, in the PLINQ case) as the seed.
Solution
So, it remains to the user to ensure that the seed is an "identity" with respect to the aggregation. In other words, it can be added to the computation multiple times, without affecting the result. Since the aggregation is associative and commutative, the user can always apply one more reduction step to the final answer to get the same effect as if the seed had been set to a non-identity value (e.g. if the user needs to compute 5 + sum of squares, they can simply compute the sum of squares, and then add 5 to the result, instead of initializing the seed to 5).
PLINQ could potentially provide overloads that accept two seeds: a seed that will be used by exactly one thread, and another seed that will be used by all other threads. Currently, we don't have plans for including such overload, since the user can already achieve the same effect (as described above).
In the previous sections, I loosely described the constraints on the intermediate reduce function, the final reduction function and the seed, so that a parallel aggregation returns the same answer as its sequential version.
Let's state the rules a bit more explicitly. In the rule descriptions below, F() represents the intermediate reduction function, G() the final reduction function, and S the user-provided seed.
Rule 1:
G(a, b) = G(b, a)
for all possible accumulator values of a, b
Rule 2:
G(G(a, b), c) = G(a, G(b, c))
for all possible accumulator values of a, b, c
Rule 3:
F(a, x) = G(a, F(S, x))
for all possible accumulator values of a
and all possible element values of x
If rules 1-3 hold, then it is guaranteed that G is the right final reduction function for F, and also F, G and S satisfy the constraints on associativity, commutativity and seed identity.
Finally, let’s reiterate that the rules above (and this entire blog posting) assume that the reduction functions do not access shared state.
Note: It is possible to show that the 3 rules above are sufficient to guarantee that the parallel aggregation would produce the same answer as the sequential aggregation by describing a process by which an expression representing the parallel evaluation (e.g. G(G(F(F(F(S,x5),x2),x3),F(F(S,x1),x6)),F(S,x4)) can be translated into an expression representing the corresponding sequential evaluation (e.g. F(F(F(F(F(F(S,x1),x2),x3),x4),x5),x6)) by applying the rules 1-3.
This blog posting describes the PLINQ aggregation semantics in the CTP release. Of course, things could change in the future. For example, PLINQ could potentially support associative non-commutative aggregations. In many cases, that would have a performance impact, so likely the user would have to specify whether the aggregation is commutative via a flag.
If you have any feedback on the aggregation semantics or API, we would love to hear from you.
I don't understand why you're building the Aggregate method that complicated. If you make func a binary method of the form TAccumulate x TAccumulate -> TAccumulate, you're getting an algebraic structure.
So, (TAccumulate, func) has to be a commutative Monoid which implies all the properties you described.
This can be easily achieved by introducing a valueSelector: TSource -> TAccumulate.
You're getting the following method signature:
Func<TSource, TAccumulate> valueSelector,
TAccumulate identityElement,
Func<TAccumulate, TAccumulate, TAccumulate> func,
TAccumulate accumulator = identityElement;
accumulator = func(accumulator, valueSelector(elem));
Now you can split up the source and compute thread-local accumulators and use also func(accumulator1, accumulator2) to combine them.
I do not like the fact to provide two different accumulator functions. It makes no sense either because the second method must be quasi the same as the first because of the nondeterminism (i.e. every value could be processed by a seperate thread).
With this version, you can implement the Aggregate method like a binary tree where the two childs are always independent from the other.
func(
valueSelector(elem0),
valueSelector(elem1))
valueSelector(elem2),
valueSelector(elem3)))
The leaves of the tree are just calls to valueSelector(elem). In this tree you have always the option to run a child in an seperate thread.
Thanks for your suggestion, Thomas.
We have thought about the approach you are proposing previously. I agree that it is logically cleaner, and has all the desired algebraic properties. Unfortunately, it also comes with a price: an extra delegate invocation for each element in the input.
Another possible issue is that your proposed approach would rule out aggregations with mutable accumulators. For example, say that you have an aggregation over an IEnumerable<char>, and you want to compute how many times each letter (A-Z) appears. TAccumulate might be an integer array with 26 elements, representing the count of each letter. Value selector would have to return an array of 26 integers, so we'd end up constructing an array for each input element. On the other hand, a (TAccumulate, TElement)->TAccumulate reduce function could simply increment one element in the array, and return the same array.
Note that as of the CTP release, PLINQ does not support mutable accumulators, but it is an extension we are considering. We would need an overload where the user provides a seedConstructor delegate instead of a seed instance, so that we can have multiple independent accumulators.
So, that's my take on this issue. However, the Aggregate design is far from closed, and we are are open to consider other solutions.
Igor
Another issue with simply mapping TSource => TAccumator is that operation of combining TSource's may be singificantly cheaper than combining TAccumator's (think Maxtrix-matrix multiplication instead of matrix-vector multiplication).
A different use of a function mapping TSource->TAccumaultor is to avoid the change of "seed" to "identify". Seed is nice because it gives semantics to zero-length sequences. A mapping from source to accumulator can be used to handle sequences of lenght 1 and avoid the need for a separate zero element or change in semantics of the seed elelement.
Late comment here... We have used the mutable version of aggregate to prepare and then Union hash sets of values into a single set, effectively an input.selectmany(e => e).distinct(). After tweaking the degree of parallelism to be suitable to the number of input arrays, the Plinq aggregate outperforms exactly the same algorithm written as two parallel for loops with equivalent batch size. The reason appears to be that each batch in the parallel for engenders a larger overhead than each task created by the Plinq aggregate. Can this be correct, and if so, how and why?