One thing that you need to consider when designing your Azure application is "what is the minimum deployment"? If your application has 5 different queues with a worker role to read each queue, we are going to want at least 2 instances per role (for some redundancy) giving you a minimum deployment of 5 packages and 10 instances. Hmmm, if you have very little traffic that seems a little extravagant; so let take a look at how we can scale down our requirements and potentially make deployment easier. If you aren’t used to multi-threaded programming; the first thing that probably springs to mind is to round robin all of our queues in one big loop:

    public override void Start()
    {
        while (true)
        {
            ProcessQueue1();
            ProcessQueue2();
            // ....
            ProcessQueueN();
        }
    }

Assume that queues 1 to N have different messages and require different processing – otherwise we would put them all in the same queue – right?

While this would work, we can do better. There a couple of things we should look to improve on:

  1. Idle CPU time: A lot of the time we are going to do something along the lines of; read from queue, write to table/blob/queue/SDS then delete the message from the queue. Now we might end up doing some processing of our message, but there is significant amount of network traffic going on. Every time we read/write to the storage system we are making an off machine call. This means that our application will block until the network call is completed - i.e. our node is doing nothing.
  2. Fault tolerance: If there is an error with one of the processors - say a poison message on Queue 2, then ProcessQueueN is never going to get checked.
  3. Simplicity: We might have a high priority queue that we want to check very frequently and a low priority queue that we don't check that often. Any logic we add to achieve this is going to get complex and won't be very easy to change through configuration.

By multi-threading our worker role, we should be able to solve all these issues. One thing to keep in mind is that there is only 1 CPU in the virtual machine that the CTP of Azure builds for you; so multithreading isn't going to help if we are doing lots of CPU bound processing. Oh, and one proc means that the parallel extensions won't help us here as they will only ever give us one thread - thanks to Daniel for confirming that over a beer at TechEd.

There are 2 types of people who don’t write multi-threaded code; those that don’t understand it and have never tried and those who really understand it and know how difficult it is to get right. Having said that, here are a few steps that will multi-thread our application without us having to step too far into the multi-threading minefield.

If you have followed Part 3, you will know that we currently have a nice class with an instance method that performs all our processing; this positions us very nicely to multi-thread our app. The biggest problem people get into when writing concurrent applications is protecting (or not protecting) access to shared resources (variables, files, queues …). If multiple threads access the same thing at the same time, there could be problems – a bit like two kids grabbing for the ketchup at the same time – there will be tears and ketchup everywhere. By moving our MessageQueue instance into our processor class and making it private should ensure that only that processor instance can access it. If we create one processor per thread and ensure that all the resources that that processor needs are contained in that processor object we no longer have any shared resources – every kid gets their own bottle of ketchup. This neatly lets us step over the first mine.

For every different queue/message type that we are going to be monitoring, we need to create a new IProcessMessages implementation. If we just want several threads monitoring the same queue, then we would just create several instances of our single processor type and execute each one on a separate thread.

Because we are going to have several processors executing, we are going to need to keep track of them, so lets create a type to do that:

class ProcessorRecord {

    /// <summary>
    /// The last time this thread completed some work
    /// </summary>
    public DateTime LastThreadTest { get; set; }

    /// <summary>
    /// The type that represents this processor
    /// </summary>
    private Type processorType;

    /// <summary>
    /// The instance of the processor type that we are wrapping
    /// </summary>
    public IProcessMessages Processor { get; private set; }

    public ProcessorRecord(Type type)
    {
        processorType = type;
        ResetProcessor();
    }

    /// <summary>
    /// Recreate the processor instance - used in the event of an error.
    /// </summary>
    public void ResetProcessor()
    {
        Processor = (IProcessMessages)Activator.CreateInstance(processorType);
        LastThreadTest = DateTime.Now;
    }
}

That is all reasonably straight forward. We use the type class rather than an instance of a processor because we will want to re-create the instance if an error occurs – see part 3 for more details.

Now we come to modifying our main loop. One change that is going to happen here is that we are going to use the main thread to periodically check the status of all our processors. This is better done here rather than in the GetHealthStatus function simply because we have more control over when it gets called. e.g. if GetHealthStatus gets called every 5 seconds and we don’t expect to hear from our threads for 20 seconds then we are executing the health check logic far to often – this wasn't a problem with our single threaded implementation, but now we have several time stamps to check. So the code for GetHealthStatus gets even simpler - so it just reports on the contents of a class variable:

private RoleStatus status;

public override RoleStatus GetHealthStatus()
{
    return status;
}

Our Start function is modified so it starts a thread per processor then to spend the rest of the apps lifetime checking their last reporting times:

public override void Start()
{
    status = RoleStatus.Healthy;

    ProcessorRecord [] processors = new ProcessorRecord[] { 
        new ProcessorRecord (typeof(ProcessQueue1)),
        new ProcessorRecord (typeof(ProcessQueue2)),
    };


    foreach (ProcessorRecord pr in processors) {
        Thread th = new Thread (RunProcessor);
        th.Start (pr);
    }

    // Now just monitor the health of the threads that we have created.
    //
    while (true)
    {
        Thread.Sleep(HEALTH_CHECK_SLEEP);
        RoleStatus localStatus = RoleStatus.Healthy;

        lock (dateTimeLock)
        {
            foreach (ProcessorRecord pr in processors)
            {
                if (lastHealthyReport.AddSeconds(DEFAULT_REPORT_TIMEOUT) < DateTime.Now)
                {
                    localStatus = RoleStatus.Unhealthy;
                    // No need to continue looking.
                    break;
                }
            }
        }
        status = localStatus;
    }
}

And finally the code that was in our Start method at the end of part 3, makes it into the RunProcessor method; this is the start point for our new thread.

public void RunProcessor (Object state) {
    ProcessorRecord pr = (ProcessorRecord)state;
    int sleepTimeIndex = 0;

    while (true)
    {
        try
        {
            if (pr.Processor.Process() == false)
            {
                Thread.Sleep(DEFAULT_SLEEP_TIME);
            }

            // Updating the date time only if an error is not occur
            lock (dateTimeLock)
            {
                pr.LastThreadTest = DateTime.Now;
            }

            // reset the sleep time
            sleepTimeIndex = 0;
        }
        catch (Exception ex)
        {
            RoleManager.WriteToLog("Error",
                "Error occured, trying again"
                + ex.ToString());
            pr.ResetProcessor();

            // Sleep using our back off
            sleepTimeIndex = Math.Min(sleepTimeIndex++, SLEEP_TIMES.Length - 1);
            Thread.Sleep(SLEEP_TIMES[sleepTimeIndex]);
        }
    }
}

That leaves us with the same reliability that we built into part 3, but enables us to run multiple processors on multiple threads. It isn’t too much of a stretch of the imagination to consider storing the information that the ProcessorRecord class need in table storage. This would be the name of the processor and potentially sleep time and how long the thread is allowed between report times before it goes unhealthy. This would allow us to configure at runtime the number and types of processors running on our worker roles – very useful if we start to see much more traffic of a particular message type than another – add some more threads rather than some more machine instances. More interestingly, it also means that we only have to build and deploy one package to Azure. This model might not suit everyone, but it is a great starting point – especially if you have a CTP token that only allows you to have one type of worker role!

Neil.