How PLINQ processes an IEnumerable<T> on multiple cores

How PLINQ processes an IEnumerable<T> on multiple cores

  • Comments 15

As Ed Essey explained in Partitioning in PLINQ, partitioning is an important step in PLINQ execution. Partitioning splits up a single input sequence into multiple sequences that can be processed in parallel. This post further explains chunk partitioning, the most general partitioning scheme that works on any IEnumerable<T>.

Chunk partitioning appears in two places in Parallel Extensions. First, it is one of the algorithms that PLINQ uses under the hood to execute queries in parallel. Second, chunk partitioning is available as a standalone algorithm through the Partitioner.Create() method.

To explain the design of the chunk partitioning algorithm, let's walk through the possible ways of processing an IEnumerable<T> with multiple worker threads, finally arriving at the solution used in PLINQ (approach 4).


Approach 1: Load the input sequence into an intermediate array

As a simple solution, we could walk over the input sequence and store all elements into an array. Then, we can split up the array into ranges, and assign each range to a different worker.

The disadvantage of this approach is that we need to allocate an array large enough to store all input elements. If the input sequence is long, this will algorithm leads to unnecessarily large memory consumption. Also, we need to wait until the entire input sequence is ready before the workers can start executing.


Approach 2: Hand out elements to threads on demand

An entirely different approach is to have all worker threads share one input enumerator. When a worker is ready to process the next input element, it takes a shared lock, gets the next element from the input enumerator, and releases the lock.

This algorithm has a fairly large overhead because processing every element requires locking. Also, handing out elements individually is prone to poor cache behavior.

This approach does have an interesting advantage over Approach 1, though: since workers receive data on demand, the workers that finish faster will come back to request more work. In contrast, Approach 1 splits up all work ahead of time, and a worker that is done early simply goes away.


Approach 3: Hand out elements in chunks

To mitigate the two drawbacks of Approach 2 (synchronization cost and cache behavior), we can hand out elements to threads in "chunks". When a thread is ready to process more inputs, it will take say 64 elements from the input enumerator.

Unfortunately, while this approach nicely amortizes the synchronization cost over multiple elements, it does not work well for short inputs. For example, if the input contains 50 elements and the chunk size is 64, all inputs will go into a single partition. Even if the work per element is large, we will not be able to benefit from parallelism, since one worker gets all the work.

And since IEnumerable<T> in general does not declare its length, we cannot simply tune the chunk size based on the input sequence length.


Approach 4: Hand out elements in chunks of increasing size

A solution to the problem with small inputs is to use chunks of a growing size. The first chunk assigned to each thread is of size 1 and subsequent chunks are gradually larger, until a specific threshold is reached.

Our solution doubles the chunk size every few chunks. So, each thread first receives a few chunks of size 1, then a few chunks of size 2, then 4, and so forth. Once the chunk size reaches a certain threshold, it remains constant.

This chunking strategy ensures that if the input is short, it will still get split up fairly among the cores. But, the chunk size also grows fairly quickly, and the per-chunk overheads are small for large inputs. Also, the algorithm is quite good at load-balancing, so if one worker is taking longer to process its inputs, other workers will process more elements to decrease the overall processing time.

One interesting consequence of the chunk partitioning algorithm is that multiple threads will call MoveNext() on the input enumerator. The worker threads will use a lock to ensure mutual exclusion, but the enumerator must not assume that MoveNext() will be called from a particular thread (e.g., it should not use thread-local storage, manipulate UI, etc).

The current implementation of both PLINQ chunk partitioning and Partitioner.Create() follows approach 4 fairly closely. Now you know how it behaves and why!

Leave a Comment
  • Please add 8 and 8 and type the answer here:
  • Post
  • Approach 1: Load the input sequence into an intermediate array

    That approach is not inefficient, it is plainly wrong actually.

    It is pretty valid for enumerator to be indefinite. And not only academically valid, but practically too. Say, you want to find an offset of your phone number occurrence in the 'stream' of decimal digits of PI. You would create an indefinite iterator returning PI digits and feed it to PLINQ to make a search.

  • Our solution doubles the chunk size every few chunks. So, each thread first receives a few chunks of size 1, then a few chunks of size 2, then 4, and so forth. Once the chunk size reaches a certain threshold, it remains constant.

    ---

    The obvious improvement is to take time of chunk processing into account.

    Time measurement is easy and inexpensive.

    Knowing the time you would neither need that silly initial wobbling of 1,1,1,2,2,2,4,4,4,8,8,8 steps, nor you would need to apply arbitrary 'roof'. After 1-2 initial chunks you would know exactly how large your next chunks should be.

    Initial wobbling of small chunk sizes is huge waste of CPU because all the cores will waste cycles and cache lines fighting for those few initial elements. It's not 1 redundant lock, it's rather like 30 of them. Painful unneeded up-front hit for no apparent reason.

    And the 'roof' constant is arbitrary, which means it will work around certain loads but end up either unfair for smaller collections or expensive racing for larger number of inexpensive calculations.

    I am sure you can derive optimal time of each chunk processing from OS time quantum and other environment details. The cost of synchronization is more or less known too, so it's really win-win situation to use time-per-chunk as a metric for chunk size.

  • PingBack from http://adam.hathcock.us/?p=33

  • Thanks for your suggestions, Oleg.

    Using time measurement data to decide chunk sizes has both advantages as well as disadvantages. One disadvantage is that work per element doesn't have to be the same for all elements. If we only time a few delegate executions, we don't necessarily get a good picture of how long the delegate executions take in general. Another disadvantage is that high-resolution time measurements are fairly expensive, relative to the other costs involved.

    The algorithm discussed in this article performs well on a number of real-world workloads that we tested. The chunk size grows exponentially, and thus very quickly. The amortized cost per element is low for long inputs, and we still get decent load-balancing even for short inputs.

    But, no single algorithm is best for every scenario. If our algorithm does not work ideally for you, you can implement your own algorithm (see the Partitioner - http://social.msdn.microsoft.com/Forums/en-US/netfxgeneralprerelease/thread/df4b1306-7c38-4887-8211-13be6a681d4a). You can implement the time-based partitioning algorithm by deriving from the Partitioner abstract class, and plug it into PLINQ.

    Thanks again for taking the time to share your thoughts.

  • PingBack from http://mdavey.wordpress.com/2009/06/15/random-reading-ccr-dsl-nerds-vs-2010-cloud-db/

  • Several objections:

    1) You're saying time measurement does not guard against uneven workload per-element. That may be correct in the short run, but easily workable for the long run. On the other side, exponential growth algorithm can't account for it at all and surely is worse in that sense.

    2) Exponential growth tends to overshoot or undershoot at the end of the sequence. Obviously, no algorithm can guess the end of the sequence, but the bigger your chunk the worse and more likely is your miss. With exponential algorithm each new chunk is much bigger, so the bigger sequence the more work will get shared  unfairly at the end.

    Time-balanced logic doesn't need to grow chunks above real workload-justified chunk size, so the final unfair workload will be manageable.

    3) There is no need for precise time measurements, because user-mode scheduling is approximate by definition. The ideal time quantum for a chunk should be quite a long comparing to the precision DateTime.UtcNow gives.

    I am not saying you must switch to time measurement, but what if that IS the best approach for majority of cases? Would you rather miss a chance to make your product better?

    Anyway, the insight you gave into this area is really interesting and I hope you'll crack this one to perfection.

    Cheers

    Oleg.

  • Oleg,

    Algorithms that use time measurements to make decisions are somewhat less predictable than other algorithms, and they have a higher degree of hardware dependence.

    Our current algorithm performs well across a wide range of scenarios, and it has proven itself to be a good general-case approach.

    Igor Ostrovsky

  • PingBack from http://www.vishwatech.com/globalnews/?p=1707

  • Your contact form is not working, so I'll post it here:

    You folks implemented the .None flag for the tasks...

    http://msdn.microsoft.com/en-us/library/ms182149(VS.80).aspx

    However, what you are doing is contrary to the meaning of .None: "If an enumeration that has the FlagsAttribute applied defines a zero-valued member, its name should be 'None' to indicate that no values have been set in the enumeration. Using a zero-valued member for any other purpose is contrary to the use of the FlagsAttribute in that the AND and OR bitwise operators are useless with the member."

    Really you should have .None and .Default, for the behavior you are using, and what is currently .None should be .Default.

    That's all :)

  • Hi James-

    Thanks for the feedback.  I'm not following the objection, though.  Assuming you're referring to TaskCreationOptions, None really does mean that no values have been set in the enumeration.  The options provided in Beta 1 are DetachedFromParent, LongRunning, PreferFairness, and RespectParentCancellation... none of those are used when None is selected.  Could you clarify?  We do strive to follow .NET guidelines, and we work closely with the folks that create the guidelines to ensure that we're doing so appropriately.

    Thanks!

  • Hmm... Ok... the version I was reading about had 'None' defaulting to ExecuteSynchronously and two others. A bit vague in my mind. It may have changed by now. Apologies if so. :)

    The version I am actively using (the CTP) has none of these, so I may be referring to an intermediate version.

  • Here we go:

    http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskcontinuationoptions(VS.100).aspx

    If .None = 0 and includes .ExecuteSynchronously which is not zero, then .None is not a proper name for the flag. FxCop likes having a .None, but it's not the same as having a .Default or whatnot. Anyway, that's the link I was looking at.

  • But the default behavior is not ExecuteSynchronously. Ahh!  I see, this is a documentation defect... None means continue on any, no task options, and to execute asynchronously, not synchronously.  We'll get the docs fixed. Thanks for letting us know.

  • Pick of the Week: Resisting Dependency Injection General How PLINQ Processes an IEnumerable&lt;T&gt; on Multiple Cores : Igor Ostrovsky walks us through the different possible implementations of processing an enumerable collection across multiple cores.

  • >>"Once the chunk size reaches a certain threshold, it remains constant"

    Is the value of this threshold a constant? Or it's calculated depend on some parameters? If so, which are this parameters?

    Cheers!

Page 1 of 1 (15 items)