TPL Dataflow preview available for download

TPL Dataflow preview available for download

Rate This
  • Comments 26

As mentioned here, the Visual Studio Async CTP is now available for download from http://msdn.com/vstudio/async.  Not only does this download add language support into C# and Visual Basic for writing asynchronous methods (in which you can easily “await” tasks), it also includes a new .NET library we lovingly refer to as “TPL Dataflow”, which enables building concurrent systems based on dataflow concepts, on in-process message passing, and on asynchronous pipelines.  This new library, System.Threading.Tasks.Dataflow.dll, is heavily inspired by the Visual C++ Asynchronous Agents Library, by the CCR from Microsoft Robotics, by the Axum language, and more; it’s built on top of a multitude of constructs introduced in .NET 4, internally using types like Task and ConcurrentQueue<T>,to provide solutions for buffering and processing data, for building systems that need high-throughput and low-latency processing of data, and for building agent/actor-based systems.  TPL Dataflow was also designed to integrate very well with the new language support for tasks, such that you can easily use TPL Dataflow constructs within asynchronous methods, and such that you can harness asynchronous methods within “dataflow blocks.” We’ll be discussing TPL Dataflow more in coming posts; in the meantime, for more information on this DLL, check out the whitepaper available at http://www.microsoft.com/downloads/details.aspx?FamilyID=d5b3e1f8-c672-48e8-baf8-94f05b431f5c; better yet, download the Async CTP and start playing with the bits today! (Note, too, that we’re extremely interested in your feedback, so please share any thoughts, suggestions, praises, or criticisms you may have.)

Leave a Comment
  • Please add 1 and 2 and type the answer here:
  • Post
  • I'm just getting started with the CTP; I'm planning to use the dataflow blocks to replace an existing codebase I developed using the CCR a while back.

    Could you provide some guidance on migrating projects from the "sunsetting" CCR over to TPL Dataflow?  You went into it a little bit in your Channel9 interview, but I'd love to see something more comprehensive.

    Thanks!

  • I'm having trouble using the basic BufferBlock type; I'm not seeing the expected behavior in the following:

    using System;

    using System.Threading;

    using System.Threading.Tasks;

    using System.Threading.Tasks.Dataflow;

    namespace TDF_Intro

    {

       class Program

       {

           static BufferBlock<int> m_buffer = new BufferBlock<int>();

           static void Main(string[] args)

           {

               var ab = new ActionBlock<int>((i) => { Console.WriteLine(i); });

               ab.Post(1);

               ab.Post(2);

               ab.Post(3);

               Console.WriteLine("Done!");

               Console.ReadLine();

               new Thread(async () =>

               {

                   for (int i = 0; i < 10; i++)

                   {

                       m_buffer.Post(i);

                       await TaskEx.Delay(1000);

                   }

               }).Start();

               new Thread(async () =>

               {

                   for (int i = 0; i < 10; i++)

                       Console.WriteLine(await m_buffer.ReceiveAsync());

               }).Start();

               Console.WriteLine("Done!");

               Console.ReadLine();

           }

       }

    }

    In the first part, I see "Done" followed by "1, 2, 3" as expected.  However, in the second part, I see "Done" followed by "0, 1, 1, 2, 2, 3, 3, 4, 4, 5" instead of "0, 1, 2, 3, 4, 5, 6, 7, 8, 9".  When I remove the call to "await TaskEx.Delay(1000)", however, I get the expected result.  Also, the "1, 1" pairs always show up right after each other, without a delay in between.

    Any help would be greatly appreciated!  (Even a "don't use await TaskEx.Delay at the end of a loop for the time being" would help. :))  Thanks in advance!

  • Lars, great request.  We'll try to get some guidance like that out soon.

  • Lars, regarding the BufferBlock, this is an unfortunate bug that was in the initial preview release, stemming from an interaction between TPL Dataflow and the await language binding for TPL.  It was already and easily fixed, and I'm hoping we'll have another release in the near future that contains the fix.  In the meantime, you can work around the problem by adding an "await TaskEx.Yield();" after the await on the ReceiveAsync().  For more details on the nature of the bug (and other workarounds), see the forum thread at social.msdn.microsoft.com/.../3164dfeb-36a1-4dfd-8256-28a14c5cc539.  Thanks for your interest and patience.

  • Thanks a lot Stephen, that did the trick!  And I've been patiently waiting for this ever since I saw the first video on the CCR four years ago; a few bugs in a CTP aren't going to bother me. :)

    Now I'm trying to port an interleaved agent from my CCR codebase to TPL Dataflow (the interleave arbiter was arguably one of the most useful CCR features, especially for robotics).  Below is the sample I've written up; it looks more like CCR code than TPL Dataflow code, however.  How would I clean this up?  Also, I don't think it works properly; my (very simplistic) testing indicates that the Read() and Write() calls execute simultaneously, instead of having a reader/writer lock relationship.  Am I doing something wrong?

       class InterleavedAgent

       {

           private ConcurrentExclusiveSchedulerPair m_scheduler;

           private int state;

           public BufferBlock<Tuple<WriteOnceBlock<int>, WriteOnceBlock<Exception>>> ReadPort { get; set; }

           public BufferBlock<Tuple<WriteOnceBlock<int>, WriteOnceBlock<bool>, WriteOnceBlock<Exception>>> WritePort { get; set; }

           public InterleavedAgent()

           {

               m_scheduler = new ConcurrentExclusiveSchedulerPair();

               ReadPort = new BufferBlock<Tuple<WriteOnceBlock<int>, WriteOnceBlock<Exception>>>();

               WritePort = new BufferBlock<Tuple<WriteOnceBlock<int>, WriteOnceBlock<bool>, WriteOnceBlock<Exception>>>();

               ReadPort.LinkTo(new ActionBlock<Tuple<WriteOnceBlock<int>, WriteOnceBlock<Exception>>>(

                   (request) => { Read(request); }, new DataflowBlockOptions(m_scheduler.ConcurrentScheduler)));

               WritePort.LinkTo(new ActionBlock<Tuple<WriteOnceBlock<int>, WriteOnceBlock<bool>, WriteOnceBlock<Exception>>>(

                   (request) => { Write(request); }, new DataflowBlockOptions(m_scheduler.ExclusiveScheduler)));

               state = 0;

           }

           private async void Read(Tuple<WriteOnceBlock<int>, WriteOnceBlock<Exception>> request)

           {

               Console.WriteLine("start...");

               await TaskEx.Delay(500);

               request.Item1.Post(state);

               Console.WriteLine("end...");

           }

           private async void Write(Tuple<WriteOnceBlock<int>, WriteOnceBlock<bool>, WriteOnceBlock<Exception>> request)

           {

               Console.WriteLine("START...");

               await TaskEx.Delay(2000);

               state = request.Item1.Receive();

               request.Item2.Post(true);

               Console.WriteLine("END...");

           }

       }

    private static void TestInterleavedAgent()

           {

               var agent = new InterleavedAgent();

               new Thread(() =>

               {

                   for (int i = 0; i < 10; i++)

                   {

                       var response = new WriteOnceBlock<int>(x => x);

                       var exception = new WriteOnceBlock<Exception>(x => x);

                       agent.ReadPort.Post(new Tuple<WriteOnceBlock<int>, WriteOnceBlock<Exception>>(response, exception));

                       response.LinkTo(new ActionBlock<int>((x) => { Console.WriteLine("Read: " + x); }));

                       exception.LinkTo(new ActionBlock<Exception>((x) => { Console.WriteLine("Read: ERROR"); }));

                       Thread.Sleep(500);

                   }

               }).Start();

               new Thread(() =>

               {

                   for (int i = 1; i <= 3; i++)

                   {

                       var request = new WriteOnceBlock<int>(x => x);

                       request.Post(i);

                       var response = new WriteOnceBlock<bool>(x => x);

                       var exception = new WriteOnceBlock<Exception>(x => x);

                       agent.WritePort.Post(new Tuple<WriteOnceBlock<int>, WriteOnceBlock<bool>, WriteOnceBlock<Exception>>(request, response, exception));

                       response.LinkTo(new ActionBlock<bool>((x) => { Console.WriteLine("Write: " + x); }));

                       exception.LinkTo(new ActionBlock<Exception>((x) => { Console.WriteLine("Write: ERROR"); }));

                       Thread.Sleep(1000);

                   }

               }).Start();

               Console.WriteLine("Done!");

               Console.ReadLine();

           }

    I would expect to never see "start" or "end" (from Read()) in between a "START" and an "END" (from Write()).  But in fact they're all mixed together.

  • Hi Lars-

    ConcurrentExclusiveSchedulerPair's schedulers ensure concurrent/exclusive execution for the code that's actually executing... it doesn't provide such guarantees while asynchronously waiting for something.  In your read function, for example, when you you "await TaskEx.Delay(2000);" you're temporarily exiting your read function to allow other work to happen, and thus at that point the CESP is able to execute other work, in particular your exclusive work, because nothing else is running on the scheduler at that point.  The primary scenarios we've seen people need interleaves/CESP for are for protecting shared data structures instead of needing to use locks.

    Do you have a real need to protect even the asynchronous delays, or was this just a test case?

  • This was just a test case.  And your explanation makes perfect sense; I'd been thinking about CESP as if it was a CCR Interleave arbiter, but that's not the case.  Hence, I was trying to protect the entire Read/Write method.

    Maybe my problem is that my original design was too coarse-grained, trying to use the same tool for different jobs.  Here's the scenario I'm working on, and I think I've figured out what I need to do:

    The ModBus protocol uses RS-485 serial communication (via System.Net.SerialPort) to communicate in request-reply fashion with addressable peripherals.  I've created a CCR/DSS service to represent ModBus communication over a given COM port (i.e., the service "owns" all ModBus requests/replies made on that port).  Obviously, requests (SendBytes) and replies (ReadBytes) need to be paired and isolated properly; that's one aspect of the synchronization.  It seems like the best way to handle that would actually be a BufferedBlock, rather than using CESP, with an async Run() method that performs the paired writes&reads for each request that is enqueued.

    One layer up in the application, I have a CCR/DSS service that represents a peripheral I/O device (effectively an array of digital output and input coils).  Consumers of this service should be able to concurrently read the state of the input coils but only write the state of the output coils exclusively.  This is the part where I would think CESP comes in.  If the read and write calls were nothing more than a direct access to in-memory arrays, no problem; I could effectively isolate the reads and writes properly by not 'awaiting' in those units of work.  However, the writes need to make asynchronous calls to the ModBus service/agent to actually make the state change happen; moreover, there is a background timer which makes asynchronous ModBus calls to read the state of the input coils.  This is where CESP, from what I understand, will not be sufficient.  But it sounds as using a queue (BufferedBlock) for the ModBus agent will solve that problem, since it will effectively prevent the I/O agent's read/write calls from being incorrectly interleaved?

  • Hi Lars-

    Thanks for the details. Without understanding your situation better, it sounds like an asynchronous method that pulls from a buffer block and does the processing would work, or alternatively an action block that uses an asynchronous method as its function (noting that, by default, action block will only process one message at a time, though that can be overridden using a DataflowBlockOptions provided to it).

    Regarding CESP, we've considered augmenting CESP to support both kinds of functionality (i.e. considering asynchronous waits as part of the processing or not as part of the processing), where you could potentially do something like:

    await cesp.EnterExclusiveAsync();

    ... // exclusive code here, which could include awaits

    cesp.ExitExclusive();

    You would do that in any asynchronous method; the "..." code would be scheduled when the exclusive rights were obtained, and then when done they would be released so that other tasks scheduled to the scheduler could run. Would that be useful to you?  In effect, this would be augmenting the CESP with imperative lock-like functionality.  You could achieve this without CESP of course by using an asynchronous lock, such as the AsyncSemaphore in our .NET 4 samples at code.msdn.microsoft.com/ParExtSamples.  With that, you can write:

    await sem.Wait();

    ... // exclusive code here, which could include awaits

    sem.Release();

    The benefit to having it integrated with CESP is that it would play nicely with other tasks scheduled to the pair of schedulers, but for most cases I believe the above should suffice, too.

  • Also, I should mention that BufferBlock can itself be used as an asynchronous semaphore lock.  Just create a BufferBlock and post a dummy item to it, e.g.

       private readonly BufferBlock<int> m_lock = new BufferBlock<int>();

       ...

       m_lock.Post(0);

    Then, you can "aquire" the lock by receiving the item from the buffer, e.g.

       await m_lock.ReceiveAsync(); // subject to the CTP bug referred to earlier, so for now you'll want to follow this with a "await TaskEx.Yield();"

    and then to release the "lock" you just post back to the buffer, e.g.

       m_lock.Post(0);

    If you want to use it as a semaphore, you can post multiple items to the buffer.

  • A great start thanks for doing this.  I've been using CCR and look forward to porting to the Dataflow model.

    Much easier to understand than CCR.  Terminology and class names make more sense.  Easier to configure because the Dispatcher/DispatcherQueue part is missing mostly because all of this is already handled in the Scheduler.  Easier to support because the component is part of the .NET Framework and not some other add-on.

    Having experience with CCR was good and bad.   Bad because I had to "unlearn" some CCR concepts.  Good because understanding CCR meant understanding the problem space.  NOTE: Unlearning CCR did not take long.  I think the guidance mentioned in the blog comments would be sufficient.

    Not sure I understand how to use the CompletionTask Property.  If I do a continuation what does that mean?  For example, every time data is posted will  the continuation execute  or what?   Wouldn't the job of exception handling happen in the delegate attached to the Target?  When/how should exceptions from the Task be propagated?

    Could a "Choice" be configured to receive an Exception?  Perhaps even configuring to receive sub-classed exceptions and then the default Exception base class?

    It would be useful to constrain the size of the internal data structure storing the data and/or have a revolving collection.  Say, for example, you always want the most recent data.  It looks like it could be achieved using a combination of Blocks, but a revolving collection seems like a common scenario.  If this isn't available how about a Clear method.

    I saw the blog comments about using Idisposable to do the unlinking, but what about suspending for a moment?   For example how do you just pause or wait until some future activation?  For example a Windows Service that reads configuration, fully configures itself,  validates the configuration, then systematically switches to "on" mode.  Something like the CCR's Arbiter.Activate method.

  • Hi Juday-

    Thanks for the feedback.

    Regarding the CompletionTask property, which is now just named "Completion", this task represents the overall lifecycle of the dataflow block.  For example, on an ActionBlock, this task will not complete until all data has been processed by the ActionBlock and it won't receive any more data because its Complete method was called.  Alternatively, it'll complete if the delegate passed to the ActionBlock lets an exception go unhandled, in which case the block will cease further processing, and the Completion task will end in a Faulted state.  Or, if you cancel the CancellationToken provided to the block, the task will end in the Canceled state (assuming no exceptions went unhandled in the interim).  So, it's not per datum to process, but rather the all-up processing of the block.  You could use this, for example, to achieve fork/join semantics, e.g.

    var ab = new ActionBlock<int>(i => { ... });

    for(int i=0; i<1000; i++) ab.Post(i);

    ab.Complete();

    ab.Completion.Wait();

    Regarding Choice, sure, you can configure it to work with data sources over any T, including Exception.

    Regarding a revolving collection, it sounds like you're asking for a buffer that overwrites contents to only store the most recent N.  We currently have that behavior in BroadcastBlock, but only for where N == 1.  Good suggestion for us to consider.

    You can suspend/resume simply by unlinking and then relinking.  For example, if you had a BufferBlock linked to an ActionBlock with a bounded capacity of 1, then you could unlink the buffer from the action when you want to suspend processing, and then re-link to the action when you want the action block to continue processing.

    Thanks!

Page 2 of 2 (26 items) 12