As I’m growing into my role at Microsoft I am discovering that the wealth of new technologies is almost overwhelming. In many ways, keeping up with the latest material is harder than just sticking with known things and growing slowly. With the fast release cadence at Microsoft, there is literally something new every single day. As part of my role, I get to demonstrate these new features to customers who may be interested in them to solve real world business problems. Sometimes I just get to create fun demos that engage customers and let me learn the new technology – this is one of these demonstrations.

I spoke at VS Live! Redmond a couple of weeks ago and Jason Zander, a Corporate Vice President in the Azure group did just an amazing demo of the new Event Hub feature and I wanted to duplicate it from the ground up. Here’s the basic setup. The demo consists of a web client that sends a message (and the browser being used) to an event hub. The event hub passes it on to a queue (this is mostly because it makes the demo much more fun) and then a client reads out of the queue. The best part is that it happens almost instantaneously.

The main thing to note is that Queues are really designed to be read from sequentially (although you can certainly peek at any message and Event Hubs can have a message processed by many systems at once. The differences between Queues, Topics and Event Hubs can be found here: http://msdn.microsoft.com/en-us/library/azure/dn789972.aspx. This example doesn’t show these differences but instead shows the basic techniques of writing and reading to/from hubs and queues. Because Event Hubs are so new, the documentation is extremely limited right now (it’s still in Preview right now in fact).

With all of this said, hopefully this will be helpful and you’ll be able to adapt it for your own purposes… Before beginning I would also like to call out Jeff Wilcox for helping provide a couple of key pieces/ideas for the solution.

Solution Overview

The solution consists of an Azure Website which allows users to enter a message. The message is passed to an Event Hub. An Azure Cloud Service Worker processes the Event Hub and passes the message to an Azure Queue. Finally a Wpf application reads the Azure Queue and displays an aggregation of the data to the end user. Conceptually this is similar to a messaging app where 500 people send one person a message at the same time. In more practical terms thing of an Internet of Things (IoT) where thousands of devices send messages every second and that data has to be processed and stored and displayed. So there is a potential real word application for this type of structure.

Creating the Event Hub

In the Azure management portal select New > Service Bus > Event Hub as follows:

image

Provide a name (DemoApp in this case):

image

If you don’t already have an existing service bus namespace, it will create one for you (it is automatically appended with “-ns”). This is a mistake that I made originally – my Event Hub and my Queue are in two different namespaces. This is not required – I just made it more complicated for myself.

Next, provide a partition count and days for retention:

SNAGHTML3e38e9e1

The partition count is a value between 1 and 32. Click the check mark and the hub will be created. After the hub is created, make sure to click the Connection Information at the bottom of the hub page to grab the string to allow the client applications to send data to and read data from the hub.

The Web Client – Writing to the Hub

Before attempting to use an Event Hub you need to add a reference to the right package. To do this, right-click the solution and select Manage NuGet Packages… Search for the Microsoft Azure Service Bus Event Hub – EventProcessorHost. Once you add that you just need to update or add a key with the Event Hub connection data to the web.config file. The page that takes the message is very simple and looks like the following:

image

This is a standard MVC application which I added an EventHubs page to. The code in the EventHub.cshtml page is as follows:

@{
    ViewBag.Title = "Event Hubs";
}
<p style="padding-top:20px">
    Enter a message to send: <input type="text" id="message" name="eventMessage" /><input type="button" id="sendMessageButton" value="Send Message" />
</p>
<script type="text/javascript">
    $("#sendMessageButton").click(function() {
        $.getJSON(getHomeUrl() + "home/SendMessage/?" + $.param({ eventMessage: $("#message").val(), browserInfo: $.browser.name }), "", function () {
            $("#message").val("");
        })
    })
</script>

The main work on this page is done by the AJAX call which sends the data to the server. There’s nothing related to the Event Hubs in this page – it’s just standard jQuery.

The Home controller gets this call and does the real work. The main piece is the SendMessage function (be sure to including using Microsoft.ServiceBus.Messaging).

public JsonResult SendMessage(string eventMessage, string browserInfo)
{
  //Put all of the data into a single class
  EventHubData ehd = new EventHubData() { Message = eventMessage, BrowserInfo = browserInfo };
  //serialize it
  var serializedString = JsonConvert.SerializeObject(ehd);
  //Create the Event Data object so the data and partition are known
  EventData data = new EventData(Encoding.Unicode.GetBytes(serializedString))
  {
    PartitionKey = "0"
  };
  //Get a reference to the event hub
  EventHubClient client = EventHubClient.CreateFromConnectionString(ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"], "DemoApp");
  //Send the data
  client.SendAsync(data);
  //Return to the client
  return Json("", JsonRequestBehavior.AllowGet);
}

There is a single class which backs the data and it simply encapsulate the information to send in the message so it can be easily deserialized on the back end. Next the object is serialized and an EventData object is created. The PartitionKey can be whatever you want to call the partition – there are a potential of 32 partitions and you can set this value when you create your event hub – they are referenced by ordinal number so I just lacked incredible creativity by naming it “0”. The next line gets the reference to the event hub in azure and then finally the message is send asynchronously.

That’s all there is to it. As you start sending messages you’ll see the following type of view in Azure (note that this doesn’t update instantaneously):

image

So, we can now add data to the event hub! Obviously that in itself isn’t particularly useful so without further ado, moving on!

Creating the Queue

This is where you can do it better than I did it. Select the DemoApp-ns namespace from the Azure Management Portal and click the Queues tab.

image

Click Create a new Queue. Provide a queue name and select an appropriate region.

image

Click the next arrow and set any additional settings needed then click the check mark.

image

These settings are discussed more in this great tutorial on Queues: http://social.technet.microsoft.com/wiki/contents/articles/2173.azure-and-sql-database-tutorials-tutorial-4-using-windows-azure-worker-role-and-windows-azure-queue-service.aspx.

Remember to grab your connection string!

Now here’s the tricky thing – having a queue isn’t enough. A queue has to store its data somewhere. That somewhere is in a storage service. And not only in a storage service – but in a container in a storage service. Creating this is trivial but if you aren’t aware of it (like I wasn’t because I tend to “do” before I “read” – isn’t that how it is with everyone in the tech industry?) it’s hard to figure it. To create a storage account, go to the storage management page and select New. Create the storage service then click the Containers tab.

image

Select new at the bottom and provide a name and click the check mark – you’re done! For this you’re going to need a bit more information to connect to it. On the dashboard page for the storage account you’ll see the following:

image

Grab not only the URL for Queues but also click the Manage Access Keys at the bottom and grab the storage account name and the primary access key as you’ll need these to write to the queue.

Cloud Service Worker – Reading from the Hub

At this point we have data in the hub, now we need to process it. For this we’ll create a cloud service which constantly monitors the hub and processes any data that is received.

This took me a while to figure out as it isn’t obvious – you really do need to do it the way it is described in this post – I spent hours trying to work around what I thought was an overly involved process. It turns out that the separation of concerns really is the smartest way to manage this.

The key to reading from an Event Hub is the IEventProcessor interface. After you create a new Cloud Service worker role, add a reference to the event hub (as in the above sample – and also add a reference to the Azure Service Bus because you’ll need that to access the queue). Then add the following class:

  1: using Microsoft.ServiceBus.Messaging;
  2: using Microsoft.WindowsAzure.Storage;
  3: using Microsoft.WindowsAzure.Storage.Queue;
  4: using System;
  5: using System.Collections.Generic;
  6: using System.Diagnostics;
  7: using System.Linq;
  8: using System.Text;
  9: using System.Threading.Tasks;
 10: using Newtonsoft.Json.Linq;
 11: using Microsoft.ServiceBus;
 12: 
 13: namespace WorkerRole1
 14: {
 15:     public class EventProcessor : IEventProcessor
 16:     {
 17:         PartitionContext partitionContext;
 18:         Stopwatch checkpointStopWatch;
 19: 
 20:         QueueClient queue;
 21: 
 22:         public EventProcessor()
 23:         {
 24:             queue = QueueClient.CreateFromConnectionString("[your connection string here]","[your queue name here]");
 25:         }
 26: 
 27:         public async Task CloseAsync(PartitionContext context, CloseReason reason)
 28:         {
 29:             Console.WriteLine(string.Format("Processor Shuting Down.  Partition '{0}', Reason: '{1}'.", this.partitionContext.Lease.PartitionId, reason.ToString()));
 30:             if (reason == CloseReason.Shutdown)
 31:             {
 32:                 await context.CheckpointAsync();
 33:             }
 34:         }
 35: 
 36:         public Task OpenAsync(PartitionContext context)
 37:         {
 38:             Console.WriteLine(string.Format("SimpleEventProcessor initialize.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset));
 39:             this.partitionContext = context;
 40:             this.checkpointStopWatch = new Stopwatch();
 41:             this.checkpointStopWatch.Start();
 42:             return Task.FromResult<object>(null);
 43:         }
 44: 
 45:         public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
 46:         {
 47:             try
 48:             {
 49:                 foreach (EventData eventData in messages)
 50:                 {
 51:                     Console.WriteLine("Processing event hub data...");
 52: 
 53:                     string key = eventData.PartitionKey;
 54: 
 55:                     string data = System.Text.Encoding.Unicode.GetString(eventData.GetBytes());
 56:                     try
 57:                     {
 58:                         var json = JObject.Parse(data);
 59:                         string text = json["Message"].ToString();
 60:                         string agent = json["BrowserInfo"].ToString();
 61: 
 62:                         if (queue != null)
 63:                         {
 64:                             
 65:                             await queue.SendAsync(new BrokeredMessage((agent + "##" + text)));
 66: 
 67:                             Trace.TraceInformation("Added to queue: " + agent);
 68:                         }
 69:                     }
 70:                     catch(Exception exx)
 71:                     {
 72:                         Console.WriteLine(exx.Message);
 73:                     }
 74: 
 75:                     Console.WriteLine(string.Format("Message received.  Partition: '{0}', Device: '{1}'",
 76:                         this.partitionContext.Lease.PartitionId, key));
 77:                 }
 78: 
 79:                 //Call checkpoint every 5 minutes, so that worker can resume processing from the 1 minutes back if it restarts.
 80:                 // Doing every ONE MINUTE now.
 81:                 if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(1))
 82:                 {
 83:                     await context.CheckpointAsync();
 84:                     lock (this)
 85:                     {
 86:                         this.checkpointStopWatch.Reset();
 87:                     }
 88:                 }
 89:             }
 90:             catch (Exception exp)
 91:             {
 92:                 Console.WriteLine("Error in processing: " + exp.Message);
 93:             }
 94:         }
 95:     }
 96: }
 97: 

Much of this is boilerplate code so I’ll cover the parts that require the most explanation as it relates to event hubs and queues.

All Console statements are for debugging purposes only – I left them in because you may need them. Being that this is an Azure worker role, these can be made visible in many different ways including using AppInsights.

Line 24 creates the connection to the queue where messages will be passed from the Event Hub. Obviously it’s better to read this from the web.config file. No, I don’t know why I did it this way – it was late :)

Line 45-77, the ProcessEventsAsync method does the bulk of the work and it monitors the event hub (the next code block shows where the connection to this comes from). The EventData contains the message from the client. The first task is to get the data out of the message (Line 55) and the next task is to deserialize it – in this case converting it from JSON to variables we can use (Lines 58-60). Finally it is written to the Queue using a BrokeredMessage object (Line 62-68).

This is all that’s required to read data out of the event hub and place it into the queue. The worker role class manages the lifetime of this object and passes in the necessary configuration information. This class is shown in its entirety here – but most of it is boilerplate – I only changed the OnStart method.

  1: using System;
  2: using System.Collections.Generic;
  3: using System.Diagnostics;
  4: using System.Linq;
  5: using System.Net;
  6: using System.Threading;
  7: using System.Threading.Tasks;
  8: using Microsoft.WindowsAzure;
  9: using Microsoft.WindowsAzure.Diagnostics;
 10: using Microsoft.WindowsAzure.ServiceRuntime;
 11: using Microsoft.WindowsAzure.Storage;
 12: using Microsoft.ServiceBus.Messaging;
 13: 
 14: namespace WorkerRole1
 15: {
 16:     public class WorkerRole : RoleEntryPoint
 17:     {
 18:         private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
 19:         private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
 20: 
 21:         public override void Run()
 22:         {
 23:             Trace.TraceInformation("WorkerRole1 is running");
 24: 
 25:             try
 26:             {
 27:                 this.RunAsync(this.cancellationTokenSource.Token).Wait();
 28:             }
 29:             finally
 30:             {
 31:                 this.runCompleteEvent.Set();
 32:             }
 33:         }
 34: 
 35:         public override bool OnStart()
 36:         {
 37:             // Set the maximum number of concurrent connections
 38:             ServicePointManager.DefaultConnectionLimit = 12;
 39: 
 40:             // For information on handling configuration changes
 41:             // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
 42: 
 43:             bool result = base.OnStart();
 44: 
 45:             Trace.TraceInformation("WorkerRole1 has been started");
 46: 
 47:             string storage = "DefaultEndpointsProtocol=https;AccountName=[your account name here];AccountKey=[your key here";
 48:             string serviceBus = "Endpoint=sb://[your namespace here].servicebus.windows.net/;SharedAccessKeyName=[your key name here];SharedAccessKey=[your key here]";
 49:             string eventHubName = "demoapp";
 50: 
 51:             EventHubClient client = EventHubClient.CreateFromConnectionString(serviceBus, eventHubName);
 52:             Trace.TraceInformation("Consumer group is: " + client.GetDefaultConsumerGroup().GroupName);
 53: 
 54:             _host = new EventProcessorHost("singleworker", eventHubName, client.GetDefaultConsumerGroup().GroupName, serviceBus, storage);
 55: 
 56:             Trace.TraceInformation("Created event processor host...");
 57: 
 58:             return result;
 59:         }
 60: 
 61:         private EventProcessorHost _host;
 62: 
 63:         public override void OnStop()
 64:         {
 65:             Trace.TraceInformation("WorkerRole1 is stopping");
 66: 
 67:             _host.UnregisterEventProcessorAsync().Wait();
 68: 
 69:             this.cancellationTokenSource.Cancel();
 70:             this.runCompleteEvent.WaitOne();
 71: 
 72:             base.OnStop();
 73: 
 74:             Trace.TraceInformation("WorkerRole1 has stopped");
 75:         }
 76: 
 77:         private async Task RunAsync(CancellationToken cancellationToken)
 78:         {
 79:             await _host.RegisterEventProcessorAsync<EventProcessor>();
 80: 
 81:             // TODO: Replace the following with your own logic.
 82:             while (!cancellationToken.IsCancellationRequested)
 83:             {
 84:                 //Trace.TraceInformation("Working");
 85:                 await Task.Delay(1000);
 86:             }
 87:         }
 88:     }
 89: }
 90: 

Lines 47-49 set the account information and identifying names for the storage, service bus and queue which are required by the EventProcessor class. Line 51 creates the EventHub client and line 54 starts the process. Everything else is there to manage the lifetime of the EventProcessor object. This can be published to Azure and it will start reading data from the EventHub immediately upon deployment. The queue dashboard should start to look like the following:

image

WPF Client – Reading from the Queue

Finally we get to reading data from the queue. It’s important to note that you can do this a million different ways. You can use Node.js or SignalR to do real-time web page applications, pass it to other applications for additional processing (which you can do from the event hubs as well) or virtually anything else you want. You can read it with an Android or iOS app, put it into a tabular model for use with PowerView or hundreds of other applications.

I chose WPF because it showed a mix of different technologies and I’m not as comfortable with Node or SignalR (I’m still learning both of them).

The key to this application is a class called QueueReader – everything is used for the presentation of data.

  1: using Microsoft.ServiceBus.Messaging;
  2: using System;
  3: using System.Collections.Generic;
  4: using System.Linq;
  5: using System.Text;
  6: using System.Threading;
  7: using System.Threading.Tasks;
  8: using System.Windows;
  9: 
 10: namespace WpfQueueReader
 11: {
 12:     class QueueReader
 13:     {
 14:         QueueClient _queue;
 15:         Thread _thread;
 16:         QueueData _persistentData = new QueueData();
 17: 
 18:         public delegate void MessageReceivedEventHandler(object sender, MessageEventArgs e);
 19:         public event MessageReceivedEventHandler MessageReceived;
 20: 
 21:         public void MonitorQueue()
 22:         {
 23:             _queue = QueueClient.CreateFromConnectionString("[your connection string here]", "[your queue name here]");
 24: 
 25:             BrokeredMessage message = null;
 26:             while (true)
 27:             {
 28:                 try
 29:                 {
 30:                     //receive messages from Queue
 31:                     message = _queue.Receive(TimeSpan.FromSeconds(5));
 32:                     if (message != null)
 33:                     {
 34:                         string m = message.GetBody<string>();
 35:                         string[] values = m.Split(new string[] { "##" }, 2, StringSplitOptions.None);
 36: 
 37:                         _persistentData.Message = values[1];
 38:                         _persistentData.Browsers.Add(values[0]);
 39: 
 40:                         Application.Current.Dispatcher.BeginInvoke(new ThreadStart(() => OnMessageReceived(_persistentData)), null);
 41: 
 42:                         // Further custom message processing could go here...
 43:                         message.Complete();
 44:                     }
 45:                     else
 46:                     {
 47:                         Thread.Sleep(TimeSpan.FromSeconds(5));
 48:                     }
 49:                 }
 50:                 catch(OperationCanceledException)
 51:                 {
 52:                     _queue.Close();
 53:                     
 54:                 }
 55:             }
 56:         }
 57: 
 58:         public void EndMonitoring()
 59:         {
 60:             _queue.Close();
 61:         }
 62: 
 63:         protected void OnMessageReceived(QueueData data)
 64:         {
 65:             if (MessageReceived != null)
 66:             {
 67:                 MessageReceived(this, new MessageEventArgs(data));
 68:             }
 69:         }
 70: 
 71:         public void StartTask()
 72:         {
 73:             ThreadStart threadStart = new ThreadStart(MonitorQueue);
 74:             _thread = new Thread(threadStart);
 75:             _thread.Start();
 76:         }
 77: 
 78:         public void EndTask()
 79:         {
 80:             _thread.Abort();
 81:         }
 82:     }
 83: }

Most of the functionality in this class deals with the fact that it’s running on a background thread. The main portion of this class is the MonitorQueue method. Remember to replace the connection string with your own connection information.

The MonitorQueue method does the following:

  1. Creates a connection to the queue
  2. Starts an endless loop to check the queue (note that this is a cheap way of doing it and I didn’t want to go crazy just for demo purposes – an excellent article on a better way to do this can be found here: http://www.developerfusion.com/article/120619/advanced-scenarios-with-windows-azure-queues/)
  3. Check the queue
  4. If there is a message,
    1. Get the data from the message
    2. Add it to a class
    3. Raise an event to the UI thread with the data
    4. Check the queue again
  5. If there isn’t a message,
    1. Go back to sleep for five seconds before checking the queue again

In the UI, the entire class for the single window looks like the following:

  1: using System;
  2: using System.Collections.Generic;
  3: using System.Linq;
  4: using System.Text;
  5: using System.Threading.Tasks;
  6: using System.Windows;
  7: using System.Windows.Controls;
  8: using System.Windows.Data;
  9: using System.Windows.Documents;
 10: using System.Windows.Input;
 11: using System.Windows.Media;
 12: using System.Windows.Media.Imaging;
 13: using System.Windows.Navigation;
 14: using System.Windows.Shapes;
 15: 
 16: namespace WpfQueueReader
 17: {
 18:     /// <summary>
 19:     /// Interaction logic for MainWindow.xaml
 20:     /// </summary>
 21:     public partial class MainWindow : Window
 22:     {
 23:         QueueReader _reader = new QueueReader();
 24: 
 25:         public MainWindow()
 26:         {
 27:             InitializeComponent();
 28:             _reader.MessageReceived += _reader_MessageReceived;
 29:             _reader.StartTask();
 30:         }
 31: 
 32:         void _reader_MessageReceived(object sender, MessageEventArgs e)
 33:         {
 34:             TextBlock block1 = new TextBlock();
 35:             
 36:             block1.Text = e.Data.Message;
 37: 
 38:             spMessages.Children.Add(block1);
 39:             spBrowsers.DataContext = null;
 40:             spBrowsers.DataContext = e.Data.Browsers;
 41:         }
 42: 
 43:         private void btnExit_Click(object sender, RoutedEventArgs e)
 44:         {
 45:             _reader.EndMonitoring();
 46:             _reader.EndTask();
 47:             this.Close();
 48:         }
 49: 
 50:         private void Window_Closed(object sender, EventArgs e)
 51:         {
 52:             _reader.EndMonitoring();
 53:             _reader.EndTask();
 54:         }
 55:     }
 56: }
 57: 

The constructor here (Lines 25-30) start the background process executing and wires up the event to the UI. The _reader_MessageRecieved method gets the data and simply appends values and re-binds the a collection of browsers to display. The end result of all of this looks like the following:

image

In a real time fashion, data is transmitted from the browser, to Azure and back to the WpfQueueReader.

Now, set your imaginations to work because this has real world applications! And it’s never been this easy to build a scalable, distributed application.