All about Async/Await, System.Threading.Tasks, System.Collections.Concurrent, System.Linq, and more…
In many data-parallel scenarios, all of the data to be processed is available immediately. This blog post addresses the opposite scenario: the inputs arrive gradually (as if in a stream), and we want to start producing results even before reading the last element of the input sequence.
There is a variety of scenarios in which inputs become available gradually rather than all at once. The inputs could arrive as requests across the network, inputs entered by a user, data read from an I/O device, results computed from another computation, and so on. We want to process the inputs in parallel, and make the partial results available as they are computed.
Simple PLINQ queries support streaming. A query that only consists of simple Select and Where operators will do streaming:
var q = inputSrc.Where(x => Foo(x)).Select(x => ExpensiveComputation(x));
foreach(var x in q)
In this example, Process(x) will get called on some input elements even before all elements of the input have been read.
On the other hand, not all queries support streaming. In fact, some queries cannot possibly support streaming. Consider this query that contains an OrderBy() operator:
var q = src.AsParallel().Select(x => Foo(x)).OrderBy(x => x);
We need to compute Foo(x) for all input elements, and only then we can yield the smallest element. Thus, a query that contains an OrderBy operator does not (and cannot) run in a streaming fashion.
So, what kinds of PLINQ queries do run in a streaming fashion? Generally, they are simple queries. The query can contain any number of Select and Where operators, except for the special positional variants. By positional overloads, we mean the overloads of Select and Where that accept a delegate that accesses both element values and their positions in the sequence. A positional Select or Where operator does prevents streaming if there is a Where operator anywhere prior to it in the query.
A streaming query can produce both ordered or unordered results. As is typical in PLINQ, the results are unordered by default, but you can opt into ordering by using the AsOrdered() operator:
var q = src.AsParallel().AsOrdered().Select(x => Foo(x)).Where(y => Bar(y));
In the above query, PLINQ will read the elements from the src enumerable, distribute them into different partitions, process them on multiple threads, and rearrange results into a correctly-ordered output sequence. All of these stages are happening concurrently, so first results will start getting produced while the inputs are still getting read (assuming that the input sequence is sufficiently long).
One important point about streaming PLINQ queries is that the algorithms we use are optimized for throughput rather than latency. PLINQ uses buffers internally, so a particular result may sit in an output buffer until a certain number of results have been produced. Obviously, that is undesirable if one of the results needs to be sent back to a client as quickly as possible.
One major benefit of streaming is that all data does not have to be loaded into memory at any one particular time. For example, if the query reads its inputs from one file and writes the outputs to another file, all of the inputs will not necessarily have to get loaded into memory at one time. Keep this use case in mind, and when you come across it in your development, use PLINQ to easily get your code to scale on multi-core machines.
It's great to know this is possible with PLINQ! :)
Thanks for sharing this advice.
"Thus, a query that contains an OrderBy operator does not (and cannot) run in a streaming fashion"
Um, what? You might not _want_ to run it in a stream fashion, but maybe _I_ do. For example, expr.OrderBy(x => x).Take(3) need only partially sort the sequence. Yes, OrderBy reads the whole input stream. But streaming as lazy processing need not sort the whole input before providing the first result.
"the algorithms we use are optimized for throughput rather than latency"
In this context, OrderBy's eager behavior makes sense. Whole-sequence sort is always going to be faster than a complete set of partial sorts, by some constant factor.
I might be missunderstanding something, and if I do please let me know, but as far as my understanding goes, it is correct to say that you cannot run a sorting algorithm without having all values.
Given your example, expr.OrderBy(x => x).Take(3) in a situation where expr would stream its return value in two chunks. Suppose that the OrderBy would:
1) Sort the initial chunk before the second chunk arrives
2) Return the sorted items as soon as it has sorted them (in this case, before receiving the second chunk)
Given that we would get the following two chunks (3,6,5,7) and (8,1,9) the OrderBy-method would sort the first chunk (resulting in 3,5,6,7) and the Take(3) would return 3,5,6. As can be seen, this behaviour would not be correct, since a correct result in the end would have been 1,3,5. Thus, if we would have been handed the 3 as first result, it would have given us the wrong result and thus not successfully sorted the items.
Therefore, OrderBy needs access to all items in order to be able sort correctly.