I’ve been fielding some questions this week on checkpoints in StreamInsight. I’ll share my way of thinking about checkpoints in the hopes that it will help others build applications leveraging the feature. First, I’ll define what a StreamInsight checkpoint represents. Then I’ll explain how to interpret the high-water mark information StreamInsight provides to input and output adapters when a query is resumed after StreamInsight downtime (planned or unplanned). There’s an important piece of trivia here that may surprise even veteran users!
The MSDN documentation outlines three levels of resiliency. In this post I will focus on the strictest resiliency level, but the concepts outlined are relevant for all three.
Think of a StreamInsight query as a black box. It takes a one or more sequences of events as inputs. It produces a sequence of events as output (in the upcoming 2.1 StreamInsight release, multiple output sequences are supported as well) *. It probably comes as no surprise that the black box contains some state. If your query is computing averages over windows, incoming events will contribute to some number of windows. Until the average over a particular window can be committed to the output, sums and counts need to be maintained internally. When you checkpoint a query, you’re really just saving the internal state of the query. It’s that simple!
Well, almost. In addition to the query state, a checkpoint captures the position of input and output sequences as of the checkpoint**. For example, if a checkpoint could speak it might say “between enqueuing input events x2 and x3, and between dequeing output events y2 and y3, this was the state of the query”.
After a StreamInsight server instance has been stopped, a query can be resumed using the checkpoint state. Input and output adapters need to do some work as well. After the checkpoint was taken (but before the server instance was stopped), the input sequence may have progressed (let’s say x3 was enqueued) and the output sequence may have progressed as well (let’s say y3 was dequeued). Ideally, the input adapter would then replay x3 and subsequent events. And the ideal output adapter would forget that it had ever seen y3 or anything after it.
Instead of forgetting – by, say, deleting rows from a table or removing lines from a log file – the output adapter may instead choose to suppress output events it knows have been emitted already. The latter approach is relatively difficult to get right however:
In any case, the rule of thumb when resuming a query:
Input adapters must replay events after the checkpoint; output adapters must forget events after the checkpoint.
StreamInsight provides a high-water mark (HWM, pronounced huh-wim I think) value to input adapters that can be used to determine where in the input sequence a checkpoint occurred (see IHighWaterMarkInputAdapterFactory and IHighWaterMarkTypedInputAdapterFactory). Output adapters get both an HWM value and an offset (see IHighWaterMarkOutputAdapterFactory and IHighWaterMarkTypedOutputAdapterFactory). How can these values be used to identify an element of a sequence? First, let’s define HWM. An event xi has high-water mark value h if its timestamp is h and for all j < i, xj has a lower timestamp. Whenever an event in a sequence has a higher timestamp than any preceding event, it has a HWM value. Assuming that your input sequence is conveniently described by an IEnumerable<PointEvent<T>> sequence xs, you can find the event corresponding to an hwm as follows:
var inputCheckpointEvent = xs.FirstOrDefault(x => x.StartTime == hwm)
Assuming that the output from the query is captured in a (resilient and persistent) List<PointEvent<T>>, and given hwm and offset values for an output sequence, you just need to pop ahead offset positions to find the checkpoint event:
var outputCheckpointEvent = ys .Where(x => x.StartTime >= hwm) .Skip(offset).FirstOrDefault()
I promised a piece of trivia. While I’ve shown you how to find the “checkpoint event”, I haven’t told you where the checkpoint occurred: did it occur before the checkpoint event, or after? It turns out that the answer is before for an input checkpoint event but after for an output checkpoint event. Returning to the earlier example, if the checkpoint occurred between input events x2 and x3, the checkpoint event will be x3, but if the checkpoint occurred between output events y2 and y3, the checkpoint event will be y2! Again using the simple IEnumerable<> contract, I can demonstrate how replay works:
// replay all events after and including the checkpoint event var replayXs = hwm.HasValue ? xs.SkipWhile(x => x.StartTime < hwm.Value) : xs;
When forgetting output events and again assuming (somewhat unrealistically) that a List<> is the external reliable authority on which events have been emitted:
// remove all outputs events after but not including the checkpoint event if (hwm.HasValue) { int hwmEvent = ys .Select((y, i) => new { y.StartTime, Index = i }) .FirstOrDefault(y => y.StartTime == hwm.Value); if (hwmEvent != null) { int forgetIndex = hwmEvent.Index + offset + 1; if (forgetIndex < ys.Count) { ys.RemoveRange(forgetIndex, ys.Count – forgetIndex); } } }
Let’s tie all of this information together with a simple walkthrough. In this example, the query state is described by the input events contributing to the state.
Action
Query state
Output list
Notes
Enqueue x1
{ x1 }
{ y1 }
Begin checkpoint
Waiting for HWM event on input
Enqueue x2
{ x1, x2 }
{ y1, y2 }
Let’s assume x2 does not increase the HWM, so the checkpoint cannot be initiated yet
Enqueue x3
x3 is the input checkpoint event, current query state is captured before processing x3
(continued)
{ x1, x2, x3 }
{ y1, y2, y3 }
y2 is the output checkpoint event, as it is the last event before the checkpoint
End checkpoint
Checkpoint <{ x1, x2 }, x3, y2> committed to disk
Enqueue x4
{ x1, x2, x3, x4 }
{ y1, y2, y3, y4 }
Downtime!
Resume
StreamInsight recovers query state, output adapter forgets all output events after y2
Input replays from x3
…
Notice that by replaying input events and forgetting output events, we see correct output after resuming the query!
* Notice that I’m talking about “sequences of events” rather than “temporal streams”. While StreamInsight operators are defined in terms of temporal streams – sequences of timestamped events with ordering constraints imposed by common time increments (or CTIs) – a checkpoint can be understood relative to generic sequences.
** StreamInsight doesn’t actually freeze the query operators, inputs and outputs in order to take a snapshot of the internal state, but this is a useful and accurate way of characterizing the logical contract for checkpoints. Details of the internal mechanism aren’t covered in this blog post.