Let’s say there are several agents – e.g. devices – producing temporal streams. It may be interesting to merge these sequences into a single stream that can be processed by StreamInsight. The “union” operator allows you to merge a fixed number of inputs, but what happens when inputs come and go over time? Some policy is needed that can make sense of these dynamic inputs since they may disagree on how time is progressing. A “correct” policy that doesn’t propagate a CTI that might later be violated is no good: some new input can always come on line with an earlier CTI value so such a policy doesn’t allow any CTIs through. A “loose” policy that propagates all CTIs is useless because it assumes that all inputs are progressing in lockstep – implausible given network latency, clock drift, etc.

Temporal stream: a sequence of timestamped events. An event may be a Current Time Increment (CTI) event. A CTI is a promise that no subsequent events will have timestamps lower than that of the CTI.

A possible Goldilocks policy: allow for some maximum deviation between the most and least advanced input (let’s call this timespan the delay). Whenever a CTI event from any input is processed – and whenever that CTI is greater than any seen so far – subtract the delay so that the CTI doesn’t need to disqualify events from a less advanced input. If after delaying the CTI an incoming event still violates the CTI, drop or adjust the event.

Transform Subject

The ISubject<,> contract allows us to encapsulate the policy described above. Multiple producers can feed the subject (via calls to On*). Each producer can come and go at its own pace. How can we implement such a subject? Let’s start with a helper factory method that allows us to create a “transform subject”. This subject applies an arbitrary function to an input observable, where that function may represent a stateful computation.

 1: using System;
 2: using System.Reactive.Disposables;
 3: using System.Reactive.Linq;
 4: using System.Reactive.Subjects;
 5: public static class TransformSubject
 6: {
 7:     /// <summary>
 8:     /// Creates subject that supports transformation of incoming events.
 9:     /// </summary>
 10:     public static ISubject<TIn, TOut> Create<TIn, TOut>(Func<IObservable<TIn>, IObservable<TOut>> transform)
 11:     {
 12:         if (null == transform) { throw new ArgumentNullException("transform"); }
 13:         return new _<TIn, TOut>(transform);
 14:     }
 15:     sealed class _<TIn, TOut> : ISubject<TIn, TOut>, IDisposable
 16:     {
 17:         readonly IObserver<TIn> input;
 18:         readonly IObservable<TOut> output;
 19:         readonly IDisposable subscription;
 20:         public _(Func<IObservable<TIn>, IObservable<TOut>> transform)
 21:         {
 22:             var inputSubject = new Subject<TIn>();
 23:             var outputConnectable = transform(inputSubject).Publish();
 24:             this.subscription = new CompositeDisposable(inputSubject, outputConnectable.Connect());
 25:             this.input = inputSubject;
 26:             this.output = outputConnectable;
 27:         }
 28:         public void OnCompleted()
 29:         {
 30:             this.input.OnCompleted();
 31:         }
 32:         public void OnError(Exception error)
 33:         {
 34:             this.input.OnError(error);
 35:         }
 36:         public void OnNext(TIn value)
 37:         {
 38:             this.input.OnNext(value);
 39:         }
 40:         public IDisposable Subscribe(IObserver<TOut> observer)
 41:         {
 42:             return this.output.Subscribe(observer);
 43:         }
 44:         public void Dispose()
 45:         {
 46:             this.subscription.Dispose();
 47:         }
 48:     }
 49: }
 50:  

Notice that this subject encapsulates two other subjects. One represents the input, a plain old Subject<> that allows input events to be passed to the transform logic. The other represents the output, a connectable observable that allows multiple consumers to read the output of the transform. When the subject is disposed, both the input subject and the output connection are released. A simple example of a transform subject that adds one to every incoming integer:

TransformSubject.Create((IObservable<int> xs) => xs.Select(x => x + 1))

CTI Synchronizing Subject (Round 1)

We can create a “CTI synchronizing” subject by constructing a transform subject where the transform logic tracks the highest CTI seen so far and modifies CTIs and drops events as needed:

 1: using System;
 2: using System.Reactive.Linq;
 3: using System.Reactive.Subjects;
 4: using Microsoft.ComplexEventProcessing;
 5:  
 6: /// <summary>
 7: /// The subject takes as input zero or more temporal streams. It outputs a temporal stream by applying
 8: /// a delay to all incoming CTIs and dropping any events that violate the CTI. The delay represents the
 9: /// maximum expected divergence between the most advanced and least advanced input stream.
 10: /// </summary>
 11: public static class CtiSynchronizingSubject
 12: {
 13:     public static ISubject<PointEvent<T>, PointEvent<T>> CreatePoint<T>(TimeSpan delay)
 14:     {
 15:         return Create<PointEvent<T>, T>(delay, p => p.StartTime, t => PointEvent<T>.CreateCti(t));
 16:     }
 17:  
 18:     public static ISubject<IntervalEvent<T>, IntervalEvent<T>> CreateInterval<T>(TimeSpan delay)
 19:     {
 20:         return Create<IntervalEvent<T>, T>(delay, i => i.StartTime, t => IntervalEvent<T>.CreateCti(t));
 21:     }
 22:  
 23:     // Edge events are not currently implemented. Care is needed to avoid unmatched start edges in the output
 24:     // stream.
 25:  
 26:     static ISubject<TEvent, TEvent> Create<TEvent, TPayload>(TimeSpan delay, Func<TEvent, DateTimeOffset> getApplicationTime, Func<DateTimeOffset, TEvent> createCti)
 27:         where TEvent : TypedEvent<TPayload>
 28:     {
 29:         if (delay < TimeSpan.Zero) { throw new ArgumentOutOfRangeException("delay", "Delay must be non-negative."); }
 30:  
 31:         return TransformSubject.Create((IObservable<TEvent> input) =>
 32:         { 
 33:             // Delay is applied to all CTIs to allow less advanced inputs to contribute events without violating
 34:             // CTIs from more advanced inputs.
 35:             var delayed = from e in input
 36:                           select e.EventKind == EventKind.Cti
 37:                               ? createCti(SafeSubtract(getApplicationTime(e), delay))
 38:                               : e;
 39:  
 40:             // The highest (delayed) CTI seen so far is tracked in the Scan accumulator so that only CTIs that advance time
 41:             // can be emitted and so that events that violate those CTIs can be dropped.
 42:             var withHighCtis = delayed.Synchronize().Scan(
 43:                 new 
 44:                 { 
 45:                     HighCti = default(DateTimeOffset), 
 46:                     Evt = default(TEvent), 
 47:                 },
 48:                 (accumulator, evt) => new
 49:                 {
 50:                     HighCti = evt.EventKind != EventKind.Cti ? accumulator.HighCti : Max(accumulator.HighCti, getApplicationTime(evt)),
 51:                     Evt = evt,
 52:                 });
 53:  
 54:             return from t in withHighCtis
 55:                    where t.HighCti <= getApplicationTime(t.Evt)
 56:                    select t.Evt;
 57:         });
 58:     }
 59:  
 60:     static DateTimeOffset SafeSubtract(DateTimeOffset x, TimeSpan y)
 61:     {
 62:         return x.UtcTicks >= y.Ticks ? x - y : DateTimeOffset.MinValue;
 63:     }
 64:  
 65:     static DateTimeOffset Max(DateTimeOffset x, DateTimeOffset y)
 66:     {
 67:         return x > y ? x : y;
 68:     }
 69: }

In case you haven’t encountered them before, a couple of Rx operators are worth calling out. First, we apply the Synchronize operator which serializes all incoming events – allows us to avoid any race conditions due to inputs running on different threads. Second, the Scan operator allows us to define an accumulator that tracks the highest CTI seen so far. If an event has a timestamp that precedes the highest CTI seen so far, it must be dropped to avoid a CTI violation.

Let’s look at how the subject behaves given interleaved events from two simulated input sources:

 1: var startTime = new DateTimeOffset(DateTime.Today);
 2:  
 3: // create sync subject (and monitor its output)
 4: var ctiSyncSubject = CtiSynchronizingSubject.CreatePoint<double>(TimeSpan.FromMinutes(5));
 5:  
 6: using (ctiSyncSubject.Subscribe(p =>
 7: {
 8:        int t = (p.StartTime - startTime).Minutes;
 9:        if (p.EventKind == EventKind.Cti) Console.WriteLine("OUTPUT CTI: time {0}", t);
 10:        else Console.WriteLine("OUTPUT Insert: time {0}, payload {1}", t, p.Payload);
 11: }))
 12: {
 13:        // helpers feeding input to the subject 
 14:        Action<string, int, double> insert = (l, t, p) =>
 15:        {
 16:               Console.WriteLine("INPUT Insert: input {0}, time {1}, payload {2}", l, t, p);
 17:               ctiSyncSubject.OnNext(PointEvent.CreateInsert(startTime.AddMinutes(t), p));
 18:        };
 19:        Action<string, int> cti = (l, t) =>
 20:        {
 21:               Console.WriteLine("INPUT CTI: input {0}, time {1}", l, t);
 22:               ctiSyncSubject.OnNext(PointEvent.CreateCti<double>(startTime.AddMinutes(t)));
 23:        };
 24:        
 25:        // simulate interleaved input from two sources where input "A" is ahead of input "B"
 26:        insert("A", 5, 1.0);
 27:        insert("B", 0, 2.0);
 28:        cti("A", 5);
 29:        cti("B", 0);
 30:        insert("A", 7, 3.0);
 31:        cti("A", 8);
 32:        insert("B", 1, 4.0);
 33: }

 

The following output is produced:

INPUT Insert: input A, time 5, payload 1
OUTPUT Insert: time 5, payload 1
INPUT Insert: input B, time 0, payload 2
OUTPUT Insert: time 0, payload 2
INPUT CTI: input A, time 5
OUTPUT CTI: time 0
INPUT CTI: input B, time 0
INPUT Insert: input A, time 7, payload 3
OUTPUT Insert: time 7, payload 3
INPUT CTI: input A, time 8
OUTPUT CTI: time 3
INPUT Insert: input B, time 1, payload 4

CTI Synchronizing Subject (Round 2)

A different implementation of the subject that is written in a more imperative style now. Instead of using the Scan operator to maintain state, a custom accumulator class is used:

 1: using System;
 2: using System.Reactive.Linq;
 3: using System.Reactive.Subjects;
 4: using Microsoft.ComplexEventProcessing;
 5:  
 6: /// <summary>
 7: /// The subject takes as input zero or more temporal streams. It outputs a temporal stream by applying
 8: /// a delay to all incoming CTIs and dropping any events that violate the CTI. The delay represents the
 9: /// maximum expected divergence between the most advanced and least advanced input stream.
 10: /// </summary>
 11: /// <remarks>
 12: /// In this alternate version of the subject, state is maintained using a custom accumulator
 13: /// object rather than using the Rx Scan operator.
 14: /// </remarks>
 15: public static class CtiSynchronizingSubjectAlternate
 16: {
 17:     public static ISubject<PointEvent<T>, PointEvent<T>> CreatePoint<T>(TimeSpan delay)
 18:     {
 19:         return Create<PointEvent<T>, T>(delay, p => p.StartTime, t => PointEvent<T>.CreateCti(t));
 20:     }
 21:  
 22:     public static ISubject<IntervalEvent<T>, IntervalEvent<T>> CreateInterval<T>(TimeSpan delay)
 23:     {
 24:         return Create<IntervalEvent<T>, T>(delay, i => i.StartTime, t => IntervalEvent<T>.CreateCti(t));
 25:     }
 26:  
 27:     // Edge events are not currently implemented. Care is needed to avoid unmatched start edges in the output
 28:     // stream.
 29:  
 30:     static ISubject<TEvent, TEvent> Create<TEvent, TPayload>(TimeSpan delay, Func<TEvent, DateTimeOffset> getApplicationTime, Func<DateTimeOffset, TEvent> createCti)
 31:         where TEvent : TypedEvent<TPayload>
 32:     {
 33:         if (delay < TimeSpan.Zero) { throw new ArgumentOutOfRangeException("delay", "Delay must be non-negative."); }
 34:  
 35:         var accum = new Accumulator<TEvent, TPayload>(delay, getApplicationTime, createCti);
 36:  
 37:         return TransformSubject.Create((IObservable<TEvent> input) =>
 38:             input.Synchronize().Select(accum.Process).Where(e => e != null));
 39:     }
 40:  
 41:     sealed class Accumulator<TEvent, TPayload>
 42:         where TEvent : TypedEvent<TPayload>
 43:     {
 44:         readonly TimeSpan delay;
 45:         readonly Func<TEvent, DateTimeOffset> getApplicationTime; 
 46:         readonly Func<DateTimeOffset, TEvent> createCti;
 47:  
 48:         DateTimeOffset? highCti;
 49:  
 50:         public Accumulator(TimeSpan delay, Func<TEvent, DateTimeOffset> getApplicationTime, Func<DateTimeOffset, TEvent> createCti)
 51:         {
 52:             this.delay = delay;
 53:             this.getApplicationTime = getApplicationTime;
 54:             this.createCti = createCti;
 55:         }
 56:  
 57:         public TEvent Process(TEvent evt)
 58:         {
 59:             if (null != evt)
 60:             {
 61:                 var appTime = this.getApplicationTime(evt);
 62:                 if (evt.EventKind == EventKind.Cti)
 63:                 {
 64:                     DateTimeOffset delayedStartTime = appTime.UtcTicks > delay.Ticks ? appTime - delay : DateTimeOffset.MinValue;
 65:                     if (!highCti.HasValue || delayedStartTime > highCti.Value)
 66:                     {
 67:                         highCti = delayedStartTime;
 68:                         return createCti(delayedStartTime);
 69:                     }
 70:                     else
 71:                     {
 72:                         return null;
 73:                     }
 74:                 }
 75:                 else if (highCti.HasValue && highCti > appTime)
 76:                 {
 77:                     return null;
 78:                 }
 79:                 else
 80:                 {
 81:                     return evt;
 82:                 }
 83:             }
 84:             else
 85:             {
 86:                 return null;
 87:             }
 88:         }
 89:     }
 90: }