Welcome to MSDN Blogs Sign in | Join | Help

Concurrently Speaking

A blog by Niklas Gustafsson on topics loosely related to concurrency and manycore
PDC 2009, Day Minus One

This year’s PDC promises to be an exciting one with VS 2010 in Beta2 and lots of other technologies either just shipped or about to ship. I’m in Los Angeles to speak about Axum, as part of a new thing we’re doing this year: talking about topics that are somewhat longer-lead than the usual stuff.

I’m really looking forward to being able to talk more in-depth about it and get lots of feedback from everybody listening in. My presentation is on Thursday morning.

Tomorrow, Monday, is the pre-conference workshops and setting up the booths on the main show floor. The Parallel Computing team has two booths this year, one in the HPC area and one in the Visual Studio area, so it should be really easy to find us.

I’ve been thinking about discrete event simulation a bunch lately; I’ve been fascinated by that particular use for software since way back in college when I used SIMULA and were exposed to object-oriented programming for the first time.

While discrete time is basically a very simple concept, parallelizing discrete event systems is non-trivial, since the introduction of asynchrony leads to all kinds of races between things operating in discrete time and things operating in real time. It seems to me that it should be fairly straight-forward to do it with an actor-based framework, but I have yet to prove that to myself.

The auction sample I posted earlier this year parallelized nicely, but was in many ways a cheat: it used wall clock time throughout the code, not discrete time. That meant that the course of the simulation is impacted by the runtime resources available, with simulation objects competing for processing power and thus being subject to non-determinism. Not being able to repeat simulations is an extremely bad thing.

Anyway, the next few days promise to be exciting, and I’ll keep posting each day of the conference.

The Perils of Lock-Freeness & Getting Tasks onto the UI Thread

In my last post, I was looking for someone to tell me about a race condition in the cancel() code path, but to my embarrassment, Krishnan Varadarajan, one of the many talented developers on the ConcRT team, pointed out a bad race on the run/wait code path!

In the original code, it was possible for the task counter to be decremented to zero as the last task finishes, just before a new run() call increments the count, resets the event and the calling code follows it with a call to wait(), which returns when the working thread calls event.set(). Ouch!

My issue was that I was trying too hard to avoid using locks in my code, something that should always raise a red flag. Lock-free code is great if you can pull it off, but almost always introduces complexity beyond its worth, and it is almost always incorrect.

The thing we are typically worried about when we dabble in lock-free code is scalability, and lock-based code has fine scalability as long as you don’t hold them for very long (which creates contention).

The serial task group exists in order to avoid taking locks for the duration of each scheduled task, not for a few individual instructions, as I was trying to do.

To address this, I decided to write a simple counting event class, which sets the event whenever the count it holds reaches zero, and resets it whenever its no longer zero. We can use this in the revised version of the serial_task_group by changing run(), wait(), cancel() and _Task_proc():

template <class Func>        
void run(const Func& fn)        
{            
    __int64 tasks = taskCount++;
    _Schedule_task(fn, tasks == 1);        
}    
void wait()        
{            
    taskCount.wait();        
}         
void cancel()        
{            
    InterlockedIncrement(&this->canceling);            
    taskCount.wait();            
    InterlockedDecrement(&this->canceling);        
}  
template<class Func>       
static void __cdecl _Task_proc(void* data)        
{            
    [ No changes before the loop ]  

    // Loop until there is no more work to perform.            
    do            
    {               
        [ No changes until the bottom of the loop ]
tasks = pTaskGroup->taskCount--; } while (tasks > 0); }

I also removed the use of QueueUserWorkItem and replaced it with calls to the ConcRT scheduler, which allows me to use the serial task group with any scheduler, so that tasks are serialized by the serial task group, but still sharing resources with tasks of another scheduler, such as the one used to run the task that calls run() in the first place.

Now, a variant of the serial task group is one that executes its tasks on the UI thread of some window in your application. In fact, it’s a trivial modification to the serial_task_group, and the source is available here.

It will get a little hacky, since we need to define a new window message and handle it in our window procedures:

#define WM_TASK WM_USER+1

The actual message value would have to be modified to not collide with any other user-defined messages that may appear in your specific application, of course. Alternatively, you could use RegisterMessage() to get a unique message identity at runtime, then pass that in to the task group constructor.

First, we need a window handle in order to schedule tasks, so the task group constructor takes both a handle and a message identifier:

ui_task_group(HWND hWnd, int msg) : canceling(0), hWnd(hWnd), msg(msg)      
{            
}

Second, the code to schedule a task on a scheduler is replaced by a call to ‘PostMessage:’

// Request a new worker thread if necessary.            
if (createWorker)                
    PostMessage(hWnd, msg, (WPARAM)(LPTHREAD_START_ROUTINE)_Task_proc<Func>, (LPARAM)&queue);

Then, as a last step, add this case to the big switch-statement in all message-handling procedures you have in your code (so that the task group can be used with any window you need it to):

case WM_TASK:
    {
        LPTHREAD_START_ROUTINE task = (LPTHREAD_START_ROUTINE)wParam;
        task((LPVOID)lParam);
    }
    break;

With this, you have a task group that will move all its tasks onto a UI thread, with support for wait and cancellation using APIs that look just like the task groups defined in PPL. Here’s an example:

    ui_task_group tg(hWnd,WM_TASK);

    for (int i = 0; i < 5; i++)
    {
        tg.run( [=] { ... do something on the UI thread ...} );
    }
    tg.wait();

wait() is just as dangerous here as it is with the serial task group – if you call wait() from the thread that owns the window handle, which will include all tasks scheduled by the task group, the program will promptly lock up. So don’t.

Grandiosely Serialized Tasks

No one can accuse me of spamming the MSDN blog site with too many posts, but after months of writing nothing, I recently and suddenly felt inspired to get something posted again. Maybe it had something to do with the release of Beta 2 of VS 2010 and .NET Framework 4?

Apple recently shipped the latest rev of its Mac OS X operating system, version 10.6. Among other features, it contains a thread pool called Grand Central Dispatch. While there is significant feature overlap between GCD and in the Vista thread pool, GCD has made some serious improvements in usability, similar in some ways to what we’re doing with PPL and ConcRT.

The significant one, in my opinion, is the addition of code blocks to the C/C++/Objective-C languages. Code blocks are similar to C++ lambdas, but since they are available also in C and Objective-C, the OS APIs can take full advantage of them, and GCD does. Just like lambdas add tremendous usability to PPL, code blocks make it much easier to take advantage of the features offered by GCD to define tasks.

Here’s an example lifted from the GCD documentation:

  int x = 123;
  int y = 456;

  void (^aBlock)(int) = ^(int z){ printf(“%d %d %d\n”, x, y, z);};

  aBlock(789);

Which is very similar to how you would do it with C++ lambdas:

  int x = 123;
  int y = 456;

  auto aBlock = [=](int z){ printf("%d %d %d\n", x, y, z); };

  aBlock(789);

GCD combines this with functions that dispatch blocks onto queues, just like any thread pool would, just with more ease.

What I wanted to explore, though, was one feature in particular: serial queues. What a serial queue gives you is a guarantee that there will be only one worker thread for all tasks that are added to the queue. Sometimes referred to as turn-based scheduling, this approach allows code to avoid using locks for mutual exclusion, which turns out to be especially advantageous for high-contention situations where the chance of having to block in the kernel, a very expensive operation, is high.

In Axum, we use a similar approach to protect domain state, although we go a step farther and separate the state readers from the state writers. GCD’s serial queues effectively treat every block as a writer, which allows for a simpler implementation with lower per-task overhead, but also less opportunity for parallelism between readers.

The Vista thread pool has this capability, too, but there’s nothing in PPL that supports this scenario: even using a thread-based scheduler, setting both the min and max concurrency to ‘1,’ and using a task_group still allows a task-creating thread to execute tasks when waiting for the group to complete. Going directly to the scheduler interfaces to call ScheduleTask doesn’t help – any call to Oversubscribe(true) will increase the thread count temporarily.

Luckily, the functionality is simple enough that it only takes a few lines to implement our own solution on top of the Windows thread pool. For good measure, we’ll throw in cancellation support, too, and make the API conform to PPL’s style. In fact, we can rely on the older thread pool which was available from Windows 2000 forward, so this is not a Vista-only feature. ConcRT, which the code depends on, does not support Windows 2000, just XP and later versions.

The full source code is available here. It was built with Beta2 and will not work with earlier versions. Specifically, concurrent_queue is new in Beta2.

Let’s go through it step by step:

The class we’re creating is called ‘serial_task_group,’ as it is meant to be used in the same way that the PPL ‘task_group’ and ‘structured_task_group’ are used. It is a non-template class with template methods.

The constructor is very simple as we are not associating with a ConcRT scheduler or defining scheduler policies. The event is set initially, so that any code calling wait or cancel will immediately return since the task group is initially empty.

  serial_task_group() : taskCount(0), canceling(0) { evnt.set(); }

‘taskCount’ and ‘canceling’ are volatile 64-bit integers that will be used for wait and cancellation support, respectively. Together with a ConcRT event and a concurrent queue, they form the data members of the class:

  volatile unsigned __int64 taskCount;
  volatile unsigned __int64 canceling;

  ::Concurrency::event evnt;
  ::Concurrency::concurrent_queue<_task_info *> queue;

Each time a task is added, ‘taskCount’ is incremented, each time one completes, it is decremented. ‘canceling’ is incremented for each cancellation request, and decremented when the request is met.

Just like with task_group, tasks are scheduled using a template ‘run’ method, which is how the lambda magic is leveraged:

  template <class Func>
  void run(const Func& fn)
  {
      long tasks = InterlockedIncrement(&this->taskCount);
      if (tasks > 0)
          evnt.reset();
      _Schedule_task(fn, tasks == 1);
  }

‘wait’ and ‘cancel,’ which should really be called ‘cancel-and-wait,’ are both implemented synchronously, waiting for the event to be set before returning. Cancellation works by communicating to the implementation that requests to cancel have been registered, which prevents further scheduling from happening until the task count goes to 0.

  void wait()
  {
      evnt.wait();
  }
  void cancel()
  {
      InterlockedIncrement(&this->canceling);
      evnt.wait();
      InterlockedDecrement(&this->canceling);
  }

Those methods form the public interface of the class. The more interesting parts of the implementation is in the private stuff.

First, the _Schedule_task method, which places the newly submitted task at the end of a queue and creates a worker by calling QueueUserWorkItem. The actual decision to create a worker is made by the run method when the task count is incremented from 0 to 1.

Second, since Win32 doesn’t understand C++ functors, we need to supply something else as the argument to QueueUserWorkItem, which is what the static _Task_proc method is for. Its being static complicates things, as we have to pass it the ‘this’ pointer together with the functo. Therefore, we place not just the functor on the queue, but a _task_info record which contains both the pieces we need.

  template <class Func>
  void _Schedule_task(const Func& fn, bool createWorker)
  {
      // Create a task info record and queue it up.
      _task_info *pInfo = new _task_info;
      pInfo->pFunc = new Func(fn);
      pInfo->pTaskGroup = this; 
      queue.push(pInfo);
            
      // Request a new worker thread if necessary.
      if (createWorker)
          QueueUserWorkItem((LPTHREAD_START_ROUTINE)_Task_proc<Func>, &queue, 0);
  }
  template<class Func>
  static void _Task_proc(void* data)
  {
      concurrent_queue<_task_info *> *queue = (concurrent_queue<_task_info *> *)data;
      _task_info *pInfo;

      long tasks = 0;
      serial_task_group *pTaskGroup = NULL;
            
      // Loop until there is no more work to perform.
      do
      {
          while (!queue->try_pop(pInfo))
          {
              Context::CurrentContext()->Yield();
          }

          Func* pFunc = (Func*) pInfo->pFunc;
          pTaskGroup = ((_task_info*) pInfo)->pTaskGroup;
            
          if (InterlockedXor(&pTaskGroup->canceling, 0L) == 0)
              (*pFunc)();

          delete pFunc;
          delete pInfo;

          tasks = InterlockedDecrement(&pTaskGroup->taskCount);

      } while (tasks > 0);

      pTaskGroup->evnt.set();
  }

Creating a worker only when we do not already have one is important for performance: calling QueueUserWorkItem for each task means possibly having more than one running, so we then need to introduce locks to protect the functor invocation. As taking locks is what we’re trying to avoid in the first place, it would be a shame. Besides, if we’re using serial task groups to protect access to some data structure, having it in cache is more likely if as many tasks as possible are run by the same worker thread.

Unfortunately, it is possible to schedule a new task so that we increment the task count before the decrement is called, but hit the try_pop call before the task has been added to the queue:

  Task-executing thread: Task-creating thread:
     
  Finish the last task; taskCount = 1  
    Increment taskCount; taskCount = 2
  Decrement taskCount; taskCount = 1  
  Test ‘tasks’; go to the top of the loop  
  Call try_pop on an empty queue  
    Push the task onto the queue

Therefore, if the remaining taskCount is greater than zero, but the queue is empty, we have to spin until the two agree. The right way to do this with ConcRT is to call ‘Yield’ on the current context, which (arggh!) forces us to include this undefine directive at the beginning of the file:

  #include <windows.h>
  #undef Yield

Everything else in the implementation is pretty straight-forward. There are no locks taken, but there is going to be some contention on the interlocked operations, which is less of a performance concern. The cost of calling QueueUserWorkItem is amortized over the number of tasks that are batched together in the queue at any point in time, but in low-contention situations, there is still a significant cost.

Ready to use it? Except for the task-group declaration, the code looks like any other PPL-based code:

  serial_task_group tg;

  for (int i = 0; i < 5; i++)
  {
      tg.run( [=] { 
          printf("running task %d from thread %d\n", i, GetCurrentThreadId()); } );
  }
tg.wait();

Serial task groups are not particularly expensive: a couple of object allocations (one for the task group, one for internals of the concurrent queue), for a total just above 300 bytes. If the task group is on the stack, it’s even cheaper, of course. Scheduling tasks requires a push and a pop per task, as well as two object allocations.

Here’s a challenge for you: I have this sneaking suspicion that the cancellation functionality has a race in it, but I’m not sure. I haven’t had time to prove its absence, nor have I established the exact sequence of events that would cause it. Gold stars galore to anyone who manages to convince me one way or the other! Remember that with cancellation, it’s OK to miss cancelling some work – it’s impossible to guarantee it. What would be of concern, however, is if we get stuck waiting for the cancellation when it will never be signaled.

Auction Simulation in C++

My fellow programming language enthusiast Matthew Podwysocki just posted another F#-based actor sample in his blog; it is a simulation of an auction that was first written in Scala. Last time, I followed his Axum Ping-Pong example with my own F#-based version, but this time, I’ll use his example to point to some of the new features available in Visual Studio 2010, which is available in beta form here. I also suggest reading Rick Molloy’s excellent MSDN article on agent programming in C++.

On to the sample code! I tried to follow the F# code as closely where possible, but there are obviously going to be big differences between the two. For no particular reason, I’ll use a strange mix of PascalCasing and stl_casing in the code below.

First, after the obligatory file inclusions, we have to deal with the declaration of message payloads; unfortunately, whereas F# has discriminated unions and their declaration is extremely concise, we have to roll our own in C++. Thus, the message declarations look like this:

enum MessageId { None, Inquire, Offer, Status, BestOffer, 
                 BeatenOffer, AuctionConcluded, AuctionFailed, AuctionOver };

struct AuctionReply {
    AuctionReply() : msg_id(None) 
    { }
    AuctionReply(MessageId msg_id) : msg_id(msg_id) 
    { }
    AuctionReply(MessageId msg_id, 
                 int price) : msg_id(msg_id), best_offer(price)
    { }
    AuctionReply(MessageId msg_id,
                 ITarget<AuctionReply> *seller,
                 ITarget<AuctionReply> *buyer) : msg_id(msg_id) 
    { parties.buyer = buyer; parties.seller = seller;}

    MessageId msg_id;
    union {
        int best_offer;    // Inquire, BeatenOffer
        struct { ITarget<AuctionReply> *seller; 
                 ITarget<AuctionReply> *buyer; } parties; // AuctionConcluded
    };
};
struct AuctionMessage {
    AuctionMessage() : msg_id(None) 
    { }
    AuctionMessage(MessageId msg_id, ITarget<AuctionReply> *reply_to) : 
        msg_id(msg_id), reply_to(reply_to)
    { }
    AuctionMessage(MessageId msg_id, int price, ITarget<AuctionReply> *reply_to) :
        msg_id(msg_id), price(price), reply_to(reply_to)
    { }

    MessageId msg_id;
    
    ITarget<AuctionReply> * reply_to; // Offer, Inquire
    int price;                        // Offer
};

We anticipate that each actor will be using a single mailbox for all incoming messages, and these two struct types define the payloads. The auction actor will use AuctionMessage, while the seller and client will both use AuctionReply, as is the case in the F# version.

The second thing we come to is a definition of the actor class – the VS2010 runtime comes with a class ‘agent’ built in, but it’s a much more general concept than the traditional actor, which is expected to have a mailbox for communication. Agents define an isolation pattern, but does not go into whether a single mailbox should be used, or an Axum-like collection of channels is preferable. In this case, we’ll try to stay close to the F# original and use a mailbox (actually, it will be more like Erlang’s than an F# mailbox, but that’s of secondary relevance here)

This is what actor<T> looks like (T is the mailbox payload type):

template<class Payload>
class actor : public agent
{
public:
    operator ITarget<Payload> *() { return &inbox; }
    operator ITarget<Payload> &() { return inbox; }
protected:
    Payload receive() { return Concurrency::receive(inbox);}
    bool tryreceive(Payload &msg, unsigned int timeout)
    {
        try 
        {
            msg = Concurrency::receive(inbox, timeout);
            return true;
        } 
        catch (operation_timed_out)
        {
            return false; 
        } 
    }

private:
    unbounded_buffer<Payload> inbox;
};

Note that the only public member is the conversion operators: this is how we force all outsiders to interact with the actor only by sending messages. We also define two custom receive operations for actor implementations to use to access the inbox, which is otherwise hidden (this is where our solution departs from F# and looks more like Erlang).

There’s no implementation of the run() method, that is still left to the actual actors to implement. Let’s take a look at the simplest one, the one representing sellers:

class Seller : public actor<AuctionReply>
{
protected:
    void run()
    {
        while (true)
        switch (receive().msg_id)
        {
        case AuctionConcluded:
        case AuctionFailed:
            done(agent_done);
            return;
        }

    }
};

We don’t actually do much in the seller (in this simplified example), just wait around for the auction to finish. In the positive case, we might more realistically try to contact the buyer to arrange for product delivery and payment, but that’s ignored here. When the auction is over, we set the agent state to ‘agent_done’ and exit. C++ agents can return from their run() method without the agent itself being considered “dead,” so done() is used to signal the end of its lifetime and allow anyone waiting for it to finish to proceed.

The auction actor is a lot more complex, so let us examine its run() method in segments. The body is generally speaking an infinite loop, from which we break when the auction is over and all sellers and clients are notified.

At the top of the loop, we calculate how much time remains before the auction is over, measured in milliseconds. tryreceive(), which is the methods we just defined in actor<T>, allows us to wait for a message but timeout if it doesn’t arrive within a certain interval. It returns ‘true’ if a message was received:

clock_t remaining = end_at - clock();
auto msg = AuctionMessage();

if (tryreceive(msg, remaining))
{
    switch (msg.msg_id)
    {

Where F# uses pattern matching to distinguish the messages, we use a switch statement. The Inquire case is the simplest, we just need to send back the current highest bid:

case Inquire:
    {
        asend(msg.reply_to, AuctionReply(Status, max_bid));
        break;
    }

The mailbox to send the reply to came with the message, as an ITarget<AuctionReply> pointer. Generally speaking, it is not safe to send pointers in messages between agents, but ITarget and ISource pointers are safe to use in this fashion.

Handling an offer is somewhat more complex:

case Offer:
    {
        if (msg.price >= max_bid + min_increment)
        {
            if (max_bidder != NULL) asend(max_bidder, AuctionReply(BeatenOffer, msg.price));
            max_bidder = msg.reply_to;
            max_bid = msg.price;
            asend(msg.reply_to, AuctionReply(BestOffer));
        }
        else
        {
            asend(msg.reply_to, AuctionReply(BeatenOffer, max_bid));
        }
        break;
    }

The auction agent has a minimum increment so that you discourage one-cent increases. Thus, if the new offer is higher than the last plus the increment, we have a new highest bid, and the code notifies the previous high bidder that his offer has been beaten before notifying the new bidder that he has the highest bid. If the new bid is too low, we just tell the bidder that it was not good enough.

Once the auction has timed out, the code will notify the seller of the auction result (it fails if and only if the reserve price is not met) and then waits for any further offers coming in after the close. If there is ever a 3s interval between after-hours offers, the auction agent shuts down. Here’s a potential bug in the application, which I leave for you to correct.

if (max_bid >= min_bid)
{
    auto reply = AuctionReply(AuctionConcluded, &seller, max_bidder);
    asend(max_bidder, reply);
    asend(seller, reply);
    printf("auction succeeded, highest bid was %d\n", max_bid);
}
else
{
    printf("auction failed, highest bid was %d\n", max_bid);
    if (max_bidder != NULL) asend(max_bidder, AuctionReply(AuctionOver));
    asend(seller, AuctionReply(AuctionFailed));
}

while (true)
{
    if (tryreceive(msg, 3000))
    {
        switch (msg.msg_id)
        {
        case Offer:
            asend(msg.reply_to, AuctionReply(AuctionOver));
            break;
        }
    }
    else
    {
        done(agent_done);
        return;
    }

}

There is a little bit of state in the auction agent (whether or not the auction has closed), but the bidder client is a lot more stateful. There are three distinct states: i) before it gets the status information from the auction, ii) it has not made the highest bid, and iii) it has the made the highest bid.

The first state is coded at the top of the run() method:

asend(auction, AuctionMessage(Inquire, *this));

auto initial_status = receive();

Once we’re past these two lines of code, we’re in ii) and are going to attempt to make a bid until we’re notified that ours is the highest (or that the auction is over). Where F# uses recursion, we’ll take the normal C++ approach and loop:

bool is_done = false;
int max = initial_status.best_offer;
int current;

while (!is_done)
{
    if (max > top) { Log("Too high for me!"); done(agent_done); return; }

    current = max + increment;
    Concurrency::wait(rand_in_range(1, 1000));
    Log("making an offer at $", current);
    asend(auction, AuctionMessage(Offer, current, *this));

The code injects a random delay just to make the simulation more interesting. This would correspond to the reaction time of a bidder in a real auction.

Once the offer is made, all we can do is wait to be notified whether we’re in state ii or iii:

bool has_best;

do
{
    has_best = false;

    auto reply = receive();
    switch(reply.msg_id)
    {
    case BestOffer:
        has_best = true; max = current;
        break;
    case BeatenOffer:
        max = reply.best_offer;
        break;
    case AuctionConcluded:
    case AuctionOver:
        is_done = true;
        break;
    }

} while (has_best);

The code could probably be simpler, but the key is to not go back and make another offer after we have received ‘BestOffer,’ or the bidder would be bidding against itself. Instead, after a best offer, the code waits for another message. In reality, it should never be getting ‘BestOffer’ twice in a row, so the loop is perhaps confusing in its attempt to reduce code size.

Once the auction is over, and the bidder has been notified of it, the only thing that remains is to exit:

done(agent_done);
In a more realistic scenario, the winning buyer would also want to contact the seller to arrange for exchange of goods and money.

The code is almost done – we just have to get all these agents started and make sure the program doesn’t terminate until they all have a chance to finish:

int _tmain(int argc, _TCHAR* argv[])
{
    const int CLIENT_COUNT = 15;

    agent * agents[CLIENT_COUNT+2];

    Seller seller;
    seller.start();

    Auction auction(seller, 100, clock()+6000);
    auction.start();

    agents[0] = &seller;
    agents[1] = &auction;
    for (int i = 0; i < CLIENT_COUNT; i++)
    {
        agents[i+2] = new Client(auction, i, rand_in_range(5, 25), rand_in_range(80, 450));
        agents[i+2]->start();
    }

    agent::wait_for_all(CLIENT_COUNT+2, agents);

    return 0;
}
We need to collect a reference to all our agents in an array and wait for them, because we don’t know in what order they will finish. My code is a little bit sloppy in that it doesn’t delete the client instances that are created on the heap, but the process is going away, so in this particular case, it doesn’t matter. As you can see, the auction is run for 6,000 ms, and the min acceptable bid is $100.

One really irritating fact about this sample is that it won’t scale to large numbers of agents – unlike F# async workflows and Axum’s asynchronous methods, C++ does not have a mechanism for the compiler to make control-flow construct pause. That means that each of our agents will eat up a thread until it finishes, putting a ceiling on how many we can have. An Axum-based version of this that I wrote had no trouble with 10,000 bidders, but that won’t work so easily with the C++ version.

In an earlier post, I discussed what to do about it using networks, which are just as scalable as F# workflows or Axum async methods, but won’t result in code that is nearly as pretty.

Anyway, I hoped to show an example of how to leverage the agents library to build components that are stateful. For more stateless situations, building networks and putting the logic in transform and call nodes is a more scalable and intuitive solution.

As a parting thought, I want to draw your attention to one simple fact: we just wrote an application with 17 agents working together on a problem in parallel, interacting in non-trivial patterns updating state and arriving at a shared understanding of “their world” without a single lock, mutex, or critical section.

Actors in F#

It’s been a while since I posted anything here, mainly because we’ve been busy on the Axum team blog over the last few weeks and months. Inspired by this post by Matthew Podwysocki, I thought it would be interesting to show actors in F#, which are closely related to the Axum model.

PingPong is a fairly common micro-benchmark for message-passing: it is used because it measures the pure overhead of passing messages, with the logic in the actors doing no interesting work. It is a trivial piece of code, but very educational in that it shows a very simple framework for creating actors that go back and forth synchronizing their work. It is easy to expand on, etc.

Matthew’s blog shows the Erlang code and then the Axum code. Let’s also consider F#, which like Erlang has built-in support for actors, and comes from a functional tradition. The PingPong example looks like this in F#:

open System

type message = Finished | Msg of int * MailboxProcessor<message>

let ping iters (outbox : MailboxProcessor<message>) =
    MailboxProcessor.Start(fun inbox -> 
        let rec loop n = async { 
            if n > 0 then
                outbox.Post( Msg(n, inbox) )
                let! msg = inbox.Receive()
                Console.WriteLine("ping received pong")
                return! loop(n-1)
            else
                outbox.Post(Finished)
                Console.WriteLine("ping finished")
                return ()}
        loop iters)
            
let pong () =
    MailboxProcessor.Start(fun inbox -> 
        let rec loop () = async { 
            let! msg = inbox.Receive()
            match msg with
            | Finished -> 
                Console.WriteLine("pong finished")
                return ()
            | Msg(n, outbox) -> 
                Console.WriteLine("pong received ping")
                outbox.Post(Msg(n, inbox)
return! loop() } loop()) let ponger = pong() do (ping 10 ponger) |> ignore

The structure and amount of code in this example is very similar to the Erlang code and is similarly hard/easy to read, depending on your perspective.

The mailbox processor design follows the traditional actor-oriented pattern and is therefore a bit different than the approach we’re taking in Axum. Notice that rather than establishing a channel between ping and pong, we’re sending the mailbox to the other side with each message.

One of the advantages of this model over the Axum model is that any number of clients can communicate with an actor, all that is needed is access to the mailbox reference. This means that one single pong actor can service the requests of many ping actor, at least if the implementation is completely stateless.

The mailbox model also has some disadvantages: it is hard to reason and orchestrate the interactions between n client actors when you cannot distinguish them, and any hint of statefulness ruins the scenario. In the pong case, the state we have is whether the ‘Finished’ message has been received. Once one pinger sends it, pong will stop servicing others, who will never know. In the channel model, there’s an explicit agreement between only two parties on how they both will behave, an agreement that can be checked at runtime and, in many cases, at compile time.

Another annoyance is that mailboxes do not have any clear endpoints – I can create a mailbox and start receiving messages from it. With the channel model, each side of the channel is unique and the type system takes care of separating them: you send from one and receive from the other, and ne’er the twain shall be confused.

As you can tell, I have a certain bias, but that doesn’t mean I’m down on the F# model. In fact, I love F# and hope there is a way we can combine its conciseness with the, in my humble opinion, safer Axum model for channel-based communication.

You should download and try Axum, but do also play with F# and its actor-based API. I’d love to hear your analysis of the relative strengths and weaknesses of the two models.

Niklas

Maestro Blog Available

In my last post, I lamented that there was no Maestro-specific blog, but no sooner had I spoken…

Josh Phillips got us set up with something specific to Maestro, where we’ll be posting most things related to this incubation effort.

Isolation in Maestro

As noted in the Dr Dobb's article, Maestro is primarily about establishing isolation domains so that we can cut down on the number of undocumented dependencies between components. With a language like C# or VB, any two references of type T could be referring to the same object, and if you consider a whole object graph, you have to keep track of all the references within the graph. In C++, you don't quite know where your pointers have been, or what type they started out as, so the problem is even worse there.

Sometimes, keeping track of references in your own ad hoc way is easy to do, for example when you have a very small program that doesn't call into libraries you don't know much about and you're the only developer working on it. Or, maybe you have really spent a lot of time on data design and carefully architected your application to avoid concurrency issues. If so, what happens when you start on the next version of the software, suddenly under customer pressure to quickly provide ever-increasing value?

It is more or less against the nature of object-oriented languages to restrict access to objects in the ways that would make programming parallel programs easier and safer, so we need to look elsewhere for inspiration.

There are several places to look -- functional languages, for example, offer a great solution by not allowing side-effects. Without side-effects, there's no reader/writer competition and data races cease to exist as a concern. Of course, most interesting computer activities are all about side-effects, so we need to escape the model from time to time. That doesn't diminish the value of the functional approach to programming -- you have significantly restricted the number of areas in your application that you have to manage yourself, which is very valuable in itself unless you are a theorist for whom only a completely pure model is acceptable.

Inspiration from the Web

This need to escape the model is not what causes us to look elsewhere, it is the fact that all the mainstream platforms are unsuitable for functional programming, as they have been designed with imperative languages in mind. With Maestro, we instead looked at the web for inspiration -- it also offers an isolation model, based on separating address spaces. Simply, if a pointer isn't valid in your address space, and you cannot send it to another, you don't have to worry as much about aliasing.

Of course, separated address spaces has a very high overhead, so we're trying to use the model rather than the implementation, letting a compiler enforce the constraints rather than the OS (compilers are particularly good at that).

Domains


In Maestro, the key isolation concept is a domain, which limits the runtime scope of data to its compile-time scope. In other words, objects that are created within a particular domain don't escape it. The only thing that may escape a domain is copies of its data or instances of immutable types (which .NET doesn't have a lot of, but String is an example).

A domain looks like this:

domain D1
{

    object obj = new object();
    string str = "Hello!";
}

You cannot call a method on a domain from outside it -- all its methods are either private or protected; the only thing you can do from the outside is create the domain:

var d = new D1();

Agents

So how do you manipulate the state? After all, data we cannot reach is just a waste of memory. We give you access to domain data via agents, which run on a thread that is different from the "caller." Agents are active components, while domains are inactive. This means that agents may have their own control-flow and act independent of the client that created it.

Agents also cannot have their methods called from outside the body of the agent. In fact, agent instances are not created using a constructor, nor do we ever have the opportunity to hold a reference to an agent (thus, reflection-based invocations are harder). Instead, when we create an agent instance, the Maestro runtime established a communication channel for us to use when talking to the agent. This is called the agent's primary channel, which is explicitly typed in the agent declaration:

domain D1
{

    object obj = new object();
    string str = "Hello!";

    agent A1 : channel C1
    {
        A1()
        {
            var startWith = receive(PrimaryChannel::FirstMessage);
            ...
            PrimaryChannel::Result <-- 10;
        }
    }

}

As you can see, this agent has a channel type called 'C1' and starts its work by receiving a message from its primary channel. Receive is a built-in function of Maestro and is one of three ways to receive messages coming from outside a domain. The agent ends by sending the value '10' as the result of its work.

As declared, A1 instances only have access to immutable domain state. While the string instance is immutable, the reference 'str' itself is not, so A1 does not have access to anything in D1. Because they don't, A1 instances can safely run in parallel with all other agent instances inside our outside the domain.

We can give it access to domain state by adding a keyword to the agent declaration:

domain D1
{

    object obj = new object();
    string str = "Hello!";

    agent A1 : channel C1
    {
        A1()
        {
            var startWith = receive(PrimaryChannel::FirstMessage);
            ...
            PrimaryChannel::Result <-- 10;
        }

    }

    reader agent A2 : channel C1
    {
        A2()
        {
            var startWith = receive(PrimaryChannel::FirstMessage);
            ...
            String myStr = str;  // can read
            str = “new string”   // error: but cannot write
            PrimaryChannel::Result <-- 10;
        }
    }
}

Unlike instances of A1, instances of A2 may read the domain fields and the instances they refer to and may use them in their work. They may, however, not modify either the fields or the instances they refer to. To do so, the agent has to be declared a 'writer':

domain D1
{

    object obj = new object();
    string str = "Hello!";

    agent A1 : channel C1
    {
        A1()
        {
            var startWith = receive(PrimaryChannel::FirstMessage);
            ...
            PrimaryChannel::Result <-- 10;
        }
    }

    reader agent A2 : channel C1
    {
        A2()
        {
            var startWith = receive(PrimaryChannel::FirstMessage);
            ...
            String myStr = str;  // can read
            str = “new string”   // error: but cannot write
            PrimaryChannel::Result <-- 10;
        }
    }

    writer agent A3 : channel C1
    {
        A3()
        {
            var startWith = receive(PrimaryChannel::FirstMessage);
            ...
            String myStr = str;  // can read
            str = “new string”   // and write
            PrimaryChannel::Result <-- 10;
        }
    }
}

Here, A3 may change the values of 'obj' and 'str' or modify the instances they refer to (in this case, both are immutable, but 'obj' could point to something that isn't later on). All instances of A1 can still run without coordination with other agents, but instances of A2 and A3 must coordinate their executions.

The reader / writer attribution is used to do this -- as many A2 instances as are available may run in parallel as long as no A3 instance is running. Only one instance of A3 may be executing code at any given point in time.

How to agents yield to each other, then? They do so by receiving messages. Waiting for a message means giving up your execution rights until the message is available. Thus, all coordination between agents is achieved via message-passing.


The Maestro agents concept is very much related to C++ agents, which I discussed at and after PDC. In managed code, we have a lot more infrastructure at our disposal to enforce constraints. For example, creating a new domain language is much more reasonable for .NET than for Win32.


In this post, I didn't go into detail on how to define the channels that agents use to coordinate their work, nor how Maestro interacts with the rest of .NET in a safe manner. There are a couple of other concepts that also need explanation, such as message-passing, data-flow, failure models, protocols and payload schema, but they will have to wait until another time.

 There's currently no Maestro-specific blog, so look for posts here and on the PFX team blog.

Maestro

I had expected that the first word on Maestro would come on this blog, but that's what happens when you take time between posts.

We first discussed it during PDC at the Thursday panel on parallel programming, we discussed it on Channel 9, and then Josh Phillips posted his excellent article about isolation and message-passing on the PFX team blog. It was really exciting that Dr. Dobbs contacted me yesterday to learn a bit more about Maestro, and it was really impressive to see how fast it was published. It was particularly fun to get the word out on the same day I held a web-based seminar on Maestro at the University of Illinois. Don't know if that's been published on their site yet.

As I pointed out in the DDJ interview and have said on this blog, I really think we can learn some things from how the web is programmed when we think about how we are going to scale manycore applications to (possibly) hundreds of hardware threads, or try to mix CPU-based code with GPU-based code with a non-consistent memory model between them, or integrate with cloud-based services under the software + services model. For raw performance, message-passing models are much too expensive, but to scale from local to server to cloud, we need a programming model that does not assume that pointers are valid beyond component boundaries. An actor-based model can, with the right runtime bindings underneath, provide exactly that.

If things go our way, we should be able to have some exciting things to tell about Maestro within the next couple of months. Until then, I'll try to introduce a few concepts of Maestro here on this blog just to keep things interesting...

C++ Agents


At PDC 2008, we talked a lot about agents-based programming, in particular during the native C++ talks on parallel programming. We showed how the agents library that we plan to ship in VS 2010 can be used to overlap I/O with computationally intensive processing to raise the throughput of a program. I thought I should spend a couple of posts talking a little about what that means and why it's a good thing.

Agent-based programming is a pattern that has been in use for quite some time, often referred to as actor-based programming instead. We use the term 'agent.'

http://en.wikipedia.org/wiki/Actor_model

The basic idea is to remove dependencies between the active components of a program by having them communicate solely via asynchronous message-passing. While truly avoiding shared memory is hard, given current programming languages (i.e. C++), there are things we can do to help get close to the benefits of avoiding shared memory (robustness, and sometimes, performance). If you follow the pattern with discipline, you should be able to avoid having to take locks or worry about data races. (There are many other concurrency problems to worry about, though, such as deadlocks...)

The data-flow messaging blocks we showed at PDC may seem to have little to do with agents, but they are essential. The agent class itself, though, wasn't discussed much.

As always, let's start with "hello world". Here's what it would look like with our agents API:

#include "stdafx.h"
#include <agents.h>

using namespace ::Concurrency;

class agent1 : public agent
{
protected:
    void run()
    {
        wprintf(L"Hello World!\n");
        done(agent_done);
    }
};

int _tmain(int argc, _TCHAR* argv[])
{
    agent1 a1;
    a1.start();

    agent::wait(&a1);

    return 0;
}

The agent class is simple: it has a run() method which does all the work and sets its own status to 'done'. We create the agent from _tmain(), start it and wait for it to finish. That's it. Notice that the agent class doesn't have any public members. That would be typical: most agents will only have a few public members, for example the constructors and accessor functions for the message-passing channels.

As cool as it was to see something printed to the screen, a single agent isn't very interesting, in all honesty. We have to have someone for it to communicate with. We'll add a second agent, and send some data between them. To do so, we have to add some form of communication channel...


class agent1 : public agent
{
public:
    unbounded_buffer<int> &GetOutput() { return m_output; }

    void SetInput(ISource<int> &input) { m_input = &input; }

protected:
    void run()
    {
        wprintf(L"Hello World!\n");
        asend(m_output, 10);
       
        int reply = receive(m_input);
        wprintf(L"Received (1): %d\n", reply);

        done(agent_done);
    }

private:
    ISource<int> *m_input;
    unbounded_buffer<int> m_output;
};

class agent2 : public agent
{
public:
    unbounded_buffer<int> &GetOutput() { return m_output; }

    void SetInput(ISource<int> &input) { m_input = &input; }

protected:
    void run()
    {
        int request = receive(m_input);
        wprintf(L"Received (2): %d\n", request);

        asend(m_output, request + 7);
       
        done(agent_done);
    }

private:
    ISource<int> *m_input;
    unbounded_buffer<int> m_output;
};

int _tmain(int argc, _TCHAR* argv[])
{
    agent1 a1;
    agent2 a2;
    a1.SetInput(a2.GetOutput());
    a2.SetInput(a1.GetOutput());
    a1.start();
    a2.start();

    agent::wait(&a1);

    return 0;
}

Which results in:

Hello World!
Received (2): 10
Received (1): 17

Note that the separate start of the agent makes more sense here, given that we need to link the two agents together before either of them should run.

Some may worry about the fact that waiting for data with a receive call surely must be holding the stack up. That is a valid concern, especially if you have many, many agents. In fact, because of the 'receive' calls, this little example uses three threads. Ouch!

That is why the data-flow network block 'call' is so important -- it allows us to route data to methods (in this case, a C++ lamda) and not hold on to any stack while we're waiting.

Let's look at how that works for our agent sample. The _tmain() method remains identical, but the agents are slightly larger:

class agent1 : public agent
{
public:
    unbounded_buffer<int> &GetOutput() { return m_output; }

    void SetInput(ISource<int> &input) { m_input = &input; }

    ~agent1() { delete m_reply; }

protected:
    void run()
    {
        wprintf(L"Hello World!\n");
        asend(m_output, 10);
       
       
m_input->link_target(m_reply = new call<int>([&] (int reply)
        {
            wprintf(L
"Received (1): %d\n", reply);
            done(agent_done);
        }));

    }

private:
    call<int> *m_reply;
    ISource<int> *m_input;
    unbounded_buffer<int> m_output;
};

class agent2 : public agent
{
public:
    unbounded_buffer<int> &GetOutput() { return m_output; }

    void SetInput(ISource<int> &input) { m_input = &input; }

    ~agent2() { delete m_request; }

protected:
    void run()
    {
       
m_input->link_target(m_request = new call<int>([&] (int request)
       
{
           
wprintf(L
"Received (2): %d\n", request);
           
asend(m_output, request + 7);
           
done(agent_done);
        
}));

    }

private:
    call<int> *m_request;
    ISource<int> *m_input;
    unbounded_buffer<int> m_output;
};

Instead of executing the receive call, we link up a call block that is allocated on the heap (it can't be on the stack, since that's what we're trying to get rid of here). Once the linking is done, we return from run() and give up the stack so it may be used for other things...

Since we have it on the heap, we also need to create a destructor to delete the call block. Note: once you get your hands on this, some of you may discover through experimentation that it is actually safe to delete the call block before returning from the lambda, but that's taking advantage of an implementation detail and I strongly discourage you from doing so.

That's it for right now, I think. Back to work...

My Wednesday PDC talk on Channel 9

If anyone is interested, I just found the Channel 9 link to my Wednesday talk on the Concurrency Runtime. As I said earlier, the topic is a deep dive, so it's rather dry...

http://channel9.msdn.com/pdc2008/TL22/

 

Last Day at PDC

The symposium panel Thursday went off pretty well. A much smaller crowd than at the other talks, but it was late in the program and people were starting heading home. It's not easy to compete with lunch, either.

Not a lot of controversy on the panel, we were mostly talking about futures in our different areas of interest. There were some really good questions from the audience, including some challenging the wisdom of message-passing.

Then, it was time to head home. PDC felt like a success, and I personally was really encouraged by all the interest in our technologies. It was really nice to see how much interest there still is in our investments into native code and C++.

PDC Day 3

I did my first talk Wednesday, "A Deep Dive Into the Concurrency Runtime," where I talked about how to extend the PPL and Agents native libraries and how to build your own scheduler with our runtime. A rather dry topic, I almost managed to bore myself. However, the room was filled to capacity and more, and it seemed to go over well. Lots of good deep questions followed.

"Ask the Experts" was the evening event where we met with customers over dinner (chili dogs and beer) and discussed hard problems related to concurrency. It was really a treat to meet so many customers directly and help them think through how and if they could leverage parallelism in their apps. Not every app should, and there were definitely some instances where I had to just say "stick with what you're already doing."

Many have commented on our agents and messaging APIs for C++ and been really excited about getting these capabilities so easily. It's hard to imagine it being a good experience if it weren't for C++ lambdas being added, so that's really the pivotal new feature.

Today is the last day, and we have a panel on "The Future of Parallel Programming" at noon. I'll be talking about avoiding shared state with agents-based programming and an incubation project we have going on at this time, 'Maestro." It is a language for programming agents, partitioning state and leveraging data-flow for concurrency driven by asynchronous operations. More about that later.

PDC 2008 Day 1

It's been a long time since I last attended PDC, but now it's in full swing.

Last night, I met with a group of customers and partners from Sweden and gave an impromptu presentation about manycore futures. Sweden is not a big country, but it seems the groups is the third largest at PDC, after the US and Canada. Who'd have thought...

Today's an exciting day with Ray Ozzie's keynote and the show floor opening in just a couple of hours. I look forward to all the customer interactions. We'll see how it turns out. My talks are Wednesday and Thursday, which seem an eternity away right now.

PDC 2008

Next week is PDC 2008. Presentations are done, demos are being rehearsed, and I'm excited to go to my first PDC in 15 years. Last time, I worked for a partner, not for Microsoft.

Anyway, my presentation "Deep Dive into the Concurrency Runtime' is Wednesday, and I'm talking about how to add functionality to our concurrency runtime. It's going to be a very technical talk, writing code rather than doing lots of fancy demos. It's all about what you can do with our runtime in native code, i.e. C++. Apart from the concurrency support, there are some really nice new features being added to C++ (all standards-based) that will make a ton of difference. Come to the native code sessions at PDC and see for yourself.

Oh, yes, if you come to the Thursday symposium on parallelism, I'm on the panel and will be talking a bit about agent-based programming. I'll expand on that later.

CCR and DevDiv's Concurrency Runtime

With the news about Intel's support for our concurrency runtime and my post about it yesterday, some may wonder how it relates to the Concurrency and Coordination Runtime (CCR), published as part of the MS Robotics' development kit.

The CCR, which I have blogged about before in my rather sparse column, currently offers both a specific programming model and the infrastructure to support it. Its programming model is compelling to some audiences while not fitting the needs of others. For example, it is only available for managed code, leaving C++ programmers without its capabilities. Likewise, the CCR doesn't help someone who wants to parallelize queries over objects or XML, while PLINQ is great at it.

The developer division concurrency runtime, on the other hand, is designed to serve as low-level infrastructure for a variety of programming models surfaced in libraries and languages.

Therefore, we are looking into how we can integrate the CCR functionality on top of the concurrency runtime once we have shipped the latter; it's important to us that it work well with other libraries that may also be used in an application, such as OpenMP, MPI, PLINQ, etc.

More Posts Next page »
Page view tracker