Parallel Programming: Task Schedulers and Synchronization Context

Parallel Programming: Task Schedulers and Synchronization Context

Rate This
  • Comments 22

Thanks to everyone who provided feedback on my previous post Parallel Programming in .NET Framework 4: Getting Started. As promised, I am continuing the series. This time, let’s go a little bit deeper and talk about task schedulers, synchronization context, tasks that return values, and some other cool features of the Task Parallel Library (TPL).

By the way, here’s the full list of posts in this series:

This time I’m not going to add any new functionality to my sample application, but rather I will show how you can communicate with the UI thread a little easier and make your application less dependent on a particular UI framework.

Here is the final code from my last post:

using System;
using System.Collections.Generic;
using System.Windows;
using System.Windows.Documents;
using System.Threading.Tasks;
using System.Diagnostics;

namespace ParallelApplication
{
    public partial class MainWindow : Window
    {
        public MainWindow()
        {
            InitializeComponent();
        }
        public static double SumRootN(int root)
        {
            double result = 0;
            for (int i = 1; i < 10000000; i++)
            {
                result += Math.Exp(Math.Log(i) / root);
            }
            return result;
        }
        private void start_Click(object sender, RoutedEventArgs e)
        {
            textBlock1.Text = "";
            label1.Content = "Milliseconds: ";

            var watch = Stopwatch.StartNew();
            List<Task> tasks = new List<Task>();
            for (int i = 2; i < 20; i++)
            {
                int j = i;
                var t = Task.Factory.StartNew(() =>
                {
                    var result = SumRootN(j);
                    this.Dispatcher.BeginInvoke(new Action(() =>
                         textBlock1.Text += "root " + j.ToString() + " " +
                                             result.ToString() +
                                             Environment.NewLine)
                    , null);
                });
                tasks.Add(t);
            }
            Task.Factory.ContinueWhenAll(tasks.ToArray(),
                  result =>
                  {
                      var time = watch.ElapsedMilliseconds;
                      this.Dispatcher.BeginInvoke(new Action(() =>
                          label1.Content += time.ToString()));
                  });
        }
    }
}

Yes, it does everything that I wanted, but it uses the WPF API extensively. Although it is perfectly fine to use WPF as shown above, with the TPL you can make your code much less dependent on the particular UI framework. Right now, if you copy and paste the button event handler and the SumRootN method to a Windows Forms application with almost identical UI, you would need to do a lot of work, because Windows Forms doesn’t have the Dispatcher object and uses different API for managing interactions with the UI thread.

Let’s go back to the previous post and remember why I added the Dispatcher object in the first place. Well, I needed it to communicate with the UI thread, because all the computation results were in background threads created by the TPL. However, the TPL provides a different way of handling interactions between threads. It has task schedulers: very useful objects that are responsible for queuing and executing tasks.

My application already uses a default task scheduler, because this is how the TPL works with the ThreadPool. The TPL has other schedulers in addition to the default one and also allows you to create custom schedulers. One of the schedulers that TPL provides is based on the current synchronization context, and it can be used to ensure that my task executes on the UI thread. For example, let’s take a look at this code:

Task.Factory.ContinueWhenAll(tasks.ToArray(),
      result =>
      {
          var time = watch.ElapsedMilliseconds;
          this.Dispatcher.BeginInvoke(new Action(() =>
              label1.Content += time.ToString()));
      });

The time computation is very simple and fast, so it doesn’t require a background thread. The next line is about displaying the result, which is pure UI work.

I need to somehow get a reference to the UI thread, so I can run a task on it. In this case, it’s quite easy. The code above is from the button event handler, so before I start the task I am in fact operating on the UI thread. I just need to remember the current context and then pass it to the TaskFactory.ContinueWhenAll method.

Here is how I can do this (plus I need to add System.Threading to the list of namespaces):

var ui = TaskScheduler.FromCurrentSynchronizationContext();
Task.Factory.ContinueWhenAll(tasks.ToArray(),
    result =>
    {
        var time = watch.ElapsedMilliseconds;
        label1.Content += time.ToString();
    }, CancellationToken.None, TaskContinuationOptions.None, ui);

The TaskScheduler.FromCurrentSynchronizationContext method returns a task scheduler for the current context, in this case, the UI thread. The ContinueWhenAll method has an overload that accepts the task scheduler parameter. This overload requires some other parameters as well, but since I don’t need them, I use corresponding None properties. I got rid of the inner delegate, so it’s much easier to see what the task is doing and there is no Dispatcher any more.

Now let’s take a look at a more complicated case:

for (int i = 2; i < 20; i++)
{
    int j = i;
    var t = Task.Factory.StartNew(() =>
    {
        var result = SumRootN(j);
        this.Dispatcher.BeginInvoke(new Action(() =>
             textBlock1.Text += "root " + j.ToString() + " " +
                                 result.ToString() +
                                 Environment.NewLine)
        , null);
    });
    tasks.Add(t);
}

This one requires more thorough refactoring. I can’t run all the tasks on the UI thread, because they perform long-running operations and this will make my UI freeze. Furthermore, it will cancel all parallelization benefits, because there is only one UI thread.

What I can do is to split each task into two: one will compute the results and another one will display information to the UI thread. I have already used the ContinueWhenAll method that waits for an array of tasks to finish. It’s not surprising that TPL also allows you to wait for a certain task to finish and then to perform some operation. The method that does the job is Task.ContinueWith.

var ui = TaskScheduler.FromCurrentSynchronizationContext();
for (int i = 2; i < 20; i++)
{
    int j = i;
    var compute = Task.Factory.StartNew(() =>
    {
        var result = SumRootN(j);
    });
    tasks.Add(compute);

    var display = compute.ContinueWith(resultTask =>
                     textBlock1.Text += "root " + j.ToString() + " " +
                                         result.ToString() +
                                         Environment.NewLine,
                     ui);
}

OK, the code above doesn’t compile for an obvious reason. The result variable is local for the task compute and the task display doesn’t know anything about it. How can I pass the result from one task to another one? Here is one more trick from the TPL: tasks can return values. To make the task compute return the value, all I need is to change “var result =” to “return” in the compute task.

var compute = Task.Factory.StartNew(() =>
{
    return SumRootN(j);
});

This makes the compiler change the type of the compute object from Task to Task<TResult>. In my case, the type of compute is now Task<double>. Objects of the Task<TResult> type save the return value in the Result property.

var display = compute.ContinueWith(resultTask =>
                  textBlock1.Text += "root " + j.ToString() + " " +
                                      compute.Result.ToString() +
                                      Environment.NewLine,
                  ui);

That’s it. Here is my final code for the event handler:

private void start_Click(object sender, RoutedEventArgs e)
{
    textBlock1.Text = "";
    label1.Content = "Milliseconds: ";

    var watch = Stopwatch.StartNew();
    List<Task> tasks = new List<Task>();
    var ui = TaskScheduler.FromCurrentSynchronizationContext();
    for (int i = 2; i < 20; i++)
    {
        int j = i;
        var compute = Task.Factory.StartNew(() =>
        {
            return SumRootN(j);
        });
        tasks.Add(compute);

        var display = compute.ContinueWith(resultTask =>
                          textBlock1.Text += "root " + j.ToString() + " " +
                                              compute.Result.ToString() +
                                              Environment.NewLine,
                          ui);

    }

    Task.Factory.ContinueWhenAll(tasks.ToArray(),
        result =>
        {
            var time = watch.ElapsedMilliseconds;
            label1.Content += time.ToString();
        }, CancellationToken.None, TaskContinuationOptions.None, ui);

}

If you copy the code from this event handler plus the SumRootN method to the Windows Forms application, you will need to change the code only slightly, mostly because the UI elements are a little bit different. (Windows Forms applications do not have TextBlock control and their labels do not have the Content property.) Just for fun, I did it myself and highlighted the changes I had to make in the event handler.

private void start_Click(object sender, EventArgs e)
{
    label2.Text = "";
    label1.Text = "Milliseconds: ";

    var watch = Stopwatch.StartNew();
    List<Task> tasks = new List<Task>();
    var ui = TaskScheduler.FromCurrentSynchronizationContext();
    for (int i = 2; i < 20; i++)
    {
        int j = i;
        var compute = Task.Factory.StartNew(() =>
        {
            return SumRootN(j);
        });
        tasks.Add(compute);

        var display = compute.ContinueWith(resultTask =>
               label2.Text += "root " + j.ToString() + " " +
                               compute.Result.ToString() +
                               Environment.NewLine,
                ui);
    }

    Task.Factory.ContinueWhenAll(tasks.ToArray(),
         result =>
         {
             var time = watch.ElapsedMilliseconds;
             label1.Text += time.ToString();
         }, CancellationToken.None, TaskContinuationOptions.None, ui);
}

Now I have a parallel Windows Forms application, with responsive UI. The migration was really easy. So, my advice is to stick to the TPL way of managing UI thread instead of the UI framework API. It makes your code much easier to migrate and allows you to write essentially the same code, no matter what UI framework you use.

I’m going to talk about task cancellation next time. For now, if you want to know more about the features used in this post, here are some interesting links:

P.S.

Thanks to Dmitry Lomov, Michael Blome, and Danny Shih for reviewing this and providing helpful comments, to Robin Reynolds-Haertle for editing.

Leave a Comment
  • Please add 6 and 6 and type the answer here:
  • Post
  • Nice job, keep it coming!!

  • I love your articles!  Please continue!

  • Please, any sample code in Windows Forms application, with all complete source code ??

    Thanks in advanced

  • @ae

    Create a WIndows Forms app with two labels and one button, copy the event handler from the article (the very last version) plus add this method:

           public static double SumRootN(int root)

           {

               double result = 0;

               for (int i = 1; i < 10000000; i++)

               {

                   result += Math.Exp(Math.Log(i) / root);

               }

               return result;

           }

    And yes, don't forget to add all the "using" statements. This should be a complete source code.

  • Greatly explained! Waiting for the next article of the series.

  • OK... is it me, or will this method fail unless you also copy over the synchronization context? Or do you mean to say this works from the U/I thread exactly once, but dies if you try it from a continuation (recursively)?

  • @Hal

    I don't understand what you are trying to do.

    You can execute a task in a certain context, in this case, on a UI thread. You can have a loop within a task, it's OK. If you try to start several tasks in a loop on the UI thread, you have to remember, that there is still just ONE UI thread, so there will be no parallelization. They will execute, yes, but one after another and the UI will freeze if these tasks perform long-running operations.

  • If you fork a background task and then 'ContinueWith' a task using the 'FromCurrentSynchronizationContext()' scheduler, you will wind up continuing on a thread with no sync context. Try that twice in a row, and all heck will break loose.

  • @Hal

    Can you please copy your code here?

  • Alexandra, I may owe you an apology. I was emailed that the Scheduler takes care of restoring the sync context. I will post a counter-example (or an apology) this afternoon. (I suspect the programmer put the scheduler fetch in a closure.)

  • This is is excellent I think and could return brilliant results for the end user.

  • This is excellent I think and could return brilliant results for the end user (depending on the type of user).

  • Per your request, here is the code. I wanted to be concise, but, unfortunately, simple cases seem to work.

    1) The process begins when a WCF channel says there is new data. From the WCF message handler, a routine is started on the form's U/I thread via BeginInvoke().

    2) The Invoked routine locks a queue and puts a message there to tell the U/I to show the page. If the queue was empty when the lock was satisfied, it calls StartQueueReader() [Line 1]

    3) StartQueueReader() forks GetMovePage() to read the database [Line 3] and adds a continuation on the U/I thread [Line 6] to show the results.

    4)  GetMovePage() [Line 11] reads the latest page [Lines 19 - 21], looping back if a new request came in during the page query (which can last quite awhile), and finally returns the results [Line 25] to the continuation.

    5) The continuation routine [BindMoves(), Line 41] checks the queue once again [Line 47], and if there is a fresh message, calls StartQueueReader() again [Line 49]

    6) THE DAMAGE IS DONE... an unhandle exception occurs in FromCurrentSynchronizationContext() [Line 8] because Thread.CurrentContext is null

           /// <summary>

           /// Start a spare time task to get move pages

           /// </summary>

    1       private void StartQueueReader()

    2       {

    3          var queuedGetMovePage = Task.Factory.StartNew<GetMovePageResults>(

    4              GetMovePage

    5              );

    6          queuedGetMovePage.ContinueWith(

    7              BindMoves,

    8              TaskScheduler.FromCurrentSynchronizationContext()

    9              );

    10      }

           /// <summary>

           /// Spare time routine to read data for queued page reqests

           /// </summary>

           /// <returns>Last-requested page results or <value>null</value></returns>

           [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]

    11      private GetMovePageResults GetMovePage()

    12      {

    13          GetMovePageArgs args;

    14          while ((args = MostRecentMoveArgs()) != null)

    15          {

    16              try

    17              {

                       // Get the page of moves.

    18                  int moveTotal;

    19                  var moveList = _moveManager.GetPage(

    20                      args.StartDate, DateTime.UtcNow, args.PageSize, args.Page, out moveTotal

    21                      );

    22                  // Loop if a new reqest came in while we were busy

    23                  if (args != MostRecentMoveArgs())

    24                      continue;

    25                  return new GetMovePageResults

    26                  {

    27                      MoveList = moveList,

    28                      MoveTotal = moveTotal

    29                  };

    30              }

    31              catch (Exception ex)

    32              {

    33                  Logger.Log(ex);

    34                  // On error, don't reprocess the bad request

    35                  if (PopGetMovePageArgs())

    36                      return null;

    37              }

    38          }

    39          return null;

    40      }

           /// <summary>

           /// Continue on U/I thread after we get the page

           /// </summary>

           /// <param name="queuedGetMovePage"></param>

    41      private void BindMoves(Task<GetMovePageResults> queuedGetMovePage)

    42      {

               // If we emptied the queue without result, exit

    43          var getMovePageResults = queuedGetMovePage.Result;

    44          queuedGetMovePage.Dispose();

    45          if (getMovePageResults == null)

    46              return;

               // If we are unlucky and don't have the last requested page, go around again for the right one

    47          if (!PopGetMovePageArgs())

    48          {

    49              StartQueueReader();

    50              return;

    51          }

               // Clear out the move dataset, we are going to reget them.

    52          _moves.Clear();

    53          SummaryGrid.Clear();

               // Add each move we get.

    54          var moveList = getMovePageResults.MoveList;

    55          if (moveList != null && moveList.Count != 0)

    56              foreach (var m in moveList)

    57                  AddMove(m);

               // Bind the moves.

    58          SummaryGrid.Bind();

    59          _moveTotal = getMovePageResults.MoveTotal;

    60          SetMoveLabel();

    61      }

  • @Hal

    I hoped to take a look at this today, but didn't have time. And I'll be on vacation next two weeks. Sorry.

    You can try to post your question here: social.msdn.microsoft.com/.../threads

    This is the official MSDN forum for TPL.

  • great post, thanks for sharing. it's really great to learn something everyday.

Page 1 of 2 (22 items) 12