Over the past few weeks, I’ve spent time building a handful of applications using the new sequence integration APIs in StreamInsight 1.1. I think StreamInsight veterans will be pleasantly surprised at the seamlessness of the experience! If you’re new to StreamInsight, now’s your chance to quickly build a temporally aware application. In this post, I’ll walk through five components of a typical end-to-end StreamInsight query, from event source to event sink.
First, take some time to download the latest version and kick the tires:
IObservable<>
Now you're ready to build a StreamInsight application. A few considerations:
Microsoft.ComplexEventProcessing
Microsoft.ComplexEventProcessing.Observable
Microsoft.ComplexEventProcessing.Linq
Now we'll travel downstream, using the SequenceIntegration\Northwind sample as our guide.
Event sources for a query can be based on custom input adapters, other StreamInsight queries, or – new in version 1.1 – .NET IObservable<> and IEnumerable<> sequences. The good news: in the .NET world, IEnumerable<> pull-based sequences are pervasive. SQL, OData, Sharepoint, you name it. Better news from the perspective of a Complex Event Processing system: asynchronous and push-based sequences can be easily exposed via the IObservable<> interface, particularly if you take advantage of the .NET Reactive Framework (Rx).
IEnumerable<>
In this simple example, we use two OData service queries as our event sources:
NorthwindEntities northwind = new NorthwindEntities( new Uri("http://services.odata.org/Northwind/Northwind.svc")); // Issue OData queries to determine start and end times for orders. // So that the sources behave like temporal streams, we order by the // corresponding dates. var ordersWithRegions = from o in northwind.Orders where o.ShipRegion != null select o; var orderStartTimes = from o in ordersWithRegions where o.OrderDate != null orderby o.OrderDate select new { StartTime = (DateTime)o.OrderDate, o.OrderID, o.ShipRegion }; var orderEndTimes = from o in ordersWithRegions where o.ShippedDate != null orderby o.ShippedDate select new { EndTime = (DateTime)o.ShippedDate, o.OrderID };
The orderStartTimes and orderEndTimes queries implement IEnumerable<> which makes them suitable event sources for StreamInsight.
orderStartTimes
orderEndTimes
An IObservable<> or IEnumerable<> event source can feed a temporal stream. A temporal stream is a sequence of events annotated with temporal information: timestamps for events and punctuation indicating when a particular point in time has been committed. In the above example, we have two sources orderStartTimes and orderEndTimes each including timestamp fields – StartTime and EndTime respectively – as well as a commitment based on orderby clauses that events timestamps are monotonic. We describe these temporal characteristics to StreamInsight using the ToPointStream method:
StartTime
EndTime
orderby
ToPointStream
// Map OData queries to StreamInsight inputs var startStream = orderStartTimes.ToPointStream(orderApp, s => PointEvent.CreateInsert(s.StartTime, s), AdvanceTimeSettings.IncreasingStartTime); var endStream = orderEndTimes.ToPointStream(orderApp, e => PointEvent.CreateInsert(e.EndTime, e), AdvanceTimeSettings.IncreasingStartTime);
The arguments to the ToPointStream extension method are described below:
s
s.StartTime
PointEvent.CreateInsert
Note that there are several variations on the To*Stream method supporting IObservable<> or IEnumerable<> event sources and the shaping of point, interval or edge data.
To*Stream
Take a look at Cip’s Time in StreamInsight series for more information.
Now that we have described the temporal characteristics of our event sources, we can compose a StreamInsight query. I’ll simply copy the code here without too much explanation since I’m focused on data ingress and egress in this post:
// Use clip to synthesize events lasting from the start of each order to the end // of each order. var clippedStream = startStream .AlterEventDuration(e => TimeSpan.MaxValue) .ClipEventDuration(endStream, (s, e) => s.OrderID == e.OrderID); // Count the number of coincident orders per region var counts = from o in clippedStream group o by o.ShipRegion into g from win in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { ShipRegion = g.Key, Count = win.Count() }; // Display output whenever there are more than 2 active orders in a region. const int threshold = 2; var query = from c in counts where c.Count > threshold select c;
Creating an event sink is straightforward. Several extension methods support the transformation of a temporal query (CepStream<>) to an event sink, with support for permutations of IObservable or IEnumerable sequences and TPayload, PointEvent<TPayload>, IntervalEvent<TPayload> or EdgeEvent<TPayload> elements. In the following example, we translate the query to a sequence of interval events using ToIntervalEnumerable. We then filter out insert events – skipping CTI punctuation – and project out relevant temporal and payload fields:
CepStream<>
IObservable
IEnumerable
TPayload
PointEvent<TPayload>
IntervalEvent<TPayload>
EdgeEvent<TPayload>
ToIntervalEnumerable
// Map the query to an IEnumerable sink var sink = from i in query.ToIntervalEnumerable() where i.EventKind == EventKind.Insert select new { i.StartTime, i.EndTime, i.Payload.Count, i.Payload.ShipRegion };
Now that we’re back in the world of .NET sequences, there are many possibilities for consuming the results. For now, we’ll just write the event sink contents to the console:
foreach (var r in sink) { Console.WriteLine(r); }
Interestingly, calling GetEnumerator on sink triggers a sequence of actions:
GetEnumerator
sink
StreamInsight queries now composes seamlessly with other LINQ providers! In fact, if you review the code above you can see that we’ve leveraged LINQ to OData, LINQ to Objects, and LINQ to StreamInsight. The SequenceIntegration samples linked above illustrate some other integration possibilities as well:
There – you have the five easy pieces! Comes with a side-order of toast.
What is NorthwindEntities ???? Can't find it.
NorthwindEntities is an Entity Framework data context backed by the SQL Server Northwind database. It is included in sequence integration sample project that can be downloaded @streaminsight.codeplex.com/.../46435.