There is an event processing pattern that seems to be essential for many scenarios, so I thought I should write about it from the StreamInsight perspective. It’s pretty simple: Looking at a series of point events, I’d like to analyze each event with respect to its predecessor. That could, for instance, mean to simply calculate the time between each event’s value and the next one, or the delta between them:

image

Now, there are a few details that we need to clarify. First, there is the question how long we want to wait for the second event of each such pair to arrive, so that we can compare it to the previous one and apply the desired computation. We might wait indefinitely, or have a timeout. Further down, I’ll talk about the practical implications of this decision. Second, we need to ask ourselves what the temporal properties of the output should be. Well, the first time we can carry out our pair-wise computation is as soon as we have received the second event of the pair. This suggests to create a result stream which contains point events with the same time stamps as the original input events, but those results points now express information about the current and the previous event:

image

Alright, so we have defined our semantics in terms of computation and temporal shape, and we have identified the timeout-parameter. How does that look as a StreamInsight query? Calculating a result based on a set (here: two) of events can be done in two ways:

  • applying a set-based operator (usually an aggregate) on a window that defines the set, or
  • joining two streams that contain the input events in a way so that they overlap.

Using the first approach for our purpose here means to define a window that contains each event pair, and that can be accomplished with the StreamInsight count window, using a count of two. However, then we’d need to implement our computation programmatically as a user-defined aggregate. I’d like to avoid that, as long as I have a way to express my query purely with built-in operators (easier to maintain, cleaner to express, no separate compilation necessary, possibly better performance). So let’s look at the second option: how can I turn my original stream into a shape so that each event joins exactly with the next one, but nothing else? In other words: how can we make each event overlap with just the next one?

We are actually almost there: Evidently, we need to extend each event’s life time to the end of the following event, but not further. This can be accomplished by three simple steps:

  • “sufficiently” extending each event’s life time to at least overlap the next one,
  • clipping these extended events to the respective next event, and
  • since the clipping will cause the events to end right before their successors, we just move them one tick ahead.

The following diagram shows the result of the first two steps:

image

And here is the effect of the third step:

image

The join will now “see” the event pairs as intended so that we can carry out the desired computation in the join’s projection. The overlap of the modified and the original stream corresponds exactly to the second event of each pair in the original stream, according to the desired semantics as discussed above:

image

There you have your event couples!

Here is the corresponding LINQ query:

  1. var pairs = from s in source
  2.                .AlterEventDuration(x => TimeSpan.MaxValue)
  3.                .ClipEventDuration(source,
                                      (x1, x2) => x1.SourceId == x2.SourceId)
  4.                .ShiftEventTime(x => x.StartTime + TimeSpan.FromTicks(1))
  5.             join e in source
  6.             on s.SourceId equals e.SourceId
  7.             select new
  8.             {
  9.                 e.SourceId,
  10.                 Delta = e.Value - s.Value,
  11.             };


The timeout that we used to expand each of the original events (infinite timeout in the above sample) determines for how long we are willing to keep each of these expanded events in StreamInsight’s memory while waiting for the next event. If the timeout is finite, the event might not reach up to the next one and hence not join. We could, for instance, assume that if I didn’t get a sensor reading for more than 24h, something is wrong with the device and I don’t care about the result of this query anymore, but have a bigger problem.

Attached is an end-to-end sample for LINQPad (StreamInsight plugin required – see here).

Regards,
Roman