My fellow programming language enthusiast Matthew Podwysocki just posted another F#-based actor sample in his blog; it is a simulation of an auction that was first written in Scala. Last time, I followed his Axum Ping-Pong example with my own F#-based version, but this time, I’ll use his example to point to some of the new features available in Visual Studio 2010, which is available in beta form here. I also suggest reading Rick Molloy’s excellent MSDN article on agent programming in C++.

On to the sample code! I tried to follow the F# code as closely where possible, but there are obviously going to be big differences between the two. For no particular reason, I’ll use a strange mix of PascalCasing and stl_casing in the code below.

First, after the obligatory file inclusions, we have to deal with the declaration of message payloads; unfortunately, whereas F# has discriminated unions and their declaration is extremely concise, we have to roll our own in C++. Thus, the message declarations look like this:

enum MessageId { None, Inquire, Offer, Status, BestOffer, 
                 BeatenOffer, AuctionConcluded, AuctionFailed, AuctionOver };

struct AuctionReply {
    AuctionReply() : msg_id(None) 
    { }
    AuctionReply(MessageId msg_id) : msg_id(msg_id) 
    { }
    AuctionReply(MessageId msg_id, 
                 int price) : msg_id(msg_id), best_offer(price)
    { }
    AuctionReply(MessageId msg_id,
                 ITarget<AuctionReply> *seller,
                 ITarget<AuctionReply> *buyer) : msg_id(msg_id) 
    { parties.buyer = buyer; parties.seller = seller;}

    MessageId msg_id;
    union {
        int best_offer;    // Inquire, BeatenOffer
        struct { ITarget<AuctionReply> *seller; 
                 ITarget<AuctionReply> *buyer; } parties; // AuctionConcluded
    };
};
struct AuctionMessage {
    AuctionMessage() : msg_id(None) 
    { }
    AuctionMessage(MessageId msg_id, ITarget<AuctionReply> *reply_to) : 
        msg_id(msg_id), reply_to(reply_to)
    { }
    AuctionMessage(MessageId msg_id, int price, ITarget<AuctionReply> *reply_to) :
        msg_id(msg_id), price(price), reply_to(reply_to)
    { }

    MessageId msg_id;
    
    ITarget<AuctionReply> * reply_to; // Offer, Inquire
    int price;                        // Offer
};

We anticipate that each actor will be using a single mailbox for all incoming messages, and these two struct types define the payloads. The auction actor will use AuctionMessage, while the seller and client will both use AuctionReply, as is the case in the F# version.

The second thing we come to is a definition of the actor class – the VS2010 runtime comes with a class ‘agent’ built in, but it’s a much more general concept than the traditional actor, which is expected to have a mailbox for communication. Agents define an isolation pattern, but does not go into whether a single mailbox should be used, or an Axum-like collection of channels is preferable. In this case, we’ll try to stay close to the F# original and use a mailbox (actually, it will be more like Erlang’s than an F# mailbox, but that’s of secondary relevance here)

This is what actor<T> looks like (T is the mailbox payload type):

template<class Payload>
class actor : public agent
{
public:
    operator ITarget<Payload> *() { return &inbox; }
    operator ITarget<Payload> &() { return inbox; }
protected:
    Payload receive() { return Concurrency::receive(inbox);}
    bool tryreceive(Payload &msg, unsigned int timeout)
    {
        try 
        {
            msg = Concurrency::receive(inbox, timeout);
            return true;
        } 
        catch (operation_timed_out)
        {
            return false; 
        } 
    }

private:
    unbounded_buffer<Payload> inbox;
};

Note that the only public member is the conversion operators: this is how we force all outsiders to interact with the actor only by sending messages. We also define two custom receive operations for actor implementations to use to access the inbox, which is otherwise hidden (this is where our solution departs from F# and looks more like Erlang).

There’s no implementation of the run() method, that is still left to the actual actors to implement. Let’s take a look at the simplest one, the one representing sellers:

class Seller : public actor<AuctionReply>
{
protected:
    void run()
    {
        while (true)
        switch (receive().msg_id)
        {
        case AuctionConcluded:
        case AuctionFailed:
            done(agent_done);
            return;
        }

    }
};

We don’t actually do much in the seller (in this simplified example), just wait around for the auction to finish. In the positive case, we might more realistically try to contact the buyer to arrange for product delivery and payment, but that’s ignored here. When the auction is over, we set the agent state to ‘agent_done’ and exit. C++ agents can return from their run() method without the agent itself being considered “dead,” so done() is used to signal the end of its lifetime and allow anyone waiting for it to finish to proceed.

The auction actor is a lot more complex, so let us examine its run() method in segments. The body is generally speaking an infinite loop, from which we break when the auction is over and all sellers and clients are notified.

At the top of the loop, we calculate how much time remains before the auction is over, measured in milliseconds. tryreceive(), which is the methods we just defined in actor<T>, allows us to wait for a message but timeout if it doesn’t arrive within a certain interval. It returns ‘true’ if a message was received:

clock_t remaining = end_at - clock();
auto msg = AuctionMessage();

if (tryreceive(msg, remaining))
{
    switch (msg.msg_id)
    {

Where F# uses pattern matching to distinguish the messages, we use a switch statement. The Inquire case is the simplest, we just need to send back the current highest bid:

case Inquire:
    {
        asend(msg.reply_to, AuctionReply(Status, max_bid));
        break;
    }

The mailbox to send the reply to came with the message, as an ITarget<AuctionReply> pointer. Generally speaking, it is not safe to send pointers in messages between agents, but ITarget and ISource pointers are safe to use in this fashion.

Handling an offer is somewhat more complex:

case Offer:
    {
        if (msg.price >= max_bid + min_increment)
        {
            if (max_bidder != NULL) asend(max_bidder, AuctionReply(BeatenOffer, msg.price));
            max_bidder = msg.reply_to;
            max_bid = msg.price;
            asend(msg.reply_to, AuctionReply(BestOffer));
        }
        else
        {
            asend(msg.reply_to, AuctionReply(BeatenOffer, max_bid));
        }
        break;
    }

The auction agent has a minimum increment so that you discourage one-cent increases. Thus, if the new offer is higher than the last plus the increment, we have a new highest bid, and the code notifies the previous high bidder that his offer has been beaten before notifying the new bidder that he has the highest bid. If the new bid is too low, we just tell the bidder that it was not good enough.

Once the auction has timed out, the code will notify the seller of the auction result (it fails if and only if the reserve price is not met) and then waits for any further offers coming in after the close. If there is ever a 3s interval between after-hours offers, the auction agent shuts down. Here’s a potential bug in the application, which I leave for you to correct.

if (max_bid >= min_bid)
{
    auto reply = AuctionReply(AuctionConcluded, &seller, max_bidder);
    asend(max_bidder, reply);
    asend(seller, reply);
    printf("auction succeeded, highest bid was %d\n", max_bid);
}
else
{
    printf("auction failed, highest bid was %d\n", max_bid);
    if (max_bidder != NULL) asend(max_bidder, AuctionReply(AuctionOver));
    asend(seller, AuctionReply(AuctionFailed));
}

while (true)
{
    if (tryreceive(msg, 3000))
    {
        switch (msg.msg_id)
        {
        case Offer:
            asend(msg.reply_to, AuctionReply(AuctionOver));
            break;
        }
    }
    else
    {
        done(agent_done);
        return;
    }

}

There is a little bit of state in the auction agent (whether or not the auction has closed), but the bidder client is a lot more stateful. There are three distinct states: i) before it gets the status information from the auction, ii) it has not made the highest bid, and iii) it has the made the highest bid.

The first state is coded at the top of the run() method:

asend(auction, AuctionMessage(Inquire, *this));

auto initial_status = receive();

Once we’re past these two lines of code, we’re in ii) and are going to attempt to make a bid until we’re notified that ours is the highest (or that the auction is over). Where F# uses recursion, we’ll take the normal C++ approach and loop:

bool is_done = false;
int max = initial_status.best_offer;
int current;

while (!is_done)
{
    if (max > top) { Log("Too high for me!"); done(agent_done); return; }

    current = max + increment;
    Concurrency::wait(rand_in_range(1, 1000));
    Log("making an offer at $", current);
    asend(auction, AuctionMessage(Offer, current, *this));

The code injects a random delay just to make the simulation more interesting. This would correspond to the reaction time of a bidder in a real auction.

Once the offer is made, all we can do is wait to be notified whether we’re in state ii or iii:

bool has_best;

do
{
    has_best = false;

    auto reply = receive();
    switch(reply.msg_id)
    {
    case BestOffer:
        has_best = true; max = current;
        break;
    case BeatenOffer:
        max = reply.best_offer;
        break;
    case AuctionConcluded:
    case AuctionOver:
        is_done = true;
        break;
    }

} while (has_best);

The code could probably be simpler, but the key is to not go back and make another offer after we have received ‘BestOffer,’ or the bidder would be bidding against itself. Instead, after a best offer, the code waits for another message. In reality, it should never be getting ‘BestOffer’ twice in a row, so the loop is perhaps confusing in its attempt to reduce code size.

Once the auction is over, and the bidder has been notified of it, the only thing that remains is to exit:

done(agent_done);
In a more realistic scenario, the winning buyer would also want to contact the seller to arrange for exchange of goods and money.

The code is almost done – we just have to get all these agents started and make sure the program doesn’t terminate until they all have a chance to finish:

int _tmain(int argc, _TCHAR* argv[])
{
    const int CLIENT_COUNT = 15;

    agent * agents[CLIENT_COUNT+2];

    Seller seller;
    seller.start();

    Auction auction(seller, 100, clock()+6000);
    auction.start();

    agents[0] = &seller;
    agents[1] = &auction;
    for (int i = 0; i < CLIENT_COUNT; i++)
    {
        agents[i+2] = new Client(auction, i, rand_in_range(5, 25), rand_in_range(80, 450));
        agents[i+2]->start();
    }

    agent::wait_for_all(CLIENT_COUNT+2, agents);

    return 0;
}
We need to collect a reference to all our agents in an array and wait for them, because we don’t know in what order they will finish. My code is a little bit sloppy in that it doesn’t delete the client instances that are created on the heap, but the process is going away, so in this particular case, it doesn’t matter. As you can see, the auction is run for 6,000 ms, and the min acceptable bid is $100.

One really irritating fact about this sample is that it won’t scale to large numbers of agents – unlike F# async workflows and Axum’s asynchronous methods, C++ does not have a mechanism for the compiler to make control-flow construct pause. That means that each of our agents will eat up a thread until it finishes, putting a ceiling on how many we can have. An Axum-based version of this that I wrote had no trouble with 10,000 bidders, but that won’t work so easily with the C++ version.

In an earlier post, I discussed what to do about it using networks, which are just as scalable as F# workflows or Axum async methods, but won’t result in code that is nearly as pretty.

Anyway, I hoped to show an example of how to leverage the agents library to build components that are stateful. For more stateless situations, building networks and putting the logic in transform and call nodes is a more scalable and intuitive solution.

As a parting thought, I want to draw your attention to one simple fact: we just wrote an application with 17 agents working together on a problem in parallel, interacting in non-trivial patterns updating state and arriving at a shared understanding of “their world” without a single lock, mutex, or critical section.