FR - Sous le capot : Etude de la Task Parallel Library - Eternal Coding - HTML5 / Windows / Kinect / 3D development - Site Home - MSDN Blogs

FR - Sous le capot : Etude de la Task Parallel Library


 

FR - Sous le capot : Etude de la Task Parallel Library

  • Comments 3

Le but de cet article est de découvrir les capacités de la Task Parallel Libray (TPL : Technologie de gestion des traitements parallèles introduite avec le framework .Net 4.0) d’une manière un peu différente. En effet, rien de tel pour comprendre une technologie que de s’amuser à la refaire depuis le zéro.

Bien évidemment, nous ne reproduirons pas à l’identique la TPL. Nous nous attacherons plutôt à reprendre les concepts en les implémentant à notre sauce afin d’avoir un petit framework utilisable dans un cas classique.

Nous en profiterons pour développer notre librairie en mode portable c’est à dire utilisable avec plusieurs frameworks. Pour se faire il nous suffit d’installer les Portable Library Tools. Une fois ces derniers installés, un nouveau projet apparait dans Visual Studio qui va nous permettre de développer notre TPL maison:

image

Grâce à ce projet nous allons développer une assembly directement référençable en .Net 4.0, en Silverlight 4, en Silverlight pour Windows Phone 7 et même en XNA 4.0 pour XBox 360 !

Bien entendu cette portabilité aura un cout en fonctionnalités disponibles mais nous en ferons notre affaire.

Vous pouvez d’ores et déjà retrouver le code de cette TPL portable ici : http://portabletpl.codeplex.com.

Vous pouvez également l’installer via un package NuGet : http://www.nuget.org/List/Packages/PortableTPL.

Toutefois avant de nous lancer dans la partie code, nous allons un peu étudier la partie de TPL qui nous intéresse pour voir comment architecturer notre solution.

Task et TaskFactory

La base de la TPL est constituée par la classe Task qui représente une unité de travail à réaliser.

C’est un peu l’équivalent d’un ThreadPool.QueueUserWorkItem(). Pour réaliser une tâche, il suffit de lancer une instance de Task avec l’action à réaliser en paramètre:

var task = new Task(state => { foreach (int value in data) { Thread.Sleep(value * (int)state); } }, 100); task.Start(); task.Wait();

Une autre manière de lancer une Task peut aussi être de passer par la méthode TaskFactory.StartNew() présente sur la classe statique Task.Factory :

Task task = Task.Factory.StartNew(() => { foreach (int value in data) { ... } }); task.Wait();

On peut noter l’appel à la méthode Task.Wait() qui va permettre de se synchroniser avec notre tâche et d’attendre qu’elle ait fini son travail.

De plus, il faut savoir qu’il existe une autre classe pour les tâches : Task<T>. Cette classe fille de Task rajoute la notion de valeur de résultat via la propriété Result qui est bien entendu de type T.

Task<int> task = Task.Factory.StartNew<int>( (limit) => { int sum = 0; for (int i = 0; i < (int)limit; i++) { sum += i; } return sum; }, 100); task.Wait(); Console.WriteLine(task.Result);

Ainsi Task<T> a pour but de récupérer un résultat la ou Task effectue une opération qui ne retourne rien.

La continuation de tâches

La continuation de tâches est une autre fonctionnalité que nous allons couvrir avec notre solution. Il s’agit en effet de permettre le lancement d’une tâche secondaire lorsqu’une tâche primaire prend fin.

Pour se faire il suffit d’utiliser la méthode ContinueWith() :

Task task = Task.Factory.StartNew(() => { foreach (int value in data) { ... } }); task.ContinueWith(t => { // Code de la seconde tâche... });

La seconde tâche recevra en paramètre la première tâche afin de pouvoir traiter les éventuels cas d’erreurs.

Les schedulers et le work stealing

Les tâches sont gérées et organisées par des schedulers (ordonnanceurs) dont le rôle est de les répartir sur les threads de travail. En effet, en fonction du nombre de processeurs, la TPL va créer des threads de travail (worker threads). Chaque tâche à exécuter serait alors traiter par un de ces worker threads.

En fonction du type de scheduler, l’attribution des tâches sur les worker threads sera faite selon des algorithmes différents. Dans notre cas nos gèrerons le scheduler standard (qui répartit la charge de manière uniforme) et le scheduler du contexte de synchronisation (qui fait tout exécuter sur le thread de l’interface afin justement de permettre à ses tâches de piloter l’interface).

Ainsi, ces derniers possèdent chacun une file d’attente des tâches qui leur sont attribués. A fur et à mesure chaque worker thread va donc dépiler ses travaux.

En plus de ce fonctionnement, la TPL gère une fonctionnalité supplémentaire qui est le work stealing (ou vol de travail). En effet, si un worker thread vient à terminer sa file d’attente et que ses collègues ont encore du travail à faire, il va gentiment venir leur prendre une partie de ce qu’ils leur restent à faire. Tout ceci afin de garantir une constante répartition de la charge sur chaque processeur.

Si l’on ne précise rien, les tâches sont lancées sur le scheduler standard. Toutefois si l’on souhaite lancer notre tâche sur le scheduler du contexte de synchronisation, il nous suffit de passer par ce code:

Task task = Task.Factory.StartNew(() => { foreach (int value in data) { Thread.Sleep(value * 100); } },CancellationToken.None, TaskCreationOptions.None, TaskScheduler.FromCurrentSynchronizationContext());

Un exemple de l’utilisation de ce genre de continuation peut être la gestion de la mise à jour d’une interface après l’exécution d’une tâche. Avant le lancement de la tâche nous pouvons afficher un sablier ou une animation d’attente puis lancer notre tâche. Lorsque la tâche se termine, sur une seconde tâche en continuation de la première et en exécution sur le scheduler du contexte de synchronisation nous pouvons masquer notre contrôle pour indiquer à l’utilisateur la fin du traitement.

Gestion de l’état

Nos tâches présentent un propriété Status qui va nous indiquer leur état courant :

  • TaskStatus.Created : Premier état d’une tâche. Son constructeur vient d’être appelé et elle n’a pas encore été lancée ni même schedulée.
  • TaskStatus.WaitingForActivation : La tâche a été lancée et elle attend d’être affectée par un scheduler à un worker thread.
  • TaskStatus.WaitingToRun : La tâche a été affectée à un worker thread mais ce dernier ne l’a pas encore exécutée.
  • TaskStatus.Running : La tâche est en cours d’exécution par un worker thread.
  • TaskStatus.RanToCompletion : Tout va bien! La tâche a été exécutée complètement.
  • TaskStatus.Faulted : Une exception a eu lieu dans la tâche et son exécution a été stoppée.
  • TaskStatus.Canceled : L’utilisateur a annulé l’exécution d’une tâche.

On peut donc suivre tout la vie d’une tâche via son statut. De plus, les tâches portent plusieurs propriétés booléennes qui rassemblent certains états:

  • Task.IsCanceled : Equivaut au status Canceled
  • Task.IsFaulted : Equivaut au status Faulted
  • Task.IsCompleted : Equivaut aux status RanToCompletion/Faulted/Canceled

Le support de l’annulation

La TPL permet au développeur de gérer simplement la notion d’annulation au sein des tâches. En effet, depuis le framework .Net 4.0, la classe CancellationTokenSource a été ajoutée afin de servir de service de base pour l’annulation en général. Dans le cadre de la TPL, cette classe va émettre un jeton qui sera copié vers la tâche et qui permet de savoir si une annulation a été demandée au niveau de la classe CancellationTokenSource. Si c’est le cas, le jeton peut lever une exception d’annulation :

var tokenSource = new CancellationTokenSource(); var token = tokenSource.Token; var task = Task.Factory.StartNew(limit => { for (int i = 0; i < (int)limit; i++) { token.ThrowIfCancellationRequested(); Console.WriteLine("Processing {0}", i); } }, 2000, token); Thread.Sleep(200); Console.WriteLine("Let's cut..."); tokenSource.Cancel();

Nous pouvons bien sur faire le choix de ne pas lever une exception mais plutôt de sortir proprement de l’exécution si nous détectons que le token a sa propriété IsCancellationRequested positionnée sur True.

Le lancement de l’annulation se fait sur le CancellationTokenSource auquel sont liés tous les tokens émis. Ceci peut bien entendu permettre d’annuler plusieurs tâches d’un coup. Il est même envisageable de s’enregistrer auprès du token via la méthode Register pour être prévenu lorsqu’une annulation est demandée.

La classe Parallel

En plus de la notion de tâches, la TPL ajoute le support de la classe Parallel qui va nous permettre de faire des boucles parallèles de manière vraiment élégante:

Parallel.For(0, 10, i => { Console.WriteLine(i); });

De même avec un ForEach :

Parallel.ForEach(new[]{"toto", "titi"}, Console.WriteLine);

Bien entendu, dans le cadre de l’utilisation de ces méthodes, il faut bien faire attention que chaque élément de la boucle soit bien autonome afin de ne pas provoquer d’erreurs d’accès croisés.

Gestion des exceptions

TPL propose une solution élégante pour la gestion des exceptions. En effet, si dans le corps d’un tâche, un exception venait à être levée, TPL se chargerait de la récupérer.

Elle sera alors par la suite transmise à l’utilisateur soit lors de la sortie d’un Parallel.For/ForEach soit lors de l’appel à la méthode Wait ou à la propriété Result des Task<T>.

Le point important à souligner ici est que la TPL va agréger les exceptions qui pourraient avoir lieu dans une tâche ou dans un Parallel.For/ForEach pour les présenter d’un coup sur le thread appelant :

try { Parallel.For(0, 10, i => { Console.WriteLine(i); if (i == 5) throw new Exception("erf..."); }); } catch (AggregateException aggEx) { foreach (Exception ex in aggEx.InnerExceptions) { Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message)); } } Console.ReadLine();

Dans cet exemple, le système écrira bien les 10 valeurs sur la console puis à la sortie du Parallel.For, la TPL lévera l’exception d’agrégation et nous verrons donc aussi apparaitre le superbe message “Caught exception erf…”.

De la même manière, nous pouvons récupérer cette exception sur un Task.Wait:

Task<int> task = Task.Factory.StartNew( limit => { int sum = 0; for (int i = 0; i < (int)limit; i++) { sum += i; if (i == 12) throw new Exception("Bad trip..."); } return sum; }, 100); try { task.Wait(); Console.WriteLine(task.Result); } catch (AggregateException aggregateException) { foreach (Exception ex in aggregateException.InnerExceptions) { Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message)); } }

Si nous ne faisons pas appel à une méthode ou une propriété qui lève l’exception d’agrégation, cette dernière restera alors non observée. La seule manière par la suite de savoir si une erreur a effectivement eu lieu sera de regarder la valeur de Task.Status ou celle de Task.IsFaulted.

Notre architecture

C’est donc maintenant que les choses sérieuses commencent! Nous savons de quoi nous parlons, nous avons le vocabulaire, il ne nous reste plus qu’à refaire pareil avec notre framework limité par la Portable Library.

Et avant toute chose, posons le schéma des classes principales à traiter :

image

On reconnaitra facilement une bonne partie des classes de la TPL et c’est tant mieux car c’est ce que l’on cherche à faire : avoir une API qui mimique celle de TPL.

Mise en place

Pour mémoire, la solution exposée n’est certainement pas celle retenue dans la vraie TPL et à pour seul but d’être didactique.

Les worker threads

Les worker threads constituent les vrais travailleurs dans notre affaire. C’est eux qui vont à la mine pour exécuter les tâches. Leur travail leur est affecté par les schedulers.

En interne, ils disposent d’une file d’attente (qui dont être protégée contre les accès concurrents par des locks) de tâches à réaliser. Leur boucle principale s’architecture donc autour d’un ManualResetEvent qui sera signalé dès qu’une nouvelle tâche sera ajoutée :

internal void AddTask(Task task) { task.SetIsScheduled(); lock (workQueue) { workQueue.Enqueue(task); } workEvent.Set(); } internal void AddTasks(Task[] tasks) { lock (workQueue) { foreach (Task task in tasks) { workQueue.Enqueue(task); } } workEvent.Set(); }

La boucle de travail principale va donc attendre de manière passive sur ce ManualResetEvent et traitera par la suite sa file d’attente jusqu’à ce que cette dernière soit vide :

void ThreadJob() { while (true) { workEvent.WaitOne(); workEvent.Reset(); if (quitCondition) return; Task task; bool noWork = false; do { bool emptyQueue = !workQueue.TryDequeue(out task); if (emptyQueue && !scheduler.StealWork(this)) { noWork = true; } if (task != null) task.DoWork(); if (quitCondition) return; } while (!noWork); } }

Notons au passage la présence d’une condition de sortie (QuitCondition) et de l’algorithme de vol de travail (scheduler.StealWork) que nous verrons plus tard.

La tâche (pour exécuter son code) fournit une méthode DoWork qui appelle le délégué qu’elle a reçu à sa construction :

protected virtual void ExecuteDelegate() { if (ActionDelegate.Method.GetParameters().Length > 0) ActionDelegate.DynamicInvoke(AsyncState); else ActionDelegate.DynamicInvoke(); }

L’algorithme des schedulers

Un scheduler doit à minima fournir une méthode EnqueueTask afin d’ajouter une nouvelle tâche à sa liste de travaux à dispatcher sur ses worker threads. Pour se faire nous allons avoir une classe abstraite TaskScheduler qui marquera cette méthode en tant qu’abstraite pure.

De plus cette classe portera une propriété statique ProcessorCount qui (hélas) devra être remplie par l’utilisateur. En effet, la Portable Library nous contraint sur l’accès à la propriété Environment.ProcessorCount car cette dernière n’est pas présente sur tous les frameworks que nous visons.

Le scheduler standard

Il s’agit du principal scheduler. Dès son lancement, il va construire un nombre de worker threads égal au nombre de processeurs définis par la propriété TaskScheduler.ProcessorCount.

Pour implémenter la méthode EnqueueTask, ce scheduler va chercher le worker thread dont la file de travail sera la plus petite :

WorkerThread FindLowestQueue() { WorkerThread result = null; int currentLevel = int.MaxValue; foreach (WorkerThread workerThread in workerThreads) { if (workerThread.QueueLength < currentLevel) { currentLevel = workerThread.QueueLength; result = workerThread; } } return result; }

Une fois le worker thread trouvé, le scheduler lui rajoutera juste la tâche en question.

Le scheduler du contexte de synchronisation

Ce scheduler est assez simple puisqu’il envoie toutes les tâches que l’on lui donne sur le contexte de synchronisation (le Dispatcher par exemple en WPF) :

public override void EnqueueTask(Task task) { context.Post(state=> task.DoWork(), null); }

Ce scheduler est construit par une méthode statique de la classe TaskScheduler :

public static TaskScheduler FromCurrentSynchronizationContext() { return new SynchronisationContextTaskScheduler(SynchronizationContext.Current); }

Work stealing

Nous l'avons vu plus haut, le scheduler peut à la demande d’un worker thread qui n’a plus rien à faire aller faire un transfert de travaux entre un worker thread chargé et un worker thread inoccupé.

Pour se faire nous avons la méthode StealWork :

internal bool StealWork(WorkerThread source) { foreach (WorkerThread workerThread in workerThreads) { if (workerThread == source) continue; if (workerThread.QueueLength > 1) { int total = workerThread.QueueLength / 2; List<Task> stolenWork = new List<Task>(); for (int index = 0; index < total; index++) { Task task; if (workerThread.TryDequeue(out task)) { stolenWork.Add(task); } } source.AddTasks(stolenWork.ToArray()); return true; } } return false; }

Cette dernière parcourt la liste des worker threads et dès qu’elle en trouve un qui a encore du travail (plus de une tâche), elle lui “vole” la moitié de son travail.

Gestion de la classe Parallel

La classe statique Parallel n’est finalement qu’un point d’entrée différent pour générer des tâches. En effet, cette dernière va faire appel aux services d’un scheduler standard créée pour l’occasion qui exécutera le code suivant:

void Dispatch(int stackSize, int excludedEnd, Func<int, Task> produceTask) { List<Task> totalTasks = new List<Task>(); int index = 0; foreach (WorkerThread workerThread in workerThreads) { int start = index * stackSize; int end = start + stackSize; if (index == ProcessorCount - 1) end = excludedEnd; Task[] tasks = new Task[end - start]; int taskID = 0; for (int local = start; local < end; local++) { Task task = produceTask(local); tasks[taskID++] = task; } totalTasks.AddRange(tasks); workerThread.AddTasks(tasks); index++; } Task.WaitAll(totalTasks.ToArray()); }

Au sein de cette méthode Dispatch, nous segmentons le nombre d’éléments à traiter par le nombre de worker threads disponibles. Il suffit ensuite d’attendre la fin de toutes ces tâches. L’énorme avantage en procédant ainsi est que l’on profite de tout ce qu’offre le scheduler et notamment le work stealing.

Gestion des exceptions

La gestion des exceptions va se faire au niveau des tâches qui devront protéger l’appel de leur délégué par un try/catch pour attraper toutes les exceptions remontées par le délégué. Il faut de plus faire attention au fait que l’exception qui remontera sera de type TargetInvocationException puisque l’on passe par la réflexion pour lancer notre action. Il faudra donc prendre l’exception encapsulée et la stocker dans notre exception d’agrégation :

try { ExecuteDelegate(); } catch (TargetInvocationException exception) { if (aggregateException == null) { aggregateException = new AggregateException(); } aggregateException.InnerExceptions.Add(exception.InnerException); status = TaskStatus.Faulted; return; }

Gestion des annulations

La gestion des annulations se fait au travers d’un CancellationToken qui lèvera une exception si il y a eu une demande d’annulation :

public void ThrowIfCancellationRequested() { if (isCancellationRequested) { throw new OperationCanceledException(this); } }

Par la suite, il va falloir modifier notre gestion des exceptions pour adopter un comportement différent lorsque une exception de type OperationCanceledException est levée :

try { WaitEvent.Reset(); ExecuteDelegate(); } catch (TargetInvocationException exception) { if (aggregateException == null) { aggregateException = new AggregateException(); } if (exception.InnerException is OperationCanceledException) { aggregateException.InnerExceptions.Add(new TaskCanceledException()); } if (status != TaskStatus.Canceled) { aggregateException.InnerExceptions.Add(exception.InnerException); status = TaskStatus.Faulted; } return; }

Ainsi, si nous rencontrons une annulation au niveau des exceptions, nous ne devons pas positionner le statut sur Faulted. En effet, la tâche lors de sa construction s’est abonnée au niveau du token et positionnera alors son statut sur Canceled.

Résultats et benchmarks

Pour la petite histoire, je me suis amusé à remplacer la TPL dans le code de rendu de mon raytracer maison (Bolas). En effet, via la TPL je demande sur un Parallel.For de traiter chaque ligne de mon image en parallèle pour accélérer le rendu  final:

image

Le code concerné est le suivant :

Task task = Task.Factory.StartNew(() => { if (scene == null || scene.Camera == null) return; if (IsParallel) { Parallel.For(0, RenderHeight, y => ProcessLine(scene, y)); return; } for (int y = 0; y < RenderHeight; y++) { ProcessLine(scene, y); } });

Comme on peut le voir, il s’agit d’un simple Parallel.For qui appelle pour chaque ligne la méthode ProcessLine.

En utilisant la TPL, il faut 47s sur mon Intel Core I7, pour générer une image de 800x800 avec anti-aliasing.
En utilisant notre version, il faut 45s pour générer la même image. Le résultat est donc tout à fait acceptable Sourire.

Alors bien évidemment, la TPL gère tout ça bien mieux (tant en stabilité qu’en fiabilité) et rajoute de très nombreuses autres fonctionnalités (notamment la notion de tâches filles attachées) mais ce qu’il est important de saisir ici c’est qu’il n’y a rien de magique. Comprendre comment fonctionne la TPL permet même de mieux architecturer son code en prenant les bonnes décisions en termes de design de sa gestion parallèle.

Ressources supplémentaires

Dev center MSDN sur Silverlight : http://msdn.microsoft.com/fr-fr/silverlight/default.aspx

Dev center MSDN sur Windows Phone 7: http://msdn.microsoft.com/fr-fr/windowsphone/

Dev center MSDN sur le framework .NET : http://msdn.microsoft.com/fr-fr/netframework/default.aspx

Leave a Comment
  • Please add 4 and 7 and type the answer here:
  • Post
  • Félicitation, tu viens de me faire perdre 15minutes de taf, mais elles n'ont pas été gâchées. Merci pour ton article !

  • Moi ça va j'étais à la maison en attente de préparer le BBQ ... donc on peut pas dire que j'ai vraiment perdu du temps ;)

  • super article.

    Juste au passage : Vive les ribbon :P

Page 1 of 1 (3 items)