I recently discussed a StreamInsight scenario on the OSIsoft vCampus discussion forum, which included a great and instructive query. Their forum is only available to OSIsoft users, so I’ll walk through the use case here.

Let’s assume we have a number of event sources, each producing data at arbitrary intervals. We would like to compare the current value of each such streams with its average over a period of time (here: 24h). We want to produce a result event if this comparison fulfills some condition for all streams at the same time. The last piece is important here, and we can actually separate it from the rest when we design the query. So we will start with the average and the condition. I’ll work with code snippets from an end-to-end LINQPad sample that you can download at the end of the article.

We start with the computation of the average over the last 24h of each event source (identified by field Id) in our incoming stream:

// compute the 24h average for each Id separately
var avgStream = from e in input
               group e by e.Id into g
               from win in g
                   .AlterEventDuration(e => TimeSpan.FromHours(24))
                   .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
               select new
               {
                   Id = g.Key,
                   Avg = win.Avg(e => e.Value)
               };

No rocket science here. This is a sliding time window, moving along with every new event, so we don’t want to use a Hopping or Tumbling window. Setting the events’ lifetimes to the size of the window and applying the snapshot window implements such a sliding behavior. The window is inside a group, because we want the average per Id. Let’s look at how this result behaves in time (for one specific Id):

image

Next is the check for the condition. In our example, we want to know whether the current value differs from the current average by more than 10%:

// for each Id, compare the average with the real-time value
var spikeTest = from a in avgStream
               join b in input
               on a.Id equals b.Id
               select new
               {
                   a.Id,
                   Spike = Math.Abs(b.Value - a.Avg) > a.Avg * 0.1
               };

We are comparing the raw input stream with the average stream, hence we need a join. Using the Id field for the equality predicate ensures that we are comparing the right things. Note that the snapshot window already produced a continuous stream of intervals, so that there is always something to join with the raw point events. Also, the result of this join will be again point events, with a boolean value assigned to the field Spike that tells us whether the condition was fulfilled or not for each original input event:

image

Again, in this diagram we are just considering one specific Id. Notice something interesting: The average value that we use in our comparison already includes the original value! The fourth event 7 is compared with 4.5, which is based on the last 24h including 7 itself. If this is not desired, we could simply move the Average stream one tick forward, or the original input one tick back in the join.

Now comes the second part of the query, where we want to correlate between Ids. Remember, we want to know at which point in time all sources fulfill the condition. Here is a possible intermediate result for three sources with our query so far:

image

As you can see, there is a point in time when all three Ids report true – not at the same time, but having true as the most recent result. This is when we want output! So apparently, we first have to take care of the sources not being synchronized (at least that’s our assumption). In that case, in order to look across substreams, we need to turn the series of point events into a continuous signal, using the combination of a lifetime extension plus a clipping against the next event (with the same Id):

// turn the result into a continuous signal for each Id:
var spikeSignal = spikeTest
                     .AlterEventDuration(e => TimeSpan.MaxValue)
                     .ClipEventDuration(spikeTest, (e1, e2) => e1.Id == e2.Id);

Now the stream looks like this:

image

The periods when the last value for all three substreams is true is clearly visible now. And I just gave away the query for it: count over a snapshot window all events with true, and produce an output when the count is 3:

// compute the periods where condition is met over all Ids:
var result = from win in spikeSignal
                .Where(e => e.Spike)
                .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
            select new { AllStreamsSpike = win.Count() == 3 };

(I am using the Where extension method to make this expression a little more concise.)

As a diagram:

image

But not so fast. I want to discuss an alternative to the last LINQ statement, one that does not need the number of sources hardcoded. The alternative is based on the equivalence between all:true and !exists:false (excuse my mathematical pseudo-notation). In other words, our desired result can equally be obtained by reporting wherever none of our inputs has the value false:

image

Easy to see. And you know what that means in terms of StreamInsight LINQ: LeftAntiSemiJoin FTW! However, what’s on the left side of the join? After all, StreamInsight cannot invent something (an event) out of nothing. So we need another event that covers the period of investigation. For demonstration purposes I will just use a quick IEnumerable:

var fillstream = new[] { new { dummy = true } }
   .ToIntervalStream(
       Application,
       ev => IntervalEvent.CreateInsert(starttimestamp, endtimestamp, ev),
       AdvanceTimeSettings.StrictlyIncreasingStartTime);

This stream contains a single interval event lasting from a specific start- to a specific end time, with a dummy payload. The fillstream can now serve as the left side of the LASJ:

var result = from f in fillstream
            where (from s in spikeSignal
                   where !s.Spike
                   select s).IsEmpty()
            select new { AllStreamsSpike = true };

Also known as: The principle of double negation in StreamInsight.

(btw: the last projection produces a constant payload, which does not work in V1.1, but in our upcoming release, so hang in there!)

Note that the two solutions for the final statement have something in common: they need that additional piece of information to produce the desired result: either the total number of sources, or the additional filling stream. Just the actual data streams are not enough – imagine the case where one of them doesn’t have any values for some time in the beginning. That stream won’t have any events throughout the query, but we still want to take it into consideration. The overall number of sources is external knowledge, it can’t be derived from the events in general.

Another aspect to think about is how we handle the notion of “the most recent result” here. Notice that our true/false events are accurate when they occur, but not necessarily in the entire time until the next one, because the average changes over that time: old events are falling out of the window, without new ones entering. What we could have done in the spikeTest statement is to use the signal of the original stream as input instead of the point events. Then again, it all depends on your exact semantics. It just shows that time is tricky and that you need to think through your scenario and requirements thoroughly.

Regards,
Roman