For a while now I have been using a class that wraps and adds extra functionality to the queue in the StorageLib sample in the Windows Azure SDK. There are a few benefits that this wrapper provides, so I thought it might be time to share:
- Strongly typed access to the queue.
- Warning when you forget to remove a message from the queue.
- Automatic serialization/deserialisation of the message content.
- Hooks to provide poison message detection and handling
So lets take a look at each of these; if I’m going too slow and you can’t wait to get to the code then there is a link at the bottom of this post.
Strongly Typed Access
If you are only storing one type of message on your queue, then using Generics we can get rid of all the casting code that you would normally have to write. If we define a .Net type to represent the message that we are going to place on the queue, then we can create an API that becomes cleaner, type safe and more self describing.
Creating our queue:
TypedQueue<NewUserRequest> newUserQueue = new TypedQueue<NewUserRequest>("new-users");
Writing to the queue
NewUserRequest nur = new NewUserRequest( ... );
newUserQueue.PushMessage(nur);
and reading from the queue
DequeuedMessage<NewUserRequest> deNur = newUserQueue.PopMessage ();
ProcessNewUserRequest (deNur.MessageContents);
newUserQueue.RemoveMessage (deNur);
Ok, there is quite a bit of important code missing from these snippets, but hopefully you get the idea. The use of generics when defining the TypedQueue type allows for us to have a very clean way of working with our queues.
Warning when you forget to Remove a Message
It is all to easy to forget to call RemoveMessage when you have finished processing the message you just read from the queue. This is where the DequeuedMessage<> type comes in use. It is used to wrap the Message that comes back from the queue, but in debug builds it has a Finalizer method that will generate an Assert if the message was never passed to RemoveMessage. So any missing class to RemoveMessage should show up during your release cycle. This code won’t make it into a release build so there is no perf hit when you get to production.
Automatic Serialisation/Deserialisation
The TypedQueue class is using the XmlSerialzer to turn the .Net object into a form that can be written to the queue. This does mean that all the properties of your type need to be public and that you have to have a default constructor. This isn’t too much of an impact, but because the current CTP runs code in partial trust, we can’t use the binary formatter. One thing to be aware of is that we only have 8kb to serialise our message to, so if you are writing largish objects to the queue, you might want to use the XmlAttributeAttribute to help control the serialised form of your type.
I know people that are using WCF’s DataContractSerializer but I have to confess that I have a much better understanding of how the XmlSerialiser works, hence I used the thing I know – if the only tool you have is a hammer; everything starts to look like a nail ;-) If you don’t like my old school serialisation it should be an easy change.
Hooks for poison message detection
The TypedQueue class allows you to specify a couple of delegates that will be used to detect and process poison messages. I have provided a coupe of implementations of these, but nothing more sophisticated than I blogged about in a previous post.
If you define a PoisonMessgeCheck function, the TypedQueue class will use this to determine if a message is potentially poisonous before it tries to deserialise it. If this check says that the message is bad the PoisonMessageHandler delegate is called. I’ve only written very trivial implementations (look in PoisonMessageHelpers.cs) but it wouldn’t be too hard to write on that wrote the message contents to a poison message table. After this call, the TypedQueue will remove the message from the queue effectively deleting it.
To set up the TypedQueue to look for message that are regarded as poisonous if they have been on the queue for 10 times the visibility timeout, your code would look like:
TimeSpan poisonTime = TimeSpan.FromSeconds (10 * newUserQueue.MessageVisabilityTimeOut);
newUserQueue.PoisonMessgeCheck =
PoisonMessageHelpers.PoisonIfOnQueueForLongerThan (poisonTime);
This check is performed from the PopMessage function. If a message is poisonous, the API will behave exactly the same as if there had been no message on the queue – i.e. return a null. This means we don’t need to include or poison message code in our main queue processing loop allowing it to look like:
while (true) {
DequeuedMessage<NewUserRequest> deNur = newUserQueue.PopMessage ();
if (deNur != null) {
ProcessNewUserRequest (deNur.MessageContents);
newUserQueue.RemoveMessage (deNur);
} else {
Thread.Sleep (NO_MESSAGE_SLEEP_TIME);
}
}
If you haven’t seem my earlier posts and the Thread.Sleep looks a little strange, best have a quick read of this.
The same pattern can be applied if you want to pull messages of the queue in batches, just pass the max number of messages you want to PopMessages. Any messages that are poisonous will be removed from the queue before any valid messages are returned in a List.
Disclaimer
The code I’ve attached has been ripped out of the code from a couple of Proof of Concept projects, as a result, you shouldn’t regard this as well tested. The concepts have been proved to work, but this implementation hasn’t.
Oh, and the TypedQueue should be as thread safe as the StorageLib.Queue it is wrapping – no that wasn’t a statement that it is thread safe do probably best to create a separate instance per thread ;-)
The code is here and bonus points if you notice where the file is stored ;-)
Hope you find this as useful as I have.
Neil.