Welcome to MSDN Blogs Sign in | Join | Help
Debugging PPL in Visual Studio 2010

As you know from our previous posts, Visual Studio 2010 comes with great support for the task-based programming model and multi-threaded applications. Even though Daniel uses managed code in his examples, all the content he created for the Parallel Tasks and Parallel Stacks windows applies to C++ code too.

Find out about all the links to debugging material from blog post: Parallel Debugging.

Concurency::parallel_for and Concurrency::parallel_for_each

Marko Radmilac, Senior Developer for Microsoft’s Concurrency Runtime, offers the following discussion of parallel iteration in the Concurrency Runtime:

Let me say up front that this is my first blog, so I apologize if my writing format does not match what people have come to expect from a blog. I do hope, however, that this blog will contain enough information to provide some introductory information on the Parallel Patterns Library’s parallel_for and parallel_for_each constructs, when and how to use them. This blog has a Q and A format and contains both basic and advanced information, so feel free to jump to the section that is of most interest to you.

What is parallel_for and where do I find it? As you are probably aware (since you are reading this blog), the native C++ libraries in Visual Studio 2010 were extended to provide rich support for parallel programming. There are different layers at which users can interact with the parallel runtime, the highest one of which is the Parallel Patterns Library (using the header file ppl.h). In it, users can find different constructs that allow them to quickly parallelize their programs without extensive knowledge of scheduling decisions, underlying threads, the surrounding environment, etc. One of these constructs is the parallel_for construct, which allows a user to parallelize a for-loop quickly. Its close cousin is parallel_for_each, providing the same level of ease in parallelizing std::for_each construct.

How do I use parallel_for and when? Parallel for is a construct which takes the body of a for-loop written by a user captured in a functor (for details on functors and lambdas please read Stephan’s excellent blog), divides the work (number of iterations) amongst the available computing resources (processors) and executes that work in parallel. Here’s a simple example of serial to parallel transformation:

for (int i = 0; i < n; i++)
{
   
iter();
}

becomes

Concurrency::parallel_for(0, n,
   
[] (int i)
   
{
       
iter();
   
});

A naïve user might decide go ahead and convert all for-loops in the program into parallel for-loops. That would be a mistake, because without an initial feasibility analysis this conversion might be detrimental to the program. There are two aspects that must be considered before a conversion is made, correctness and performance, and they are both explained in the following few paragraphs.

I have converted my for-loops into parallel for-loops. Why doesn’t my code work properly? Converting all for-loops in a program into parallel for-loops may yield an incorrect program. There are quite a few algorithms in which the individual iterations of the loop are not independent. They often require another iteration to have been executed beforehand, so executing them in isolation would likely yield wrong results. A typical example would be a partial sum computation “in place”, on the original array (look at std::partial_sum):

for (int i = 0; i < n; i++)
{
    // Array “a” contains both an original sequence
    //
and the end result
    a[i] += a[i-1];
}

In order to compute kth term in a resulting sequence, the k-1th term must be known. If one were to execute iterations in parallel it could be that the k-1th term is not populated by the time the kth term is processed, yielding an incorrect result.

Concurrency::parallel_for(0, n,
    [] (int i)
    {
        a[i] += a[i-1]; // incorrect!
    });

I have converted my for-loops into parallel for-loops. Why does my code run slower than serial code? Converting all for-loops in a program into parallel for-loops may yield performance degradation instead of an improvement. Parallel for, as mentioned before, will divide the work amongst available processors and set it up for parallel execution. The initial division of work, setting up worker threads, and invoking a functor for each iteration, all incur a certain cost. That cost is not big, but it is not negligible either. If the work done in a single iteration and number of iterations itself are both small, this overhead is likely to show up as a less than expected speedup, sometimes even a slowdown.

Concurrency::parallel_for(0, 100,
    [] (int i)
    {
        a[i] = b[i] + 1; // only a few cycles
    });

In the example above, there is one memory read, one memory write and one add to be performed in a single iteration of the loop. On top of that, there are only 100 iterations in the entire loop. In this case, making a function call to perform this one iteration is likely to be on the same order of magnitude, if not greater, than the entire iteration itself. Therefore, this loop is not a good candidate for parallelization.

I am frustrated! When should I use parallel_for then? You should analyze your program first and identify “hot spots”, i.e. places in your code that are computationally intensive. Identify for-loops that control that computation and try to rewrite them in a way that eliminates dependencies between iterations. Also, in the presence of nested for-loops, it is often more beneficial to parallelize at the outer most loop level, in order to increase the amount of work being done per iteration. Once this analysis is complete, apply parallel for to the given loops and watch your program run faster J.

I was using OpenMP and I noticed that OpenMP executes my workload much faster than parallel_for does. Why is that? OpenMP uses a simple mechanism for scheduling the parallel iterations. It counts the number of iterations, divides them up by the number of processors P, and then schedules P workers to execute individual chunks of work. Each iteration in OpenMP consists of only a function call to the outlined function (in parallel for’s case this is a functor/lambda). On the other hand, parallel for is a more general mechanism, does much more work during preparation for parallel invocation, and much more during each iteration. This impacts the total amount of work (and the number of iterations in the loop) needed for parallel for to be profitable, and that number is higher than OpenMP’s. Therefore, you must have a for-loop with just enough work to make OpenMP profitable, but not enough to do the same for parallel for. If you had less work, both would be slower than serial execution; if you had more, both would be equally profitable because the overhead is amortized by the amount of work being done.

Why does it take so much work per iteration for parallel_for to turn profitable? OpenMP does it with far less work per iteration. As mentioned above, parallel for is a much more general construct than any construct in OpenMP. It should be noted that the Concurrency Runtime (ConcRT) is not less performant than the OpenMP runtime, but the parallel for construct tries to address many more issues than a simple OpenMP for-loop, and these come at a cost. It is possible to reduce the generality of parallel for to a minimum, at which point it becomes very close to the performance of the OpenMP counterpart. You can find a less generalized implementation of parallel_for in our sample deck, called parallel_for_fixed (it uses fixed-partitioning of iterations, without range stealing). The reason for generality of parallel for implementation is given in the following few paragraphs.

My workload inside the for-loop is uneven. Can parallel for help me distribute it properly? Yes, parallel for by default does range stealing and dynamic redistribution of work. That means that if any worker thread finishes its range, it will attempt to help other worker threads with their workloads. This ensures that all computing resources are fully utilized during parallel for execution. OpenMP does this to an extent with dynamic and guided scheduling, but parallel for does it on a per iteration level without using interlocked operations to mark the progress.

Can parallel for support both unsigned and signed loop indices? Do I have to worry about overflow? Yes, parallel_for handles both signed and unsigned indices equally well. As long as the type supports some basic operations found on an integer type, parallel for will handle it well. It will also make sure that the range does not overflow a signed type. This was not a case with OpenMP below version 3.0.

Do I have to worry about overloading the system? What if there are other parts of a process using the Concurrency Runtime? Parallel for automatically detects system load and identifies available processors to use. Therefore, it will be cooperative with the rest of the process, and use only available resources. Once other, busy resources become available, parallel for will quickly find use for them (they will join the computation). OpenMP provides this through its dynamic mode; although, once a parallel region has started executing, any available resources arriving after that point would not be used in the computation.

Can I have blocking code in my loop iterations? What happens if I block? Yes, you can have blocking in your code as long as it is cooperative (using ConcRT blocking APIs, or using Win32 blocking APIs on user mode scheduling -- UMS). When blocking happens, parallel for and ConcRT ensure that another worker is scheduled to pick up where the previous one left off. As a result, parallel for enables users to write code that would not work properly as serial code. The prime example is enabling forward blocking dependency, where ith iteration is blocked until jth iteration executes, where j > i. This code would deadlock in serial. OpenMP does not provide any support for blocking between iterations.

Concurrency::event e;
volatile LONG counter = 0;

Concurrency::parallel_for(0, 64,
    [&] (int i)
    {
        InterlockedIncrement(&counter);

        if (i == 63)
        {
            // all other iterations blocked;
            // unblock them
            e.set();
        }
        else
        {
            // wait for iteration 63 to unblock
            e.wait(); 
        }
    });

// counter must be at 64 at this point

Can I cancel parallel for in the middle of computation? Yes, you can cancel parallel for at any point by throwing an exception from one of its iterations, or canceling its parent task group. This technique is very useful in performing long searches where computation can be stopped as soon as the result is found.

Concurrency::parallel_for(0, n,
    [] (int i)
    {
        if (found(i))
        {
            throw ItemFound();
            // cancels work immediately
        }
    });

This can also be achieved by cancelling the parent task_group, thus enabling cancellation on the entire subtree of work.

Parallel for guarantees that only iterations that have already started will complete after cancellation is initiated. This ensures that resources are not wasted. OpenMP does not have any support for cancellation, although one can be built in using a flag.

bool flag = false;

#pragma omp parallel for
for (int i = 0; i < n; i++)
{
    if (flag)
    {
        return;
    }
    else if (found(i))
    {
        flag = true; // cancels work
    }
}

It should be noted that even with this hand-crafted cancellation scheme OpenMP does not support cancellation of the entire sub-tree of computation, which parallel for does.

Can I throw exceptions inside the parallel for? Will that tear down my application? Yes, you can throw exceptions in the parallel for functor, and that will not result in the termination of your application. An exception on a worker thread will safely be transported to the main thread that initiated the parallel for (if exception is not caught in the body of the functor, naturally) so the user will have an opportunity to catch the exception outside of parallel for. OpenMP has no exception handling support.

Can I use parallel for for STL iterators? This is why we have parallel for each. If your iterator supports random access, it will use the same mechanism used for parallel for. If your iterator only supports forward access, then we have provided code that reduces this case to a random access, at which point parallel for construct is reused.

How many iterations, exactly, does it take to make parallel for better than serial? How many cycles should the functor contain in order for parallel for to beat the serial implementation? This is a difficult question to answer. It depends on a machine configuration, whether there is other parallel work in the process, etc. Therefore, instead of giving a general and non-useful answer, I will attempt to give a concrete answer for a 4-core box that I am using, without any other work on it.

This is how I decided to measure it:

i) I will have an outside loop with the number of repetitions for the experiment. This loop amortizes the initial spin of threads in both OpenMP and ConcRT. Also, it magnifies the result to a level that is easily measurable (above 100 ms).

ii) I will have a dial that controls the amount of work being done per iteration.

iii) I will have a dial that controls the number of iterations in a loop.

When ii) is small and iii) is big we will measure the overhead per iteration (provided that the initial amount of work per iteration is small); when they are both small we will measure the overhead for the initial setup of threads, dividing work, etc.

The work being done will be a simple function call without any work in that function, and ii) will control how many times that function is being called. This is how a simple version of this program would look like:

#include <ppl.h>
#include <windows.h>
#include <stdio.h>
#include <omp.h>

using namespace Concurrency;

#define RUN_TEST test_parallel_for

#define SIZE_OF_ARRAY 10000

#define REPEAT 10000

#define WORK 2

long long counter() {
   
LARGE_INTEGER li;
   
QueryPerformanceCounter(&li);
   
return li.QuadPart;
}

long long frequency() {
   
LARGE_INTEGER li;
   
QueryPerformanceFrequency(&li);
   
return li.QuadPart;
}

// compiled /Od in a separate object as {}
// to avoid inlining and invariant code motion

void noworkhelper();

void work(int index)
{
   
for (int i=0; i<WORK; i++)
   
{
       
noworkhelper();
   
}
}

typedef void (*FNPTR) (int i);

FNPTR Func = work;

__declspec(noinline)
void test_parallel_for()
{
   
parallel_for(0, SIZE_OF_ARRAY, Func);
}

__declspec(noinline)
void test_omp_for()
{
   
#pragma omp parallel for
   
for (int i = 0; i < SIZE_OF_ARRAY; i++)
   
{
       
Func(i);
   
}
}

void run_data()
{
   
for (int i = 0; i < REPEAT; i++)
   
{
       
RUN_TEST();
   
}
}

void main()
{
   
SchedulerPolicy policy(1,
           SchedulerKind, ThreadScheduler);
   
Scheduler::SetDefaultSchedulerPolicy(policy);

    double etime = 0.0;

    long long start = counter();
    run_data();
    long long finish = counter();

    etime = (finish - start) * 1000.0 / frequency();
    printf("Elapsed time for test: %g ms\n", etime);

    Scheduler::ResetDefaultSchedulerPolicy();
}

Before showing the results, it should be noted that these are pure overhead tests, designed to show costs related to creating, running and maintaining parallel regions. Here are the raw tables:

 

R:100000, I: 500, W:5

R:50000, I: 500, W:10

R:10000, I: 500, W:50

R:5000, I: 500, W:100

R:1000, I: 500, W:500

R:500, I: 500, W:1000

Serial

713.2

670.9

637.1

649.4

632.9

630.2

OpenMP

295.1

265.5

197.5

189.6

173.4

184.6

Parallel for

1142.1

671.7

293.7

241.3

202.5

200.9

Fixed

564.3

388.9

232.7

233.6

183.3

206.7

R – the number or repeats, I – the number of iterations, W – the number of units of work, measured times all in milliseconds

image Small number of iterations, small work 1

Now let’s look at what happens at high number of iterations and low amounts of work. This will tell us how costly each iteration is to execute in OpenMP and parallel for.

 

R:500, I: 100000, W:5

R:100, I: 100000, W:25

R:50, I: 100000, W:50

R:10, I: 100000, W:250

R:5, I: 100000, W:500

R:1, I: 100000, W:2500

Serial

712.1

645.2

638.4

637.1

632.7

630.5

OpenMP

198.6

194.6

170.9

168.3

169

168.5

Parallel for

420.3

207.6

219.9

169.1

170.6

164.9

Fixed

226.6

213.6

196.8

169.1

172.9

179.5

R – the number or repeats, I – the number of iterations, W – the number of units of work, measured times all in milliseconds

image 
Large number of iterations, small work 1

Conclusion -- Parallel for is a general purpose mechanism for executing serial for-loops in parallel. It is up to the user to determine legality and profitability of such a transformation. Parallel for construct increases the robustness and the flexibility of parallelized code by having built in support for features such as: cooperative blocking during iterations, immediate cancellation, signed integer iteration type with arbitrarily large ranges, dynamic redistribution of workload, support for STL iterators, etc. However, the generality of parallel for comes at a cost, and there are 2 types of overhead that make our parallel for implementation slower than OpenMP in some cases. The first one is the initial setup of the worker threads and the range stealing mechanism, and the second one is additional work being done per iteration to support cancellation and range stealing. This may result in poor scaling of the parallel for-loop, if the amount of work being done per iteration is very small. If your program falls into that category, and you would still like to see speedup from executing in parallel, please use a specialized, fixed-partitioning version of parallel for, parallel_for_fixed, available in our samples collection. It will scale as good as OpenMP in the cases where limiting overhead is essential.

Sample Message Blocks priority_buffer, bounded_buffer, and alternator

Recently, I created 3 new sample message blocks that complement the Agents Library’s existing set and provide additional functionality. The three message blocks I selected to write, priority_buffer, bounded_buffer, and alternator, were based on customer feedback and to improve support for certain scenarios.

Each of these sample message blocks behave similar to unbounded_buffer and the other message blocks in the Agents Library, so taking a quick review at one of my previous posts introducing a couple of blocks might be valuable. Also taking a look at the msdn page for asynchronous message blocks would be helpful.

 

priority_buffer

Priority_buffer is very similar to unbounded_buffer, except messages are sorted and distributed to its targets based on priority. To use a priority_buffer the payload must implement the comparison operator<. This should not be an expensive operation as it will be called often to compare messages. A payload that is less than another is considered to be of higher priority. For example if using a priority_buffer of integers 1 is of higher priority than 2.

Priority_buffer doesn’t collect together and stop messages from being delivered to sort them. Message order will only be changed if messages are not consumed as fast as they are produced. Because messages are prioritized, ordering is no longer guaranteed; priority_buffer will always try to deliver the highest priority message to its targets at any given time.

Scenarios for priority_buffer include providing quality of service by addressing the most important items or tasks first.

bounded_buffer

Bounded_buffer is like a queue with a maximum capacity. The number of messages a bounded_buffer can hold is specified at construction time. Until it reaches its capacity bounded_buffer performs exactly like unbounded_buffer. Once full bounded_buffer will no longer accept new messages, instead it will postpone any offered messages until it goes back below capacity. Postponing messages is a way for a message block to say it isn’t going to take ownership of this message right now, but might want to at a later point in time. Once back below capacity bounded_buffer will try to consume any of these previously postponed messages from each of its sources. Bounded_buffer should be used in conjunction with the send message passing function to easily quench data production directly at the source without wasting computation cycles using polling.

Bounded_buffer is great in data-flow networks or producer/consumer scenarios where data creation occurs at a faster rate than can be processed and memory is a concern. Using bounded_buffer, message production can be blocked at the source of creation until previous messages are processed.

alternator

Many of the blocks in the Agents Library deliver messages out to targets based on linking order; i.e.., the first linked target gets a chance at each message before all others. Sometimes, it is desirable to evenly distribute messages across all targets in a round-robin fashion, alternator does exactly this. Alternator delivers the first message to its first linked target, the second message to its second linked target… and so on looping back to the first link when each target has been offered a message. Think of alternator as an unbounded_buffer that distributes messages fairly amongst its targets.

Alternators are useful for evenly distributing work across multiple pipelines or agents.

Download the Code

Follow this link to download the source code for priority_buffer, bounded_buffer, alternator, and many other samples from the Concurrency Runtime team. Specifically these blocks are located in SamplePack\ConcRTExtras\agents_extras.h.

 

Feedback

Please let me know what features you find the most useful and which ones we are lacking. This will help influence what features might be included or discussed in the future.

Resource Management in Concurrency Runtime – Part 3

In my previous blog post, I talked about the dynamic migration concept of the Concurrency Runtime’s (ConcRT) Resource Manager (RM). Today I will be demonstrating that concept in action and will focus on its performance characteristics.

A Demonstration of Dynamic Migration

The scenario that we are going to use to demonstrate the resource manager’s dynamic core migration concept involves two schedulers running different kinds of workloads in phases. The schedulers will be created with default policy on an 8 core machine (MinConcurrency=1, MaxConcurrency=8). The workload of schedulers in each phase will be characterized by one of the following:

i)                    Both schedulers have an equal amount of work

ii)                   One scheduler has work, the other one has no work

iii)                 One scheduler has more work than the other scheduler

Here are the phases and the recorded performance of the Resource Manager in detail:

Depicting Dynamic Migration

Phase1: In this phase both schedulers have equal amount of work. No dynamic migration happens and schedulers keep their proportionally allocated resources. Note that both schedulers have 4 resources.

Phase2: Only Scheduler1 has work here. This phase starts at 2.246s; RM reacts at 2.309s by allocating all resources to Scheduler1. Note that Scheduler1 has 8 resources of which one is shared with Scheduler2. That is because resource manager cannot take away cores below the MinConcurrency value and has to have at least 1 core assigned to Scheduler1. However, Scheduler1 has no work and the resource is idle therefore Scheduler2 makes use of it without interruption.

Phase3: This is similar to Phase2, this time only Scheduler2 has work. The phase starts at 4.508s; RM reacts at 4.602s by allocating all resources to Scheduler2.

Phase4: Here both schedulers have work but Scheduler1’s work is more than Scheduler2’s. The phase starts at 6.833s; RM first balances resource usage among schedulers at 6.911s taking into account the workload of each. This continues until Scheduler2 completes all of its work. At 8.112s all resources are allocated back to the Scheduler1.

Focusing on Dynamic Migration Reaction Time

In order to give more details on the dynamic migration reaction time, a zoomed in view of Phase4 will be examined.

Focusing on Reaction Time

Here the first core migration happens after 78ms where RM balances the resources between the two schedulers. This reaction time depends on a number of factors:

1)      The statistics polling interval in resource manager

All core migration decisions are done after resource manager polls the schedulers. With the current implementation poll interval is 100ms, therefore the reaction time can be anywhere between 0 and 100ms.

 

2)      The statistics from the schedulers

Statistics is the representation of scheduler workload in resource manager including task completion rate, incoming task rate and total task queue size. The faster the growth in statistics the faster the scheduler will get extra resources. This also implies that with a slower growth resource manager may delay allocation to the upcoming polls.

 

3)      The availability of resources

Resource manager will only migrate if there is an available resource. Resources are considered available if other schedulers have declared their resources idle or have declining statistics. In other words, resource manager’s reaction will be delayed until there is an available resource.

 

After rebalancing, the statistics don’t change until 8.112s therefore resource manager continues with the current allocation. Whenever Scheduler2 completes all work, all resources are allocated back to Scheduler1.

Next Steps and Feedback

I would love to hear feedback regarding the blog content or areas of interest with respect to Concurrency Runtime to have a guided focus on blogging.

Code Samples for the Concurrency Runtime, Agents Library and Parallel Pattern Library updated for Beta2

We’ve posted an update to our sample pack at http://code.msdn.com/concrtextras for Visual Studio 2010 Beta2 . The newest thing is this drop of the sample pack are the three new header files in the ConcRTExtras folder.  Here’s what these files contain:

ppl_extras.h contains additional stl style parallel algorithms like parallel_accumulate, parallel_partial_sum, parallel_transform, parallel_all_of, parallel_any_of, parallel_none_of and also parallel_for_fixed, parallel_accumulate_fixed, parallel_partial_sum_fixed.

agents_extras.h contains additional useful message blocks like spriority_buffer, bounded_buffer, alternator, join_transform and  a recalculate_buffer.

concrt_extras.h contains useful wrapper classes and functions from the runtime itself like concrt_suballocator a std::allocator built on Concurrency::Alloc, task_scheduler, schedule_group and schedule_task which are simple wrapper classes around Scheduler and ScheduleGroup that offer functor support like PPL for scheduling tasks.

These are located in the Concurrency::samples namespace and the team will be blogging about many of these over the coming weeks, but please feel free to ask any questions here or in the forums.

-Rick

What’s new in Beta 2 for the Concurrency Runtime, Parallel Pattern Library and Asynchronous Agents Library

Last week Visual Studio 2010 Beta was released for download. Since Beta1, the team has been pretty busy adding enhanced functionality to make you more productive at expressing parallelism in your applications and improving the quality and performance of our runtime and programming models.

Here’s a guide to what’s new in Beta2: we’ve added 2 new concurrent containers, significantly enhanced our online documentation and improved our debug experience by adding better visualizations and more intuitive views in the parallel debug window. We’ve also modified a small number of APIs which you should be aware of if you are using Beta 1.

Concurrent containers – concurrent_queue and concurrent_vector

We’ve alluded more than once to concurrent_queue and concurrent_vector in our videos and live talks, these are finally in the box and with them two new header files concurrent_queue.h and concurrent_vector.h.

concurrent_queue<T> is very similar to std::queue<T> and it offers push, try_pop interfaces and ‘unsafe’ iterators and size accessors (these aren’t threadsafe during concurrent pushes and pops).

concurrent_vector<T> is most similar to a std::vector<T> and it offers a push_back method that is internally synchronized across threads and allows efficient thread safe growth of the vector. Like std::vector, concurrent_vector has random access iterators, but unlike std::vector, the guarantee of contiguous storage is removed and there are no insert and erase methods.

The interfaces to concurrent_queue and concurrent_vector will be incredibly familiar if you are a user of Intel’s Threading Building Blocks (the interfaces are identical), and a very big thank you goes out to their team for their assistance with this.

Here’s a brief example that uses both std::queue and std::vector in a parallel loop:

#include <ppl.h>
#include <concurrent_vector.h>
#include <concurrent_queue.h>
#include <iostream>
using namespace Concurrency;
using namespace std;
int main()
{
concurrent_vector<int> odds;
concurrent_queue<int> evens;
parallel_for(0,100,1,[&odds,&evens](int i){
if (i%2 == 0)
evens.push(i);
else
odds.push_back(i);
});
cout << "We expect 100 items: " << evens.unsafe_size() + odds.size() << endl;
}

Debugging enhancements in Beta2

VS 2010 Beta 2 also now includes significant enhancements for parallel debugging. The locals window now includes visualizers for all first class objects in the PPL, the Agents Library so now when you a look at an instance of a concurrent_queue or an ubounded_buffer in the locals window they both look a lot like a std::queue instead of exposing implementation details. The Parallel Tasks and Parallel Stacks windows have also been enhanced; in addition to those videos, try the C++ code in the MSDN walkthrough and Daniel Moth’s blog. Finally, like the rest of the C Runtime, we’ve made our source code for the Concurrency Runtime available as part of the install, so if you need to debug deeper or really want to see how things work internally, you can now.

Documentation updated with more How To’s and walkthroughs

Our offline and online documentation has been significantly updated for Beta2. We’ve added multiple conceptual topics and expanded our How To topics significantly to include information not just on PPL and the Agents Library but on the underlying Concurrency Runtime and how to manage and use scheduler instances. Any feedback on these topics is greatly appreciated.

API updates to task_group and agent

For Beta 2, we’ve made a very small number of API updates to our task_group, structured_task_group and to our agent classes. The change to the agent is simple describe, we’ve simplified the state management and removed the parameter for the agent::done method, it’s sufficient to call this->done() instead of this->done(agent_done).

The change to task_group and structured_task_group is additive, we’ve added the method run_and_wait which takes a functor and runs it inline on the current thread or task. This offers the major benefit of being able to compose tasks and nest their cancellation. One of the easiest ways to see this in action is through implementing a parallel search algorithm, like a parallel version of the new C++0x library function ‘all_of’:

template<class InIt,class Pr>

inline bool parallel_all_of(InIt first, InIt last, Pr pred)
{

typedef iterator_traits<InIt>::value_type Item_type;

//create a structured task group
structured_task_group tasks;

auto for_each_predicate = [&pred,&tasks](const Item_type& cur){

if (!pred(cur))
tasks.cancel();
};

auto task = make_task([&](){
parallel_for_each(first,last,for_each_predicate);
});

return tasks.run_and_wait(task) != canceled;
}

Here we are placing a call to parallel_for_each inside of a structured_task_group and cancelling the work when the predicate is not true. This has the effect of cancelling all nested tasks in the structured_task_group, potentially saving work if the predicate is long running and expensive to compute. We can use this in our example above like this (but don’t expect to see significant speedups over std::all_of for this example):

if (parallel_all_of(odds.begin(),odds.end(),[](int i){return i % 2;}))
cout << "success!" << endl;
So try Beta 2 if you haven’t already
That’s about it for now, so download VS 2010 Beta 2 if you haven’t already and if you’re at PDC or TechEd Europe next month stop by our booth or come to one our talks.

-Rick

Resource Management in Concurrency Runtime – Part 2

In my previous blog post, I talked about key concepts of the Concurrency Runtime’s (ConcRT) resource manager, starting with a definition of a resource. I then explained why an application might be composed of a number of scheduler instances. Eventually I mentioned how the resource manager helps in resource allocation to the schedulers in order to improve performance and increase utilization of hardware. Specifically, the ‘initial allocation’ algorithm was mentioned, which defines the resource allocation done at the time of scheduler initialization.

Today I will be talking about ‘dynamic migration’ which is another key concept of the resource manager. I will provide details on how the resource manager deals with resource utilization dynamically as the workload of schedulers change.

Dynamic Migration

Dynamic migration can be defined as moving resources from one scheduler to another in order to improve CPU utilization.

As an example, consider the scenario on an 8 way machine where there are two schedulers (let them be S1 and S2) created with MinConcurrency = 1 and MaxConcurrency = 8 (Please refer to this for a description of the scheduler policy values that involves resource management). As mentioned as part of the initial allocation algorithm, after S1 and S2 are created the resources will be shared proportional to the MaxConcurrency and they will both have 4 resources:

 

Until now this is all initial allocation. After this point, resource manager will start collecting information about the workload of the schedulers by polling periodically. At each poll the resource manager will capture the state of each scheduler instance with the IScheduler::Statistics() method. Let me note that the time a scheduler instance registers to the resource manager, it has to provide an implementation of IScheduler interface so that resource manager can call on that interface. The IScheduler::Statistics() method is expected to return three measurements calculated by the scheduler instance itself:

  1. Number of completed tasks since last statistics call
  2. Number of incoming tasks since last statistics call
  3. The total size of all work queues

The resource manager will keep a history of these measurements and derive a metric for scheduler workload for each scheduler. A statistically significant change in that metric (roughly speaking a positive or negative change as a factor of the standard deviation) will signify to the resource manager that the scheduler either needs one or more additional resources or can give away one or more. The bigger the change the more resources will be added or removed.

After polling each scheduler, the resource manager will have a list of schedulers (L1) that can give away a number of resources and a list of schedulers (L2) that needs a number of resources. Next thing to do then for all schedulers in L1, starting from the one with the minimum resource allocation priority, is to remove extra resources and re-allocate them to the schedulers in L2 starting with the highest priority. It is important to note that if L2 is empty, then schedulers in L1 will still keep their resources.

Let’s return back to the example above and assume that the resource manager has decided S1 can give away 2 resources and S2 can make use of 2 resources. The resource manager will inform S1 to give away 2 resources, oversubscribe the hardware threads with 2 more resources and inform S2 that it can now use the extra resources. It is expected that S1 will obey the protocol and release the resources as soon as possible. After this protocol completes, the allocation will be as follows:

More Details on Dynamic Migration

We covered the general idea of dynamic migration. Let us walk through some what-if scenarios. 

1. What happens if S1 has no work at all and S2 is still in need of resources, does S1 fall back to zero resources?

As a general rule of resource allocation, the resource manager will never allocate fewer resources to a scheduler than its MinConcurrency policy value. Therefore S1 will at least have 1 resource assigned. However since there is no work for S1, S1 will put that resource into inactive mode indicating to the resource manager that the resource is idle. Knowing this, the resource manager will share the underlying hardware thread by oversubscribing and providing another resource to S2.

2. What would happen if S1 shuts down and S2 is still in need of more?

All resources of S1 will become available. Resources will be allocated to S2 (not more than MaxConcurrency of S2) soon after.

3. On a machine with NUMA topology, is locality considered?

Locality information is used after the resource manager decides which scheduler is going to give away resources and which scheduler is going to take more (note that the schedulers are selected from the L1 and L2 list with respect to their allocation priority – therefore locality is less important compared to allocation priority). Let S1 (an element of L1) have R1 resources to give away and S2 (an element of L2) be in a need of R2 resources. If R1 is greater than R2 then the subset of resources to be allocated to S2 will be selected with respect to locality. Here is an example:

Assume this is the allocation layout before dynamic migration where Node1 and Node2 are NUMA nodes having 4 cores each. Let S1 be in need of 2 resources and S2 be able to give away 4. The resource manger will select two resources from S1 close to the ones in S2. Since resources of S2 (Core1 and Core3) are closer to Core2 and Core4 (as cores 1 to 4 are in the same node), the resource manager will migrate Core2 and Core4 to S2. After dynamic migration this would be the layout:

 

Next Steps and Feedback

Next I will demonstrate dynamic migration and focus on its performance characteristics. I would love to hear feedback regarding the blog content or areas of interest with respect to Concurrency Runtime to have a guided focus on blogging.

Asynchronous Agents Library - Intro to Message Blocks

In my previous post I talked about the agent class. Now I will introduce the Agents Library’s message blocks, how to use them, and the fundamentals of what they do. I will cover some basics that apply to all of the message blocks, introduce the messaging APIs, and then specifically explain three blocks: unbounded_buffer, overwrite_buffer, and single_assignment.

Message Blocks

In the Agents Library we have created a set of interfaces and defined a protocol for message blocks to communicate and exchange messages. Message blocks are intended to be used to establish defined communication protocols between isolated components and develop concurrent applications based on data flow. The message blocks provided by the Agents Library can be used in conjunction with the agents class itself or separately.

For more information on data flow - http://en.wikipedia.org/wiki/Dataflow

ISource and ITarget Interfaces

ISource and ITarget are base interfaces message blocks work with. They declare functions for handling linking and exchanging messages. A source block implements ISource, a target block ITarget, and a propagator block, a block that is a source and a target, implements both. In this blog I will only be discussing the functions needed to use the message blocks provided by the Agents Library. A more comprehensive knowledge of the functions in the ISource and ITarget interfaces are required when creating your own message blocks; this will be discussed in a later post.

Linking/Unlinking and Creating Messaging Networks

Often it is desirable to link message blocks together to create a network. One such reason for building messaging networks is to create data flow pipelines.  The following three functions, declared on the ISource interface, are used to connect source blocks to target blocks:

void link_target(ITarget<_Type> * _PTarget); – adds a link to the specified target. This causes the source block to offer any of its messages to the target until it is unlinked.

void unlink_target(ITarget<_Type> * _PTarget); – removes an existing link with the specified target, once unlinked no more messages will be presented to the target.

      void unlink_targets(); – removes all links with any target blocks.

Here is a simple example connecting and then disconnecting two unbounded_buffers:

            unbounded_buffer<int> buffer1, buffer2;

      buffer1.link_target(&buffer2);

      ...

      buffer2.unlink_target(&buffer2);

Some blocks have restrictions on the number of targets allowed; invalid_link_target exception is thrown in these cases.

Basic Message Propagation

Messages are exchanged between blocks using light weight tasks (LWTs) on a scheduler, because of this message propagation works cooperatively with any other work in the same scheduler. This means long running tasks that never block or yield can slow down or cease the forward progress of message delivery. Thus is the nature of working in a cooperative environment.

All of the message blocks built into the Agents Library guarantee in-order message delivery. It is possible to create your own blocks which do not preserve order, however all of built in ones do.

Messaging APIs - send, asend, receive, and try_receive

Once creation of messaging networks and propagation of messages is understood the only other thing needed to start programming is how to insert and remove messages directly from individual blocks.

Two global functions are used to create and insert messages:

            bool send(ITarget<_Type> &_Trg, const _Type &_Data);

      bool asend(ITarget<_Type> &_Trg, const _Type &_Data);

Each of these takes a target and the data to transmit. Send synchronously originates a message with a target, whereas asend asynchronously does. This means a send call will block until the target either takes the message or declines it. Send returns true if the message was delivered and false otherwise. Asend will not block until the target takes the message, it offers the message and immediately returns. A return value of true means the target has accepted the message and will eventually take it, otherwise false means the target either declined the message or postponed the decision on whether or not to take it until later.

Likewise here are the two global functions for removing or extracting messages:

template <class _Type>

_Type receive(ISource<_Type> & _Src, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);

 

template <class _Type>

bool try_receive(ISource<_Type> & _Src, _Type & _value);

 

Receive takes a source block to extract a message from and an optional timeout, the extracted value is the return value. If a message is currently not available in the source then receive will block until one is, optionally a timeout also may be specified. Correspondingly try_receive will only obtain a message if the source has one at that instance, otherwise it returns immediately. A return value of true on try_receive indicates a message was received, false means one was not.

All of these functions when blocking do so cooperatively with the Concurrency Runtime. To learn more about working cooperatively with the Concurrency Runtime take a look at the series of posts on Synchronization with the Concurrency Runtime.

unbounded_buffer

Unbounded_buffer is one of the most basic messaging blocks; it acts very similar to a queue. As its name suggests unbounded_buffer can store any number of messages, limited only by memory, collected from its source links (links to blocks that have the unbounded_buffer as a target). Unbounded_buffer always accepts all messages offered to it. Messages propagated to an unbounded_buffer are collected into a queue and then offered one at a time to each of its targets. Each message in an unbounded_buffer will only be given to one of its targets based on link ordering. This means targets of an unbounded_buffer compete for messages.

Unbounded_buffer provides two utility functions:

            bool enqueue(_Type const& _Item);

      _Type dequeue();

Each of these is equivalent to send and receive respectively, and basically are wrappers around them.

unbounded_buffer<int> buffer;

      // These are equivalent.

      buffer.enqueue(1);

      send(buffer, 1);

 

      // And so are these.

      int value = buffer.dequeue();

      int value = receive(buffer);

Unbounded_buffers are excellent for producer/consumer patterns. In a previous post Introduction to Asynchronous Agents Library the FindString agents sample makes use of unbounded_buffers to communicate between the individual agents.

overwrite_buffer

Essentially overwrite_buffer is a simple broadcaster. Overwrite_buffer is a message block that holds one message at a time, very similar to a variable. Every time an overwrite_buffer receives a message it offers a copy of it to any of its targets and then stores the message internally, replacing any previously stored message. The important thing to note here is there is no competition for data. Every time a message comes into an overwrite_buffer it is offered to all of its targets, then afterwards it can be overwritten at any point.

Overwrite_buffer also provides two utility functions:

bool has_value() const ;

_Type value();

Has_value returns true or false indicating whether or not the overwrite_buffer has received its first message. Value is a wrapper around receive. Has_value can be used to check if overwrite_buffer has a message, if overwrite_buffer does then calling value or receive will not be a blocking call.

Uses of overwrite_buffer include tracking state, continuously monitoring status, or broadcasting messages. In the Agents Library the agent class takes advantage of this using an overwrite_buffer internally to track its state. Calling the start, cancel, done, and wait functions work with agent’s the internal overwrite_buffer.

single_assignment

Single_assignment behaves very similar to overwrite_buffer, except it will only accept one message from any of its sources. Once single_assignment accepts a message all subsequent offered messages will be declined. Just like overwrite_buffer, single_assignment gives a copy of the message to each of its targets; there is no competition for data.

Single_assignment provides the following two utility functions:

            bool has_value() const ;

      _Type const & value();

These perform exactly same as in overwrite_buffer.

Single_assignments are useful when a single value is read by many, similar to a const variable. A single_assignment can also be used to pick the first available message from a group of blocks. The offered message will be accepted and all others will be declined. Used in this form single_assignment can act as a choice or chooser from a group of blocks.

In following posts I will introduce more of the message blocks and provide sample applications.

Synchronization with the Concurrency Runtime - Part 3

In my previous posts, I addressed the motivation behind using concurrency runtime’s synchronization primitives and also introduced Critical Section and reader writer lock. In this blog, I will cover concurrency runtime’s event.

Event

This is a bi-state type class which, unlike Critical Section or Reader Writer Lock, does not protect access to shared data. Events synchronize flow of execution and use concurrency runtime’s facilities to enable cooperative schedule of work. They behave similar to Win32 manual-reset event. The main difference between the concurrency runtime’s event and Win32 event is that the concurrency runtime’s event are designed to cooperatively yield to other cooperative tasks in the runtime when blocked in addition to preempting whereas Win32 events are, by design purely pre-emptive in nature.

 

Example:

 This example enables the scheduler to create two threads and then calls DemoEvent function that takes event class and a Win32 manual-reset event class as template parameters. The Demo function first creates several tasks that simulate some work and then wait for a shared event to become signaled.

 

Special thanks to Rick Molloy for sharing this example.

// event.cpp : Defines the entry point for the console application.

//

// compile with: /EHsc

#include <windows.h>

#include <concrt.h>

#include <concrtrm.h>

#include <ppl.h>

 

 

using namespace Concurrency;

using namespace std;

 

class WindowsEvent

{

    HANDLE m_event;

public:

    WindowsEvent()

        :m_event(CreateEvent(NULL,TRUE,FALSE,TEXT("WindowsEvent")))

    {

    }

 

    ~WindowsEvent()

    {

        CloseHandle(m_event);

    }

 

    void set()

    {

        SetEvent(m_event);

    }

 

    void wait(int count = INFINITE)

    {

        WaitForSingleObject(m_event,count);

    }

};

 

template<class EventClass>

void DemoEvent()

{

    EventClass e;

    LONG volatile taskCtr = 0;

 

    //create a taskgroup and schedule multiple copies of the task

    task_group tg;

    for(int i = 0;i < 8; ++i)

        tg.run([&e, &taskCtr]{

            //Simulate some work

            Sleep(100);

 

            printf_s("\tTask %d waiting for the event\n", InterlockedIncrement(&taskCtr));

 

            e.wait();

    });

 

    Sleep(1000);

 

    printf_s("  Setting the event\n");

 

    //Set the event

    e.set();

 

    //wait for the tasks

    tg.wait();

}

 

void main ()

{

    // Create a scheduler that uses two threads.

    CurrentScheduler::Create(SchedulerPolicy(2, MinConcurrency, 2, MaxConcurrency, 2));

 

    printf_s("Cooperative Event\n");

    DemoEvent<event>();

 

    printf_s("Windows Event\n");

    DemoEvent<WindowsEvent>();   

}

 

Sample Output:

Cooperative Event

        Task 1 waiting for the event

        Task 2 waiting for the event

        Task 3 waiting for the event

        Task 4 waiting for the event

        Task 5 waiting for the event

  Setting the event

Windows Event

        Task 1 waiting for the event

        Task 2 waiting for the event

  Setting the event

        Task 3 waiting for the event

        Task 4 waiting for the event

        Task 5 waiting for the event

We observe that when using cooperative event, we execute all 5 tasks, each task when waiting on the event that is not set, cooperatively yields so that another task can be scheduled and run in the thread’s quantum. When using windows event, we observe that the 2 tasks scheduled block the thread until the event is set.

Introduction to Asynchronous Agents Library

One of the native concurrency components coming in Visual Studio 2010 is the Asynchronous Agents Library. The goal of the Agents Library is to improve developer productively and enable developers to take advantage of concurrency with an agent-based message passing model to build isolated and composable components. In this blog, I will provide an introduction to the features available in the Asynchronous Agents Library and go into more detail about the agent class itself with an example program.

An Agent Based Model

Programming with an agent-based model allows concurrency to be inherently baked into the program from the start. Separating parts of a program into composable and reusable pieces that follow well-defined boundaries to communicate with message passing can tolerate latencies and effectively use parallel resources. By avoiding sharing memory when possible and focusing on data dependencies, scaling performance can be obtained using higher level abstractions, like agents, for parallelism. For more information on agent based models take a look here:

http://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts

Overview of Agents Library Features

The Agents Library can be broken down into three basic parts:

The Agent Class – The agent class itself is intended for course grained parallelism/components that handle larger computationally intensive tasks or collections of smaller tasks. Fundamentally, agents are tasks that have an observable lifecycle and communicate with other agents by using message passing. Agents are NOT intended to be used for fine-grained parallelism; for that, the patterns and constructs in the Parallel Patterns Library are better suited.

Message Blocks – To enable mechanisms for communicating between asynchronous agents and isolated program components, we provide a collection of various message blocks:

  • unbounded_buffer
  • overwrite_buffer
  • single_assignment
  • call
  • transformer
  • timer
  • join
  • multitype_join
  • choice

For example, an unbounded_buffer works very similar to a queue. It collects messages from each of its source blocks and offers them in order to its target blocks.

In addition, infrastructure for easily creating your own message blocks is included.

Coordination Primitives – For inserting and removing messages from a group of messaging blocks (a messaging network), we include a set of APIs:

  • send
  • asend
  • receive
  • try_receive

In later blogs, I will dive into the specifics of each message block and coordination primitive explaining how and when to use them.

Agent Class in More Detail

Essentially an agent is a light weight task (LWT) with observable state and cancellation. The lifecycle of an agent is as follows:

AgentStateDiagram

To see how an agent passes through the states, let’s look at the simplest case creating, starting, and waiting on an agent:

    MyAgent myagent();

myagent is now in the ‘agent_created’ state.

    myagent.start();

Calling start() moves the agent to ‘agent_runnable’.

    agent::wait(&myagent);

Wait blocks until myagent reaches one of the final states (agent_cancelled, agent_done, or agent_faulted).

The agent class contains a protected pure virtual run() method which is called when the agent is executed:

    virtual void run() = 0;

When start() is called a light weight task is scheduled and the function will immediately return; once there are available resources the scheduler will start executing the agent task, which will call the run() method when it executes. When an agent is finished, it must call the done() function to signal completion of its work. Commonly, done() is called at the end of the run() function; however, in some circumstances, it may be desirable to decouple the lifetime of the agent from the execution of the run method, such as if the agent does work asynchronously.

A single agent can be waited on by using the wait function, or multiple agents using the wait_for_one and wait_for_all functions on an array of agents. Waiting on an agent is one way of observing an agents state or status. The two following methods allow for directly querying the status and obtaining a message block to receive from:

    agent_status status(); 
    ISource<agent_status> * status_port();

Agents also can be canceled:

    bool cancel();

If an agent has not started yet and cancel is called it will not be run and its status will be canceled, however if the agent has already started then calling cancel has no effect. Already started agents can’t be stopped.

Agents Sample:

To better illustrate agents, consider this implementation of a find string. The program works by searching a directory recursively for files matching an expression and then searching each file for a given string, very similar to ‘findstr’. The program is separated into several agents with specific jobs. One agent is for locating matching files, several for parsing the files looking for the specified string, and one for writing out found matches to the console. This sample also uses unbounded_buffer, asend, and receive.

FindStringDiagram

FileFinder.h:

This agent takes a string containing the type of files to search for. Each matching file found is sent to an unbounded_buffer m_pFileNames which the FileReader agents receive from.

Here is the run() method. It searches each directory recursively for matching files. A special end message is also used at the end to signal there are no more files. Notice at the end of run() the done() function is called.

void run()
{
    wchar_t currentDirectory[MAX_FILE_NAME] = L".\\";
    FindFilesRecursively(currentDirectory);

    // Send END_MSG to signal done processing.
    asend(m_pFileNames, (wchar_t *)END_MSG);
    done(agent_done);
}

Here is the code where matching file names are sent to the unbounded_buffer:

while(moreFiles)
{
    // Copy file name, receiver will be responsible for cleanup.
    pFileName = new wchar_t[MAX_FILE_NAME];
    wcscpy_s(pFileName, MAX_FILE_NAME, pDirectory);
    wcscat_s(pFileName, MAX_FILE_NAME, findFileData.cFileName);

    //
    // Send the filename to the unbounded_buffer.
    //
    asend(m_pFileNames, pFileName);
    moreFiles = FindNextFile(hFind, &findFileData);
}

FileReader.h:

The FileReader agent keeps receiving file names to parse from an input unbounded_buffer and sends any matches to another unbounded_buffer which the ConsoleWriter will receive from.

Here is the run() method. Notice the while loop that keeps receiving messages, until the end message is reached, from the same unbounded_buffer the FileFinder agent was using.

void run()
{
    wchar_t *pFileName;

    //
    // Repeatedly pull filename messages out of the 
    // unbounded_buffer to parse until the END_MSG is 
    // reached. Note this will ConcRT aware block until
    // a message is avaliable in the buffer.
    //
    size_t currentLineNum;
    const size_t lineSize = 2000;
    wchar_t line[lineSize];
    while((int)(pFileName = receive(m_pFileNames)) != END_MSG)
    {
        currentLineNum = 1;
        wifstream inputFile;
        inputFile.open(pFileName, ifstream::in);
        while(inputFile.good() 
            && inputFile.getline(line, lineSize))
        {
            if(wcsstr(line, m_pSearchString) != NULL)
            {
                //
                // Create a new message payload and send it to 
                // the unbounded_buffer for the ConsoleWriter 
                // to receive from.
                //
                asend(m_pFoundBuffer, new Payload(pFileName, 
                    wcsnlen(pFileName, MAX_FILE_NAME)+1,
                    currentLineNum,
                    line,
                    wcsnlen(line, lineSize)+1));
            }

            ++currentLineNum;
            line[0] = '\0';
        }
        inputFile.close();
        delete pFileName;
    }

    // Resend the END_MSG for any other FileReaders.
    asend(m_pFileNames, (wchar_t *)END_MSG);
    done(agent_done);
}

ConsoleWriter.h:

This agent receives messages containing information about found matches and prints the information to the console.

Here is a snippet from its run method. It loops receiving messages from the unbounded_buffer the FileReaders send to.

//
// Keep receiving messages to print to the console until
// the END_MSG is received. Note this will ConcRT aware
// block until a message is avaliable in the buffer.
//
while((int)(pPayload = receive(m_pFoundBuffer)) != END_MSG)
{
    SetConsoleTextAttribute(hConsole, FOREGROUND_BLUE 
        | FOREGROUND_GREEN | FOREGROUND_RED | FOREGROUND_INTENSITY); 
    printf("%ls:%lu:", pPayload->m_pFileName, 
        (unsigned long)pPayload->m_lineNumber);
    SetConsoleTextAttribute(hConsole, FOREGROUND_BLUE 
        | FOREGROUND_GREEN | FOREGROUND_RED);
    printf(":%ls\n", pPayload->m_pLine);
    delete pPayload;
}

Payload.h:

Defines a class used to pass messages between FileReaders and the ConsoleWriter. It holds the file name a match was found in, the line number, and the line itself.

FindString.cpp

Driver of the program creates, starts, and then waits on all the agents.

Here is the code to create the unbounded_buffers:

    unbounded_buffer<wchar_t *> fileBuffer;
    unbounded_buffer<Payload *> foundBuffer;

Creation of the agents:

// Agent to find files.
FileFinder fileFinder(pFilePattern, &fileBuffer);
fileFinder.start();

// Agents to handle parsing files.
FileReader **ppFileReaderAgents = new FileReader*[numAgents];
for(unsigned int i = 0; i < numAgents; ++i)
{
    ppFileReaderAgents[i] = new FileReader(&fileBuffer, 
        pStringPattern, &foundBuffer);
    ppFileReaderAgents[i]->start();
}

// Agent to handle writing to the console.
ConsoleWriter consoleWriter(&foundBuffer);
consoleWriter.start();

Waiting on the agents to finish:

// Wait for agents to finish.
agent::wait(&fileFinder);
for(unsigned int i = 0; i < numAgents; ++i)
{
    agent::wait(ppFileReaderAgents[i]);
    delete ppFileReaderAgents[i];
}
delete [] ppFileReaderAgents;

// Now that all other agents have finished signal the ConsoleWriter
// it can finish.
send(foundBuffer, (Payload *)END_MSG);
agent::wait(&consoleWriter);

In this program, I created agents to each handle large chunks of isolated work and I/O. Notice how parallelism is built in to the program by using higher levels of abstraction from the beginning. I created a pipeline of agents that work concurrently together with almost no knowledge of each other. Each agent takes in messages, performs some action, and then accordingly sends a message. Since each agent can be running at the same time I can exploit any available cores on the system; notice how there are many FileReader agents used, this means the program will be parsing multiple files concurrently at the same time. By designing my program in this fashion I can easily add, remove, or modify individual agents without needed to restructure anything else. I also avoided having to deal explicitly with locks and other difficult forms of synchronization.

To see all the code and play around with this sample download the project and source files here:

Code Samples

Example command line execution of this program is:

FindString.exe agents *.txt

In future posts, I will go into more detail about each message block the Asynchronous Agents Library provides, how to use them, the coordination primitives, and how to take advantage of our infrastructure and easily construct your own message blocks.

Samples posted for the Parallel Pattern Library and Concurrency Runtime

Following the release of Visual Studio 2010, we've just published a set of sample applications for using the Parallel Pattern Library, the Agents Library and the Concurrency Runtime on code gallery.  These supplement the documentation and samples provided on msdn for Beta1.

The samples include using task parallelism with PPL, a few based examples of using the Agents Library (Dining Philosophers, an example using choice) and a find in files example (which will be blogged about here in the next day or so).  There are also some examples of using the Concurrency Runtime itself, a simple event based application demonstrating cooperative blocking and UMS threads and a task based example (fibonacci) which shows the Concurrent Suballocator.

As always your feedback here or in the forums is welcome.

-Rick

Auction Simulation written in the Asynchronous Agents Library

One of our architects, Niklas Gustafsson has just posted an actor based auction simulation written in the Asynchronous Agents Library. If you follow the blog trail back, you'll see that this was implemented originally in scala and that Matthew Podwysocki has implemented this in F#. Niklas also alluded to, but hasn't shared yet an implementation built in Axum.  I find it fascinating to compare and contrast the implementations for similarities and differences.

It’s also worth stressing his concluding remarks here which really highlight why we’re providing an asynchronous message passing and actor based programming model:

As a parting thought, I want to draw your attention to one simple fact: we just wrote an application with 17 agents working together on a problem in parallel, interacting in non-trivial patterns updating state and arriving at a shared understanding of “their world” without a single lock, mutex, or critical section.

The ability to write such complex and concurrent code without dealing with the headaches of managing synchronization primitives is incredibly exciting (at least to me it is), so go check this out.

-Rick

Implementing Dining Philosophers with the Agents Library

The latest issue of msdn magazine includes an article that I wrote which illustrates implementing the Dining Philosophers purely in message passing and without using any explicit locking.  If you have Visual Studio 2010 Beta1 installed, you can also download the code and build the project, even if you don't you can browse the code online.

For folks that are curious about how locks are avoided, I'll state that the chopsticks themselves are messages and rather than the classic solution with semaphores, the philosopher uses the join message block to receive both messages from the table when it is time to pick them up. i.e. the code looks like this:

vector<Chopstick*> PickupChopsticks()

{

    //create the join

    join<Chopstick*,non_greedy> j(2);

    m_LeftChopstickProvider->link_target(&j);

    m_RightChopstickProvider->link_target(&j);

 

    //pickup the chopsticks

    return receive(j);

}

I'd encourage you to read the article and provide feedback either here or in our forums

-Rick

What's new in the Concurrency Runtime and the Parallel Patterns and Asynchronous Agents Libraries

Visual Studio 2010 Beta1 has been released, and it is a full install version.  The team has been busy, busy busy since the CTP release last fall to deliver most of the APIs and objects we’ve blogged about here into Beta1.  So I wanted to take a moment to summarize what’s new in the Beta for the Parallel Pattern Library, the Asynchronous Agents Library and the Concurrency Runtime. 

You can find the official reference documentation and walkthroughs in the MSDN Library here.

What’s new in the Parallel Patterns Library

The Parallel Patterns Library (PPL) provides an imperative programming model that promotes scalability and ease-of-use for developing concurrent applications.  In general the library’s surface area remains largely unchanged, with a few high impact additions.  You will also find it more stable, more robust, faster, and more scalable – lots of goodness!

What’s new in the Asynchronous Agents Library

The Asynchronous Agents Library (or just Agents Library) provides a programming model that enables you to increase the robustness of concurrency-enabled application development. The Agents Library is a C++ template library that promotes an actor-based programming model and in-process message passing for fine-grained dataflow and pipelining tasks. The Agents Library builds upon the scheduling and resource management components of the Concurrency Runtime.

Here’s a summary of what’s we’ve added and revised in the Agents library:

  • Added the choice, join and multitype_join message blocks to allow users to wait on a set of messages; choice waits for any message, join waits for all messages of a single type, and multitype_join waits for all messages and allows for multiple different types.
  • Renamed the transform message block to transformer (we didn’t want Concurrency::transformer to conflict with std::transform)
  • Added functor support to the pipeline message blocks call and transformer
  • Building custom message blocks is now easier due to some refactoring. 

What’s new and improved in the Concurrency Runtime itself

The underlying Concurrency Runtime has undergone significant feature improvements and performance enhancements, many of which we have been discussing already on this blog.  These changes will have improvements to everything built on top of the runtime, including the Parallel Patterns Library and the Agents Library.

Call to Action

Download and install Microsoft Visual Studio 2010 Beta 1 today.  Impress your friends and coworkers by being among the first to learn and use our libraries.  Provide early and critical feedback on our forum that will shape the way that these libraries are ultimately released. 

 

See this blog post on the value of your feedback:  On Achieving Perfection –or– Why We Love Your Feedback (and Why You Can Love Giving It).

 

Happy coding!

 

Debugging PPL Tasks

You have read several articles here about how the Microsoft Concurrency Runtime enables task-based parallelism.  Now Daniel Moth has an article on his blog that describes the debugging functionality offered by VS 2010's Parallel Tasks window.  The parallel tasks window works with both native PPL tasks and with .NET TPL tasks.  Check it out. 

More Posts Next page »
Page view tracker