Bonjour, 

Jusqu’à présent pour manipuler la notion de tâche, nous utilisions le type task_handle<> (ou directement le type task_group<>) comme suit :

Code Snippet
  1. task_group task;   
  2.     int i=0;   
  3.     auto  t1=make_task([&i]()
  4.         {
  5.            
  6.             i=42;
  7.         });
  8.     int z=0;
  9.     auto t2=make_task([&z]()
  10.         {
  11.            
  12.             z=42;
  13.         });
  14.  
  15.     task.run_and_wait (t1);   
  16.     task.run_and_wait (t2);   
  17.     std::wcout << i+z << std::endl;
  18.     std::wcout << "Fini" << std::endl;

Ici pour pouvoir faire le calcul de i+z nous devons capturer par référence les variables associées dans l’expression lambda (&i, et &z), puis attendre que les deux tâches est finie leur travail.

Désormais, avec Visual Studio 11, nous pouvons utiliser directement le type task<> et la notion de future comme suit :

Code Snippet
  1. task<int> t1=task<int>([]()->int
  2.         {
  3.             return 42;
  4.         });
  5.    
  6.     task<int> t2=task<int>([]()->int
  7.         {
  8.             return 42;
  9.         });
  10.    
  11.     std::wcout << t1.get() +t2.get () << std::endl;

La syntaxe ce simplifie, plus besoin de capturer les variables par référence, et la méthode get() de chaque tâche, à pour rôle d’attendre en appelant la méthode wait() de chaque tâche et de retourner la valeur.

Avec type task<> il est possible également de mettre en place la notion de continuation. Cette notion permet d’enchainer des tâches asynchrones qui se suivent, c’est à dire que le résultat de la tâche Tn dépend de la tâche Tn-1 et ceci sans se préoccuper de savoir si la tâche précédente a terminée son travail ou pas. Dans un monde de plus en plus connecté, les opérations asynchrones vont se généraliser afin d’éviter de figer l’interface utilisateur, il est donc important de simplifier le développement.

Pour mettre en place cette notion de continuation, le type task<> implémente la méthode then() qui sera exécutée une fois que la tâche a finie son travail.

Code Snippet
  1. task<int> t1=task<int>([]()->int
  2.     {
  3.         return 42;
  4.     }).then ([](int i)->int
  5.         {
  6.        
  7.             return i+42;   
  8.         });
  9.  
  10. task<int> t2=task<int>([]()->int
  11.     {
  12.         //simule une charge
  13.         std::this_thread::sleep_for (chrono::seconds (1));       
  14.         return 42;
  15.     }).then ([](int i)->int
  16.         {
  17.            
  18.             return i+42;   
  19.         });           
  20. std::wcout << t1.get() + t2.get() << std::endl;

ici la valeur de retour de chaque tâche, est passée comme paramètre à la méthode then() dans l’expression lambda.

Dans cette version nous avons également ajouté des algorithmes paralleles aux algorithmes de bases (parallel_for, parallel_for_each, et parallel_invoke)

Les algorithmes parallèles de tries

parallel_sort , parallel_radixsort, parallel_buffered_sort ces algorithmes s’utilisent de la même manière que le sort standard si ce n’est que l’exécution se fait en mode parallèle.

Code Snippet
  1. vector<unsigned int> v3;
  2.    parallel_sort(begin(v3),end(v3))

Bien que dans la majorité des cas cela soit suffisant pour obtenir des performances accrues sur un trie simple d’entiers (Exemple sur un vecteur de 40 000 000 d’entiers, j’obtiens une score de 2745 ms avec std::sort() et 711 avec parallel_sort(), il est des cas ou une meilleur maitrise permet peut être d’améliorer les performances.

Chaque algorithme, possèdent pour ce faire des méthodes surchargées, qui permettent de définir le prédicat de trie ainsi que la taille de la partition à utiliser.

Code Snippet
  1. vector<unsigned int> v2;
  2.       parallel_sort(begin(v2),end(v2),[](const unsigned int& l,const unsigned int& r)->bool   
  3.    {
  4.       
  5.        return (l<r);
  6.    },4096);  

La taille de la partition par défaut est de 2048 et est suffisante pour la majorité des cas. Mail il est  possible de jouer sur la taille de a partition pour éventuellement améliorer les performances. Néanmoins, si la taille de la partition est inférieure ou égale à la taille du vecteur, c’est de toute manière std::sort() qui sera utilisé. (Par défaut si le nombre d’éléments du vecteur est <= à 2048 c’est toujours la version std::sort() qui sera utilisée).

Code Snippet
  1. inline void parallel_sort(const _Random_iterator &_Begin, const _Random_iterator &_End, const _Function &_Func, const size_t _Chunk_size = 2048)
  2. {
  3.     _CONCRT_ASSERT(_Chunk_size > 0);
  4.  
  5.     // We make the guarantee that if the sort is part of a tree that has been canceled before starting, it will
  6.     // not begin at all.
  7.     if (is_current_task_group_canceling())
  8.     {
  9.         return;
  10.     }
  11.  
  12.     size_t _Size = _End - _Begin;
  13.     size_t _Core_num = Concurrency::details::_CurrentScheduler::_GetNumberOfVirtualProcessors();
  14.  
  15.     if (_Size <= _Chunk_size || _Core_num < 2)
  16.     {
  17.         return std::sort(_Begin, _End, _Func);
  18.     }
  19.  
  20.     _Parallel_quicksort_impl(_Begin, _Size, _Func, _Core_num * _MAX_NUM_TASKS_PER_CORE, _Chunk_size, 0);
  21. }

Vous remarquerez également que std::sort() est utilisé de toute manière si le système ne possède pas au moins de processeurs virtuels.

D'autres algorithmes de tries sont disponibles et sont a utiliser en fonction des besoins. Par exemple dans notre cas le trie radix

Code Snippet
  1. vector<unsigned int> v4;
  2.    parallel_radixsort(begin(v4),end(v4));

Donne un meilleur résultat 197.09 sur 40 000 000 d’éléments. La taille de la partition par défaut étant de 256*256

et le parallel_buffered_sort donne 631

Code Snippet
  1. parallel_buffered_sort(begin(v5),end(v5));

Si vous souhaitez plus d’informations sur les différents tries, je ne ne peut que vous encourager à aller voir le post sur le blog de l’équipe en charge du parallélisme chez Microsoft.

 

Réduction

Appliquer une somme, un minimum, un maximum sur tous les membres d’un groupe est appelé en générale une réduction. Appliquer une réduction en parallèle peut soit impliquer des problèmes de race condition, soit des problèmes d’arrondie dans une opération d’addition de float par exemple.

Avec Visual Studio 2010 lorsqu’on souhaite faire une réduction, par exemple la somme d’un double dans une boucle parallèle, une des méthodes consiste à utiliser l’objet concurrency::combinable<T>

Code Snippet
  1. combinable<double> sumReduction;   
  2.    parallel_for_each(begin(v1),end(v1),[&sumReduction](double i)
  3.    {    
  4.        sumReduction.local ()+=i;      
  5.    });
  6.    sum=sumReduction.combine (std::plus<double>());

 

Avec Visual Studio 11, il est désormais possible d’utiliser l’algorithme parallel_reduce() qui semble plus performant que l’utilisation du type concurrency::combinable().

Son utilisation est relativement identique à std::accumulate()

Code Snippet
  1. vector<double>::const_iterator deb=begin(v1);
  2.    vector<double>::const_iterator fin=end(v1);
  3.    auto identity=std::iterator_traits<vector<double>::const_iterator>::value_type();  
  4.    sum= parallel_reduce(deb,fin,identity,std::plus<double>());

 

Transformation

Enfin le dernier algorithme est l’algorithme de transformation parallel_transformation()

Code Snippet
  1. parallel_transform(begin(v1),end(v1),begin(v2),[](double value)->double
  2. {
  3.     return value*2;
  4. });

Eric Vernié