Session data can be used by both DevOps for diagnostics and monitoring and by the business to identify and respond to trends in customer behavior.   Proper collation of session data from disparate sources is thus crucial to a successful service.  Here I start my adventure in learning Reactive Extensions (Rx) by applying Rx with Azure Service Bus to batch together logging operations for persistence into Azure Table Storage.

There is an abundance of documentation on how to use service bus for scale and redundancy so my example will be slightly simplified and I focus mainly on the consumption of Service Bus Brokered Messages using Rx.  I use subscriptions and topics in this example but the code could be adapted for another queuing system.  My example will simulate the following architecture:

 

 

Here we see servers in multiple data centers are partitioning their log messages across several Service Bus Topics.  A deployment of worker roles reads the messages from the associated subscriptions, aggregates them into batches partitioned by a SessionId using Rx and logs them into Azure Table Storage.  A good practice would be to provide a fail over service bus in another data center but our example will omit that.

First let’s define an interface for log messages:

 

 public interface ILogMessage
{
string SessionId { get; set; }
}

 

We will use this interface to define a helper class to read messages from the service bus.  First let’s design a wrapper class to wrap our log message and the underlying service bus BrokeredMessage object:

 public class WrappedLoggedMessage<T> where T:ILogMessage
{
public WrappedLoggedMessage(BrokeredMessage brokeredMessage)
{
_brokeredMessage = brokeredMessage;
_t = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(brokeredMessage.GetBody<string>());
}


private readonly BrokeredMessage _brokeredMessage;
private readonly T _t;
public T Message { get { return _t; } }
public string SessionId { get { return _t.SessionId; } }

public async Task Abandon()
{

await _brokeredMessage.AbandonAsync();
}

public async Task Complete()
{
await _brokeredMessage.CompleteAsync();
}

}

 

With our wrapped log object we look at a simple way to turn a stream of brokered messages from service bus subscription clients into an IObservable stream:

 public class SubscriptionClientsToObservableHelper<T> where T:ILogMessage 
{


private readonly int _batchSize;
private const int MinBackOff = 10;
private const int MaxBackOff = 60000;

private SubscriptionClient[] _subscriptionClients;

public SubscriptionClientsToObservableHelper(IEnumerable<SubscriptionClient> subscriptionClients, int batchSize)
{

_subscriptionClients = subscriptionClients.ToArray();
_batchSize = batchSize;
}


public IObservable<WrappedLoggedMessage<T>> PollMessages()
{
return Observable.Create<WrappedLoggedMessage<T>>(observer =>
{
var isCancelled = false;
((Action)async delegate
{
try
{
var query = from client in _subscriptionClients
select Task.Run(async () =>
{
var delay = MinBackOff;
while (true)
{
IEnumerable<BrokeredMessage> result = null;
try
{
result = await client.ReceiveBatchAsync(_batchSize);
}
catch (Exception e)
{
Trace.TraceError(e.ToString());
}
int messageCount = 0;
if (result != null)
{

foreach (var msg in result)
{
try
{
messageCount++;
var wrapper = new WrappedLoggedMessage<T>(msg);

lock (observer)
{
observer.OnNext(wrapper);
}
}
catch (Exception e)
{
Trace.TraceError(e.ToString());
}
}

}

if (messageCount > 0)
{
delay = MinBackOff;
}

if (messageCount == 0)
{
await Task.Delay(delay);
delay *= 2;
delay = Math.Min(delay, MaxBackOff);
}

if (isCancelled)
{
return;
}

}

});

await Task.WhenAll(query);

}
catch (Exception e)
{
observer.OnError(e);
Trace.TraceError(e.ToString());

}
finally
{
observer.OnCompleted();
}
})();

return () => isCancelled = true;
});
}




}

 

This was a fairly straightforward exercise although I did go down a wrong turn when I read all messages from the clients and then processed them.  This is because ReceiveBatchAsync() was not returning immediately if no messages were received.  Switching to a simple Task per subscription client and then synchronizing the OnNext() call was much more efficient.

We now focus on consuming our messaging stream.  First we define an abstract class:

 public abstract class LogMessageObserver<T> where T:ILogMessage
{
private IObservable<WrappedLoggedMessage<T>> _observable;
private readonly int _observableGroupBufferTimeMilliSeconds;
private readonly int _observableGroupBufferSize;
private readonly object locker = new object();
public LogMessageObserver(int observableGroupBufferTimeMilliSeconds,
int observableGroupBufferSize)
{
_observableGroupBufferTimeMilliSeconds = observableGroupBufferTimeMilliSeconds;
_observableGroupBufferSize = observableGroupBufferSize;
}


public long Logged { get; private set; }

public void Start(SubscriptionClientsToObservableHelper<T> helper)
{
_observable = helper.PollMessages();

var groupedLogs = _observable.GroupByUntil(c => c.SessionId,
g => Observable.Timer(TimeSpan.FromSeconds(60)).Take(1));

groupedLogs.Subscribe((groupedObs) =>
{

var buffered = groupedObs.Buffer(TimeSpan.FromMilliseconds(_observableGroupBufferTimeMilliSeconds), _observableGroupBufferSize);

var query = buffered.Select(logs => Observable.FromAsync(async () =>
{
bool exeptionOccurred = false;
int loggedCount = 0;

if (logs == null || logs.Count == 0)
return;

try
{
loggedCount = await Log(from l in logs select l.Message);
}
catch (Exception logEx)
{
exeptionOccurred = true;
// could end up with double messages
Trace.TraceError(logEx.ToString());
}

try
{
// currently use all or nothing approach to marking messages as abandon/completed
if (loggedCount != logs.Count)
{
// attempt to release messages
List<Task> abandonTasks = new List<Task>();
foreach (var m in logs)
{
var t = m.Abandon();
abandonTasks.Add(t);
}
await Task.WhenAll(abandonTasks);
}
else
{
List<Task> completeTasks = new List<Task>();

foreach (var m in logs)
{
var t = m.Complete();
completeTasks.Add(t);
}

await Task.WhenAll(completeTasks);

var complete =
(from t in completeTasks where t.IsCompleted && !t.IsFaulted select t).ToList();
lock (locker)
{
int count = complete.Count();
Logged += count;
}
}
}
catch (Exception ex)
{
exeptionOccurred = true;
if (ex is TimeoutException)
{

}
else
{
Trace.TraceError(ex.ToString());
}

}

if (exeptionOccurred)
{
// throttle if exception...
await Task.Delay(2000);
}

return;
})).Concat();

query.Subscribe();
});

return;
}
/// <summary>
/// Log method to permanent storage.
/// </summary>
/// <param name="messages">wrapped messages batch from same partition (sessionId) </param>
/// <returns> 0 or number messages logged</returns>
public abstract Task<int> Log(IEnumerable<T> messages);
}

 

There are a few points of interest in this class’s Start method.  First, I used the Rx GroupByUntil() extension to partition the stream by the ILogger.SessionID property.  Notice the use of the Timer() and Take() call to provide a group expiration mechanism to prevent memory leaks.  Second, we use the Rx Buffer() method to group messages from the Session together.  This allows me to batch together messages for storage to the same partition or file.  Third, I used Observable.FromAsync() in conjunction with the Concat() method to serially log the messages.  In my slightly simplified example we use an all or nothing approach to decide whether to abandon or complete the underlying brokered messages.  This approach introduces the possibility of duplicates if some of the logs were inserted or if a Complete() call fails.  Your own implementations may stress throughput over reliability and thus use a different approach. 

For our example we’ll use a simple log message object that we will persist in Azure Table storage:

 public class LogMessage :ILogMessage
{
public string SessionId { get; set; }
public string LogMessageId { get; set; }
public DateTime TimeStamp { get; set; }
public string LogData { get; set; }
}
 public class TableLogMessage : TableEntity
{
public TableLogMessage()
{

}
public TableLogMessage(LogMessage log)
{
this.PartitionKey = log.SessionId;
this.RowKey = Guid.NewGuid().ToString();
this.LogData = log.LogData;
this.LogMessageId = log.LogMessageId;
this.LogTimeStamp = log.TimeStamp;
}

// this is just to confirm the partitioning feel free to omit
public string RoleId { get; set; }


public string LogSessionId { get { return this.PartitionKey; } }
public string LogMessageId { get; set; }
public DateTime LogTimeStamp { get; set; }
public string LogData { get; set; }



}

 

 

We will implement the Log method as follows:

 public class ConcreteMessageObserver : LogMessageObserver<LogMessage>
{
private const int MaxTableInsertBatchSize = 100;
private string _connectionString;
private string _logTableName;
private string _roleId;
public ConcreteMessageObserver( int observableGroupBufferTimeMilliSeconds,
int observableGroupBufferSize,string connectionString, string logTableName = "logmessagetable") : base(observableGroupBufferTimeMilliSeconds, observableGroupBufferSize)
{
_connectionString = connectionString;
_logTableName = logTableName;
_roleId = RoleEnvironment.CurrentRoleInstance.Id;
}

public void Initialize()
{
try
{
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(_connectionString);

// Create the table client.
CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

// Create the CloudTable object
CloudTable table = tableClient.GetTableReference(_logTableName);
table.CreateIfNotExists();
}
catch (Exception ex)
{
Trace.TraceError(ex.ToString());
}
}

public override async Task<int> Log(IEnumerable<LogMessage> messages)
{
int totalCounter = 0;
try
{

CloudStorageAccount storageAccount = CloudStorageAccount.Parse(_connectionString);

// Create the table client.
CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

// Create the CloudTable object that represents the "people" table.
CloudTable logTable = tableClient.GetTableReference(_logTableName);

var batchOperation = new TableBatchOperation();
int counter = 0;
foreach (var message in messages)
{
var tableLogMessage = new TableLogMessage(message) {RoleId = _roleId};
batchOperation.Insert(tableLogMessage);
counter++;

if (counter < MaxTableInsertBatchSize)
continue;

await logTable.ExecuteBatchAsync(batchOperation);
totalCounter += counter;
batchOperation = new TableBatchOperation();
counter = 0;
}

if (counter > 0)
{
await logTable.ExecuteBatchAsync(batchOperation);
totalCounter += counter;
}





}
catch (Exception ex)
{
Trace.TraceError(ex.ToString());

if (ex is Microsoft.WindowsAzure.Storage.StorageException)
{
var storageEx = ex as Microsoft.WindowsAzure.Storage.StorageException;
if (storageEx.RequestInformation.HttpStatusCode == 404)
{
Initialize();
}

}
// our example uses all or nothing with risk of getting duplicates
return 0;
}

return totalCounter;
}
}

Let’s demonstrate how to use it in a simple project.  First create a new WorkerRole cloud project.  Use NuGet to add the Rx-Main and Azure Storage packages.

Add the following Configuration Settings to ServiceDefinition.csdef:

 <ConfigurationSettings>
<Setting name="ServiceBus1"/>
<Setting name="ServiceBus2"/>
<Setting name="AzureStorage"/>
</ConfigurationSettings>

Next in your Cloud.cscfg file fill in the new settings:

 <Instances count="4" />
<ConfigurationSettings>
<Setting name="AzureStorage" value="[Your Value]" />
<Setting name="ServiceBus1" value="[Your Value]" />
<Setting name="ServiceBus2" value="[Your Value]" />
</ConfigurationSettings>

Now let’s configure our topics and subscriptions:

   

 public class SimplifiedServiceBusHelper
{
private int messageLockDuration = 300000;
private string _serviceBusConnectionString;
private string[] _topics;
public SimplifiedServiceBusHelper(string serviceBusConnectionString, IEnumerable<string> topics)
{
_serviceBusConnectionString = serviceBusConnectionString;
_topics = topics.ToArray();
}

public TopicClient[] TopicClients { get; private set; }
public SubscriptionClient[] SubscriptionClients { get; private set; }

public void Setup(string template = "subscriptionontopic{0}")
{
TopicClients = new TopicClient[_topics.Length];
SubscriptionClients = new SubscriptionClient[_topics.Length];

for(int i=0; i< _topics.Length;i++)
{
var topic = _topics[i];
var namespaceManager = NamespaceManager.CreateFromConnectionString(_serviceBusConnectionString);
if (!namespaceManager.TopicExists(topic))
{
namespaceManager.CreateTopic(
new TopicDescription(topic)
{
DefaultMessageTimeToLive = TimeSpan.MaxValue,
EnableBatchedOperations = true,

RequiresDuplicateDetection = true,
MaxSizeInMegabytes = 5120,
EnablePartitioning = true
});
}
TopicClients[i] = TopicClient.CreateFromConnectionString(_serviceBusConnectionString, topic);

var subscription = string.Format(template, topic);
if (!namespaceManager.SubscriptionExists(topic, subscription))
{
namespaceManager.CreateSubscription(new SubscriptionDescription(topic, subscription)
{
DefaultMessageTimeToLive = TimeSpan.MaxValue,
MaxDeliveryCount = 10000,
EnableBatchedOperations = true,
EnableDeadLetteringOnFilterEvaluationExceptions = false,
EnableDeadLetteringOnMessageExpiration = false,
LockDuration = TimeSpan.FromMilliseconds(messageLockDuration),
});
}
SubscriptionClients[i] = SubscriptionClient.CreateFromConnectionString(_serviceBusConnectionString,
topic, subscription);


var subClient = SubscriptionClient.CreateFromConnectionString(_serviceBusConnectionString,
topic, subscription);



}
}



}
}

 

That we can modify our WorkerRole.cs class as follows:

 

 public class WorkerRole : RoleEntryPoint
{
private ConcreteMessageObserver _observer;

public override void Run()
{
// This is a sample worker implementation. Replace with your logic.
Trace.TraceInformation("WorkerRole1 entry point called");
Setup();

while (true)
{
Thread.Sleep(10000);
if (_observer != null && _observer.Logged > 0)
{
Trace.TraceInformation("observer.Logged: {0}", _observer.Logged);
}
}
}

private void Setup()
{
string serviceBusConnectionString = RoleEnvironment.GetConfigurationSettingValue("ServiceBus1");
string serviceBusConnectionString2 = RoleEnvironment.GetConfigurationSettingValue("ServiceBus2");

var allTopics = new string[] { "topic0", "topic1", "topic2", "topic3", "topic4", "topic5", "topic6", "topic7", };
int num = RoleEnvironment.CurrentRoleInstance.Id.Last() - '0';
var topics = new string[] { allTopics[num * 2], allTopics[num * 2 + 1] };
var sbHelper1 = new SimplifiedServiceBusHelper(serviceBusConnectionString, topics);
var sbHelper2 = new SimplifiedServiceBusHelper(serviceBusConnectionString2, topics);
sbHelper1.Setup();
sbHelper2.Setup();
IEnumerable<SubscriptionClient> subClients = sbHelper1.SubscriptionClients.Union(sbHelper2.SubscriptionClients);
SubscriptionClientsToObservableHelper<LogMessage> helper = new SubscriptionClientsToObservableHelper<LogMessage>(subClients, 100);

string storageConnectionString = RoleEnvironment.GetConfigurationSettingValue("AzureStorage");
_observer = new ConcreteMessageObserver(15000, 100, storageConnectionString);
_observer.Initialize();
_observer.Start(helper);
}


public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 5000;

// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.

return base.OnStart();
}
}

Let’s create a simple console program to populate the Service Bus:

 

 class Program
{

private static string _sb1 = "[Fill Me]";
private static string _sb2 = "[Fill Me]";
private static string[] allTopics = new string[] { "topic0", "topic1", "topic2", "topic3", "topic4", "topic5", "topic6", "topic7", };
static void Main(string[] args)
{
int numSessions = 10000;
int numMessages = 20;

var sbHelper1 = new SimplifiedServiceBusHelper(_sb1, allTopics);
var sbHelper2 = new SimplifiedServiceBusHelper(_sb2, allTopics);
sbHelper1.Setup();
sbHelper2.Setup();

var topicClients = sbHelper1.TopicClients.Union(sbHelper2.TopicClients).ToArray();
var logMessages = new List<LogMessage>();

for (int i = 0; i < numSessions; i++)
{
var sessionId = Guid.NewGuid().ToString();
for (int j = 0; j < numMessages; j++)
{


var msg = new LogMessage()
{
SessionId = sessionId,
LogData = "Message #"+j,
LogMessageId = Guid.NewGuid().ToString(),
TimeStamp = DateTime.Now.AddSeconds(j),
};

logMessages.Add(msg);
}
}

var countQuery = from m in logMessages
group m by (Math.Abs(m.SessionId.GetHashCode()) % topicClients.Length)
into g
select new { Client = topicClients[g.Key], Count = g.Count() };


var query = from m in logMessages
group m by ( Math.Abs(m.SessionId.GetHashCode()) % topicClients.Length)
into g
select new {Client = topicClients[g.Key], Messages = g.ToList()};

var t = from g in query
select Task.Run(async () =>
{
int counter = 0;
var batch = new List<BrokeredMessage>();
foreach (var msg in g.Messages)
{
var json = Newtonsoft.Json.JsonConvert.SerializeObject(msg);
var brokeredMessage = new BrokeredMessage(json);
batch.Add(brokeredMessage);
counter++;

if (counter == 100)
{
while (true)
{
try
{
await g.Client.SendBatchAsync(batch);
break;
}
catch (Exception ex)
{
Trace.TraceError(ex.ToString());
}
await Task.Delay(2000);
}
counter = 0;
batch = new List<BrokeredMessage>();
}
}

if (counter > 0)
{
while (true)
{
try
{
await g.Client.SendBatchAsync(batch);
break;
}
catch (Exception ex)
{
Trace.TraceError(ex.ToString());
}
await Task.Delay(2000);
}
}


});

Task.WhenAll(t).Wait();

return;



}
}

Run your console program and then you can monitor the subscription.

 

 

Publish your cloud service and then watch the numbers go down!  You can verify that the log messages end up in table storage using storage explorer:

 

 

Final Thoughts:

This was my first adventure with reactive extensions and the only thing I am sure of is that I still have a lot to learn.  I will likely revisit this post as my experience with them matures.  I certainly could have used Rx in the console app and I need to better understand Rx error handling.  Despite my neophyte skill level I am still quite impressed with their flexibility.