TPL Dataflow preview available for download

TPL Dataflow preview available for download

Rate This
  • Comments 26

As mentioned here, the Visual Studio Async CTP is now available for download from http://msdn.com/vstudio/async.  Not only does this download add language support into C# and Visual Basic for writing asynchronous methods (in which you can easily “await” tasks), it also includes a new .NET library we lovingly refer to as “TPL Dataflow”, which enables building concurrent systems based on dataflow concepts, on in-process message passing, and on asynchronous pipelines.  This new library, System.Threading.Tasks.Dataflow.dll, is heavily inspired by the Visual C++ Asynchronous Agents Library, by the CCR from Microsoft Robotics, by the Axum language, and more; it’s built on top of a multitude of constructs introduced in .NET 4, internally using types like Task and ConcurrentQueue<T>,to provide solutions for buffering and processing data, for building systems that need high-throughput and low-latency processing of data, and for building agent/actor-based systems.  TPL Dataflow was also designed to integrate very well with the new language support for tasks, such that you can easily use TPL Dataflow constructs within asynchronous methods, and such that you can harness asynchronous methods within “dataflow blocks.” We’ll be discussing TPL Dataflow more in coming posts; in the meantime, for more information on this DLL, check out the whitepaper available at http://www.microsoft.com/downloads/details.aspx?FamilyID=d5b3e1f8-c672-48e8-baf8-94f05b431f5c; better yet, download the Async CTP and start playing with the bits today! (Note, too, that we’re extremely interested in your feedback, so please share any thoughts, suggestions, praises, or criticisms you may have.)

Leave a Comment
  • Please add 8 and 7 and type the answer here:
  • Post
  • Hi Toub.

    Thanks for this awesome work. I have a concrete question on how to use the Dataflow library with async agents.

    Suppose I want to create an Agent class, which takes as its input a number of DocumentInputs, for each DocumentInput internally produces a DocumentResult, and as its output creates an aggregated/combined result of the different DocumentResults - the AggregatedResult.

    The way I would do this using Dataflow would be to declare the DocumentInputs property as a BufferBlock<DocumentInput>. As the BufferBlock is both a Target and a Source, the caller of the Agent can use its ITargetBlock<DocumentInput>.Post(...) method to provide input data. The Agent itself can then in its constructor link this input to an ActionBlock<DocumentInput>, which is responsible for processing each Document. The output from this will be posted by the ActionBlock to a BufferBlock<DocumentResult>, which in turn can then be linked to an exclusively running ActionBlock<DocumentResult>, which aggregates the DocumentResult to an internal AggregatedResult property.

    Once all DocumentInputs have been processed, and all DocumentResults aggregated to the AggregatedResult, the Agent can post this to, say, an AggregatedResultBlock property (of type BroadcastBlock<AggregatedResult>), on which the user of the Agent could be awaiting.

    My question is related to this last part. How can I make sure, that all DocumentInputs have been processed, and all DocumentResults aggregated? How should this be implemented, following best practices?

    Right now this is most of the code in my Agent's constructor:

    {

         //Some setup of two ConcurrentExclusiveSchedulerPairs.

         //Because the processing can happen concurrently, but the aggregation must happen exclusively (but not excluding the processing, only excluding other aggregation requests).

         //Setup data blocks

         _documentInputs = new BufferBlock<IDocumentInputData>();

         _documentOutputs = new BroadcastBlock<Tuple<IDocumentInputData, DocumentOutputData>>(o => o);

         _output = new WriteOnceBlock<OutputData>(o => o);

         _outputData = new OutputData();

         //Setup action blocks

         var processDocumentAction = new ActionBlock<IDocumentInputData>(async input => ProcessDocument(input), concurrentProcessing);

         var aggregateAction = new ActionBlock<Tuple<IDocumentInputData, DocumentOutputData>>(async io => AggregateDocumentResult(io.Item2), exclusiveAggregation);

         _documentInputs.CompletionTask.ContinueWith(delegate { FinishAggregation(); }); //FinishAggregation is the method that postes the AggregatedResult to the AggregatedBlock...

         //Create links

         _documentInputs.LinkTo(processDocumentAction);

         _documentOutputs.LinkTo(aggregateAction);

    }

    My idea was then, to create this method in the Agent. This could be used by the user to both signal that he/she is done posting DocumentInputs, and that she wants the result of the agent.

       public async Task<OutputData> GetResultAsync()

       {

          //Mark Input Complete, because if our user queries for the result, he must be done posting DocumentInputs

         _documentInputs.DeclinePermanently();

         await _documentInputs.CompletionTask;

         await _documentOutputs.CompletionTask;

         return await Output.ReceiveAsync();

       }

    You would then use the Agent like this:

      var agent = new MyAgent();

      agent.DocumentInputs.Post(...);

      agent.DocumentInputs.Post(...);

      var result = await agent.GetResultAsync();

    While not working, this also seems to be not fully using the possibilities provided by async methods. What might I be missing? Could you give me a hint in the right direction, as to how to best combine the best of both worlds (Dataflow and async methods).

    Thanks in advance,

    Christian

  • Hi Christian-

    Just to make sure I understand...

    You want to allow any number of inputs to be posted, and as those are posted, they should be processed and then merged into an aggregate.  Once all of the inputs have been posted, you want to retreive the aggregated result.  Further, you need the aggregation processing to run sequentially, but the input processing can be run concurrently.  Did I get that right?

    If so, I might structure this something like the following:

    class DocumentProcessor

    {

       public DocumentProcessor()

       {

           // Set up the task to represent the final result

           var finalResult = new TaskCompletionSource<AggregatedResult>();

           Result = finalResult.Task;

           // Process the inputs with as much parallelism as possible.

           var transformInput = new TransformBlock<DocumentInput, DocumentResult>(

               input => ProcessDocument(input),

               new DataflowBlockOptions(TaskScheduler.Default, DataflowBlockOptions.UnboundedDegreeOfParallelism));

           Inputs = transformInput;

           // Aggregate the results.  This runs sequentially by default.

           AggregatedResult finalAggregatedResult = new AggregatedResult();

           var aggregatingAction = new ActionBlock<DocumentResult>(result =>

           {

               finalAggregatedResult = AggregateResults(finalAggregatedResult, result);

           });

           transformInput.LinkTo(aggregatingAction);

           transformInput.CompletionTask.ContinueWith(t => aggregatingAction.DeclinePermanently());

           // When everything is done, the action block will complete, and we can store the final result.

           aggregatingAction.CompletionTask.ContinueWith(delegate { finalResult.SetResult(finalAggregatedResult); });

       }

       public ITargetBlock<DocumentInput> Inputs { get; private set; }

       public Task<AggregatedResult> Result { get; private set; }

       private AggregatedResult AggregateResults(AggregatedResult currentAggregate, DocumentResult newResult) { return new AggregatedResult(); }

       private DocumentResult ProcessDocument(DocumentInput di) { Thread.SpinWait(100000000); return new DocumentResult(); }

    }

    The Inputs property is a TransformBlock that goes from DocumentInput to DocumentResult.  It's configured to run on the default scheduler but with an unbounded concurrency level, meaning that it'll process as many items concurrently as it can (in effect with a task per element), while also maintaining the order in which the elements were posted (such that they'll be output to the next block in the same order they were posted to the inputs).  These results are then sent along to an ActionBlock which runs sequentially (the default for a dataflow block), processing one result at a time and aggregating that result into the all up result.  When the user calls Inputs.DeclinePermanently, that causes the TransformBlock to complete as soon as its done its processing, and its continuation then declines on the ActionBlock.  The ActionBlock will finish processing all of the data, and its continuation will then complete the Result task using whatever the finalAggregateResult ended up as.

    Note that I've made a few further assumptions here.  I've assumed that it's ok for the aggregation phase to run concurrently with processing of incoming documents, as long as the aggregation phase itself only processes one document at a time.  If that's not a valid assumption, then a ConcurrentExclusiveSchedulerPair could be used as you suggested.  I've also left out exception handling, because there are various semantics you might want to utilize in such a case... adding in exception support is only a few more lines, based on what you desire in terms of behavior (e.g. if the transform function fails, should we stop doing further transforms but allow the aggregation to continue to produce a result based on the results thus far? etc.)

    In any event, this class can now be used like:

    var processor = new DocumentProcessor();

    processor.Inputs.Post(...);

    processor.Inputs.Post(...);

    ...

    processor.Inputs.DeclinePermanently();

    var finalResult = await processor.Result;

    I hope that helps and that I captured your problem correctly.  If I didn't please let me know.

    Thanks.

  • it might be useful to have a message buffer that stops accepting messages when the amount of available memory is low to help prevent OutOfMemoryExceptions.

  • Hi Aaron-

    Thanks for the good suggestion!  We've had several requests now for bounding functionality of some kind, and this is very related to that.  We'll look into it.

  • Hi,

    First of all, great work! This made it so much easier for me to make an agent-based architecture.

    A few questions:

    1) Can I distribute the TDF assembly with my program?

    and

    2) In agent/actor design, it's often useful to make sure that all messages handed to an agent are processed sequentially (i.e. in a thread-safe manner) within that agent. But I can't (at first glance) seem to find a way to synchronize processing between the blocks an agent exposes. Any ideas for how I could achieve this?

    Thanks in advance!

  • procuro um BHO em VB.NET (preferencialmente em vb 2005) mas c++ ou c# tb dou meus passinhos.... preciso redirecionar acesso a sites como facebook orkut etc... para about:blank.....

    Agradeço qualquer ajuda sobre....

  • Hi Alex-

    re: distribution

    I wouldn't recommend it right now.  First and foremost, this release was really about getting feedback on the APIs, and there are some known bugs (many of which are already fixed in our internal builds and which I hope to have available soon in an updated public build) that you wouldn't want in a production app.  For that and related reasons, the EULA dictates that the Async CTP is not for production usage.

    re: synchronizing processing

    Yes.  In the System.Threading.Tasks.Dataflow.dll, in the System.Threading.Tasks namespace, there's a new type called ConcurrentExclusiveSchedulerPair.  An instance of this type provides two TaskScheduler properties, one called ExclusiveScheduler and the other called ConcurrentScheduler.  All tasks scheduled to the ExclusiveScheduler will run serially, and you can configure all of your agent's work to run on this scheduler.  You do that by creating a new DataflowBlockOptions to target that scheduler and then pass that options instance into your dataflow blocks at construction time, e.g.

    var cesp = new ConcurrentExclusiveSchedulerPair();

    var exclusive = new DataflowBlockOptions(cesp.ExclusiveScheduler);

    var ab1 = new ActionBlock<int>(i => ..., exclusive);

    var ab2 = new ActionBlock<int>(i => ..., exclusive);

    var ab3 = new ActionBlock<int>(i => ..., exclusive);

    Now all of the work on ab1, ab2, and ab3 will be executed serially and not concurrently.  You can further control how fairly these blocks treat each other.  By default, the tasks a block schedules will keep running while there's more work to do, but if for example you wanted to dedicate a task per message such that you achieved more fairness between the block, you could change the DataflowBlockOptions to use MaxMessagesPerTask==1.

    I hope that helps.

  • Hi,

    Yes, that answered the question exactly.

    One more thing: Is there some recommended way to schedule delayed messages? I'm considering simply going with Task.Factory.StartNewDelayed (from ParallelExtensionsExtras), but if there's a better (and perhaps more efficient?) way to do this, I'd love to know. :-)

    Thanks!

  • Hi Alex-

    Great.

    Regarding a recommended way to schedule delayed tasks, the StartNewDelayed from ParallelExtensionsExtras is a fine way to go.  As you can see from the Async CTP, we're planning on a Task.Delay method in the future that will be very similar.

  • How about to separate TPL DataFlow stuff from Async CTP? I believe DataFlow is a library whereas Async CTP is compiler stuff. It would be very cool to get TPL DataFlow earlier that C# 5! Thanks for great work!

  • Hi Shrike-

    Thanks for the suggestion.  We're looking into provided a separate CTP download for just the TPL Dataflow DLL.  Stay tuned...

  • Given you can link a SourceBlock to a TargetBlock, it would be really nice to be able to unlink as well. Also, have some concept of the ability to 'insert' a block in a 'chain' of blocks would be really nice. Would really help composability of flows.

  • Hi David-

    Thanks for the feedback.

    Regarding unlinking, that capability is actually there.  The LinkTo method returns an IDisposable representing the link; if you dispose of it, the source will be unlinked from the target.  That's the explicit way of doing it, and I believe is what you are asking for.  There are also implicit ways.  If you specify in the call to LinkTo that it should be a one-time only link, than after the first message is successfully propagated to the target, the source will unlink from the target.  Additionally, if a target returns DeclinePermanently from its OfferMessage method, the source should automatically unlink.  

    Regarding inserting a block into a chain, can you elaborate on what you're looking for?  e.g. what code would you want to be able to write?

  • Hi Toub,

    Its great to see tasks ported to silverlight in the CTP.

    How about making System.Threading.Tasks.Dataflow.dll available for both silverlight and the phone in future CTP's ?

    It would be good to see some f# samples covering dataflow.

  • Hi Paul-

    Thanks for the suggestion!  We can certainly investigate doing so, though at the moment we haven't had any concrete plans to do so.

Page 1 of 2 (26 items) 12