Our look at the internals of my Azure Photo Mosaics program continues with this coverage of the use of Windows Azure Queues in the application. Windows Azure Queues are designed to facilitate inter-role communication within your Windows Azure applications, allowing you to decouple processing and thereby scale portions of the application independently. The Photo Mosaics program makes use the four Windows Azure queues highlighted in the architecture diagram below:
As you can see, each of the queues is positioned between two of the web or worker roles that handle the processing within the application.
As you can see the queues are paired: imagerequest and imageresponse each contain one message corresponding, respectively, to the initiation and completion of a client request. slicerequest and sliceresponse contain possibly multiple messages for each client request (depending on the value of ‘number of slices’ input by the user), but there is a one-to-one correspondence between messages in slicerequest and sliceresponse.
Windows Azure Queues are one of the three main types of storage available within a Windows Azure Storage account (and in previous blog posts we looked at the use of tables and blobs in the context of the Photo Mosaics application). You can create an unlimited number of queues per storage account, and each queue can store an unlimited number of messages (up to the 100TB limit of a storage account, of course). Additionally queues have the following restrictions/attributes:
That then leads to the question of what happens if the message was processed correctly, but the role processing it crashed in the instant immediately before it was to explicitly delete the message. Won’t the message reappear to potentially be processed again? Yes, indeed, and that’s something you have to plan for. Essentially, you need to make sure that the operations you perform are idempotent, meaning that regardless of whether the message is processed once or multiple times, the outcome is identical.
But what if there’s a bug, and the message is never processed successfully? Won’t it continually reappear on the queue only to be part of a vicious cycle of failure – a poison message if you will? That’s where two additional message attributes come in:
For the level of activity in the Photo Mosaics application, these limits well exceed the expected load; however, 500 messages for a large application or even a small one that responds to real-time stimuli (like sensors) is well within the realms of possibility. In scenarios like this where you need to scale your queue storage, you’ll want to consider load balancing over multiple queues or even multiple queues in multiple storage accounts to get the throughput you need.
For the Photo Mosaics application, I’ve built a few abstractions over Windows Azure queues and messages that may initially seem overly complex, but actually help simplify and standardize their usage within the Web and Worker roles.
In a previous post on the storage architecture, I’ve already introduce the QueueAccessor class as part of a data access layer (CloudDAL). QueueAccessor is a static class that maintains a reference to the four queues used in the application, namely, imagerequest, imageresponse, slicerequest, and sliceresponse. The references are of a new type, ImageProcessingQueue, which I’ve defined in the Queue.cs file within the CloudDAL:
ImageProcessingQueue has the following members:
Similar to my use of metadata for blobs, each ImageProcessingQueue instance is created with two pieces of metadata that are used to set the PoisonThreshold and Timeout:
1: internal ImageProcessingQueue(CloudQueueClient queueClient, String queueName)
2: {
3: this.Name = queueName;
4: this.RawQueue = queueClient.GetQueueReference(queueName);
5:
6: // fetch the queue's attributes (metadata)
7: this.RawQueue.FetchAttributes();
8: String timeout = this.RawQueue.Metadata["defaulttimeout"];
9: String threshold = this.RawQueue.Metadata["poisonthreshold"];
10:
11: // pull out queue-specific timeout/poison message retry value (or set to defaults)
12: Int32 i;
13: this.Timeout = (Int32.TryParse(timeout, out i)) ? new TimeSpan(0, 0, i) : new TimeSpan(0, 0, 30);
14: this.PoisonThreshold = (Int32.TryParse(threshold, out i)) ? i : Int32.MaxValue;
15: }
AcceptMessage and SubmitMessage are essentially inverse operations that wrap the GetMessage and AddMessage APIs by providing some exception handling and deferring message-specific handling to two generic methods defined on QueueMessage, a class which we’ll discuss in the next section.
Similar to ImageProcessingQueue’s encapsulation of the CloudQueue reference, a new class, QueueMessage (in Messages.cs of the CloudDAL), wraps a reference to CloudQueueMessage and augments it with application-specific functionality. That class serves as the abstract ancestor of the four distinct message types uses in the Photo Mosaics application – ImageRequestMessage, ImageResponseMessage, SliceRequestMessage, SliceResponseMessage - each of which is handled by exactly one of the similarly-name queues.
Each of the four messages shares a common set of three fields that ultimately appear in every message’s payload:
The four concrete implementations of QueueMessage add additional properties that are specific to the task represented by the message. Those fields are summarized below:
Each of these classes implements a Parse method which handles the conversion of the message payload (which is just a string) into the fields and properties for the specific message type. If a message payload fails to parse, a custom exception of type QueueMessageFormatException (also defined in Messages.cs) is thrown. They Payload property is the inverse of the Parse method and formats the actual string payload for the message based on the property values of the specific instance of QueueMessage.
With this infrastructure, all of the message processing within the application can be handled with two generic methods (AcceptMessage and SubmitMessage), versus having to spread message-specific processing across each implementation of the various Windows Azure roles. Maintenance and enhancements are also simplified since modifying the structure of a message requires changes only to the appropriate descendant of the QueueMessage class.
ImageProcessingQueue.AcceptMessage<T> and QueueMessage.CreateFromMessage<T> work in tandem to pull a message (of type T) off of the queue and parse the payload into the appropriate descendant of the QueueMessage class.
In the AcceptMessage case below, a message is retrieved from the queue (GetMessage in Line 6) and the payload parsed, via CreateFromMessage, to return a strongly-typed message of the appropriate type.
1: public T AcceptMessage<T>() where T : QueueMessage, new()
3: T parsedMsg = default(T);
4: if (this.RawQueue.Exists())
5: {
6: CloudQueueMessage rawMsg = this.RawQueue.GetMessage(this.Timeout);
7: if (rawMsg != null)
8: {
9: try
10: {
11: parsedMsg = QueueMessage.CreateFromMessage<T>(rawMsg);
12: }
13: catch (QueueMessageFormatException qfme)
14: {
15: // exception handling elided for brevity
16: }
17: }
18: }
19: return parsedMsg;
20: }
Note that CreateFromMessage assumes the three fields shared by every message appear in a specific order at the beginning of the payload (Lines 17-19), followed by the message-specific properties (in _components in Line 22)
1: internal static T CreateFromMessage<T>(CloudQueueMessage rawMsg) where T : QueueMessage, new()
3: // check if message parameter is valid
4: if ((rawMsg == null) || String.IsNullOrEmpty(rawMsg.AsString))
5: throw new ArgumentNullException("rawMsg", "No message data to parse");
6:
7: // create a new message instance
8: T newQueueMessage = new T();
9: newQueueMessage.RawMessage = rawMsg;
11: // split message payload into array
12: String[] s = newQueueMessage.RawMessage.AsString.Split(MSG_SEPARATOR);
13:
14: // first element is queue URI
15: if (s.Length >= 3)
16: {
17: newQueueMessage._queueUri = s[0];
18: newQueueMessage._clientId = s[1];
19: newQueueMessage._requestId = s[2];
20:
21: // split payload array into components
22: newQueueMessage._components = s.Skip(3).ToList();
23:
24: // parse into strongly typed message fields
25: newQueueMessage.Parse();
26: }
27: else
28: {
29: throw new QueueMessageFormatException( "Message is missing one or more required elements (queueUri, userId, requestId)");
30: }
31:
32: // return the new message instance
33: return newQueueMessage;
34: }
When a message is placed on a queue, an inverse operation occurs via ImageProcessingQueue.SubmitMessage<T> and QueueMessage.CreateFromArguments<T>. In Line 8 below, you can see that CreateFromArguments accepts the clientId and requestId as parameters as well as the specific queue’s URI – these, again, are the three properties that are part of all messages in the Photo Mosaics application. The message-specific properties are passed in the params argument.
1: public void SubmitMessage<T>(String clientId, Guid requestId, params object[] parms) where T : QueueMessage, new()
4:
5: this.RawQueue.CreateIfNotExist();
6: try
7: {
8: parsedMsg = QueueMessage.CreateFromArguments<T>( this.RawQueue.Uri, clientId, requestId, parms);
9: this.RawQueue.AddMessage(new CloudQueueMessage(parsedMsg.Payload));
10: }
11: catch (QueueMessageFormatException qfme)
12: {
13: // exception handling code elided for brevity
14: }
1: internal static T CreateFromArguments<T>(Uri queueUri, String clientId, Guid requestId, params object[] parms) where T : QueueMessage, new()
3: T newQueueMessage = new T();
5: // pull arguments into payload arrays
6: newQueueMessage._queueUri = queueUri.ToString();
7: newQueueMessage._clientId = clientId;
8: newQueueMessage._requestId = requestId.ToString();
9: newQueueMessage._components = (from p in parms select p.ToString()).ToList<String>();
11: // parse into strongly typed message fields
12: newQueueMessage.Parse();
14: // return the new message instance
15: return newQueueMessage;
What we’ll see in the next post is how code in the web and worker roles of the application leverage AcceptMessage, SubmitMessage, and the QueueMessage instances to carry out the workflow of the Photo Mosaics application.
I realize that was quite of lot of technical content to absorb, and if you’re really interested in understanding it, you’re likely poring over the code now. If I lost you as soon as you scrolled past the first screen – no worries. I feel the biggest takeaway from this post is that creating a flexible and fault-tolerant infrastructure for handling your messaging between the various roles in your application is paramount. The time you spend doing that will reap rewards later as you’re building out the processing for the application and as new requirements and unforeseen challenges surface. I don’t suggest that the framework I’ve set up here is ideal; however, some sort of framework is recommended, and hopefully this post has provided some food-for-thought.
Lastly, we haven’t talked much yet about diagnostics or monitoring, but it should be apparent that queues are one of the more significant barometers of how well the application is running in Windows Azure. If the queue length gets too high, it may mean that you need additional web or worker roles to handle the additional requests. On the other hand, if there’s never any wait time, perhaps the load is so light that you could dial down some of the roles you have spun up (and save some money). We’ll look at Windows Azure diagnostics and tracing in a later blog post, and it’s through that mechanism that you can keep tabs on the application’s health and even respond automatically to bursts or lulls in activity made manifest by changes in queue length.