StreamInsight Query Pattern: Find the top category (using Order By, Take and ApplyWithUnion)

This is a question that has come up in a few customer engagements, and on several forum posts – how do I find the top category in a stream for a given group?  Several examples of this pattern of question are:

  • Given a stream of Product View events from a website, each containing a user id and product category
    • How do I determine the most popular category for each user in the past 8 hours?
    • How do I determine the top 2 most popular categories in the past 8 hours?
  • Given a stream of Sensor Data events from a process data collection system, each containing a sensor id and a sensor reading (for example, a power meter reading)
    • How do I determine the highest reading for each meter in the past hour?
    • How do I determine the meters with the three highest readings in the past hour?

Writing a StreamInsight query to answer this question typically follows one of two patterns:

  • Top X by grouping
  • Top X

FilesDownload_32x32Note: all of the source code, sample data, queries etc for this blog post may be found here.  Please download and follow along.

Find Top X

The 2-step process for finding the Top X is detailed below.  For the purposes of this example, we’ll be using the question How do I determine the top 2 most popular categories in the past 8 hours with some sample data and results.

  • Create the initial aggregate. In the case of the product view question, this would be a window of events containing count by category in the past 8 hours.
  • Order by, Take X.  Sort the initial aggregates and take the top 2 ranked results.

Ok, sounds reasonable – what does this look like in terms of a StreamInsight query?

Code Snippet

  1. // Determine the page view count (by category)
  2. // in the past eight hours
  3. var categoryCount = from e in pageViews
  4.     group e by e.SkuCategory into skuGroups
  5.     from win in skuGroups.TumblingWindow(
  6.         TimeSpan.FromHours(8),
  7.         HoppingWindowOutputPolicy.ClipToWindowEnd)
  8.     select new
  9.     {
  10.         SkuCategory = skuGroups.Key,
  11.         Count = win.Count()
  12.     };

This query creates the initial aggregates, being the window of time (start time / end time) and a set of events containing the SKU category and count.

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM  Zune 1  
User Counts 12:00:00 AM 8:00:00 AM  XBox 5  
User Counts 12:00:00 AM 8:00:00 AM  DVD 6  

Then we’ll take a snapshot of the initial aggregates window, sort by the count and take the top 2 results.

Code Snippet

  1. // From the output of categoryCount, order by the Count
  2. // field and take the top 2 ranked events
  3. var topCategories = (from win in categoryCount
  4.         .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  5.     from e in win
  6.     orderby e.Count descending
  7.     select e).Take(2);

Resulting in the answer to our question:

 Start Time  End Time  SkuCategory  Count
12:00:00 AM 8:00:00 AM  XBox 5
12:00:00 AM 8:00:00 AM  DVD 6

Putting it all together:

Code Snippet

  1. // Determine the page view count (by category)
  2. // in the past eight hours
  3. var categoryCount = from e in pageViews
  4.     group e by e.SkuCategory into skuGroups
  5.     from win in skuGroups.TumblingWindow(
  6.         TimeSpan.FromHours(8),
  7.         HoppingWindowOutputPolicy.ClipToWindowEnd)
  8.     select new
  9.     {
  10.         SkuCategory = skuGroups.Key,
  11.         Count = win.Count()
  12.     };
  13.  
  14. // From the output of categoryCount, order by the Count
  15. // field and take the top 2 ranked events
  16. var topCategories = (from win in categoryCount
  17.     .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  18.                     from e in win
  19.                     orderby e.Count descending
  20.                     select e).Take(2);

Find Top X by Grouping

The 3-step process for finding the Top X by Grouping is detailed below.  For the purposes of this example, we’ll be using the question How do I determine the most popular category for each user in the past 8 hours with some sample data and results.

  • Create the initial aggregate. In the case of the product view question, this would be a window of events containing count by category and user id in the past 8 hours.
  • Group the initial aggregates.  In order to determine the top X by group, we need to re-group the aggregate results.  In the case of the question How do I determine the most popular category for each user in the past 8 hours we will need to group by user before sorting and selecting the top X.
  • Order by, Take X.  Once we have our sub-groups created, we’ll sort each of them and take the top X ranked results.

Ok, sounds reasonable – what does this look like in terms of a StreamInsight query?

Code Snippet

  1. // Determine the page view count (by user and by category)
  2. // in the past eight hours
  3. var categoryCountByUser = from e in pageViews
  4.     group e by new { e.UserId, e.SkuCategory } into skuGroups
  5.     from win in skuGroups.TumblingWindow(
  6.         TimeSpan.FromHours(8),
  7.         HoppingWindowOutputPolicy.ClipToWindowEnd)
  8.     select new
  9.     {
  10.         UserId = skuGroups.Key.UserId,
  11.         SkuCategory = skuGroups.Key.SkuCategory,
  12.         Count = win.Count()
  13.     };

This query creates the initial aggregates, being the window of time (start time / end time) and a set of events containing the user Id, SKU category and count.

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM 1  DVD 4
User Counts 12:00:00 AM 8:00:00 AM 1  XBox 1
User Counts 12:00:00 AM 8:00:00 AM 1  Zune 1
User Counts 12:00:00 AM 8:00:00 AM 2  DVD 2
User Counts 12:00:00 AM 8:00:00 AM 2  XBox 4

Next we’ll re-group this window of events before performing the order by / take X:

Code Snippet

  1. // Take these events and re-group them into user groups
  2. var userGroups = from e in categoryCountByUser
  3.                     group e by e.UserId;

This logically splits the window of events up into groups based on the user ID.

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM 1  DVD 4
User Counts 12:00:00 AM 8:00:00 AM 1  XBox 1
User Counts 12:00:00 AM 8:00:00 AM 1  Zune 1
           
User Counts 12:00:00 AM 8:00:00 AM 2  DVD 2
User Counts 12:00:00 AM 8:00:00 AM 2  XBox 4

Finally, we use the ApplyWithUnion operator to perform an order by / take operation on each individual group, then union the results back together.

Code Snippet

  1. // (a) Apply this query to each group
  2. var topCategoriesByUser = userGroups.ApplyWithUnion(
  3.     // (b) Create a snapshot window over the results of the previous tumbling window
  4.     applyIn => (from win in applyIn                                 
  5.                     .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  6.                 // (c) for the events in the window
  7.                 from e in win
  8.                 // (d) sort by the count field, largest to smallest
  9.                 orderby e.Count descending
  10.                 // (e) take the top ranked event(s)
  11.                 select e).Take(1),
  12.     // (f) the new event type will consist of the grouping key (user id),
  13.     // the sku category and the event count
  14.     e => new { UserId = e.Key, SkuCategory = e.Payload.SkuCategory,
  15.         Count = e.Payload.Count } );

Resulting in the answer to our question:

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM 1  DVD 4
User Counts 12:00:00 AM 8:00:00 AM 2  XBox 4

This syntax can be a little overwhelming at first glance, so let’s break it down and examine what’s going on.

  • (a) Apply this query to each group.  The ApplyWithUnion operator performs the ‘apply’ query to each group, then unions the results back into a single stream.  This can be thought of as the StreamInsight equivalent to foreach (var g in group) { do stuff }
  • (b) Create a snapshot window over the results of the previous tumbling window.  We use windows when we want to look at sets of events (as defined by a window of time).  In this case, we want to look at each window of events from the Tumbling window in the previous query (which has been grouped by user id) to perform some sorting.
  • (c) For the events in the window, (d) sort by the count field, largest to smallest.  Here we look at the events in the group, and sort them by the e.Count field in descending order.
  • (e) Take the top ranked event(s) .  Remember that Take doesn’t mean take the first event, it means take the first rank of events.  For example, if both DVD and XBox views happened to be four (4) for a given user in the same time window, which one of them is “largest”.  Since they would both occupy the top rank, both events would be returned from the query. 
    • It would be the responsibility of the consuming system to determine how to handle situations where more than one event occupies the top rank (and depending on the business logic could be as simple as taking the first event, or displaying both, etc).
  • ( f ) The new event type will consist of the grouping key (user id), sku category and event count.  The events emitted from the group and apply branch have two fields – Key and Payload.  Attempting to pass the event directly out of the stream would result in passing a nested event, causing an exception, hence the e => new {} projection.

Putting it all together:

Code Snippet

  1. // Determine the page view count (by user and by category)
  2. // in the past eight hours
  3. var categoryCountByUser = from e in pageViews
  4.     group e by new { e.UserId, e.SkuCategory } into skuGroups
  5.     from win in skuGroups.TumblingWindow(
  6.         TimeSpan.FromHours(8),
  7.         HoppingWindowOutputPolicy.ClipToWindowEnd)
  8.     select new
  9.     {
  10.         UserId = skuGroups.Key.UserId,
  11.         SkuCategory = skuGroups.Key.SkuCategory,
  12.         Count = win.Count()
  13.     };
  14.  
  15. // Take these events and re-group them into user groups
  16. var userGroups = from e in categoryCountByUser
  17.                     group e by e.UserId;
  18.  
  19. // Determine the top ranked category for each user
  20. var topCategoriesByUser = userGroups.ApplyWithUnion(
  21.     applyIn => (from win in applyIn                                 
  22.                     .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  23.                 from e in win
  24.                 orderby e.Count descending
  25.                 select e).Take(1),
  26.     e => new { UserId = e.Key, SkuCategory = e.Payload.SkuCategory,
  27.         Count = e.Payload.Count } );

Summary

StreamInsight provides rich capabilities for ranking (TopK) events within a window (or groups within a window) by using the OrderBy and TopK (Take) methods.  These can be used, in conjunction with the appropriate windows and aggregates to find trends, “hot” patterns and other interesting occurrences within event streams.

Key References