As a user of Visual Studio, you might have had a chance to use the concurrent data structures introduced in the Beta 2 release – concurrent_vector<T> and concurrent_queue<T>. A detailed discussion about the design, usage, and methods of concurrent_queue can be found here. A similar discussion of concurrent_vector will be posted in the near future.
In this post, we will go over the inner workings of these two containers, specifically, how they are laid out in memory.
concurrent_vector
The concurrent_vector class can be thought of as a parallel version of std::vector that allows thread safe growth. However, unlike std::vector, iterators to concurrent_vector are not invalidated when its storage is grown, and it does not support the pop_back() operation.
In std::vector, storage is dynamically allocated, and its elements follow a strict linear sequence. Due to the fact that elements are guaranteed to be in contiguous memory locations, they are all relocated to new locations when the container grows. However, concurrent_vector does not make the guarantee that all elements are stored in contiguous memory locations. Consequently, existing elements are not relocated when more memory needs to be allocated. Instead, elements are stored in a series of different contiguous arrays of elements. When an empty concurrent_vector is instantiated, no memory is allocated to start with. As elements are added to the container, arrays of increasing sizes are allocated as and when required. The sizes of the allocated arrays are always powers of 2, and increase as the container grows.
If you look at the raw view of concurrent_vector in a debugger, you will see that it contains a segment table member called _My_segment, which looks like an array of arrays. (To enable raw view, go to Tools -> Options -> Debugging and check the “Show raw structure of objects in variables windows” checkbox). In the segment table, _My_segment[0] points to the first 2 elements of the container, _My_segment[1] to the next 2 elements, _My_segment[2] to the next 4 elements, _My_segment[3] to the next 8 elements, etc. See Figure 1 (not to scale).
Figure 1
This arrangement of the segment table is independent of the sizes of the actual storage arrays contained within concurrent_vector, and is simply a logical view of the elements. In other words, the sizes of the logical arrays as seen by the segment table are always 2, 2, 4, 8, 16, 32, 64, 128, etc. However, the sizes of the actual arrays allocated from the heap may very well be 16, 16, 32, 64, 128, etc. The size of the first array is determined by the first reservation, growth or assignment operation.
For example, take a concurrent_vector<int> v that is instantiated with an initial size of 10. The actual size of the first array is the lowest power of 2 that can fit 10 elements, i.e., 16. As the container grows, the sizes of the subsequent arrays that are allocated are 16, 32, 64, etc. (increasing powers of 2). The segment table of v is populated such that _My_segment[0] points to the first 2 elements of the first array, _My_segment[1] points to the next 2 elements of the same first array, _My_segment[2] points to the next 4 elements, again in the first array, _My_segment[3] points to the next 8 elements, still in the first array, _My_segment[4] points to the second array (of size 16), _My_segment[5] points to the third array (of size 32), etc. See Figure 2.
Figure 2
When shrink_to_fit() is called, the storage is defragmented and the internal layout is optimized. However, it is not guaranteed that all elements of v are in one contiguous array. When clear() is called, all arrays are deallocated.
This, in short, is how the elements of concurrent_vector are laid out in memory.
concurrent_queue
concurrent_queue can be thought of as a parallel version of std::queue that offers thread safe addition and removal of elements. The elements contained in a concurrent_queue are held in blocks of memory called pages. As elements are enqueued and dequeued from the container, pages are added and deleted. The size of a single page and the number of elements a page can hold vary with the size of the data structure being held. See figure 3.
Figure 3
The concurrent_queue class is implemented as an array of 8 micro-queues, where each micro-queue is a linked list of pages. This layout is illustrated in Figure 4. Take the example of a concurrent_queue<int> q, to which we enqueue integers in the order 0, 1, 2, 3, etc. such that q[i] is equal to i. When q is instantiated, it is empty and no pages are allocated at first. As elements are enqueued, they are added to the first available spot in the first available page in a specific micro-queue. The formula used to select which micro-queue the ith element is added to is i*3%8. Therefore, element 0 is added to the micro-queue at _Array[0], element 1 to _Array[3], element 2 to _Array[6], element 3 to _Array[1], and so on. If there is no available page in the micro-queue that was selected, a new page is allocated and linked to the end of the micro-queue. The _Page data structure has a member variable that keeps track of the next page in the micro-queue (denoted by N in Figure 4). For a concurrent_queue<int>, the number of integers stored in each page is 32 (from Figure 3). Since the number of micro-queues in a concurrent_queue is always fixed at 8, the first layer of pages in q will hold the first 256 integers that are enqueued (0 through 255). When more integers are added to q, a new layer of pages is allocated.
Figure 4
There are 2 member variables in concurrent_queue that keep track of the number of elements enqueued and dequeued through its lifetime –_Tail_counter and _Head_counter. When an element is enqueued, _Tail_counter is incremented. When an element is dequeued, _Head_counter is incremented. The number of elements in the queue at any given time is the difference between _Tail_counter and _Head_counter.
When enough elements are dequeued from q such that a particular page becomes empty, the page is destroyed and its memory is deallocated. In the above example, when elements 0 – 247 are dequeued, each page in the first layer of pages ends up containing exactly 1 element. When element 248 is dequeued, the first page in _Array[0] becomes empty, and is therefore deallocated. When element 249 is dequeued, the first page in _Array[3] is deallocated. See Figure 5.
Figure 5
As more elements are enqueued and dequeued from q, new pages are added and older pages are deallocated. This, in short, is how concurrent_queue is laid out in memory.
A note on visualizers
So far, we have discussed the internals of concurrent_vector and concurrent_queue. Chances are that when you are using one of them in a program, you are less interested in the details of implementation of the data structure, and more interested in seeing the elements contained within them, similar to how you would see std::vector or std::queue under a debugger in the locals window. In the Beta 2 release, we have included debugger visualizers for concurrent_vector and concurrent_queue so that they appear like their corresponding STL data structures.
That’s it for now. Feel free to post a comment if there’s something more you would like to know about concurrent data structures or visualizers in Visual Studio 2010.
- Raghu Simha.

Figure 1: A screen shoot on a 16 core Windows 7
This blog is about an end-to-end scenario for the Concurrency Runtime and Visual Studio 2010, involving building a financial application, and measuring and tuning its performance.
The user scenario for this application is that an investor is looking for a subset of instruments in an index like the S&P500 that best approximates the overall index’s behavior. Due to an almost infinite number of permutations, including which subsets of instruments to consider as well as how much investment should be made in each instrument, it is near impossible to have a closed form solution for this problem. A heuristic is needed, and in this case the developer implementing a solution to this scenario uses a form of genetic algorithm called Differential Evolution.
We will take the S & P 500 as an example, and to apply differential evolution to our problem we will make these substitutions:
- Complete S & P 500 = Chromosomes
- Max Contributors = non zero chromosomes
- Num. of Portfolios = num. of individuals
- Num. of iterations = num. of generations
Differential Evolution creates a random population, then continues to evolve that population until a certain number of iterations (generations) is reached or until a satisfactory result is found. An evolution consists of four steps executed on each individual.
- Mutation: Selecting 3 individuals (donors) to pick their chromosomes.
- Recombination: randomly copy chromosomes from the one individual to the newly generated one
- Reparation: re-apply the rules back after mutation, and recombination
- Selection: Pick the new individual from the new generation if better than previous, else promote the old individual to the next generation unchanged
Then two measures are taken to evaluate the population
- Tracking Error (Fitness, smaller the better): Tracking error is defined as the root-mean-square of differences between the log returns of the index and the individual.
- Diversity (smaller the better): Diversity is defined as the average standard deviation of each chromosome across all individuals. This is an indication that the individuals in this population are similar or not, as we get closer to the optimal solution the variations will become less, as all individuals will become closer to that optimal.
Also I’ve chosen to use Excel as a front end to make use of its charting and spread sheet functions. Excel add-ins, if not carefully designed, may lead to making Excel non-responsive, and one of the goals was to keep excel responsive to enable saving, and cancellation. This is why I split the add-in in two parts. The one that coupled and driven by excel creates a concurrency Agent that will carry on the calculations. Then an integer handle to that agent is created and returned to Excel. Every time Excel wishes to receive the results, or check completion it communicates with the first part with that handle. This allowed decoupling of background processing and UI, while allowing good communication between both.
Due to the fact that each generation depends on the previous one, the loop that evolves the generations can’t be parallelized, but the evolution process itself is parallelizable, as individuals are free to evolve independent of each other. The first chance for parallelizing this application was inside the Evolve method, that had a for loop that executes mutation, recombination, and selection for each individual. Profiling the serial version with the sampling profiler, we find out It contributes to 86% of the total work done by the application. Each loop has read access to all elements of the old generation, and only write-access to individual (i) of the new generation during selection. So, there is no sharing contention, by simply changing the serial for loop to parallel_for the application started to scale up. This sample also show how easy it is to use C++0x lambdas to make this replacement close to a mirror image.
1: if( _asParallel )
2: {
3: parallel_for( 0, _numIndividuals, [&]( size_t i )
4: {
5: Individual y = Mutate( p, i );
6:
7: Recombine( p, i, y );
8: Repair( y );
9: Select( newPopulation, i, y );
10: } );
11: }
12: else
13: {
14: for( size_t i = 0 ; i < _numIndividuals ; ++i )
15: {
16: Individual y = Mutate( p, i );
17:
18: Recombine( p, i, y );
19: Repair( y );
20: Select( newPopulation, i, y );
21: }
22: }
Doing some measurements against the serial implementation, the scaling effecieny[1] was less than 70%. So I executed the following to debug performance:
· started to use the Concurrency Visualizer that comes with Visual Studio 2010.
· turned on tracing by calling (Concurrency::EnableTracing()[2];). This adds markers to my profiler traces to detect where my parallel loops started and ended.
In the "CPU Utilization" view[3] it was clear that the CPUs are not all completely utilized all the time. So I zoomed in at one interesting point and switched to the "Threads" view[4] to have a better look at what the threads are doing.

Figure 2: CPU Utilization view, in Concurrency Visualizer
Looking at the thread view I started to understand why I was unable to utilize all the CPUs. My population was on the order of 200 individuals and the work associated with each was a few thousands cycles. Each core of my 8 cores was processing 25 individuals. Due to randomization not all of cores completed their work at the same time. Near the end, when there was not enough work to keep all CPUs busy, some cores were left without work to do while other cores completed the processing for the parallel loop. In the sample image you can see that only 3 threads are executing when we have 5 idle CPUs. This suggested that if we added a second smaller size work, those additional tasks could get executed during the idle times of the waiting threads. Another interesting point was a serial part between the iterations contributed to 25% of execution of each Evolution loop.

Figure 3: Threads view in Concurrency Visualizer
Now that we know what was blocking the performance let us see how we fixed each one.
First the fine grain concurrency: we need to find small work that is executed frequently and try to parallelize it. Going back to the sampling profiler, I found that the error tracking function simply calculates the log of the difference between an individual and the original index. It executed 72% of the time, and the exclusive work[5] was 26%. We call it during selection, reparation, creation of new population, and in reporting the best individual.

Figure 4: Functions view from the Sampling Profiler
Again, I’ve replaced the for loop with parallel_for. But this is not safe as the result vector needs protection. The Concurrency Runtime provides a cooperative reader writer lock, and even better a concurrent vector that is thread-safe. Concurrent_vector and is compatible with std::vector except for deleting elements so the change was as simple as changing from std::vector<> to Concurrency::concurrent_vector<>. After completing the changes the code again looked identical, involving no major changes. This reduced the exclusive work for that function to less than 1.2%.
1: PortfolioHistory PortfolioTracker::TrackHistory( Individual &w )
2: {
3: Concurrency::concurrent_vector<
4: double,
5: UserDefAlloc< double > > prices;
6:
7: if( _asParallel )
8: {
9: parallel_for( 0, pricesSize, [&]( size_t j )
10: {
11: double newPrice = 0.0;
12: for( size_t i = 0 ; i < indexes.size() ; ++i )
13: {
14: int index = indexes[ i ];
15: newPrice += _data._Members[ index ]._Prices[ j ]
16: * w._Chromosomes[ index ];
17: }
18: prices[ j ] = newPrice;
19: } );
20: }
21: else
22: {
23: for( size_t j = 0 ; j < pricesSize ; ++j )
24: {
25: double newPrice = 0.0;
26: for( size_t i = 0 ; i < indexes.size() ; ++i )
27: {
28: int index = indexes[ i ];
29: newPrice += _data._Members[ index ]._Prices[ j ]
30: * w._Chromosomes[ index ];
31: }
32: prices[ j ] = newPrice;
33: }
34: }
35: }
36:
37: template<typename T>
38: class UserDefAlloc
39: {
40: public:
41: pointer allocate(size_type count, const void *)
42: {
43: return (allocate(count));
44: }
45:
46: pointer allocate(size_type count)
47: {
48: // Call ConcRT Sub Allocator's Alloc
49: pointer ptr = static_cast<pointer>(Alloc(count * sizeof(T)));
50: return ptr;
51: }
52:
53: void deallocate(pointer ptr, size_type)
54: {
55: // Call ConcRT Sub Allocator's Free
56: Free(ptr);
57: }
58:
59: void construct(pointer ptr, const T& val)
60: {
61: // Call the std construct to construct obj@ptr with value val
62: std::_Construct(ptr, val);
63: }
64:
65: void destroy(pointer ptr)
66: {
67: // Call the std destroy to call destructor obj@ptr
68: std::_Destroy(ptr);
69: }
70: };

Figure 5: Functions view from the sampling profiler
Second, the non parallelizable part. This part starts once the evolution ends. We need to decide which generation to pick, the new one or previous ones comparing the best individual from both. then we need to format the result and cache it, so Excel can query the results later on. The serial implementation execute this in serial and we need a reader writer lock to protect the results list until the results are updated with the new one while the parallel version executes the result preparation in parallel using a background task group, then asynchronously sends the results to an unbounded buffer. The second part of the add-in that is coupled with Excel, will use this unbounded buffer to retrieve, and display the results.
1: TG.run( [&]()
2: {
3: //
4: // do the exhaustive copy in the back ground
5: //
6: latestResult->_Population = *best;
7: } );
8:
9: TG.run( [&]()
10: {
11: //
12: // calculate the track history of the best individual
13: //
14: if ( best->_Optimum != Population::uninitialized )
15: {
16: latestResult->_PopulationBest =
17: TrackHistory( best->_Individuals[ best->_Optimum ] );
18: }
19: else
20: {
21: latestResult->_PopulationBest =
22: PortfolioHistory(
23: MyVector( double )( _data._Index._Prices.size(), 0.0 ) );
24: }
25: } );
26:
27: //
28: // execute other code,
29: // then at the end of the function, wait for the task_group
30: //
31:
32: TG.wait();
33:
34: if( _latestResultQueue != NULL )
35: {
36: //
37: // asynchronously send the result to Excel
38: //
39: asend<PortfolioTrackerProgressEventArgs *>(
40: _latestResultQueue,
41: latestResult );
42: }
After fixing these two parts, the program speedup is 3.5x on 4 core, and On 8 core speedup is 6x.
To summarize:
The Concurrency Runtime enables developers to parallelize existing serial applications. Visual Studio 2010 is packed with features that enable the developer to debug, profile, all integrated in one tool.
· The Concurrency Runtime features used
· Visual Studio 2010 features used
Links and Blogs:
· ConcRT:
· Profiler:
Videos:
· ConcRT:
· Profiler:
References
- [1] D.K. Tasoulis, N.G. Pavlidis, V.P. Plagianakos & M.N. Vrahatis: Parallel Differential Evolution
- [2] M.G. Epitropakis, V.P. Plagianakos, & M.N. Vrahatis: Non-Monotone Differential Evolution
[1] It is the speed up divided by cores, so if the parallel implementation is 4x faster on 8 core then the scalability is 50%.
[2] Enabling tracing will show markers for parallel_for, parallel_foreach, and parallel_invoke. Whether they are called directly or nested inside each other. The marker will mark span from the first iteration to the last executed one.
[3] http://msdn.microsoft.com/en-us/library/dd627195(VS.100).aspx
[4] http://msdn.microsoft.com/en-us/library/dd627193(VS.100).aspx
[5] Work done exclusively inside that function, not inside the functions it calls into.
The Asynchronous Agents Library within the Concurrency Runtime provides a set of basic message blocks which can be used to create a message passing network. In most cases, these blocks have sufficient enough flexibility and can be composed together to provide all the necessary functionality for your message-passing network. However, there are cases in which the provided blocks do not have the exact behavior you want, and a custom message block is desired. In this post, I will discuss the process for creating your own custom message block.
What kind of message block are you writing?
The first thing to decide when creating a message block is to determine what kind if block you desire. Is it a source block, in which case it only offers messages but does not receive them, or is it a target block, which can only receive messages for processing, but does not offer them out to anyone else? It could also be a combination of the two, a block that both receives from sources and offers to targets.
The Asynchronous Agents Library contains two interfaces ISource and ITarget, which provides the API interface for source and target blocks, respectively. While you are free to create your own block from these interfaces, we highly recommend you use one of the three base classes we provide: source_block, target_block, and propagator_block. These are explained in the table below:
| source_block | A block that is only a source of messages. This block offers messages to other blocks, but cannot receive messages. |
| target_block | A block that is only a target of messages. This block receives messages from other blocks, but cannot offer them. |
| propagator_block | A block that is both a source and target of messages. This block can both receive messages from a source and propagate message out to other blocks. |
We recommend using these base classes because they significantly ease the process of developing a custom message block. They take care of all the error checking, safe unlinking and the required locks so they free you up to focus purely on the desired behavior of your message block.
All three base classes are templated on their network link registry type. This defines how many source or target links a block can have. The Asynchronous Agents Library contains two such link registries: single_link_registry and multi_link_registry, which allow a single or multiple links, respectively. For example, if you wanted to create a message block that was purely a source block and can only accept one target, you would declare your message block as:
class example : public source_block<single_link_registry<ITarget<_Type>>>
Similarly, you could specify the block to use multi_link_registry to allow multiple links. This is only differentiated because a single_link_registry can be slightly optimized over multiple links.
I will now focus on creating source and target blocks, and how they can be customized for different behavior. A propagator block would have the combination of the two.
Writing a source_block
A source_block is a block that offers messages to its targets. Thus, it must deal with target blocks calling back to it to take messages. The following six methods must be defined in your derived class and can be customized to change the behavior of your block:
| virtual void propagate_to_any_targets(message<_Type> * _PMessage) |
| This method is the main propagation routine for a source_block. This is where your block should be processing the received message and offering messages to its targets. This method is called automatically when calling one of two methods: async_send or sync_send, which are used to asynchronously or synchronously propagate a message out, respectively. These two methods are present on every block and serve to initiate either a asynchronous or synchronous propagation for the block. |
| virtual message<_Type> * accept_message(runtime_object_identity _MsgId) |
| This method is called from a target block after they are offered a message from this source_block. The call to accept must be made within the source’s call to target::propagate, thus this is part of the message passing handshake. The method is used to take ownership of the offered message. A common issue to decide in accept_message is if your block returns the actual offered message itself, or copies of messages. For example, our unbounded_buffer block passes the specific message offered to its targets, and only one will receive that message. Our overwrite_buffer, on the other hand, returns copies to each of its targets, so all targets can get a version of the message. |
| virtual bool reserve_message(runtime_object_identity _MsgId) |
| This method is called from a target block after they are offered a message from this source_block and want to reserve the message to consume it later. The function should return true if you want to allow this message to be reserved, false otherwise. |
| virtual message<_Type> * consume_message(runtime_object_identity _MsgId) |
| This method is called from a target block after they have already successfully reserved a message and now wish to consume it. Similar to accept, a common issue to decide here is whether to return the specific message or a copy of the message. |
| virtual void release_message(runtime_object_identity _MsgId) |
| This method is called from a target block after they have already successfully reserved a message and now do not wish to consume it. Mostly cleanup and possibly error checking should be done here. |
| virtual void resume_propagation() |
| This method is automatically called when a reserved message is consumed or a message is released. Here, you should decide how to proceed with propagation. In general an async_send(NULL) should be called, which will simply force the block to restart propagation without a new message. |
Writing a target_block
A target_block is a block that can accept messages offered from its sources. Thus, it must deal with messages being passed to it from sources. There are two methods than can be overridden to handle the messages, propagate_message and send_message. Only propagate_message must be overridden in the derived class; send defaults to not allowing synchronous propagation.
| virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) |
| This method is called after a message, _PMessage, has been asynchronously offered from source _PSource and has successfully passed any filter that you may have added to your block. The message has not yet been accepted, so ownership still resides with the source _PSource. Thus, this target_block can now be customized to do whatever it wants with the message and source. Typically, you want to call _PSource->accept(_PMessage->msg_id(), this);
in order to take ownership of this message from the source. However, the implementation of this method is totally dependent on the desired behavior for this block. |
| virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource) |
| This method is exactly the same as propagate_message except that the message _PMessage has been synchronously offered from source _PSource . |
Combining the two: Writing a propagator_block
The main usage of a propagator_block is its ability to receive a message, like a target_block, then manipulate that message and send it out, like a source_block. Thus, creating a propagator_block is simply the combination of the methods defined above. The propagation of a received message can be done by using either of the two methods: async_send and sync_send.
The async_send(message * _PMessage) method will spin up a light-weight task in the Concurrency Runtime which will asynchronously call propagate_to_any_targets(_PMessage) to manipulate the messages as you wish according to your custom block, and offer the message out to its targets.
Similarly, the sync_send(message * _PMessage) method will also offer the message out to its targets, but does so synchronously, in line.
Example: Squaring block
As a simple example, let’s look at how I would write a simple message block that accepts any message from a source, squares the received message payload, then propagates out a message with the square to its targets.
First, since this block both receives in and sends out messages, I need to make it a propagator_block. I’ll define the class as:
template<class _Type = int>
class squaring_block :
public propagator_block<multi_link_registry<ITarget<_Type>>,
multi_link_registry<ISource<_Type>>>
This class declaration specifies that my block is a propagator_block that can link to any number of targets and be linked from any number of sources.
Next, I’ll need some place to store my messages as they arrive in this block. I’ll need to store them in case, for example, the input throughput exceeds the output throughput, or in case a target reserves one of the offered messages and we need to stall the network pipeline until it is consumed or released. To do this, let’s just include a simple queue that holds our messages:
std::queue<message<_Type> *> messageBuffer;
Depending on your block, you may not need to store a queue of messages, or any messages at all. It all depends on the functionality you desire.
First, I’ll specify the constructor and destructor for the block:
squaring_block()
{
initialize_source_and_target();
}
~squaring_block()
{
remove_network_links();
}
The call to initialize_source_and_target is a base class method for propagator_block that initializes the block and allows you to specify which Scheduler or ScheduleGroup this block uses. For simplicity, and in most cases, we can just call this method with no arguments. (The source_block and target_block base classes have initialize_source and initialize_target methods, respectively.) The destructor must call remove_network_links, which is another base class method. This method is critical in that it ensures the links to and from this block are properly removed before destructing the object. Otherwise, asynchronous propagations could be continuing to deleted memory.
Now, let’s move on to the interesting aspects about customizing the block. As a target, this squaring_block is going to receive messages offered to it. A source block will do this by calling squaring_block::propagate. Since we are deriving from propagator_block, all the necessary error handling has already been done, and we just need to specify what happens after we get a message that we actually want in the propagate_message function (which is called by propagate):
virtual message_status propagate_message(message<_Type> * _PMessage,
ISource<_Type> * _PSource)
{
message_status result = accepted;
_PMessage = _PSource->accept(_PMessage->msg_id(), this);
if (_PMessage != NULL)
{
async_send(_PMessage);
}
else
{
result = missed;
}
return result;
}
The code here simply tries to accept a message from the source by calling _PSource->accept, then, if it properly returned a message, it would call async_send(_PMessage) on the returned message. As soon as the accept call has returned, the message has transferred ownership to our squaring_block from the source; however, our squaring_block has not yet processed or stored away the message and prepared it for transferring to any of our targets. That process is initiated by async_send.
Similar to propagate_message, there is a send_message function which can be specified. This function happens on a synchronous path, while propagate is for the asynchronous. For this purposes of this block, we only need to change the implementation to call sync_send instead of async_send and everything else is the same.
Propagation of messages
Now how about processing and offering the message out to the squaring_block’s targets? This is done via the source methods that were outlined earlier. First, let’s specify the propagate_to_any_targets method. This is the method that is called in a light-weight task created by the call to async_send during propagate_message and is where all the main processing should occur.
virtual void propagate_to_any_targets(message<_Type> * _PMessage)
{
if (_PMessage != NULL)
{
// Square the input, delete the original message
_Type outputPayload = _PMessage->payload * _PMessage->payload;
message<_Type> * newMessage = new message<_Type>(outputPayload);
delete _PMessage;
messageQueue.push(newMessage);
if (messageQueue.size() > 1)
return;
}
while (messageQueue.size() > 0)
{
message_status status = declined;
for (target_iterator _Iter = _M_connectedTargets.begin();
*_Iter != NULL; ++_Iter)
{
ITarget<_Target_type> * _PTarget = *_Iter;
status = _PTarget->propagate(messageQueue.front(), this);
// Ownership of message changed. Do not propagate this
// message to any other target.
if (status == accepted)
break;
}
// If status is anything other than accepted, then the current
// message was not propagated out. Nothing after it can be
// propagated out.
if (status != accepted)
break;
}
}
Let’s step through that code slowly and investigate how I made this block behave how I wanted it to. The first part of the function is focused on the processing of the message:
// Square the input, delete the original message
_Type outputPayload = _PMessage->payload * _PMessage->payload;
message<_Type> * newMessage = new message<_Type>(outputPayload);
delete _PMessage;
messageQueue.push(newMessage);
if (messageQueue.size() > 1)
return;
First, upon receiving a message I squared the input, providing the desired functionality of this block. Next, I delete the old message, since this squaring_block is the current owner of it, but its lifetime will end with this block; i.e. there will be no further propagations of that specific message out of this block. Then, I checked if there was any current message being offered by this block. If so, I enqueue the new message to the queue of stored messages and return; otherwise, I can begin propagation.
One subtle note about message processing: while I technically could have done the processing of the message by squaring the input in propagate_message rather than in propagate_to_any_targets, it is better practice to do the latter. This is because propagate_message is called from your block’s source, thus it is running in a task initiated by your source. Doing processing in propagate_to_any_targets is better because it is your block’s propagation task.
At this point, everything else in the propagate_to_any_targets method has to do with the actual propagation to targets. The message has already been processed and is ready for offering. The main while loop takes each message, one at a time starting with the head, and offers it to each of the targets, also one at a time. By using our standard multi_link_registry and our base classes, I’m able to use our included iterators to search through the targets with a simple for loop and propagate out to my targets:
for (target_iterator _Iter = _M_connectedTargets.begin();
*_Iter != NULL; ++_Iter)
{
ITarget<_Target_type> * _PTarget = *_Iter;
status = _PTarget->propagate(messageQueue.front(), this);
// Ownership of message changed. Do not propagate this
// message to any other target.
if (status == accepted)
break;
}
This takes the currentMessage and propagates it out to the targets. After each propagate call returns, we check to see what the result is. If it was accepted, that means the ownership has changed and we can break out and offer the next message in our queue. Otherwise, we should propagate it to the next target we have. If no target accepts the message, we propagation should stall until that currentMessage is accepted or consumed. We cannot propagate other messages in the queue, otherwise messages will be propagated out of order.
Now that we’ve created a block that can offer messages out, we need to handle what happens when a target block calls back and tries to take the offered message. This is handled in the accept_message method:
virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
{
message<_Type> * msg = NULL;
if (messageQueue.front()->msg_id() == _MsgId)
{
msg = messageQueue.front();
messageQueue.pop();
}
return msg;
}
This method just checks to see if the message the target is trying accept is the same as the current one that my squaring_block is holding and trying to propagate. If so, it dequeues it from my block, and returns it to the target. Note: as soon as it returns the message, ownership of the message has changed hands. We can no longer safely touch that message pointer.
At this point, we have a block that can participate in both synchronous and asynchronous message passing! For completeness, there are four more functions to handle reserving and consuming of messages. However, as this post is long enough already, I’ll simplify this block by disallowing reserving and consuming of message. In my next post, I’ll dive into the details of how to handle proper reserving of messages. Disallowing reservations can be done by:
virtual bool reserve_message(runtime_object_identity _MsgId)
{
return false;
}
virtual message<_Type>* consume_message(runtime_object_identity _MsgId)
{
return NULL;
}
virtual void release_message(runtime_object_identity _MsgId)
{
}
virtual void resume_propagation()
{
}
So that’s it! That’s a rather lengthy, but in-depth introduction to writing custom message blocks! Please let me know if there’s anything still confusing or there’s any interesting custom message blocks that you come up with! Look for my next post soon on allowing reservations.
Now that Visual Studio Beta2 has been available for a few weeks (see Rick’s earlier blog post) I hope you’ve had a chance to experiment with one or both of the concurrent containers that were introduced: concurrent_queue<T> and concurrent_vector<T>. These two containers are lock-free, and are used to avoid synchronization bottlenecks in parallel algorithms. As with all concurrency features, they come with a set of pitfalls that must be avoided. In this post I will focus on the concurrent_queue, its design, proper usage of its methods, and its use in some real scenarios.
From std::queue to concurrent_queue
It is enlightening to compare the concurrent_queue design against that of the standard C++ std::queue, the latter which is, of course, not concurrency-safe. The basic API of std::queue’s enqueue/dequeue functionality can be summarized as follows:
template<class T>
class queue
{
public:
void push(const T&);
size_type size() const;
T& front();
void pop();
};
Forgetting for the moment that std::queue, as implemented, is not concurrency-safe, we can first note that the enqueue portion of this API could be concurrency-safe. Enqueuing an item can be accomplished with a single call to push(), and this operation is atomic in that it is performed by a single method call, which means that the enqueued item will be added to the queue by the time push() returns.
However, the dequeue portion of this API is not, and cannot be, concurrency-safe because the dequeue operation on std::queue is not atomic; it requires three separate sub-operations: size(), front(), and pop().
if (q.size() > 0)
{
T x = q.front();
q.pop();
}
To safely dequeue an item we must first check to see if the queue is empty, to avoid dequeueing from an empty queue. Then we have to retrieve the value from the front of the queue. After we have the front item, we need to remove it from the queue by calling pop(). Concurrent enqueue and dequeue operations that occur after the call to size(), or after the call to front(), can result in unexpected behaviour. Here, for example, are three potential races that could play out:
- The internal implementation of std::queue could be corrupted, which would result in undefined behaviour, likely a crash.
- The result of (q.size() > 0) cannot be trusted if a concurrent push() or pop() occurs immediately afterward. If the queue was empty (q.size() == 0), a concurrent push could occur immediately after the check, which would cause the rest of this dequeue operation to be skipped when in fact there is an item on the queue. If the queue had a single element in it (q.size() == 1), a concurrent pop could occur immediately after the check, causing the queue to become empty; this pop() would fail because the queue is now empty, which in debug-mode would throw an assert dialog.
- Two threads might dequeue simultaneously. The first thread could retrieve the front item, and then get interrupted. The second thread could then run and retrieve the front item, which is the same item as the first thread. Assuming nothing else bad happens (see #1), both threads will have dequeued the same item.
Why is std::queue designed this way? Imagine that front() and pop() were combined into a single method and invoked as:
T x = q.pop(); // Hypothetical API
If the copy-assignment operator in type “T” throws an exception, the value dequeued would be lost forever. By separating the retrieval of the item from the queue from its removal from the queue, users can potentially recover from this situation. This separation makes std::queue’s API exception-safe, but concurrency-unsafe.
As we’ll soon see, concurrent_queue makes the dequeue operation concurrency-safe by making it atomic.
Concurrency-Safe Operations on concurrent_queue
The concurrent_queue is concurrency-safe with respect to the following concurrent operations:
- Enqueues concurrent with enqueues
- Dequeues concurrent with dequeues
- Enqueues concurrent with dequeues
These operations are internally synchronized using a lock-free algorithm. This allows concurrent_queue to perform much better than one implemented using coarse-grained locking.
Enqueue: The enqueue operation on concurrent_queue is straightforward, and identical to that of std::queue:
void push(const T& source)
This will push a source value onto the tail of the queue, synchronizing with other concurrent enqueue/dequeue operations.
Dequeue: Instead of three method calls to achieve a dequeue, the concurrent_queue’s dequeue operation is encapsulated in a single method, try_pop():
bool try_pop(T& destination);
Note that the name of the method is now try_pop(), which as its name implies, attempts to pop an item from the head of the queue. If the dequeue was successful, the dequeued value is stored in the “destination” parameter, and this method returns true.
We’ve seen that a separate check for queue emptiness prior to popping is subject to races. The concurrent_queue try_pop() method solves this issue by simply returning false if we attempted to pop from an empty queue. Note that this is not a failure, nor does it necessarily indicate an error in the program that calls it. Frequently, the correct action for callers of try_pop() is to retry when false is returned. I’ll talk about this further down.
I mentioned that std::queue’s dequeue operation was exception-safe but not concurrency-safe. Here, concurrent_queue’s dequeue operation is concurrency-safe, but not exception-safe. If the assignment operator of type “T” throws an exception, the dequeued value will be lost.
Concurrency-Unsafe Operations on concurrent_queue
The methods discussed here are not safe to call during concurrent push() and try_pop() operations. They are meant to be used only after all concurrent operations have completed.
empty: This method returns true if the container has no elements:
bool empty() const;
While this method is technically thread-safe (it won’t corrupt the state of the concurrent_queue), the value it returns isn’t terribly useful because the returned value might immediately become incorrect if a concurrent push() or try_pop() happens.
unsafe_size: As is clearly apparent from its name, this method is concurrency-unsafe.
size_type unsafe_size() const;
This method can produce incorrect results if it is called concurrently with push() and try_pop() operations. To understand why, it is necessary to understand a bit about how the concurrent_queue operates under the covers. There are 2 member variables in concurrent_queue that demarcate a sliding window that keeps track of the number of elements enqueued and dequeued through its lifetime: _Tail_counter and _Head_counter. When an element is enqueued, _Tail_counter is incremented. When an element is dequeued, _Head_counter is incremented. The number of elements in the queue at any given time is (_Tail_counter - _Head_counter), and indeed this formula expresses exactly how unsafe_size() is computed. Now consider the following case:
- Assume for this example that _Tail_counter is 12, and _Head_counter is 10. The queue has 2 elements.
- Thread 1 calls unsafe_size(). It gets as far as fetching the value of “_Tail_counter” before it gets pre-empted. It has fetched the value 12.
- Thread 2 performs 5 push() operations followed by 5 successful try_pop() operations. _Tail_counter will now be 17, while _Head_counter will now be 15. The queue still has 2 elements.
- Thread 1 resumes and finishes computing unsafe_size(). It fetches the value of _Head_counter, which is 15. It is now going to subtract the new value of _Head_counter (15) from the old value of _Tail_counter (12) and return (12 – 15), or -3 as the queue size. But wait! The concurrent_queue’s “size_type” is an unsigned type, which means the actual value will be 4294967293, or, if you prefer, really really huge.
clear: It is not concurrency-safe to clear out the contents of the concurrent_queue during concurrent operations:
void clear();
If, after all concurrent operations are complete, you wish to ensure that the concurrent_queue is empty and that all its elements are destructed, you can call this method. The concurrent_queue destructor will also implicitly clear out all its elements.
Iteration: Iteration over a concurrent_queue is not thread-safe and all methods that return iterators are explicitly prefixed with the word “unsafe_”.
iterator unsafe_begin();
iterator unsafe_end();
const_iterator unsafe_begin() const;
const_iterator unsafe_end() const;
Iterating while concurrent push() and try_pop() operations are happening will yield undefined results (e.g. crashing). However, for debugging it can be useful to traverse any remaining elements in the concurrent_queue and dump them out, and that’s what these methods are for.
Using concurrent_queue for Producer-Consumer
The concurrent_queue is an ideal data structure for producer-consumer scenarios. In the following simplistic example, we schedule one task that pushes (produces) 1000 integers, and the main thread pops (consumes) those integers:
// Task that produces 1000 items
void ProducerTask(void* p) {
concurrent_queue<int>* pq = (concurrent_queue<int>*)p;
for (int i = 0; i < 1000; ++i)
pq->push(i);
}
void Producer_Consumer() {
concurrent_queue<int> q;
// Schedule a task to produce 1000 items
CurrentScheduler::ScheduleTask(ProducerTask, &q);
// Consume 1000 items
for (int i = 0; i < 1000; ++i) {
int result = -1;
while (!q.try_pop(result))
Context::Yield();
assert(result == i);
}
}
Note that the consumer must retry (spin) if the try_pop() operation fails. Since this simple example guarantees that the producer will enqueue 1000 items, a failure in the dequeue operation simply means that the consumer thread is outpacing the producer thread, and eventually the producer thread will catch up and enqueue an item. When retrying in this way, it is very important to call Context::Yield() inside the spin-loop, which cooperatively yields to any other runnable tasks, allowing the producer task an opportunity to run. To see why this is important, imagine running this code on a single-core machine. Without the Context::Yield(), the consumer will busy-wait for an item to appear on the queue, starving out the producer task. If the producer task is starved, the process live-locks.
Debugger Visualizations for concurrent_queue
The internal implementation data structures for concurrent_queue are very complex. If you want to explore the internals of it, have fun. However, when debugging a program, developers simply want to see the contents of the concurrent_queue, not its internal data structures. In Beta2 we have included debugger visualizers for concurrent_queue (and concurrent_vector) so that they appear like their corresponding STL data structures in the debugger’s watch window. Here’s a sample of what the Visual Studio debugger’s watch window would look like for a concurrent_queue<int> that contains the three elements 5, 7, and 9:
Summary
The concurrent_queue container was adapted from the concurrent_queue in Intel’s Threading Building Blocks (and a big “thank you” goes to Intel for collaborating with us and allowing us to modify their implementation). If you’re familiar with the TBB concurrent_queue, you’ll note quite a few API differences, mainly because TBB’s queue supports bounded/blocking operations. Intel and Microsoft collaborated on a new, non-blocking concurrent_queue, and you’ll see TBB’s concurrent_queue start to conform to that of the Concurrency Runtime starting in TBB 2.2.
We will soon follow up with another post that talks about concurrent_vector.
As you know from our previous posts, Visual Studio 2010 comes with great support for the task-based programming model and multi-threaded applications. Even though Daniel uses managed code in his examples, all the content he created for the Parallel Tasks and Parallel Stacks windows applies to C++ code too.
Find out about all the links to debugging material from blog post: Parallel Debugging.
Marko Radmilac, Senior Developer for Microsoft’s Concurrency Runtime, offers the following discussion of parallel iteration in the Concurrency Runtime:
Let me say up front that this is my first blog, so I apologize if my writing format does not match what people have come to expect from a blog. I do hope, however, that this blog will contain enough information to provide some introductory information on the Parallel Patterns Library’s parallel_for and parallel_for_each constructs, when and how to use them. This blog has a Q and A format and contains both basic and advanced information, so feel free to jump to the section that is of most interest to you.
What is parallel_for and where do I find it? As you are probably aware (since you are reading this blog), the native C++ libraries in Visual Studio 2010 were extended to provide rich support for parallel programming. There are different layers at which users can interact with the parallel runtime, the highest one of which is the Parallel Patterns Library (using the header file ppl.h). In it, users can find different constructs that allow them to quickly parallelize their programs without extensive knowledge of scheduling decisions, underlying threads, the surrounding environment, etc. One of these constructs is the parallel_for construct, which allows a user to parallelize a for-loop quickly. Its close cousin is parallel_for_each, providing the same level of ease in parallelizing std::for_each construct.
How do I use parallel_for and when? Parallel for is a construct which takes the body of a for-loop written by a user captured in a functor (for details on functors and lambdas please read Stephan’s excellent blog), divides the work (number of iterations) amongst the available computing resources (processors) and executes that work in parallel. Here’s a simple example of serial to parallel transformation:
for (int i = 0; i < n; i++)
{
iter();
}
becomes
Concurrency::parallel_for(0, n,
[] (int i)
{
iter();
});
A naïve user might decide go ahead and convert all for-loops in the program into parallel for-loops. That would be a mistake, because without an initial feasibility analysis this conversion might be detrimental to the program. There are two aspects that must be considered before a conversion is made, correctness and performance, and they are both explained in the following few paragraphs.
I have converted my for-loops into parallel for-loops. Why doesn’t my code work properly? Converting all for-loops in a program into parallel for-loops may yield an incorrect program. There are quite a few algorithms in which the individual iterations of the loop are not independent. They often require another iteration to have been executed beforehand, so executing them in isolation would likely yield wrong results. A typical example would be a partial sum computation “in place”, on the original array (look at std::partial_sum):
for (int i = 0; i < n; i++)
{
// Array “a” contains both an original sequence
// and the end result
a[i] += a[i-1];
}
In order to compute kth term in a resulting sequence, the k-1th term must be known. If one were to execute iterations in parallel it could be that the k-1th term is not populated by the time the kth term is processed, yielding an incorrect result.
Concurrency::parallel_for(0, n,
[] (int i)
{
a[i] += a[i-1]; // incorrect!
});
I have converted my for-loops into parallel for-loops. Why does my code run slower than serial code? Converting all for-loops in a program into parallel for-loops may yield performance degradation instead of an improvement. Parallel for, as mentioned before, will divide the work amongst available processors and set it up for parallel execution. The initial division of work, setting up worker threads, and invoking a functor for each iteration, all incur a certain cost. That cost is not big, but it is not negligible either. If the work done in a single iteration and number of iterations itself are both small, this overhead is likely to show up as a less than expected speedup, sometimes even a slowdown.
Concurrency::parallel_for(0, 100,
[] (int i)
{
a[i] = b[i] + 1; // only a few cycles
});
In the example above, there is one memory read, one memory write and one add to be performed in a single iteration of the loop. On top of that, there are only 100 iterations in the entire loop. In this case, making a function call to perform this one iteration is likely to be on the same order of magnitude, if not greater, than the entire iteration itself. Therefore, this loop is not a good candidate for parallelization.
I am frustrated! When should I use parallel_for then? You should analyze your program first and identify “hot spots”, i.e. places in your code that are computationally intensive. Identify for-loops that control that computation and try to rewrite them in a way that eliminates dependencies between iterations. Also, in the presence of nested for-loops, it is often more beneficial to parallelize at the outer most loop level, in order to increase the amount of work being done per iteration. Once this analysis is complete, apply parallel for to the given loops and watch your program run faster J.
I was using OpenMP and I noticed that OpenMP executes my workload much faster than parallel_for does. Why is that? OpenMP uses a simple mechanism for scheduling the parallel iterations. It counts the number of iterations, divides them up by the number of processors P, and then schedules P workers to execute individual chunks of work. Each iteration in OpenMP consists of only a function call to the outlined function (in parallel for’s case this is a functor/lambda). On the other hand, parallel for is a more general mechanism, does much more work during preparation for parallel invocation, and much more during each iteration. This impacts the total amount of work (and the number of iterations in the loop) needed for parallel for to be profitable, and that number is higher than OpenMP’s. Therefore, you must have a for-loop with just enough work to make OpenMP profitable, but not enough to do the same for parallel for. If you had less work, both would be slower than serial execution; if you had more, both would be equally profitable because the overhead is amortized by the amount of work being done.
Why does it take so much work per iteration for parallel_for to turn profitable? OpenMP does it with far less work per iteration. As mentioned above, parallel for is a much more general construct than any construct in OpenMP. It should be noted that the Concurrency Runtime (ConcRT) is not less performant than the OpenMP runtime, but the parallel for construct tries to address many more issues than a simple OpenMP for-loop, and these come at a cost. It is possible to reduce the generality of parallel for to a minimum, at which point it becomes very close to the performance of the OpenMP counterpart. You can find a less generalized implementation of parallel_for in our sample deck, called parallel_for_fixed (it uses fixed-partitioning of iterations, without range stealing). The reason for generality of parallel for implementation is given in the following few paragraphs.
My workload inside the for-loop is uneven. Can parallel for help me distribute it properly? Yes, parallel for by default does range stealing and dynamic redistribution of work. That means that if any worker thread finishes its range, it will attempt to help other worker threads with their workloads. This ensures that all computing resources are fully utilized during parallel for execution. OpenMP does this to an extent with dynamic and guided scheduling, but parallel for does it on a per iteration level without using interlocked operations to mark the progress.
Can parallel for support both unsigned and signed loop indices? Do I have to worry about overflow? Yes, parallel_for handles both signed and unsigned indices equally well. As long as the type supports some basic operations found on an integer type, parallel for will handle it well. It will also make sure that the range does not overflow a signed type. This was not a case with OpenMP below version 3.0.
Do I have to worry about overloading the system? What if there are other parts of a process using the Concurrency Runtime? Parallel for automatically detects system load and identifies available processors to use. Therefore, it will be cooperative with the rest of the process, and use only available resources. Once other, busy resources become available, parallel for will quickly find use for them (they will join the computation). OpenMP provides this through its dynamic mode; although, once a parallel region has started executing, any available resources arriving after that point would not be used in the computation.
Can I have blocking code in my loop iterations? What happens if I block? Yes, you can have blocking in your code as long as it is cooperative (using ConcRT blocking APIs, or using Win32 blocking APIs on user mode scheduling -- UMS). When blocking happens, parallel for and ConcRT ensure that another worker is scheduled to pick up where the previous one left off. As a result, parallel for enables users to write code that would not work properly as serial code. The prime example is enabling forward blocking dependency, where ith iteration is blocked until jth iteration executes, where j > i. This code would deadlock in serial. OpenMP does not provide any support for blocking between iterations.
Concurrency::event e;
volatile LONG counter = 0;
Concurrency::parallel_for(0, 64,
[&] (int i)
{
InterlockedIncrement(&counter);
if (i == 63)
{
// all other iterations blocked;
// unblock them
e.set();
}
else
{
// wait for iteration 63 to unblock
e.wait();
}
});
// counter must be at 64 at this point
Can I cancel parallel for in the middle of computation? Yes, you can cancel parallel for at any point by throwing an exception from one of its iterations, or canceling its parent task group. This technique is very useful in performing long searches where computation can be stopped as soon as the result is found.
Concurrency::parallel_for(0, n,
[] (int i)
{
if (found(i))
{
throw ItemFound();
// cancels work immediately
}
});
This can also be achieved by cancelling the parent task_group, thus enabling cancellation on the entire subtree of work.
Parallel for guarantees that only iterations that have already started will complete after cancellation is initiated. This ensures that resources are not wasted. OpenMP does not have any support for cancellation, although one can be built in using a flag.
bool flag = false;
#pragma omp parallel for
for (int i = 0; i < n; i++)
{
if (flag)
{
return;
}
else if (found(i))
{
flag = true; // cancels work
}
}
It should be noted that even with this hand-crafted cancellation scheme OpenMP does not support cancellation of the entire sub-tree of computation, which parallel for does.
Can I throw exceptions inside the parallel for? Will that tear down my application? Yes, you can throw exceptions in the parallel for functor, and that will not result in the termination of your application. An exception on a worker thread will safely be transported to the main thread that initiated the parallel for (if exception is not caught in the body of the functor, naturally) so the user will have an opportunity to catch the exception outside of parallel for. OpenMP has no exception handling support.
Can I use parallel for for STL iterators? This is why we have parallel for each. If your iterator supports random access, it will use the same mechanism used for parallel for. If your iterator only supports forward access, then we have provided code that reduces this case to a random access, at which point parallel for construct is reused.
How many iterations, exactly, does it take to make parallel for better than serial? How many cycles should the functor contain in order for parallel for to beat the serial implementation? This is a difficult question to answer. It depends on a machine configuration, whether there is other parallel work in the process, etc. Therefore, instead of giving a general and non-useful answer, I will attempt to give a concrete answer for a 4-core box that I am using, without any other work on it.
This is how I decided to measure it:
i) I will have an outside loop with the number of repetitions for the experiment. This loop amortizes the initial spin of threads in both OpenMP and ConcRT. Also, it magnifies the result to a level that is easily measurable (above 100 ms).
ii) I will have a dial that controls the amount of work being done per iteration.
iii) I will have a dial that controls the number of iterations in a loop.
When ii) is small and iii) is big we will measure the overhead per iteration (provided that the initial amount of work per iteration is small); when they are both small we will measure the overhead for the initial setup of threads, dividing work, etc.
The work being done will be a simple function call without any work in that function, and ii) will control how many times that function is being called. This is how a simple version of this program would look like:
#include <ppl.h>
#include <windows.h>
#include <stdio.h>
#include <omp.h>
using namespace Concurrency;
#define RUN_TEST test_parallel_for
#define SIZE_OF_ARRAY 10000
#define REPEAT 10000
#define WORK 2
long long counter() {
LARGE_INTEGER li;
QueryPerformanceCounter(&li);
return li.QuadPart;
}
long long frequency() {
LARGE_INTEGER li;
QueryPerformanceFrequency(&li);
return li.QuadPart;
}
// compiled /Od in a separate object as {}
// to avoid inlining and invariant code motion
void noworkhelper();
void work(int index)
{
for (int i=0; i<WORK; i++)
{
noworkhelper();
}
}
typedef void (*FNPTR) (int i);
FNPTR Func = work;
__declspec(noinline)
void test_parallel_for()
{
parallel_for(0, SIZE_OF_ARRAY, Func);
}
__declspec(noinline)
void test_omp_for()
{
#pragma omp parallel for
for (int i = 0; i < SIZE_OF_ARRAY; i++)
{
Func(i);
}
}
void run_data()
{
for (int i = 0; i < REPEAT; i++)
{
RUN_TEST();
}
}
void main()
{
SchedulerPolicy policy(1,
SchedulerKind, ThreadScheduler);
Scheduler::SetDefaultSchedulerPolicy(policy);
double etime = 0.0;
long long start = counter();
run_data();
long long finish = counter();
etime = (finish - start) * 1000.0 / frequency();
printf("Elapsed time for test: %g ms\n", etime);
Scheduler::ResetDefaultSchedulerPolicy();
}
Before showing the results, it should be noted that these are pure overhead tests, designed to show costs related to creating, running and maintaining parallel regions. Here are the raw tables:
| | R:100000, I: 500, W:5 | R:50000, I: 500, W:10 | R:10000, I: 500, W:50 | R:5000, I: 500, W:100 | R:1000, I: 500, W:500 | R:500, I: 500, W:1000 |
| Serial | 713.2 | 670.9 | 637.1 | 649.4 | 632.9 | 630.2 |
| OpenMP | 295.1 | 265.5 | 197.5 | 189.6 | 173.4 | 184.6 |
| Parallel for | 1142.1 | 671.7 | 293.7 | 241.3 | 202.5 | 200.9 |
| Fixed | 564.3 | 388.9 | 232.7 | 233.6 | 183.3 | 206.7 |
R – the number or repeats, I – the number of iterations, W – the number of units of work, measured times all in milliseconds
Small number of iterations, small work 1
Now let’s look at what happens at high number of iterations and low amounts of work. This will tell us how costly each iteration is to execute in OpenMP and parallel for.
| | R:500, I: 100000, W:5 | R:100, I: 100000, W:25 | R:50, I: 100000, W:50 | R:10, I: 100000, W:250 | R:5, I: 100000, W:500 | R:1, I: 100000, W:2500 |
| Serial | 712.1 | 645.2 | 638.4 | 637.1 | 632.7 | 630.5 |
| OpenMP | 198.6 | 194.6 | 170.9 | 168.3 | 169 | 168.5 |
| Parallel for | 420.3 | 207.6 | 219.9 | 169.1 | 170.6 | 164.9 |
| Fixed | 226.6 | 213.6 | 196.8 | 169.1 | 172.9 | 179.5 |
R – the number or repeats, I – the number of iterations, W – the number of units of work, measured times all in milliseconds
Large number of iterations, small work 1
Conclusion -- Parallel for is a general purpose mechanism for executing serial for-loops in parallel. It is up to the user to determine legality and profitability of such a transformation. Parallel for construct increases the robustness and the flexibility of parallelized code by having built in support for features such as: cooperative blocking during iterations, immediate cancellation, signed integer iteration type with arbitrarily large ranges, dynamic redistribution of workload, support for STL iterators, etc. However, the generality of parallel for comes at a cost, and there are 2 types of overhead that make our parallel for implementation slower than OpenMP in some cases. The first one is the initial setup of the worker threads and the range stealing mechanism, and the second one is additional work being done per iteration to support cancellation and range stealing. This may result in poor scaling of the parallel for-loop, if the amount of work being done per iteration is very small. If your program falls into that category, and you would still like to see speedup from executing in parallel, please use a specialized, fixed-partitioning version of parallel for, parallel_for_fixed, available in our samples collection. It will scale as good as OpenMP in the cases where limiting overhead is essential.
Recently, I created 3 new sample message blocks that complement the Agents Library’s existing set and provide additional functionality. The three message blocks I selected to write, priority_buffer, bounded_buffer, and alternator, were based on customer feedback and to improve support for certain scenarios.
Each of these sample message blocks behave similar to unbounded_buffer and the other message blocks in the Agents Library, so taking a quick review at one of my previous posts introducing a couple of blocks might be valuable. Also taking a look at the msdn page for asynchronous message blocks would be helpful.
priority_buffer
Priority_buffer is very similar to unbounded_buffer, except messages are sorted and distributed to its targets based on priority. To use a priority_buffer the payload must implement the comparison operator<. This should not be an expensive operation as it will be called often to compare messages. A payload that is less than another is considered to be of higher priority. For example if using a priority_buffer of integers 1 is of higher priority than 2.
Priority_buffer doesn’t collect together and stop messages from being delivered to sort them. Message order will only be changed if messages are not consumed as fast as they are produced. Because messages are prioritized, ordering is no longer guaranteed; priority_buffer will always try to deliver the highest priority message to its targets at any given time.
Scenarios for priority_buffer include providing quality of service by addressing the most important items or tasks first.
bounded_buffer
Bounded_buffer is like a queue with a maximum capacity. The number of messages a bounded_buffer can hold is specified at construction time. Until it reaches its capacity bounded_buffer performs exactly like unbounded_buffer. Once full bounded_buffer will no longer accept new messages, instead it will postpone any offered messages until it goes back below capacity. Postponing messages is a way for a message block to say it isn’t going to take ownership of this message right now, but might want to at a later point in time. Once back below capacity bounded_buffer will try to consume any of these previously postponed messages from each of its sources. Bounded_buffer should be used in conjunction with the send message passing function to easily quench data production directly at the source without wasting computation cycles using polling.
Bounded_buffer is great in data-flow networks or producer/consumer scenarios where data creation occurs at a faster rate than can be processed and memory is a concern. Using bounded_buffer, message production can be blocked at the source of creation until previous messages are processed.
alternator
Many of the blocks in the Agents Library deliver messages out to targets based on linking order; i.e.., the first linked target gets a chance at each message before all others. Sometimes, it is desirable to evenly distribute messages across all targets in a round-robin fashion, alternator does exactly this. Alternator delivers the first message to its first linked target, the second message to its second linked target… and so on looping back to the first link when each target has been offered a message. Think of alternator as an unbounded_buffer that distributes messages fairly amongst its targets.
Alternators are useful for evenly distributing work across multiple pipelines or agents.
Download the Code
Follow this link to download the source code for priority_buffer, bounded_buffer, alternator, and many other samples from the Concurrency Runtime team. Specifically these blocks are located in SamplePack\ConcRTExtras\agents_extras.h.
Feedback
Please let me know what features you find the most useful and which ones we are lacking. This will help influence what features might be included or discussed in the future.
In my previous blog post, I talked about the dynamic migration concept of the Concurrency Runtime’s (ConcRT) Resource Manager (RM). Today I will be demonstrating that concept in action and will focus on its performance characteristics.
A Demonstration of Dynamic Migration
The scenario that we are going to use to demonstrate the resource manager’s dynamic core migration concept involves two schedulers running different kinds of workloads in phases. The schedulers will be created with default policy on an 8 core machine (MinConcurrency=1, MaxConcurrency=8). The workload of schedulers in each phase will be characterized by one of the following:
i) Both schedulers have an equal amount of work
ii) One scheduler has work, the other one has no work
iii) One scheduler has more work than the other scheduler
Here are the phases and the recorded performance of the Resource Manager in detail:

Phase1: In this phase both schedulers have equal amount of work. No dynamic migration happens and schedulers keep their proportionally allocated resources. Note that both schedulers have 4 resources.
Phase2: Only Scheduler1 has work here. This phase starts at 2.246s; RM reacts at 2.309s by allocating all resources to Scheduler1. Note that Scheduler1 has 8 resources of which one is shared with Scheduler2. That is because resource manager cannot take away cores below the MinConcurrency value and has to have at least 1 core assigned to Scheduler1. However, Scheduler1 has no work and the resource is idle therefore Scheduler2 makes use of it without interruption.
Phase3: This is similar to Phase2, this time only Scheduler2 has work. The phase starts at 4.508s; RM reacts at 4.602s by allocating all resources to Scheduler2.
Phase4: Here both schedulers have work but Scheduler1’s work is more than Scheduler2’s. The phase starts at 6.833s; RM first balances resource usage among schedulers at 6.911s taking into account the workload of each. This continues until Scheduler2 completes all of its work. At 8.112s all resources are allocated back to the Scheduler1.
Focusing on Dynamic Migration Reaction Time
In order to give more details on the dynamic migration reaction time, a zoomed in view of Phase4 will be examined.

Here the first core migration happens after 78ms where RM balances the resources between the two schedulers. This reaction time depends on a number of factors:
1) The statistics polling interval in resource manager
All core migration decisions are done after resource manager polls the schedulers. With the current implementation poll interval is 100ms, therefore the reaction time can be anywhere between 0 and 100ms.
2) The statistics from the schedulers
Statistics is the representation of scheduler workload in resource manager including task completion rate, incoming task rate and total task queue size. The faster the growth in statistics the faster the scheduler will get extra resources. This also implies that with a slower growth resource manager may delay allocation to the upcoming polls.
3) The availability of resources
Resource manager will only migrate if there is an available resource. Resources are considered available if other schedulers have declared their resources idle or have declining statistics. In other words, resource manager’s reaction will be delayed until there is an available resource.
After rebalancing, the statistics don’t change until 8.112s therefore resource manager continues with the current allocation. Whenever Scheduler2 completes all work, all resources are allocated back to Scheduler1.
Next Steps and Feedback
I would love to hear feedback regarding the blog content or areas of interest with respect to Concurrency Runtime to have a guided focus on blogging.
We’ve posted an update to our sample pack at http://code.msdn.com/concrtextras for Visual Studio 2010 Beta2 . The newest thing is this drop of the sample pack are the three new header files in the ConcRTExtras folder. Here’s what these files contain:
ppl_extras.h contains additional stl style parallel algorithms like parallel_accumulate, parallel_partial_sum, parallel_transform, parallel_all_of, parallel_any_of, parallel_none_of and also parallel_for_fixed, parallel_accumulate_fixed, parallel_partial_sum_fixed.
agents_extras.h contains additional useful message blocks like spriority_buffer, bounded_buffer, alternator, join_transform and a recalculate_buffer.
concrt_extras.h contains useful wrapper classes and functions from the runtime itself like concrt_suballocator a std::allocator built on Concurrency::Alloc, task_scheduler, schedule_group and schedule_task which are simple wrapper classes around Scheduler and ScheduleGroup that offer functor support like PPL for scheduling tasks.
These are located in the Concurrency::samples namespace and the team will be blogging about many of these over the coming weeks, but please feel free to ask any questions here or in the forums.
-Rick
Last week Visual Studio 2010 Beta was released for download. Since Beta1, the team has been pretty busy adding enhanced functionality to make you more productive at expressing parallelism in your applications and improving the quality and performance of our runtime and programming models.
Here’s a guide to what’s new in Beta2: we’ve added 2 new concurrent containers, significantly enhanced our online documentation and improved our debug experience by adding better visualizations and more intuitive views in the parallel debug window. We’ve also modified a small number of APIs which you should be aware of if you are using Beta 1.
Concurrent containers – concurrent_queue and concurrent_vector
We’ve alluded more than once to concurrent_queue and concurrent_vector in our videos and live talks, these are finally in the box and with them two new header files concurrent_queue.h and concurrent_vector.h.
concurrent_queue<T> is very similar to std::queue<T> and it offers push, try_pop interfaces and ‘unsafe’ iterators and size accessors (these aren’t threadsafe during concurrent pushes and pops).
concurrent_vector<T> is most similar to a std::vector<T> and it offers a push_back method that is internally synchronized across threads and allows efficient thread safe growth of the vector. Like std::vector, concurrent_vector has random access iterators, but unlike std::vector, the guarantee of contiguous storage is removed and there are no insert and erase methods.
The interfaces to concurrent_queue and concurrent_vector will be incredibly familiar if you are a user of Intel’s Threading Building Blocks (the interfaces are identical), and a very big thank you goes out to their team for their assistance with this.
Here’s a brief example that uses both std::queue and std::vector in a parallel loop:
#include <ppl.h>
#include <concurrent_vector.h>
#include <concurrent_queue.h>
#include <iostream>
using namespace Concurrency;
using namespace std;
int main()
{
concurrent_vector<int> odds;
concurrent_queue<int> evens;
parallel_for(0,100,1,[&odds,&evens](int i){
if (i%2 == 0)
evens.push(i);
else
odds.push_back(i);
});
cout << "We expect 100 items: " << evens.unsafe_size() + odds.size() << endl;
}
Debugging enhancements in Beta2
VS 2010 Beta 2 also now includes significant enhancements for parallel debugging. The locals window now includes visualizers for all first class objects in the PPL, the Agents Library so now when you a look at an instance of a concurrent_queue or an ubounded_buffer in the locals window they both look a lot like a std::queue instead of exposing implementation details. The Parallel Tasks and Parallel Stacks windows have also been enhanced; in addition to those videos, try the C++ code in the MSDN walkthrough and Daniel Moth’s blog. Finally, like the rest of the C Runtime, we’ve made our source code for the Concurrency Runtime available as part of the install, so if you need to debug deeper or really want to see how things work internally, you can now.
Documentation updated with more How To’s and walkthroughs
Our offline and online documentation has been significantly updated for Beta2. We’ve added multiple conceptual topics and expanded our How To topics significantly to include information not just on PPL and the Agents Library but on the underlying Concurrency Runtime and how to manage and use scheduler instances. Any feedback on these topics is greatly appreciated.
API updates to task_group and agent
For Beta 2, we’ve made a very small number of API updates to our task_group, structured_task_group and to our agent classes. The change to the agent is simple describe, we’ve simplified the state management and removed the parameter for the agent::done method, it’s sufficient to call this->done() instead of this->done(agent_done).
The change to task_group and structured_task_group is additive, we’ve added the method run_and_wait which takes a functor and runs it inline on the current thread or task. This offers the major benefit of being able to compose tasks and nest their cancellation. One of the easiest ways to see this in action is through implementing a parallel search algorithm, like a parallel version of the new C++0x library function ‘all_of’:
template<class InIt,class Pr>
inline bool parallel_all_of(InIt first, InIt last, Pr pred)
{
typedef iterator_traits<InIt>::value_type Item_type;
//create a structured task group
structured_task_group tasks;
auto for_each_predicate = [&pred,&tasks](const Item_type& cur){
if (!pred(cur))
tasks.cancel();
};
auto task = make_task([&](){
parallel_for_each(first,last,for_each_predicate);
});
return tasks.run_and_wait(task) != canceled;
}
Here we are placing a call to parallel_for_each inside of a structured_task_group and cancelling the work when the predicate is not true. This has the effect of cancelling all nested tasks in the structured_task_group, potentially saving work if the predicate is long running and expensive to compute. We can use this in our example above like this (but don’t expect to see significant speedups over std::all_of for this example):
if (parallel_all_of(odds.begin(),odds.end(),[](int i){return i % 2;}))
cout << "success!" << endl;
So try Beta 2 if you haven’t already
-Rick
In my previous blog post, I talked about key concepts of the Concurrency Runtime’s (ConcRT) resource manager, starting with a definition of a resource. I then explained why an application might be composed of a number of scheduler instances. Eventually I mentioned how the resource manager helps in resource allocation to the schedulers in order to improve performance and increase utilization of hardware. Specifically, the ‘initial allocation’ algorithm was mentioned, which defines the resource allocation done at the time of scheduler initialization.
Today I will be talking about ‘dynamic migration’ which is another key concept of the resource manager. I will provide details on how the resource manager deals with resource utilization dynamically as the workload of schedulers change.
Dynamic Migration
Dynamic migration can be defined as moving resources from one scheduler to another in order to improve CPU utilization.
As an example, consider the scenario on an 8 way machine where there are two schedulers (let them be S1 and S2) created with MinConcurrency = 1 and MaxConcurrency = 8 (Please refer to this for a description of the scheduler policy values that involves resource management). As mentioned as part of the initial allocation algorithm, after S1 and S2 are created the resources will be shared proportional to the MaxConcurrency and they will both have 4 resources:
Until now this is all initial allocation. After this point, resource manager will start collecting information about the workload of the schedulers by polling periodically. At each poll the resource manager will capture the state of each scheduler instance with the IScheduler::Statistics() method. Let me note that the time a scheduler instance registers to the resource manager, it has to provide an implementation of IScheduler interface so that resource manager can call on that interface. The IScheduler::Statistics() method is expected to return three measurements calculated by the scheduler instance itself:
- Number of completed tasks since last statistics call
- Number of incoming tasks since last statistics call
- The total size of all work queues
The resource manager will keep a history of these measurements and derive a metric for scheduler workload for each scheduler. A statistically significant change in that metric (roughly speaking a positive or negative change as a factor of the standard deviation) will signify to the resource manager that the scheduler either needs one or more additional resources or can give away one or more. The bigger the change the more resources will be added or removed.
After polling each scheduler, the resource manager will have a list of schedulers (L1) that can give away a number of resources and a list of schedulers (L2) that needs a number of resources. Next thing to do then for all schedulers in L1, starting from the one with the minimum resource allocation priority, is to remove extra resources and re-allocate them to the schedulers in L2 starting with the highest priority. It is important to note that if L2 is empty, then schedulers in L1 will still keep their resources.
Let’s return back to the example above and assume that the resource manager has decided S1 can give away 2 resources and S2 can make use of 2 resources. The resource manager will inform S1 to give away 2 resources, oversubscribe the hardware threads with 2 more resources and inform S2 that it can now use the extra resources. It is expected that S1 will obey the protocol and release the resources as soon as possible. After this protocol completes, the allocation will be as follows:

More Details on Dynamic Migration
We covered the general idea of dynamic migration. Let us walk through some what-if scenarios.
1. What happens if S1 has no work at all and S2 is still in need of resources, does S1 fall back to zero resources?
As a general rule of resource allocation, the resource manager will never allocate fewer resources to a scheduler than its MinConcurrency policy value. Therefore S1 will at least have 1 resource assigned. However since there is no work for S1, S1 will put that resource into inactive mode indicating to the resource manager that the resource is idle. Knowing this, the resource manager will share the underlying hardware thread by oversubscribing and providing another resource to S2.
2. What would happen if S1 shuts down and S2 is still in need of more?
All resources of S1 will become available. Resources will be allocated to S2 (not more than MaxConcurrency of S2) soon after.
3. On a machine with NUMA topology, is locality considered?
Locality information is used after the resource manager decides which scheduler is going to give away resources and which scheduler is going to take more (note that the schedulers are selected from the L1 and L2 list with respect to their allocation priority – therefore locality is less important compared to allocation priority). Let S1 (an element of L1) have R1 resources to give away and S2 (an element of L2) be in a need of R2 resources. If R1 is greater than R2 then the subset of resources to be allocated to S2 will be selected with respect to locality. Here is an example:

Assume this is the allocation layout before dynamic migration where Node1 and Node2 are NUMA nodes having 4 cores each. Let S1 be in need of 2 resources and S2 be able to give away 4. The resource manger will select two resources from S1 close to the ones in S2. Since resources of S2 (Core1 and Core3) are closer to Core2 and Core4 (as cores 1 to 4 are in the same node), the resource manager will migrate Core2 and Core4 to S2. After dynamic migration this would be the layout:

Next Steps and Feedback
Next I will demonstrate dynamic migration and focus on its performance characteristics. I would love to hear feedback regarding the blog content or areas of interest with respect to Concurrency Runtime to have a guided focus on blogging.
In my previous post I talked about the agent class. Now I will introduce the Agents Library’s message blocks, how to use them, and the fundamentals of what they do. I will cover some basics that apply to all of the message blocks, introduce the messaging APIs, and then specifically explain three blocks: unbounded_buffer, overwrite_buffer, and single_assignment.
Message Blocks
In the Agents Library we have created a set of interfaces and defined a protocol for message blocks to communicate and exchange messages. Message blocks are intended to be used to establish defined communication protocols between isolated components and develop concurrent applications based on data flow. The message blocks provided by the Agents Library can be used in conjunction with the agents class itself or separately.
For more information on data flow - http://en.wikipedia.org/wiki/Dataflow
ISource and ITarget Interfaces
ISource and ITarget are base interfaces message blocks work with. They declare functions for handling linking and exchanging messages. A source block implements ISource, a target block ITarget, and a propagator block, a block that is a source and a target, implements both. In this blog I will only be discussing the functions needed to use the message blocks provided by the Agents Library. A more comprehensive knowledge of the functions in the ISource and ITarget interfaces are required when creating your own message blocks; this will be discussed in a later post.
Linking/Unlinking and Creating Messaging Networks
Often it is desirable to link message blocks together to create a network. One such reason for building messaging networks is to create data flow pipelines. The following three functions, declared on the ISource interface, are used to connect source blocks to target blocks:
void link_target(ITarget<_Type> * _PTarget); – adds a link to the specified target. This causes the source block to offer any of its messages to the target until it is unlinked.
void unlink_target(ITarget<_Type> * _PTarget); – removes an existing link with the specified target, once unlinked no more messages will be presented to the target.
void unlink_targets(); – removes all links with any target blocks.
Here is a simple example connecting and then disconnecting two unbounded_buffers:
unbounded_buffer<int> buffer1, buffer2;
buffer1.link_target(&buffer2);
...
buffer2.unlink_target(&buffer2);
Some blocks have restrictions on the number of targets allowed; invalid_link_target exception is thrown in these cases.
Basic Message Propagation
Messages are exchanged between blocks using light weight tasks (LWTs) on a scheduler, because of this message propagation works cooperatively with any other work in the same scheduler. This means long running tasks that never block or yield can slow down or cease the forward progress of message delivery. Thus is the nature of working in a cooperative environment.
All of the message blocks built into the Agents Library guarantee in-order message delivery. It is possible to create your own blocks which do not preserve order, however all of built in ones do.
Messaging APIs - send, asend, receive, and try_receive
Once creation of messaging networks and propagation of messages is understood the only other thing needed to start programming is how to insert and remove messages directly from individual blocks.
Two global functions are used to create and insert messages:
bool send(ITarget<_Type> &_Trg, const _Type &_Data);
bool asend(ITarget<_Type> &_Trg, const _Type &_Data);
Each of these takes a target and the data to transmit. Send synchronously originates a message with a target, whereas asend asynchronously does. This means a send call will block until the target either takes the message or declines it. Send returns true if the message was delivered and false otherwise. Asend will not block until the target takes the message, it offers the message and immediately returns. A return value of true means the target has accepted the message and will eventually take it, otherwise false means the target either declined the message or postponed the decision on whether or not to take it until later.
Likewise here are the two global functions for removing or extracting messages:
template <class _Type>
_Type receive(ISource<_Type> & _Src, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);
template <class _Type>
bool try_receive(ISource<_Type> & _Src, _Type & _value);
Receive takes a source block to extract a message from and an optional timeout, the extracted value is the return value. If a message is currently not available in the source then receive will block until one is, optionally a timeout also may be specified. Correspondingly try_receive will only obtain a message if the source has one at that instance, otherwise it returns immediately. A return value of true on try_receive indicates a message was received, false means one was not.
All of these functions when blocking do so cooperatively with the Concurrency Runtime. To learn more about working cooperatively with the Concurrency Runtime take a look at the series of posts on Synchronization with the Concurrency Runtime.
unbounded_buffer
Unbounded_buffer is one of the most basic messaging blocks; it acts very similar to a queue. As its name suggests unbounded_buffer can store any number of messages, limited only by memory, collected from its source links (links to blocks that have the unbounded_buffer as a target). Unbounded_buffer always accepts all messages offered to it. Messages propagated to an unbounded_buffer are collected into a queue and then offered one at a time to each of its targets. Each message in an unbounded_buffer will only be given to one of its targets based on link ordering. This means targets of an unbounded_buffer compete for messages.
Unbounded_buffer provides two utility functions:
bool enqueue(_Type const& _Item);
_Type dequeue();
Each of these is equivalent to send and receive respectively, and basically are wrappers around them.
unbounded_buffer<int> buffer;
// These are equivalent.
buffer.enqueue(1);
send(buffer, 1);
// And so are these.
int value = buffer.dequeue();
int value = receive(buffer);
Unbounded_buffers are excellent for producer/consumer patterns. In a previous post Introduction to Asynchronous Agents Library the FindString agents sample makes use of unbounded_buffers to communicate between the individual agents.
overwrite_buffer
Essentially overwrite_buffer is a simple broadcaster. Overwrite_buffer is a message block that holds one message at a time, very similar to a variable. Every time an overwrite_buffer receives a message it offers a copy of it to any of its targets and then stores the message internally, replacing any previously stored message. The important thing to note here is there is no competition for data. Every time a message comes into an overwrite_buffer it is offered to all of its targets, then afterwards it can be overwritten at any point.
Overwrite_buffer also provides two utility functions:
bool has_value() const ;
_Type value();
Has_value returns true or false indicating whether or not the overwrite_buffer has received its first message. Value is a wrapper around receive. Has_value can be used to check if overwrite_buffer has a message, if overwrite_buffer does then calling value or receive will not be a blocking call.
Uses of overwrite_buffer include tracking state, continuously monitoring status, or broadcasting messages. In the Agents Library the agent class takes advantage of this using an overwrite_buffer internally to track its state. Calling the start, cancel, done, and wait functions work with agent’s the internal overwrite_buffer.
single_assignment
Single_assignment behaves very similar to overwrite_buffer, except it will only accept one message from any of its sources. Once single_assignment accepts a message all subsequent offered messages will be declined. Just like overwrite_buffer, single_assignment gives a copy of the message to each of its targets; there is no competition for data.
Single_assignment provides the following two utility functions:
bool has_value() const ;
_Type const & value();
These perform exactly same as in overwrite_buffer.
Single_assignments are useful when a single value is read by many, similar to a const variable. A single_assignment can also be used to pick the first available message from a group of blocks. The offered message will be accepted and all others will be declined. Used in this form single_assignment can act as a choice or chooser from a group of blocks.
In following posts I will introduce more of the message blocks and provide sample applications.
In my previous posts, I addressed the motivation behind using concurrency runtime’s synchronization primitives and also introduced Critical Section and reader writer lock. In this blog, I will cover concurrency runtime’s event.
Event
This is a bi-state type class which, unlike Critical Section or Reader Writer Lock, does not protect access to shared data. Events synchronize flow of execution and use concurrency runtime’s facilities to enable cooperative schedule of work. They behave similar to manual-reset event
Example:
This example enables the scheduler to create two threads and then calls DemoEvent function that takes event class and a Win32 manual-reset event class as template parameters. The Demo function first creates several tasks that simulate some work and then wait for a shared event to become signaled.
Special thanks to Rick Molloy for sharing this example.
// event.cpp : Defines the entry point for the console application.
//
// compile with: /EHsc
#include <windows.h>
#include <concrt.h>
#include <concrtrm.h>
#include <ppl.h>
using namespace Concurrency;
using namespace std;
class WindowsEvent
{
HANDLE m_event;
public:
WindowsEvent()
:m_event(CreateEvent(NULL,TRUE,FALSE,TEXT("WindowsEvent")))
{
}
~WindowsEvent()
{
CloseHandle(m_event);
}
void set()
{
SetEvent(m_event);
}
void wait(int count = INFINITE)
{
WaitForSingleObject(m_event,count);
}
};
template<class EventClass>
void DemoEvent()
{
EventClass e;
LONG volatile taskCtr = 0;
//create a taskgroup and schedule multiple copies of the task
task_group tg;
for(int i = 0;i < 8; ++i)
tg.run([&e, &taskCtr]{
//Simulate some work
Sleep(100);
printf_s("\tTask %d waiting for the event\n", InterlockedIncrement(&taskCtr));
e.wait();
});
Sleep(1000);
printf_s(" Setting the event\n");
//Set the event
e.set();
//wait for the tasks
tg.wait();
}
void main ()
{
// Create a scheduler that uses two threads.
CurrentScheduler::Create(SchedulerPolicy(2, MinConcurrency, 2, MaxConcurrency, 2));
printf_s("Cooperative Event\n");
DemoEvent<event>();
printf_s("Windows Event\n");
DemoEvent<WindowsEvent>();
}
Sample Output:
Cooperative Event
Task 1 waiting for the event
Task 2 waiting for the event
Task 3 waiting for the event
Task 4 waiting for the event
Task 5 waiting for the event
Setting the event
Windows Event
Task 1 waiting for the event
Task 2 waiting for the event
Setting the event
Task 3 waiting for the event
Task 4 waiting for the event
Task 5 waiting for the event
We observe that when using cooperative event, we execute all 5 tasks, each task when waiting on the event that is not set, cooperatively yields so that another task can be scheduled and run in the thread’s quantum. When using windows event, we observe that the 2 tasks scheduled block the thread until the event is set.
One of the native concurrency components coming in Visual Studio 2010 is the Asynchronous Agents Library. The goal of the Agents Library is to improve developer productively and enable developers to take advantage of concurrency with an agent-based message passing model to build isolated and composable components. In this blog, I will provide an introduction to the features available in the Asynchronous Agents Library and go into more detail about the agent class itself with an example program.
An Agent Based Model
Programming with an agent-based model allows concurrency to be inherently baked into the program from the start. Separating parts of a program into composable and reusable pieces that follow well-defined boundaries to communicate with message passing can tolerate latencies and effectively use parallel resources. By avoiding sharing memory when possible and focusing on data dependencies, scaling performance can be obtained using higher level abstractions, like agents, for parallelism. For more information on agent based models take a look here:
http://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
Overview of Agents Library Features
The Agents Library can be broken down into three basic parts:
The Agent Class – The agent class itself is intended for course grained parallelism/components that handle larger computationally intensive tasks or collections of smaller tasks. Fundamentally, agents are tasks that have an observable lifecycle and communicate with other agents by using message passing. Agents are NOT intended to be used for fine-grained parallelism; for that, the patterns and constructs in the Parallel Patterns Library are better suited.
Message Blocks – To enable mechanisms for communicating between asynchronous agents and isolated program components, we provide a collection of various message blocks:
- unbounded_buffer
- overwrite_buffer
- single_assignment
- call
- transformer
- timer
- join
- multitype_join
- choice
For example, an unbounded_buffer works very similar to a queue. It collects messages from each of its source blocks and offers them in order to its target blocks.
In addition, infrastructure for easily creating your own message blocks is included.
Coordination Primitives – For inserting and removing messages from a group of messaging blocks (a messaging network), we include a set of APIs:
- send
- asend
- receive
- try_receive
In later blogs, I will dive into the specifics of each message block and coordination primitive explaining how and when to use them.
Agent Class in More Detail
Essentially an agent is a light weight task (LWT) with observable state and cancellation. The lifecycle of an agent is as follows:
To see how an agent passes through the states, let’s look at the simplest case creating, starting, and waiting on an agent:
MyAgent myagent();
myagent is now in the ‘agent_created’ state.
myagent.start();
Calling start() moves the agent to ‘agent_runnable’.
agent::wait(&myagent);
Wait blocks until myagent reaches one of the final states (agent_cancelled, agent_done, or agent_faulted).
The agent class contains a protected pure virtual run() method which is called when the agent is executed:
virtual void run() = 0;
When start() is called a light weight task is scheduled and the function will immediately return; once there are available resources the scheduler will start executing the agent task, which will call the run() method when it executes. When an agent is finished, it must call the done() function to signal completion of its work. Commonly, done() is called at the end of the run() function; however, in some circumstances, it may be desirable to decouple the lifetime of the agent from the execution of the run method, such as if the agent does work asynchronously.
A single agent can be waited on by using the wait function, or multiple agents using the wait_for_one and wait_for_all functions on an array of agents. Waiting on an agent is one way of observing an agents state or status. The two following methods allow for directly querying the status and obtaining a message block to receive from:
agent_status status();
ISource<agent_status> * status_port();
Agents also can be canceled:
bool cancel();
If an agent has not started yet and cancel is called it will not be run and its status will be canceled, however if the agent has already started then calling cancel has no effect. Already started agents can’t be stopped.
Agents Sample:
To better illustrate agents, consider this implementation of a find string. The program works by searching a directory recursively for files matching an expression and then searching each file for a given string, very similar to ‘findstr’. The program is separated into several agents with specific jobs. One agent is for locating matching files, several for parsing the files looking for the specified string, and one for writing out found matches to the console. This sample also uses unbounded_buffer, asend, and receive.

FileFinder.h:
This agent takes a string containing the type of files to search for. Each matching file found is sent to an unbounded_buffer m_pFileNames which the FileReader agents receive from.
Here is the run() method. It searches each directory recursively for matching files. A special end message is also used at the end to signal there are no more files. Notice at the end of run() the done() function is called.
void run()
{
wchar_t currentDirectory[MAX_FILE_NAME] = L".\\";
FindFilesRecursively(currentDirectory);
// Send END_MSG to signal done processing.
asend(m_pFileNames, (wchar_t *)END_MSG);
done(agent_done);
}
Here is the code where matching file names are sent to the unbounded_buffer:
while(moreFiles)
{
// Copy file name, receiver will be responsible for cleanup.
pFileName = new wchar_t[MAX_FILE_NAME];
wcscpy_s(pFileName, MAX_FILE_NAME, pDirectory);
wcscat_s(pFileName, MAX_FILE_NAME, findFileData.cFileName);
//
// Send the filename to the unbounded_buffer.
//
asend(m_pFileNames, pFileName);
moreFiles = FindNextFile(hFind, &findFileData);
}
FileReader.h:
The FileReader agent keeps receiving file names to parse from an input unbounded_buffer and sends any matches to another unbounded_buffer which the ConsoleWriter will receive from.
Here is the run() method. Notice the while loop that keeps receiving messages, until the end message is reached, from the same unbounded_buffer the FileFinder agent was using.
void run()
{
wchar_t *pFileName;
//
// Repeatedly pull filename messages out of the
// unbounded_buffer to parse until the END_MSG is
// reached. Note this will ConcRT aware block until
// a message is avaliable in the buffer.
//
size_t currentLineNum;
const size_t lineSize = 2000;
wchar_t line[lineSize];
while((int)(pFileName = receive(m_pFileNames)) != END_MSG)
{
currentLineNum = 1;
wifstream inputFile;
inputFile.open(pFileName, ifstream::in);
while(inputFile.good()
&& inputFile.getline(line, lineSize))
{
if(wcsstr(line, m_pSearchString) != NULL)
{
//
// Create a new message payload and send it to
// the unbounded_buffer for the ConsoleWriter
// to receive from.
//
asend(m_pFoundBuffer, new Payload(pFileName,
wcsnlen(pFileName, MAX_FILE_NAME)+1,
currentLineNum,
line,
wcsnlen(line, lineSize)+1));
}
++currentLineNum;
line[0] = '\0';
}
inputFile.close();
delete pFileName;
}
// Resend the END_MSG for any other FileReaders.
asend(m_pFileNames, (wchar_t *)END_MSG);
done(agent_done);
}
ConsoleWriter.h:
This agent receives messages containing information about found matches and prints the information to the console.
Here is a snippet from its run method. It loops receiving messages from the unbounded_buffer the FileReaders send to.
//
// Keep receiving messages to print to the console until
// the END_MSG is received. Note this will ConcRT aware
// block until a message is avaliable in the buffer.
//
while((int)(pPayload = receive(m_pFoundBuffer)) != END_MSG)
{
SetConsoleTextAttribute(hConsole, FOREGROUND_BLUE
| FOREGROUND_GREEN | FOREGROUND_RED | FOREGROUND_INTENSITY);
printf("%ls:%lu:", pPayload->m_pFileName,
(unsigned long)pPayload->m_lineNumber);
SetConsoleTextAttribute(hConsole, FOREGROUND_BLUE
| FOREGROUND_GREEN | FOREGROUND_RED);
printf(":%ls\n", pPayload->m_pLine);
delete pPayload;
}
Payload.h:
Defines a class used to pass messages between FileReaders and the ConsoleWriter. It holds the file name a match was found in, the line number, and the line itself.
FindString.cpp
Driver of the program creates, starts, and then waits on all the agents.
Here is the code to create the unbounded_buffers:
unbounded_buffer<wchar_t *> fileBuffer;
unbounded_buffer<Payload *> foundBuffer;
Creation of the agents:
// Agent to find files.
FileFinder fileFinder(pFilePattern, &fileBuffer);
fileFinder.start();
// Agents to handle parsing files.
FileReader **ppFileReaderAgents = new FileReader*[numAgents];
for(unsigned int i = 0; i < numAgents; ++i)
{
ppFileReaderAgents[i] = new FileReader(&fileBuffer,
pStringPattern, &foundBuffer);
ppFileReaderAgents[i]->start();
}
// Agent to handle writing to the console.
ConsoleWriter consoleWriter(&foundBuffer);
consoleWriter.start();
Waiting on the agents to finish:
// Wait for agents to finish.
agent::wait(&fileFinder);
for(unsigned int i = 0; i < numAgents; ++i)
{
agent::wait(ppFileReaderAgents[i]);
delete ppFileReaderAgents[i];
}
delete [] ppFileReaderAgents;
// Now that all other agents have finished signal the ConsoleWriter
// it can finish.
send(foundBuffer, (Payload *)END_MSG);
agent::wait(&consoleWriter);
In this program, I created agents to each handle large chunks of isolated work and I/O. Notice how parallelism is built in to the program by using higher levels of abstraction from the beginning. I created a pipeline of agents that work concurrently together with almost no knowledge of each other. Each agent takes in messages, performs some action, and then accordingly sends a message. Since each agent can be running at the same time I can exploit any available cores on the system; notice how there are many FileReader agents used, this means the program will be parsing multiple files concurrently at the same time. By designing my program in this fashion I can easily add, remove, or modify individual agents without needed to restructure anything else. I also avoided having to deal explicitly with locks and other difficult forms of synchronization.
To see all the code and play around with this sample download the project and source files here:
Code Samples
Example command line execution of this program is:
FindString.exe agents *.txt
In future posts, I will go into more detail about each message block the Asynchronous Agents Library provides, how to use them, the coordination primitives, and how to take advantage of our infrastructure and easily construct your own message blocks.
Following the release of Visual Studio 2010, we've just published a set of sample applications for using the Parallel Pattern Library, the Agents Library and the Concurrency Runtime on code gallery. These supplement the documentation and samples provided on msdn for Beta1.
The samples include using task parallelism with PPL, a few based examples of using the Agents Library (Dining Philosophers, an example using choice) and a find in files example (which will be blogged about here in the next day or so). There are also some examples of using the Concurrency Runtime itself, a simple event based application demonstrating cooperative blocking and UMS threads and a task based example (fibonacci) which shows the Concurrent Suballocator.
As always your feedback here or in the forums is welcome.
-Rick