Joint Forces

Joint Forces

  • Comments 1

Finally – after way too much time, I am starting my own blog about StreamInsight. As some of you might know, I already participate in the StreamInsight Team Blog, comfortably hiding behind its relative anonymity. Here and there, I replied to threads in the StreamInsight forum, realizing that similar questions about the product arise repeatedly. Now I want to join the ranks of our team members Colin and Mark, two brilliant sources of StreamInsight knowledge, and thus encourage others in the team (or even the broader user community) to unleash their StreamInsight wisdom to the world as well.

“Join” was the keyword – that’s going to be my first topic. More precisely, using the join operator for a simple pattern matching scenario. How can I detect whether event A was followed by event B within a certain time period? The scenario doesn’t sound like rocket science, and neither is its StreamInsight LINQ implementation. I’ll demonstrate how to use a timestamp modification and a join to make this work.

First of all, we need to precisely define the semantics of the scenario. This necessity is of course not specific to StreamInsight, but due to the declarative nature of its query language the benefits of a precise specification are even more apparent. Say, we are expecting a stream of point events, some of which we identify as events of type A, and others of type B. Let’s assume we have a “Value” field of some measurement in our events’ payloads, and we define A as those events with a Value < 30 and B as those with Value > 60. And we define the time period as one minute. We are now looking for events B that occur within 1 minute of event A. In other words, with each such occurrence of B, we want to generate some output event (maybe interpreted as an alarm, or a notification). Let’s draw this as a timeline diagram:


The first input event, having a value of 22, qualifies as an event of type A in our use case, but no event of type B follows within 1 minute. But the pattern is fulfilled later, when the value 71 follows value 7 within one minute. So there is our result event. We now have defined the desired output stream against the input stream. Onwards to the query!

The first thing we will write in LINQ are the substreams for A and B:

var streamA = from e in source
              where e.Value < 30
              select e;

var streamB = from e in source
              where e.Value > 60
              select e;

Apparently, our result event with always be collocated with our events B, but only for those that had an event A within 1 minute in their “past”. And here comes the power of timestamp modifications: We can change the lifetime of all events A and then find out where these modified events overlap with events B (here, the event end time gets modified, resulting in a changed event lifetime). Of course, we use a new lifetime of 1 minute:



As you can easily see now, the extended “22” event does not reach up to any event in streamB, while the extended “7” event does. And this overlap can be determined by the join operator, whose semantics prescribe that output is produced only for those periods where both inputs overlap (in addition to the optional join predicate over the event payloads). The diagram shows the join of streamB with the modified streamA:


One benefit of the join operation is that we can produce a new payload based on both join inputs, hence we can include both payload values there.

Here is the according StreamInsight LINQ statement:

var result = from a in streamA.AlterEventDuration(e => TimeSpan.FromMinutes(1))
             from b in streamB select new
             { ValueA = a.Value, ValueB = b.Value }; 

We simply added the lifetime modification to the according input stream as part of the entire join statement. Done!

Couple of aspects are worthwhile to discuss here. First of all, we could have changed the syntax and included the filters right into the join statement, to make the expression more concise. The entire scenario can be expressed as follows:

var result = from a in source
                 .Where(e => e.Value < 30) .AlterEventDuration(e => TimeSpan.FromMinutes(1))
             from b in source
                 .Where(e => e.Value > 60)
             select new
             { ValueA = a.Value, ValueB = b.Value }; 

Second, those of you who know LINQ will ask, but where is the join keyword? Justified question: we are using a cross-join here, since we don’t have any further predicate than just the lifetime overlap, which is implied by the StreamInsight join semantics anyways. And, like “regular” LINQ, a cross-join simply uses two from clauses, but neither a “join”, nor an “on”, which you would need for an equi-join. Similarly, predicates on the join result other than equality can be expressed using the cross-join syntax (two from-clauses) with a where-clause included. This lets us rewrite the previous expression as follows:

var result = from a in source .AlterEventDuration(e => TimeSpan.FromMinutes(1))
             from b in source
             where a.Value < 30 && b.Value > 60
             select new
             { ValueA = a.Value, ValueB = b.Value }; 

Note the subtle difference: Previously, we have filtered the inputs to the join, now we are filtering the output. The difference at runtime could become apparent when my input streams deliver high throughout, but only few events qualify for my filters. Doing the join before the filter could then increase the latency of the query because of the (potentially unnecessary) work the join has to do.

Hope this article gave you some insight into the application of StreamInsight to a typical query pattern. The modification of event timestamps together with the join operator is very powerful and versatile. I will use it in various other query patterns to be discussed here.


  • Glad you started your own blog!  Now you can't blame any bad content on Tiho.

Page 1 of 1 (1 items)
Leave a Comment
  • Please add 6 and 1 and type the answer here:
  • Post