TPL : Build your own TPL for Silverlight, WP7 and XBox 360 - Eternal Coding - HTML5 / JavaScript / 3D development - Site Home - MSDN Blogs

TPL : Build your own TPL for Silverlight, WP7 and XBox 360


 

TPL : Build your own TPL for Silverlight, WP7 and XBox 360

  • Comments 4

This blog is a translation of http://blogs.msdn.com/b/eternalcoding/archive/2011/05/16/fr-sous-le-capot-etude-de-la-task-parallel-library.aspx

With this post, we will discover the Task Parallel Library (a library to simply manage your parallel developments with the .Net framework 4.0). But we will do this in a slightly different way. Indeed, we will totally rewrite it from scratch Sourire.

Of course, we will not do as well as the original TPL. We will focus on key concepts in order to understand how the TPL works.

We shall take advantage of writing our version to make it usable with multiple frameworks. To do that, we only need to install the Portable Library Tools. Once the latter installed, a new project appears in Visual Studio that will allow us to develop our TPL:

image_thumb[1]

With this project type, we will develop an assembly usable with .Net 4.0, Silverlight 4 (and 5), Silverlight for Windows Phone 7 and even with XNA 4.0 for XBox 360 !

Of course, this portability will have a price in terms of functionalities.

You can already download the Portable TPL just here : http://portabletpl.codeplex.com.

You can also install it with a NuGet package : http://www.nuget.org/List/Packages/PortableTPL.

However before writing our code, we will study the original TPL to see how we can build our solution.

Task & TaskFactory

TPL is built on a class called Task which is an unity of work to realize.

It is like a ThreadPool.QueueUserWorkItem(). So to realize a work, just create a Task with the action to realize in parameter :

var task = new Task(state => { foreach (int value in data) { Thread.Sleep(value * (int)state); } }, 100); task.Start(); task.Wait();

Another solution to start a Task is to use the TaskFactory.StartNew() method which is on the static class Task.Factory :

Task task = Task.Factory.StartNew(() => { foreach (int value in data) { ... } }); task.Wait();

We can remark the use of Task.Wait() which allows the synchronization between the task and our current thread.

In addition, you need to know there is another class for tasks: Task<T>. This daughter of Task  adds the concept of return value:

Task<int> task = Task.Factory.StartNew<int>( (limit) => { int sum = 0; for (int i = 0; i < (int)limit; i++) { sum += i; } return sum; }, 100); task.Wait(); Console.WriteLine(task.Result);

Thus Task<T> is designed to retrieve a result where Task executes only an action.

Task continuation

Task continuation is another feature we will cover with our solution. Indeed, the task continuation allows the launch of a secondary task when a primary one is terminated.

To do so, we only need to use the ContinueWith() method:

Task task = Task.Factory.StartNew(() => { foreach (int value in data) { ... } }); task.ContinueWith(t => { // Code de la seconde tâche... });

The secondary task will receive the primary task in parameter in order to process errors (if any).

Schedulers and work stealing

The tasks are managed and organized by schedulers whose role is to dispatch them on worker threads. Indeed, according to the number of processors, TPL will create the right count of working threads. Each task to run would be then treated by one of these worker threads.

The allocation of tasks on the worker threads will be made according to different algorithms. In our case we will cover the standard scheduler (which distributes the workload in a uniform manner) and the scheduler of the synchronization context (where all tasks run on the UI thread to allow them to drive the interface).

Thus, schedulers each have a queue of the tasks assigned to them.

In addition, the TPL handles additional functionality: the work stealing. Indeed, if a worker thread has an empty queue and his colleagues still have work to do, it will gently steal a portion of their remaining work. All of this is done to ensure a constant distribution of the load on each processor.

By default, the tasks are launched on the standard scheduler. However if you want to start a task on the scheduler of the synchronization context, you simply need to execute this code:

Task task = Task.Factory.StartNew(() => { foreach (int value in data) { Thread.Sleep(value * 100); } },CancellationToken.None, TaskCreationOptions.None, TaskScheduler.FromCurrentSynchronizationContext());

A sample of the use of this type of continuation can be the update of an interface after the execution of a task. Before the launch of the task we can display an hourglass or a wait animation and then start our task. When the task completes, on a second task running on the scheduler of the synchronization context and in continuation of the first we can hide our control to indicate to the user the end of the treatment.

State management

Tasks have a status property to indicate their current state :

  • TaskStatus.Created : First state of a task. Task is not yet launched or even scheduled.
  • TaskStatus.WaitingForActivation : Task was launched but is not yet assigned to a worker thread by a scheduler.
  • TaskStatus.WaitingToRun : Task was assigned to a worker thread but is not yet executed.
  • TaskStatus.Running : Task is executed by a worker thread.
  • TaskStatus.RanToCompletion : Everything is ok!!! The task was executed successfully.
  • TaskStatus.Faulted : An exception was thrown.
  • TaskStatus.Canceled : User has cancelled the execution of the task.

So we can follow the complete life of the task through its status.

In addition, tasks have several properties that bring together some status:

  • Task.IsCanceled : Like Status == Canceled
  • Task.IsFaulted : Like Status == Faulted
  • Task.IsCompleted : Like Status == RanToCompletion or Faulted or Canceled

Cancellation

TPL allows the developer to simply manage the concept of cancellation within the tasks. Indeed, since the framework.NET 4.0, the CancellationTokenSource class has been added to serve as a basis for cancellation. In the TPL, this class will emit a token that will be copied to the task. It allows to know if a cancel has been requested at the level of the CancellationTokenSource class. If this is the case, the token can throw a cancellation exception:

var tokenSource = new CancellationTokenSource(); var token = tokenSource.Token; var task = Task.Factory.StartNew(limit => { for (int i = 0; i < (int)limit; i++) { token.ThrowIfCancellationRequested(); Console.WriteLine("Processing {0}", i); } }, 2000, token); Thread.Sleep(200); Console.WriteLine("Let's cut..."); tokenSource.Cancel();

We can of course make the choice to not throw an exception but rather to quit the task if we detect the token's IsCancellationRequested property set to True.

The launch of the cancellation is done on the CancellationTokenSource that issued tokens. This can cancel several tasks at once. It is even possible to register with the token via the Register method to be notified when a cancellation is requested.

The Parallel class

In addition to the notion of tasks, TPL adds support for Parallel class which will allow us to do parallel loops in a really elegant way:

Parallel.For(0, 10, i => { Console.WriteLine(i); });

The same thing with ForEach :

Parallel.ForEach(new[]{"toto", "titi"}, Console.WriteLine);

Of course, with the use of these methods, we must be careful that each loop is really independent to not cause cross-threads access violations.

Exception handling

TPL offers an elegant solution for the management of exceptions. Indeed, if in the body of a task, an exception should be thrown, TPL will catch it for us.

It will be subsequently transmitted to the user when:

  • Parallel.For/ForEach returns
  • Calling the Wait method of a Task
  • Calling the Result property of Task <T>.

The important point to note here is that TPL will aggregate the exceptions that might occur in a task or a Parallel.For/ForEach to present them on the calling thread:

try { Parallel.For(0, 10, i => { Console.WriteLine(i); if (i == 5) throw new Exception("erf..."); }); } catch (AggregateException aggEx) { foreach (Exception ex in aggEx.InnerExceptions) { Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message)); } } Console.ReadLine();

In this sample, the system write 10 values to the console and then at the exit of the Parallel.For, TPL threw the aggregate exception. We will therefore see appear the superb "caught exception erf…" message.

In the same way, we can retrieve the exception on a Task.Wait:

Task<int> task = Task.Factory.StartNew( limit => { int sum = 0; for (int i = 0; i < (int)limit; i++) { sum += i; if (i == 12) throw new Exception("Bad trip..."); } return sum; }, 100); try { task.Wait(); Console.WriteLine(task.Result); } catch (AggregateException aggregateException) { foreach (Exception ex in aggregateException.InnerExceptions) { Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message)); } }

If we do not call a method or a property that throws the aggregation exception, then the latter will not be observed. The only way then to know if an error has actually occurred will be watching the value of Task.Status or Task.IsFaulted.

Our architecture

It is therefore now that serious things start! We know what we are talking about, we have the vocabulary, we are now ready to reproduce the same thing with our framework limited by the Portable Library.

And foremost, here is our class diagram:

image_thumb3

Implementation

As a reminder, the current solution is certainly not the true TPL and is only intended to be educational.

Worker threads

The worker threads are THE real workers in our case. They go to the mine to perform tasks. Work is assigned to them by the schedulers.

Internally, they have a queue (that must be protected against concurrent access by using locks) of tasks. Their main loop is built around a ManualResetEvent (that will be signaled as soon as a new task is added) :

internal void AddTask(Task task) { task.SetIsScheduled(); lock (workQueue) { workQueue.Enqueue(task); } workEvent.Set(); } internal void AddTasks(Task[] tasks) { lock (workQueue) { foreach (Task task in tasks) { workQueue.Enqueue(task); } } workEvent.Set(); }

The main work loop will therefore wait passively on the ManualResetEvent and later will process its queue until it is empty:

void ThreadJob() { while (true) { workEvent.WaitOne(); workEvent.Reset(); if (quitCondition) return; Task task; bool noWork = false; do { bool emptyQueue = !workQueue.TryDequeue(out task); if (emptyQueue && !scheduler.StealWork(this)) { noWork = true; } if (task != null) task.DoWork(); if (quitCondition) return; } while (!noWork); } }

Here we can see the presence of a condition of output (QuitCondition) and the work stealing algorithm (scheduler.StealWork) that we will see later.

The task (in order to run its code) provides a DoWork method that calls the delegate defined in its constructor:

protected virtual void ExecuteDelegate() { if (ActionDelegate.Method.GetParameters().Length > 0) ActionDelegate.DynamicInvoke(AsyncState); else ActionDelegate.DynamicInvoke(); }

Schedulers

A scheduler must at least provide an EnqueueTask method to add a new task to its list of tasks to dispatch on its worker threads.

Thus, we have an abstract TaskScheduler class to define this behavior.

In addition this class will have a static property ProcessorCount that must be defined by the user. Indeed, the Portable Library prevents us to access Environment.ProcessorCount because the latter is not present on all the targeted frameworks.

Standard Scheduler

It is the main scheduler. It allocates a number of worker threads equal to the number of processors defined by the TaskScheduler.ProcessorCount property.

To implement the EnqueueTask method, the scheduler will look for the worker thread whose work queue is the smallest:

WorkerThread FindLowestQueue() { WorkerThread result = null; int currentLevel = int.MaxValue; foreach (WorkerThread workerThread in workerThreads) { if (workerThread.QueueLength < currentLevel) { currentLevel = workerThread.QueueLength; result = workerThread; } } return result; }

Once a worker thread is found, the scheduler will just add it the task to execute.

Synchronization context scheduler

This scheduler is quite simple since it sends all its tasks on the synchronization context (for instance the Dispatcher for WPF):

public override void EnqueueTask(Task task) { context.Post(state=> task.DoWork(), null); }

This scheduler is built by a static method of the TaskScheduler class:

public static TaskScheduler FromCurrentSynchronizationContext() { return new SynchronisationContextTaskScheduler(SynchronizationContext.Current); }

Work stealing

As we have seen above, the scheduler can balance the load of worker threads by moving tasks for one worker thread to another.

So the StealWork method is :

internal bool StealWork(WorkerThread source) { foreach (WorkerThread workerThread in workerThreads) { if (workerThread == source) continue; if (workerThread.QueueLength > 1) { int total = workerThread.QueueLength / 2; List<Task> stolenWork = new List<Task>(); for (int index = 0; index < total; index++) { Task task; if (workerThread.TryDequeue(out task)) { stolenWork.Add(task); } } source.AddTasks(stolenWork.ToArray()); return true; } } return false; }

It iterates through the list of the worker threads and as soon as it finds one that still has work (more than a task), it will"steals" half of his work.

The Parallel Class

The static class Parallel is finally an different entry point to generate tasks. Indeed, Parallel will appeal to the services of a standard scheduler which will execute the following code :

void Dispatch(int stackSize, int excludedEnd, Func<int, Task> produceTask) { List<Task> totalTasks = new List<Task>(); int index = 0; foreach (WorkerThread workerThread in workerThreads) { int start = index * stackSize; int end = start + stackSize; if (index == ProcessorCount - 1) end = excludedEnd; Task[] tasks = new Task[end - start]; int taskID = 0; for (int local = start; local < end; local++) { Task task = produceTask(local); tasks[taskID++] = task; } totalTasks.AddRange(tasks); workerThread.AddTasks(tasks); index++; } Task.WaitAll(totalTasks.ToArray()); }

In this method, we divide the number of items to be treated by the number of worker threads available. A bunch of tasks are then created and we just have to wait for their completion.

Exception handling

The management of exceptions will happen at the level of the tasks. We must protect the call to the delegate of a task by using a try/catch block. We must then take the encapsulated exception and store it in our aggregate exception:

try { ExecuteDelegate(); } catch (TargetInvocationException exception) { if (aggregateException == null) { aggregateException = new AggregateException(); } aggregateException.InnerExceptions.Add(exception.InnerException); status = TaskStatus.Faulted; return; }

Cancellation

Cancellation management is done through a CancellationToken that will throw an exception if there is a cancellation request:

public void ThrowIfCancellationRequested() { if (isCancellationRequested) { throw new OperationCanceledException(this); } }

Later, we will have to change our management of exceptions to a different behavior when an exception of type OperationCanceledException is thrown:

try { WaitEvent.Reset(); ExecuteDelegate(); } catch (TargetInvocationException exception) { if (aggregateException == null) { aggregateException = new AggregateException(); } if (exception.InnerException is OperationCanceledException) { aggregateException.InnerExceptions.Add(new TaskCanceledException()); } if (status != TaskStatus.Canceled) { aggregateException.InnerExceptions.Add(exception.InnerException); status = TaskStatus.Faulted; } return; }

Thus, if we encounter a cancellation at the level of exceptions, we must not setting the status to Faulted because the task will set it to Canceled.

Results & benchmarks

Just for fun, I tried to replace TPL in my own raytracer (Bolas) code. Indeed, through a Parallel.For, I treat each line of my image in parallel to speed up the final rendering:

image_thumb1

The relevant code is the following:

Task task = Task.Factory.StartNew(() => { if (scene == null || scene.Camera == null) return; if (IsParallel) { Parallel.For(0, RenderHeight, y => ProcessLine(scene, y)); return; } for (int y = 0; y < RenderHeight; y++) { ProcessLine(scene, y); } });

As you can see, it is a simple Parallel.For which calls the ProcessLine method for each row.

Using the TPL, an image of 800 x 800 with anti-aliasing required 47s to generate.

By using our version, it takes 45s to generate the same image. The result is therefore quite acceptable.

Then of course, TPL is far better both in terms of stability and reliability and adds many other features.

But what it is important to understand here is that there is no magic. By understanding how the TPL works, you will be able to better architect your code by taking the right decisions in terms of parallel management design.

Leave a Comment
  • Please add 6 and 6 and type the answer here:
  • Post
  • Nice complete article.  Can you post the ray trace program?

  • Hi Donald, thanks for your feedback.

    I will post a blog soon about the ray tracer program :) Full source will be available of course.

  • When you post the RayTrace program, could you please include the code before as well as the code after conversion to your TPL?

  • yes of course

Page 1 of 1 (4 items)