This is the third (and final) post of my Service Bus mini-series which is more-or-less an in-depth follow up to my guest appearance on Mike Benkovich’s Cloud Computing Soup to Nuts webcast series.  In Part 1, we looked at using the WCF relay services of the Service Bus, and in Part 2 we introduced the use of Service Bus queues and the brokered messaging concept. This time we’ll take it a step further and see how you can used brokered messaging in a pub/sub scenario using Service Bus topics and subscriptions.

The overall concept of the sample application hasn’t changed.  There is a public website through which anyone can craft a message and send that message to one or more consumers.  In this particular case, the consumer application resides on my local machine, behind a firewall. Well, actually we’ll make use of TWO consuming applications for this post.

Topics and Subscriptions Defined

With Service Bus queues, you have the ability to publish a message and have a number of consumers compete to handle that message, with the additional benefit that messages can be stored (indefinitely), duplicate message detection can be enabled, and session-based messaging can be used to ensure ordered delivery. The one thing ‘missing’ is that each message can be consumed only once.

Topics and subscriptions extend the queue model by enabling publication/subscription and multicast delivery scenarios. You can think of a topic as the queue to which a publisher sends a message; as you’ll soon see that part of the code is practically identical to using queues.   A subscription is how the queue is viewed and accessed by a message consumer. Think of a subscription as a virtual queue associated with a given consumer. A subscription can expose all of the messages sent to the topic or just a subset that matches a filter applied to the message properties. Additionally, the subscription can modify, add, or delete properties of the brokered message made visible to its subscribers.  

In the depiction below there is a single publisher that sends messages to a topic.  Messages may have different properties, represented here by the shape and color of the elements in the queue.  There is a one to many relationship between topics and subscriptions, and here three subscriptions are defined, each being consumed by individual subscribers. 

Schematic of Topics and SubscriptionsThe three subscriptions provide different views of all of the messages published to the topic, and those views are established by setting a filter (applied to the brokered message properties) when the subscription is defined:

The top subscription uses the TrueFilter, which means every message sent to the topic will be seen by Subscriber 1,

The second subscription specifies a filter that checks the ‘color’ property of the messages of the topic and exposes only those that have a value of ‘blue’ to Subscriber 2, and

The bottom subscription has a filter that exposes only the messages with a ‘shape’ property set to ‘square.’  Additionally, note that the message objects in the subscription have been transformed, representing the ability to apply rules to the subset of messages comprising a given subscription. Here the rule sets two hypothetical border properties of each message to ‘red’ and ‘thick’

Creating a Topic

As with creating queues, you can create a topic programmatically (via the NamespaceManager) or via the Windows Azure Portal:

Creating a Topic in the Windows Azure Portal

As you can see from the interface above, topics also include some of the properties we saw on queues (documented in my previous blog post).

  • Default message time to live (TTL) indicates how long the message is available before it is deleted from the topic.
  • Topics support duplicate message detection where a message’s uniqueness is defined by the MessageId property. To detect duplicates, you’d check the Requires Duplicate Detection checkbox and set the time window during which you want duplicates to be detected (Duplicate Detection History Time Window). The default is 10 minutes, which means that a second message with the same MessageId that arrives within 10 minutes of the first occurrence of that message will automatically be deleted.
  • The Maximum Topic Size can be specified in increments of 1GB up to a maximum of 5GB; each message can be up to 256KB in size.

Although not exposed via the portal, topics (like queues) support batched operations. By programmatically setting the EnableBatchedOperations property on the TopicDescription class, a Service Bus identity with the Manage claim can specify - upon topic creation - that Send and Complete requests are batched (for performance). Note that batched operations only apply (and do apply by default) when using the asynchronous methods of the .NET managed client.

Coding the Publisher

Topic publisher interfaceWhen implementing the topics and subscription model, the publisher sends messages to a topic. For all intents and purposes, this is semantically equivalent to sending a message to a queue (as we covered in the last post). The subtle difference is the use of the CreateTopicClient method (Line 19( versus CreateQueueClient.

For this specific example, instead of specifying a color, I’ve modified the web user interface to have the user select a priority for the message.  That will come into play a bit later, but for now it’s just another property that becomes part of the brokered message sent to the topic.

   1:  protected void btnSend_Click(object sender, EventArgs e)
   2:  {
   3:      if (txtMessage.Text.Trim().Length == 0) return;
   4:      String userName = (txtUser.Text.Trim().Length == 0) ? "guest" : txtUser.Text;
   5:   
   6:      // create the message and set properties
   7:      BrokeredMessage message = new BrokeredMessage(txtMessage.Text);
   8:      message.Properties["Sender"] = txtUser.Text;
   9:      message.Properties["Priority"] = Int32.Parse(ddlPriority.SelectedValue);
  10:              
  11:      // send the message
  12:      MessagingFactory factory = MessagingFactory.Create(
  13:          ServiceBusEnvironment.CreateServiceUri("sb",
  14:          ConfigurationManager.AppSettings["SBNamespace"],
  15:          String.Empty),
  16:          TokenProvider.CreateSharedSecretTokenProvider(
  17:              userName,
  18:              ConfigurationManager.AppSettings["SBGuestCredentials"]));
  19:      factory.CreateTopicClient("thetopic").Send(message);
  20:  }

In Lines 7-9, a new message is created from the text provided in the Web form, and two properties set, one corresponding to the name of the sender and the other corresponding to the message priority.

Then in Lines 12-18, a MessagingFactory is likewise instantiated; however, here the Service Bus identity (guest, by default) needs the Send claim.

In Line 19, the message is sent via a TopicClient instance. It’s done synchronously here, but asynchronous versions of Send are also available (and preferred in most cases for scalability and user experience reasons).

Creating a Subscription

Consumers of a topic subscribe to, well, a subscription, which is essentially a view of some subset of the messages sent to the topic. A topic can have multiple subscriptions, each exposing a different subset of messages to different consuming applications. In our case, we’re going to set up two applications to respond to messages sent to the topic created above.

One of the applications is similar to what we’ve used in the previous two blog posts: it displays a message box containing the string supplied via the Web client above. In this case though, the application only displays messages with Medium or High priority.

A second application echoes all of the messages sent to the topic in a simple list box.

When executed (all on the same machine in my test setup), it looks something like below. In this screen capture, three messages with priorities Low, Medium, and High were sent. All three messages appear in the list box for Subscriber 1, but only the Medium and High priority messages are processed by Subscriber 2, which displays color-coded popup windows in response.

PubSub example output

Before looking at the implementation of the two subscriber applications, we have to create the subscriptions they tap into. Just as with a topic, you can create a subscription via the Windows Azure Portal:

Creating a new Subscription

You’ll note there are properties on the subscription that apply to the receiving end of the pub/sub relationship, properties that (with one exception) also are found on Service Bus queues.

  • Default message time to live (TTL) indicates how long the message is available before it is deleted from the subscription. If you also check the Enable Dead Lettering on Message Expiration box, the message will be moved to a special dead letter queue.
  • Lock Duration specifies the length of time (with a max of five minutes) that a message is hidden to other consumers when the subscription is accessed in PeekLock mode (versus ReceiveAndDelete mode).
  • If Requires Session is checked, messages that must be processed together by the same recipient can be accommodated. A session is defined at message creation time using the SessionId property. 
  • Enable Dead Lettering on Filter Evaluation Exceptions is a property unique to subscriptions and indicates that a message will be moved to the dead letter queue if the filter criterion that is applied to the topic to copy messages to the subscription causes an exception.  If you had trouble parsing or processing that statement, bear with me: filters will be described shortly!

As with queues, there is also a MaxDeliveryCount (default value: 10) property that indicates the maximum number of times a message can be read from the subscription, but this value is not settable via the Windows Azure portal. The property applies only when the SubscriptionClient is in PeekLock mode, in which a message is locked for a period of time (LockDuration) during which it must be marked complete, or it will again be available for processing. MaxDeliveryCount then figures into strategies for poison message processing, since a doomed message would never be marked complete and would otherwise reappear in the subscription ad infinitum.

While you can create multiple subscriptions for a topic within the Windows Azure portal, you cannot set the properties that uniquely define it as a subscription, namely the filter that is applied to the message properties within the topic to determine when a copy is made to the subscription’s virtual queue. Nor can you specify the optional rule that can modify, add, or delete properties pertinent to the given subscription.  You can, of course, set these via code, or you can set them via the Service Bus Explorer, an open source project that currently exposes more of the properties of Service Bus entities than the Windows Azure Portal.

Subscriber 1 – Consuming all the Messages within a Topic

For the application that displays all messages, we already set up a subscription in the Windows Azure Portal named allmessages, and if you view that in the Service Bus Explorer, you’ll see that there is a default rule already specified:

Subscription within Service Bus Explorer

The filter for that rule, 1=1, will always evaluate to true, which indicates all messages in the topic will be seen by this subscription.  Since the application tied to this subscription is interested in all of the messages sent to the topic, the default behavior is just what we want.  To create this subscription, along with the default rule, in code, you’d invoke methods on a NamespaceManager instance, which requires a Service Bus identity with the Manage claim (like the default owner) :

var nsMgr = new NamespaceManager(
    ServiceBusEnvironment.CreateServiceUri(
        "sb", "heyjim", String.Empty),
    TokenProvider.CreateSharedSecretTokenProvider(
        "owner", 
        "...redacted...")
);

nsMgr.CreateSubscription("thetopic", "allmessages");

allmessages subscription client

The code for the application subscribing to allmessages looks nearly identical to that of the queue consumer covered in the previous blog post. The gist of the processing is:

  • Create a persistent message factory with credentials that have the Receive claim (Lines 3-8),
  • Create a persistent subscription client pointing to the allmessages subscription of thetopic (Lines 10-11),
  • Until the end user signifies processing should halt, get the next message on the subscription, timing out after two seconds (Line 15),
  • Process the message (Lines 23++) by displaying them in a list box within the WPF application. High priority messages are marked with an “!'” after the message sender name, and medium priority messages are marked with a “*.”
   1:  internal void ProcessMessages()
   2:  {
   3:      MessagingFactory factory = MessagingFactory.Create(
   4:          ServiceBusEnvironment.CreateServiceUri("sb",
   5:              Properties.Settings.Default.SBNamespace,
   6:              String.Empty),
   7:          TokenProvider.CreateSharedSecretTokenProvider("wpfsample",
   8:                  Properties.Settings.Default.SBListenerCredentials));
   9:   
  10:      SubscriptionClient allMessageClient =
  11:          factory.CreateSubscriptionClient("thetopic", "allmessages");
  12:   
  13:      while (isProcessing)
  14:      {
  15:          BrokeredMessage message = allMessageClient.Receive(new TimeSpan(0, 0, 2));
  16:          if (message != null)
  17:          {
  18:              Dispatcher.Invoke((System.Action)(()
  19:                  =>
  20:              {
  21:                  try
  22:                  {
  23:                      lbMessages.Items.Add(String.Format("{0}{1}: {2}",
  24:                          message.Properties["Sender"].ToString(),
  25:                          new[] { String.Empty, "!", "*" }
  26:                              [Int32.Parse(message.Properties["Priority"].ToString())],
  27:                          message.GetBody<String>()));
  28:   
  29:                  }
  30:                  catch (Exception ex)
  31:                  {
  32:                      lbMessages.Items.Add(String.Format("Exception: {0} {1}",
  33:                          message.MessageId, ex.Message));
  34:                      this.isProcessing = false;
  35:                  }
  36:              }));
  37:   
  38:              message.Complete();
  39:          }
  40:      }
  41:  }

Subscriber 2 – Consuming High and Medium Priority Messages

The other client application in this scenario is very similar to the client used in the previous two posts, one that displays messages within popup windows on the client machine. Instead of displaying all of the messages, however, we want to display only those messages with High or Medium priority (as set by the user of the Web application that sent the message to the topic).

For this application we need a second subscription that we’ll call urgentmessages.  As with the allmessages subscription we can create it via the Windows Azure Portal and get the default rule associated with it – the rule that copies all of the messages to the subscription. We don’t want to process all of the messages, however, so we need to create a different rule, one that exposes only the messages with a priority of High (1) or Medium (2), where the priority of the message is included in the BrokeredMessage property bag set when the message was sent to the topic (Line 9 of the first code snippet in this post). 

But let’s go a step beyond that. If the message is high priority, I want the window that is displayed have a red background and white text, and if it’s a medium priority message, I want the window to be yellow with black text.  I could certainly have coded the subscriber application to apply that logic based on the message priority property, but to demonstrate the flexibility of Service Bus subscriptions, let’s do it via two subscription rules defined in the Service Bus Explorer:

MediumPriority with a filter of Priority = 2 and an action of SET BackColor='Yellow'; SET ForeColor='Black'

HighPriority with a filter of Priority = 1 and an action of SET BackColor='Red'; SET ForeColor='White'

Adding a Subscription rule

Whenever a rule filter expression evaluates to true, the message is copied from the topic to the subscription. In fact, a copy is made for every rule that succeeds. That means in the configuration above we’d see two copies of each message, because the $Default rule is still in play. You can’t modify rules, you can only add or delete them; therefore, we’ll need to delete the $Default rule before proceeding.

Once a rule is matched, the message properties can be modified via an action, which consists of one or more SQL-92 style expressions operating on property values.  In the case above, we’re actually adding two new Properties (BackColor and ForeColor) to the BrokeredMessage that is copied to the subscription, but we could also modify an existing property (again with a SET) or remove a property with a DELETE.

Were we to set these same rules (on an existing subscription) in code, it would look like this:

   1:  MessagingFactory factory = MessagingFactory.Create(
   2:      ServiceBusEnvironment.CreateServiceUri("sb",
   3:          Properties.Settings.Default.SBNamespace,
   4:          String.Empty),
   5:      TokenProvider.CreateSharedSecretTokenProvider("wpfsample",
   6:              Properties.Settings.Default.SBListenerCredentials));
   7:   
   8:  SubscriptionClient allMessageClient =
   9:      factory.CreateSubscriptionClient("thetopic", "urgentmessages");
  10:   
  11:  allMessageClient.RemoveRule(RuleDescription.DefaultRuleName);
  12:   
  13:  var rdHiPriority = new RuleDescription("HighPriority");
  14:  rdHiPriority.Filter = new SqlFilter("Priority = 1");
  15:  rdHiPriority.Action = new SqlRuleAction("SET BackColor='Red'; SET ForeColor='White'");
  16:  allMessageClient.AddRule(rdHiPriority);
  17:   
  18:  var rdMedPriority = new RuleDescription("MedPriority");
  19:  rdMedPriority.Filter = new SqlFilter("Priority = 2");
  20:  rdMedPriority.Action = new SqlRuleAction("SET BackColor='Yellow'; SET ForeColor='Black'");
  21:  allMessageClient.AddRule(rdMedPriority);

One of the more interesting things to note here is that the subscription rules can be modified by the subscriber.  The rules are accessed via a SubscriptionClient, which requires only the Listen claim, whereas creating the subscription itself does require the Manage claim

Note too that SqlFilter and SqlRuleAction are the concrete implementations of their base classes used to specify selection criteria and property modification via SQL-92 syntax.  There is also a CorrelationFilter which is a more efficient shortcut for a SQL filter that might operate on the CorrelationId of the associated BrokeredMessage. Check out Alejandro Jezierski’s blog post for a concise overview and use-case for the correlation filter.

Following is the complete code for processing the messages that appear on the subscription. The implementation, as you might expect, is quite similar to the other subscription client covered earlier in this post, with the exception that it’s associated with a different subscription and displays its messages through a different UI.  Note that in Lines 26-27, we reference the BackColor and ForeColor properties to style the notification window; these properties were not part of the original message sent to the topic, but rather added via the rule transformation associated with the subscription.

   1:  internal void ProcessMessages()
   2:  {
   3:      MessagingFactory factory = MessagingFactory.Create(
   4:          ServiceBusEnvironment.CreateServiceUri("sb",
   5:              Properties.Settings.Default.SBNamespace,
   6:              String.Empty),
   7:          TokenProvider.CreateSharedSecretTokenProvider("wpfsample",
   8:              Properties.Settings.Default.SBListenerCredentials));
   9:   
  10:      SubscriptionClient urgentMessageClient = 
  11:          factory.CreateSubscriptionClient("thetopic", "urgentmessages");
  12:   
  13:      while (isProcessing)
  14:      {
  15:          BrokeredMessage message = urgentMessageClient.Receive(new TimeSpan(0, 0, 2));
  16:          if (message != null)
  17:          {
  18:   
  19:              Dispatcher.Invoke((System.Action)(() =>
  20:                  {
  21:                      try
  22:                      {
  23:                          var w = new NotificationWindow(
  24:                              message.Properties["Sender"].ToString(),
  25:                              message.GetBody<String>(),
  26:                              message.Properties["BackColor"].ToString(),
  27:                              message.Properties["ForeColor"].ToString()
  28:                          );
  29:                          WindowRegistry.Add(w);
  30:                          w.Show();
  31:                      }
  32:   
  33:                      catch (Exception ex)
  34:                      {
  35:   
  36:                          btnServiceControl.Content = "Start Responding";
  37:                          this.Background = new SolidColorBrush(Colors.Orange);
  38:                          this.isProcessing = false;
  39:   
  40:   
  41:                          MessageBox.Show(ex.Message, "Processing halted", 
                                        MessageBoxButton.OK, MessageBoxImage.Stop);
  42:                      }
  43:   
  44:                  }
  45:              ));
  46:              message.Complete();
  47:          }
  48:      }
  49:  }

 

Wrap-up

Get the code!And that’s about it! Over the three blog posts in this mini-series, we’ve covered the lion’s share of the current functionality of the Windows Azure Service Bus. It should be clear that the Service Bus is a feature core to many interoperability and orchestration scenarios both in pure cloud and hybrid environments, and the real challenge, if you will, is molding its basic features into capabilities and messaging patterns that meet your application’s needs. 

Please feel free to grab the source code and experiment. It’s by no means production code, but should be enough to get you started. I’d also like to recommend a few other resources as you explore the Windows Azure Service Bus in more depth.