Something that has cropped up a few times — certainly enough times for me to want to do something about it — is the lack of some basic utilities in CCR, not things that necessarily need to find their way into the CCR package, but more some small utility classes that should be in any CCR developers back pocket when they need them.

The first one that I will address is logging. A number of times I've been shown some code and asked "why is my code not as fast as I would like?", and the core algorithm and implementation is just fine, but scattered around the code are lines like…

Log(string.Format("{0}: {1}: Starting New Task", Thread.CurrentThread.
    ManagedThreadId, DateTime.UtcNow));

Which is entirely reasonable, a little investigation into the implementation of Log can shows the real problem

static object _logLock = new object();

static void Log(string msg)
{
    lock(_logLock)
    {
        using (var stream = new StreamWriter("log.txt", true))
        {
            stream.WriteLine(msg);
        }
    }
}

There are two big problems here, and they go hand in hand, one is opening and closing a file everytime a message needs to be logged, and the second, probably introduced by the first decision is to call lock. This of course, ensures that the access to the file is safe, but at the cost of introducing lock contention. Potentially every thread in the dispatcher can be stuck in the one place.

Sometimes on the robotics team we don't realise that this is an issue, in DSS if you want to log there are convenient helper functions on the DsspServiceBase, we forget that CCR doesn't have these.

But CCR is all about solving this problem! A very simple class can encapsulate most basic logging requirements. All we need to do is post messages to a port, and have write them out in an exclusive handler…

We start by creating a small CCR service. This is a common CCR pattern where a class is created to perform some task, but its creator uses a factory function (a static public method commonly called Create) which returns the PortSet that it can communicate with the service on, rather than a classic reference to the class.

The Service

public sealed class StreamLogger : CcrServiceBase
{
    StreamLoggerOperations _port = new StreamLoggerOperations();
    Port<EmptyValue> _tokenPort = new Port<EmptyValue>();
    StreamWriter _writer;

    private StreamLogger(StreamWriter writer, DispatcherQueue taskQueue)
        : base(taskQueue)
    {
        _writer = writer;
    }

In this case the Create method takes the StreamWriter on which logging is to occur, and a DispatcherQueue to use for its own arbiters. Create returns a StreamLoggerOperations object, which we'll look at below

    public static StreamLoggerOperations Create(StreamWriter writer, 
    DispatcherQueue taskQueue)
    {
        if (writer == null)
        {
            throw new ArgumentNullException("writer");
        }
        if (taskQueue == null)
        {
            throw new ArgumentNullException("taskQueue");
        }

        var logger = new StreamLogger(writer, taskQueue);

        logger.Init();

        return logger._port;
    }

Initialization shows us the only "trick" to this. I could have used an Interleave, and defined one Receiver in the Teardown group (for shutdown) and one Receiver in the Exclusive group (for logging messages). That's a great approach, but here I'm doing something a little different which will give me a slightly more explicit behavior on shutdown.

You may have noticed above that one of the private fields of the service as _tokenPort, what I am doing is creating a Join over that token port and the string port within the main operations portset. This allows me (by being careful to ensure that only one message is on the token port at a time) to control the way in which string messages posted to the operations port are handled. Each time a the Join fires (when there is at least one message on each port in the Join) that removes the only message on the token port, and a new message is only posted when the handler completes.

Of course, in order to get the ball rolling I have to post one token onto the token port.

    private void Init()
    {
        Activate<ITask>(
            Arbiter.JoinedReceive<string, EmptyValue>(true, _port, _tokenPort, 
    OnMsg),
            Arbiter.ReceiveWithIterator<Shutdown>(false, _port, OnShutdown)
        );

        //
        // Prime the token port
        //
        _tokenPort.Post(EmptyValue.SharedInstance);
    }

The message handler is simplicity itself, Only two things to note here,

  1. If the message is null or empty, I treat this as a command to flush the writer.
  2. The entire functionality of this handler is wrapped in a try...finally construct, and it is in the finally that the new token is posted, so regardless of what happens in handler there is always a token available to that the join can fire for the next message.
    private void OnMsg(string msg, EmptyValue token)
    {
        try
        {
            if (string.IsNullOrEmpty(msg))
            {
                _writer.Flush();
            }
            else
            {
                _writer.WriteLine(msg);
            }
        }
        finally
        {
            _tokenPort.Post(token);
        }
    }

The only other handler is for Shutdown. This is were reason for the token port becomes apparent. The first thing that the handler does is yield to a receiver on the token port, this does two things, firstly it waits for any message currently being logged to be completed and, secondly it prevents any more message from being logged.

Once the main logging handler is starved, the shutdown handler then removes any messages currently waiting, closes the writer and reports success back to its caller.

    private IEnumerator<ITask> OnShutdown(Shutdown shutdown)
    {
        try
        {
            //
            // steal the serialization token, this stops the OnMsg handler from
            // ever being called again.
            //
            yield return Arbiter.Receive(false, _tokenPort, EmptyHandler);

            //
            // drain the currently outstanding messages, any messages posted
            // after this point will be discarded.
            //
            DrainMsgPort();

            _writer.Flush();
            _writer.Close();
        }
        finally
        {
            shutdown.ResultPort.Post(SuccessResult.Instance);
        }
    }

This drains messages from the port, only draining the number that were on the port when the function was first called, it is easy to see how not having this kind of limit can create code that may not terminate.

    private void DrainMsgPort()
    {
        //
        // drain the number of messages that were available at the start of this 
        // method, anything that arrives later is left for the GC, otherwise
        // this may never terminate.
        //
        var max = _port.P0.ItemCount;
        for (int count = 0; count < max; count++)
        {
            var msg = _port.Test<string>();
            if (!string.IsNullOrEmpty(msg))
            {
                _writer.WriteLine(msg);
            }
        }
    }
}

The Operations Port

The operations port is every bit as important as the service code, it is the interface that you use to communicate with the service.

At its most basic level the  port is merely derived from PortSet<string, Shutdown> and that is all that is needed, but adding some helper functions makes it enormously more useable

public sealed class StreamLoggerOperations : PortSet<string, Shutdown>
{
    public void Log(string msg)
    {
        Post(msg);
    }

    public void Log(string format, params object[] args)
    {
        Post(string.Format(format, args));
    }

    public void Log(IFormatProvider provider, string format, params object[] 
    args)
    {
        Post(string.Format(provider, format, args));
    }

    public void Flush()
    {
        Post(string.Empty);
    }

    public SuccessFailurePort Shutdown()
    {
        var s = new Shutdown();

        Post(s);

        return s.ResultPort;
    }
}