So you’ve heard about this shiny StreamInsight thing, perhaps from my last post, and want to build something for yourself and understand how all of this stuff fits together. At this point you should have your Visual Studio environment up and running, along with StreamInsight installed (if you don’t, have a look at this post). Every StreamInsight application consists of the following core components:
In this post we’ll go through building a very simple end to end StreamInsight application consisting of the following components:
Readers familiar with StreamInsight may be thinking “those adapters are part of the standard StreamInsight samples on codeplex”. You’d be absolutely right; I’d like to walk you through how those are built (having been one of the authors :) and talk about some of the design choices.
I’ll first go through a simple end to end query example, using some of the components that we’ll walk through developing later in this post.
1: using System;
2: using System.Collections.Generic;
3: using System.Linq;
4: using System.Text;
5: using Microsoft.ComplexEventProcessing.Linq;
6: using IntroHost.SimulatedInputAdapter;
7: using Microsoft.ComplexEventProcessing;
8: using StreamInsight.Samples.Adapters.OutputTracer;
9:
10: namespace IntroHost
11: {
12: class Program
13: {
14: static void Main(string[] args)
15: {
16: // Initialize the StreamInsight log using the default settings from
17: // the app.config file
18: StreamInsightLog.Init();
19:
20: // Use the StreamInsight "Default" instance
21: string instanceName = "Default";
22:
23: // Embed the StreamInsight engine
24: using (Server cepServer = Server.Create(instanceName))
25: {
26: // Create an application in which to host our queries and adapters
27: Application cepApplication = cepServer.CreateApplication("simple");
28:
29: // Create an input data stream using the SimulatedInput adapter,
30: // raising events of the SimpleEventType type, and using the
31: // SimpleEventTypeFiller class to fill in the payload fields. We
32: // will raise a Point event every 100 ms (10 every second).
33: var input = CepStream<SimpleEventType>.Create("inputStream",
34: typeof(SimulatedInputFactory), new SimulatedInputAdapterConfig()
35: {
36: CtiFrequency = 1,
37: EventPeriod = 100,
38: EventPeriodRandomOffset = 0,
39: TypeInitializer = typeof(SimpleEventTypeFiller).AssemblyQualifiedName
40: },
41: EventShape.Point);
42:
43: // Simple pass through query. Grab the input values, pass to the
44: // output adapter
45: //var query = from e in input select e;
46:
47: // Aggregating query. Average the meter values by meter over a
48: // 3 second window, along with the min, max and number of events
49: // for that meter
50: var query = from e in input
51: group e by e.MeterId into meterGroups
52: from win in meterGroups.HoppingWindow(
53: TimeSpan.FromSeconds(3),
54: TimeSpan.FromSeconds(2),
55: HoppingWindowOutputPolicy.ClipToWindowEnd)
56: select new
57: {
58: meterId = meterGroups.Key,
59: avg = win.Avg(e => e.Value),
60: max = win.Max(e => e.Value),
61: min = win.Min(e => e.Value),
62: count = win.Count()
63: };
64:
65:
66: // Bind the query to an output adapter, writing events to the
67: // console
68: var output = query.ToQuery(cepApplication, "simpleQuery", "A Simple Query",
69: typeof(TracerFactory), new TracerConfig()
70: {
71: DisplayCtiEvents = true,
72: SingleLine = true,
73: TracerKind = TracerKind.Console
74: }, EventShape.Point, StreamEventOrder.FullyOrdered);
75:
76: output.Start();
77:
78: Console.WriteLine("Query active - press <enter> to shut down");
79: Console.ReadLine();
80:
81: output.Stop();
82: }
83: }
84: }
85: }
All of the magic is line 50 to 63, which defines our query. This takes the incoming simulated events, groups them by the meter ID, and creates a window in time that is 3 seconds wide, and advances (creates output) every 2 seconds. The output of this query is the meter ID, along with the average, min and max values.
Executing this application will result in output similar to the following (since the values are randomly generated the details will differ):
2010-08-09 17:12:09,314 INFO - Advance time policy: CTI f 1, CTI offset -00:00:00.0000001, time policy: Adjust Query active - press <enter> to shut down CTI - 12:12:08.000 POINT(12:12:08.000) avg = 57.7530399389036, count = 3, max = 93.6887342453416, meterId = ValveThree, min = 7.61713502351992, POINT(12:12:08.000) avg = 60.2433371172488, count = 2, max = 77.736259381164, meterId = ValveTwo, min = 42.7504148533337, POINT(12:12:08.000) avg = 60.1549154078378, count = 4, max = 84.4917737340982, meterId = ValveOne, min = 13.812115236098, CTI - 12:12:10.000 POINT(12:12:10.000) avg = 46.1892059753599, count = 8, max = 93.6887342453416, meterId = ValveThree, min = 7.61713502351992, POINT(12:12:10.000) avg = 50.7219083455338, count = 6, max = 77.736259381164, meterId = ValveTwo, min = 8.38052840362328, POINT(12:12:10.000) avg = 62.5471921749812, count = 13, max = 94.2385576638573, meterId = ValveOne, min = 3.48205389617107, CTI - 12:12:12.000
Let’s change the SingleLine = true to SingleLine = false, and run the application again:
Query active - press <enter> to shut down CTI - 12:12:54.000 POINT(12:12:54.000) avg: 29.9929076712545 count: 2 max: 37.6554307703187 meterId: ValveTwo min: 22.3303845721904 POINT(12:12:54.000) avg: 58.6186501121235 count: 4 max: 91.9514771047754 meterId: ValveThree min: 10.0425148895208 POINT(12:12:54.000) avg: 42.7088354447432 count: 4 max: 71.3766915590394 meterId: ValveOne min: 23.3763407093363 CTI - 12:12:56.000
There we go.. much more intuitive. We see the result of grouping by the meter ID, then aggregating the simulated data looking for average, min and max values. To put it in a more tabular sense (through the magic of Excel using the row centric data from the first run):
Ok, that wasn’t too crazy – what all went into building this out? Funny you should ask…in general, the above code is what you'll spend the majority of your time doing in StreamInsight - defining and executing queries. For some situations you'll need to dig a bit deeper, define your own adapters, etc. The rest of this post is intended as the from the ground up walkthrough of building out all of the low level components. This won't be an everyday activity.
Here are the sections in this post:
If you don’t have this snazzy Add Reference dialog (for VS2010 users) you’ll definitely want to install these amazing add-ons from the Extension Manager (Tools –> Extension Manager):
Before writing any awesome queries (or even any mildly cool queries) we need some data to stream through our application. This data can come from many sources; for getting up to speed it’s generally easiest to work with simulated or canned data. This can be replayed from a canned data set (such as a .CSV file), or dynamically generated. In this walkthrough let’s go ahead and implement an input adapter that can:
For more background on developing adapters, check out the Creating Input and Output Adapters topic on MSDN. Having laid out the baseline requirements, let’s dive in through the process of building this out.
The first step in creating an adapter will to define the configuration interface (this is used by all of the rest of the classes that make up an adapter). Our adapter will have three configuration properties:
I, the adapter, say that I have received all incoming data up to time X. I will not expect you to account for any data that arrives after time X.
The effect of the CTI’s are to inform the StreamInsight engine that it’s OK to go ahead and process incoming events. Without CTI’s advancing time in the engine, no output will ever be generated (as we haven’t informed the engine that it has enough data in order to do some work). More CTIs (i.e. a lower frequency - a frequency of 1 will send a CTI with every data event, a frequency of 5 will send a CTI every 5 data events. It would be more accurate to call it the CTI period, but the advance time settings object uses frequency, so that's what we use) mean a more responsive output stream; fewer CTIs will provide for more throughput at the expense of higher latency (as the engine can perform work in chunks, and won’t need to process as many CTIs).
The other configuration variables are related to the rate at which events are raised by the adapter. The basic formula will be to wait a certain period in milliseconds between raising events as defined by:
EventPeriod + Random(0 –> Event Period Random Offset)
1: public interface ITypeInitializer<TPayload>
2: {
3: void FillValues(TPayload obj);
4: }
Create a solution folder called SimulatedInputAdapter. Inside that folder create a new class SimulatedInputAdapterConfig with the following content:
!-->
1:
2: namespace IntroHost.SimulatedInputAdapter
3: {
4: /// <summary>
5: /// This is the configuration type for the SimulatedDataInputFactory. Use instances
6: /// of this class to configure how often the simulated data input adapter raises events.
7: /// </summary>
8: public struct SimulatedInputAdapterConfig
9: {
10: /// <summary>
11: /// How often to send a CTI event (1 = send a CTI event with every data event)
12: /// </summary>
13: public uint CtiFrequency { get; set; }
14:
15: /// <summary>
16: /// The baseline period at which the adapter will generate simulated
17: /// events in milliseconds (i.e. a value of 500 means that the adapter
18: /// will raise an event twice a second, every 500 milliseconds).
19: /// </summary>
20: public int EventPeriod { get; set; }
21:
22: /// <summary>
23: /// The maximum random offset on top of the event period.
24: /// </summary>
25: public int EventPeriodRandomOffset { get; set; }
26: }
27: }
Adapters are not directly instantiated in your application, but rather created by the StreamInsight engine in response to a query starting up, based on the binding configuration. This has a couple of implications for developing adapters:
Adapter factories must be public classes, implementing one of the adapter factory interfaces (depending on the type of adapter):
Optionally, factories can implement declarative CTI behavior by implementing the ITypedDeclareAdvanceTimeProperties interface. This is the recommended option for when an adapter does not need to implement fine-grained control of advancing time.
Future posts will delve into more detail on creating various flavors of adapters; for now, we’ll simply create a basic typed input adapter that lets us raise random events (i.e. simulated data). The responsibility of the adapter factory is to handle any cross-adapter responsibilities, and create an instance of the appropriate adapter type (Point, Interval, Edge) when requested.
1: public class SimulatedInputFactory : ITypedInputAdapterFactory<SimulatedInputAdapterConfig>,
2: ITypedDeclareAdvanceTimeProperties<SimulatedInputAdapterConfig>
4: public InputAdapterBase Create<TPayload>(SimulatedInputAdapterConfig configInfo,
5: EventShape eventShape)
6: {
7: throw new NotImplementedException();
8: }
10: public void Dispose()
12: throw new NotImplementedException();
13: }
15: public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(
16: SimulatedInputAdapterConfig configInfo, EventShape eventShape)
17: {
18: throw new NotImplementedException();
19: }
20: }
We’re implementing two interfaces here – one to declare that we’re an adapter factory, the other to declaratively inject CTI events into the stream based on a policy (as defined in the configuration class).
Let’s go ahead and implement each of the interfaces for generating the appropriate adapter instances.
2: using System.Diagnostics;
3: using Microsoft.ComplexEventProcessing;
4: using Microsoft.ComplexEventProcessing.Adapters;
5:
6: namespace IntroHost.SimulatedInputAdapter
7: {
8: public class SimulatedInputFactory : ITypedInputAdapterFactory<SimulatedInputAdapterConfig>,
9: ITypedDeclareAdvanceTimeProperties<SimulatedInputAdapterConfig>
10: {
11: private static readonly string ADAPTER_NAME = "SimulatedInput";
12: private static readonly StreamInsightLog trace = new StreamInsightLog(ADAPTER_NAME);
13:
14: /// <summary>
15: /// Based on a configuration and an event type generate an adapter reference
16: /// </summary>
17: public InputAdapterBase Create<TPayload>(SimulatedInputAdapterConfig configInfo,
18: EventShape eventShape)
19: {
20: InputAdapterBase ret = default(InputAdapterBase);
21: switch (eventShape)
22: {
23: case EventShape.Point:
24: ret = new SimulatedInputPointAdapter<TPayload>(configInfo);
25: break;
26:
27: case EventShape.Interval:
28: ret = new SimulatedInputIntervalAdapter<TPayload>(configInfo);
29: break;
30:
31: case EventShape.Edge:
32: ret = new SimulatedInputEdgeAdapter<TPayload>(configInfo);
33: break;
34: }
35:
36: return ret;
37: }
38:
39: /// <summary>
40: /// No shared resources in the adapter - empty Dispose
41: /// </summary>
42: public void Dispose()
43: { }
44:
45: /// <summary>
46: /// Declaratively advance application time (i.e. inject CTI's)
47: /// </summary>
48: /// <returns></returns>
49: public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(
50: SimulatedInputAdapterConfig configInfo, EventShape eventShape)
51: {
52: trace.LogMsg(TraceEventType.Information,
53: "Advance time policy: CTI f {0}, CTI offset {1}, time policy: {2}",
54: configInfo.CtiFrequency, TimeSpan.FromTicks(-1), AdvanceTimePolicy.Adjust);
55:
56: var timeGenSettings = new AdvanceTimeGenerationSettings(configInfo.CtiFrequency,
57: TimeSpan.FromTicks(-1), true);
58:
59: return new AdapterAdvanceTimeSettings(timeGenSettings, AdvanceTimePolicy.Adjust);
60: }
61: }
62: }
Adapter factories that don’t need to manage shared resources or context tend to be very simple. This adapter factory only does two core things:
From this baseline, let’s go ahead and add the rest of the adapter instances. Each of these adapters will, on a timer, create, fill and enqueue an event of the appropriate type. The filling of the event (i.e. filling in the payload fields) will be the responsibility of an external pluggable class. Ordinarily we’d go ahead and use a lambda expression or two to inject the logic. To avoid the complication of setting up such an expression that can round trip through the serialization experience, let’s use a interface such as ITypeInitializer<TPayload>, and define a TPayload class, and an ITypeInitializer<TPayload> class:
Coupled with an event type class that we’ll use in our application:
1: public class SimpleEventType
3: public string MeterId { get; set; }
4: public double Value { get; set; }
5: public DateTime Timestamp { get; set; }
6: }
7:
8: public class SimpleEventTypeFiller : ITypeInitializer<SimpleEventType>
10: private Random rand = new Random();
11:
12: private string[] Meters = new string[]
14: "ValveOne",
15: "ValveTwo",
16: "ValveThree"
17: };
18:
19: public void FillValues(SimpleEventType obj)
20: {
21: obj.MeterId = Meters[rand.Next(Meters.Length)];
22: obj.Value = rand.NextDouble() * 100;
23: obj.Timestamp = DateTime.Now;
24: }
25: }
This is a pretty basic setup that will let us raise SimpleEventType events, and randomly fill their values. Using this interface to drive simulated input into Point, Interval and Edge adapters will look like this (Point input shown, Interval and Edge in the downloadable project with the differences in explained below):
5: using Microsoft.ComplexEventProcessing;
6: using Microsoft.ComplexEventProcessing.Adapters;
7: using System.Diagnostics;
8:
9: namespace IntroHost.SimulatedInputAdapter
11: /// <summary>
12: /// Simulated input adapter for point events. Will periodically raise
13: /// simulated event data. If a class implementing the
14: /// ITypeInitializer<TPayload> interface is specified in the configuration
15: /// the events will be filled in by an instance of that class.
17: /// <typeparam name="TPayload"></typeparam>
18: public class SimulatedInputPointAdapter<TPayload> :
19: TypedPointInputAdapter<TPayload>
21: /// <summary>
22: /// Store the simulated input configuration
23: /// </summary>
24: private SimulatedInputAdapterConfig config;
25:
26: /// <summary>
27: /// Create a log object using the category name from the factory
28: /// </summary>
29: private StreamInsightLog trace = new StreamInsightLog(
30: SimulatedInputFactory.ADAPTER_NAME);
31:
32: /// <summary>
33: /// The timer object responsible for periodically raising events
34: /// </summary>
35: private System.Threading.Timer myTimer;
36:
37: /// <summary>
38: /// Reference to the type initializer if one is specified
39: /// </summary>
40: private ITypeInitializer<TPayload> init;
41:
42: /// <summary>
43: /// Lock object used to synchronize access to raising events. This is
44: /// primarily for use during debugging when stepping through the
45: /// adapter code (as the timer will continue to fire, resulting in
46: /// multiple threads trying to raise an event concurrently).
48: private object lockObj = new object();
49:
50: /// <summary>
51: /// Random object used to create the offset to when the next event
52: /// should be raised
53: /// </summary>
54: private Random rand;
56: public SimulatedInputPointAdapter(SimulatedInputAdapterConfig config)
58: // Hold onto the configuration, and generate a type initializer if
59: // one has been configured.
60: this.config = config;
61: if (this.config.TypeInitializer != null)
62: {
63: init = (ITypeInitializer<TPayload>)Activator.CreateInstance(
64: Type.GetType(config.TypeInitializer));
65: }
66:
67:
68: }
69:
70: /// <summary>
71: /// All events are raised asynchronously. If the timer fires while
72: /// the adapter is paused the RaiseEvent function fires it will
73: /// immediately exit so we don't need to account for that here
74: /// </summary>
75: public override void Resume()
76: {
77: }
78:
79: /// <summary>
80: /// Create the random time interval generator and the thread timer
81: /// used to schedule events being raised. Start it 500 ms to give
82: /// everything time to settle out
83: /// </summary>
84: public override void Start()
85: {
86: rand = new Random();
87:
88: myTimer = new System.Threading.Timer(
89: new System.Threading.TimerCallback(RaiseEvent),
90: null, 500, config.EventPeriod);
91: }
92:
93: /// <summary>
94: /// When the timer fires, check for the state of the adapter; if running
95: /// raise a new simulated event
96: /// </summary>
97: /// <param name="state"></param>
98: private void RaiseEvent(object state)
99: {
100: // Ensure that the adapter is in the running state. If we're
101: // shutting down, kill the timer and signal Stopped()
102: if (AdapterState.Stopping == AdapterState)
103: {
104: myTimer.Dispose();
105: Stopped();
106: }
107: if (AdapterState.Running != AdapterState)
108: return;
109:
110: // Allocate a point event to hold the data for the incoming message.
111: // If the event could not be allocated, exit the function
112: lock (lockObj)
113: {
114: PointEvent<TPayload> currEvent = CreateInsertEvent();
115: if (currEvent == null)
116: return;
117: currEvent.StartTime = DateTime.Now;
118:
119: // Create a payload object, and fill with values if we have a
120: // an initializer defined
121: currEvent.Payload = (TPayload)Activator.CreateInstance(typeof(TPayload));
122: if (init != null)
123: init.FillValues(currEvent.Payload);
124:
125: if (trace.ShouldLog(TraceEventType.Verbose))
126: {
127: trace.LogMsg(TraceEventType.Verbose, "INSERT - {0}",
128: currEvent.FormatEventForDisplay(false));
129: }
130:
131: // If the event cannot be enqueued, release the memory and signal that
132: // the adapter is ready to process more events (via. Ready())
133: if (EnqueueOperationResult.Full == Enqueue(ref currEvent))
134: {
135: ReleaseEvent(ref currEvent);
136: Ready();
137: }
138: }
139:
140: // The next event will be raised at now + event period ms, plus a
141: // random offset
142: int nextEventInterval = config.EventPeriod +
143: rand.Next(config.EventPeriodRandomOffset);
144: myTimer.Change(nextEventInterval, nextEventInterval);
145: }
146: }
147: }
The key concept in this class is raising events from an asynchronous (push) source. Many of the StreamInsight examples use a thread that polls a resource. This is a perfectly suitable pattern for pull-based sources; however, our Timer is a push based source so a lot of the thread/loop code isn’t necessary. All of the state management code (checking for stop, etc) happens in the asynchronous delegate (i.e. RaiseEvent).
TypedPointInputAdapter<TPayload> where TPayload: class, new()
Error 1 'TPayload' must be a non-abstract type with a public parameterless constructor in order to use it as parameter 'TPayload' in the generic type or method 'IntroHost.SimulatedInputAdapter.SimulatedInputPointAdapter<TPayload>'
Error 2 The type 'TPayload' must be a reference type in order to use it as parameter 'TPayload' in the generic type or method 'IntroHost.SimulatedInputAdapter.SimulatedInputPointAdapter<TPayload>'
Ok, no big deal – we’ll add the constraint to the factory class as well, by a
public InputAdapterBase Create<TPayload>(SimulatedInputAdapterConfig configInfo, EventShape eventShape) where TPayload : class, new()
Declaration. Now here’s where things get interesting – since the factory object implements a non-generic interface…
The constraints for type parameter 'TPayload' of method 'IntroHost.SimulatedInputAdapter.SimulatedInputFactory.Create<TPayload>(IntroHost.SimulatedInputAdapter.SimulatedInputAdapterConfig, Microsoft.ComplexEventProcessing.EventShape)' must match the constraints for type parameter 'TPayload' of interface method 'Microsoft.ComplexEventProcessing.Adapters.ITypedInputAdapterFactory< IntroHost.SimulatedInputAdapter.SimulatedInputAdapterConfig>.Create<TPayload>(IntroHost.SimulatedInputAdapter.SimulatedInputAdapterConfig, Microsoft.ComplexEventProcessing.EventShape)'. Consider using an explicit interface implementation instead.
A long way of saying we can’t really impose constraints on a non-generic interface further up the inheritance chain, hence the usage of Activator.
The interval and edge events are almost exactly the same, save for how they define the StartTime and EndTime. The Interval event adapter defines EndTime based on the random interval (on line 15):
1: // Allocate a point event to hold the data for the incoming message.
2: // If the event could not be allocated, exit the function
3: lock (lockObj)
4: {
5: // The next event will be raised at now + event period ms, plus a
6: // random offset
7: int nextEventInterval = config.EventPeriod +
8: rand.Next(config.EventPeriodRandomOffset);
9: myTimer.Change(nextEventInterval, nextEventInterval);
10:
11: IntervalEvent<TPayload> currEvent = CreateInsertEvent();
12: if (currEvent == null)
13: return;
14: currEvent.StartTime = DateTime.Now;
15: currEvent.EndTime = currEvent.StartTime.AddMilliseconds(nextEventInterval);
16:
17: // Create a payload object, and fill with values if we have a
18: // an initializer defined
19: currEvent.Payload = (TPayload)Activator.CreateInstance(typeof(TPayload));
20: if (init != null)
21: init.FillValues(currEvent.Payload);
23: if (trace.ShouldLog(TraceEventType.Verbose))
24: {
25: trace.LogMsg(TraceEventType.Verbose, "INSERT - {0}",
26: currEvent.FormatEventForDisplay(false));
29: // If the event cannot be enqueued, release the memory and signal that
30: // the adapter is ready to process more events (via. Ready())
31: if (EnqueueOperationResult.Full == Enqueue(ref currEvent))
32: {
33: ReleaseEvent(ref currEvent);
34: Ready();
35: }
36: }
The edge adapter remembers the previous payload, and also inserts an End event to close out the previous start event. In this case, every edge event is closed when a new edge shows up. This is not a realistic scenario, as the more typical case would be that edge events are closed by specific conditions (such as a new reading for a given meter), not by any new event. I’ll cover techniques for addressing this in a future post on integrating reference data.
1: private EdgeEvent<TPayload> lastEvent = null;
2:
3: /// <summary>
4: /// When the timer fires, check for the state of the adapter; if running
5: /// raise a new simulated event
6: /// </summary>
7: /// <param name="state"></param>
8: private void RaiseEvent(object state)
10: // Ensure that the adapter is in the running state. If we're
11: // shutting down, kill the timer and signal Stopped()
12: if (AdapterState.Stopping == AdapterState)
14: myTimer.Dispose();
15: Stopped();
16: }
17: if (AdapterState.Running != AdapterState)
18: return;
20: // Allocate a point event to hold the data for the incoming message.
21: // If the event could not be allocated, exit the function
22: lock (lockObj)
23: {
24: // The next event will be raised at now + event period ms, plus a
25: // random offset
26: int nextEventInterval = config.EventPeriod +
27: rand.Next(config.EventPeriodRandomOffset);
28: myTimer.Change(nextEventInterval, nextEventInterval);
29:
30: if (lastEvent != null)
31: {
32: EdgeEvent<TPayload> closeEvent = CreateInsertEvent(EdgeType.End);
33: closeEvent.StartTime = lastEvent.StartTime;
34: closeEvent.EndTime = DateTimeOffset.Now;
35: closeEvent.Payload = lastEvent.Payload;
37: // If the event cannot be enqueued, release the memory and signal that
38: // the adapter is ready to process more events (via. Ready())
39: if (EnqueueOperationResult.Full == Enqueue(ref closeEvent))
40: {
41: ReleaseEvent(ref closeEvent);
42: Ready();
43: }
45: lastEvent = null;
46: }
47:
48: EdgeEvent<TPayload> currEvent = CreateInsertEvent(EdgeType.Start);
49: if (currEvent == null)
50: return;
51: currEvent.StartTime = DateTime.Now;
52: currEvent.EndTime = currEvent.StartTime.AddMilliseconds(nextEventInterval);
53:
54: // Create a payload object, and fill with values if we have a
55: // an initializer defined
56: currEvent.Payload = (TPayload)Activator.CreateInstance(typeof(TPayload));
57: if (init != null)
58: init.FillValues(currEvent.Payload);
59:
60: if (trace.ShouldLog(TraceEventType.Verbose))
61: {
62: trace.LogMsg(TraceEventType.Verbose, "INSERT - {0}",
63: currEvent.FormatEventForDisplay(false));
64: }
66: // If the event cannot be enqueued, release the memory and signal that
67: // the adapter is ready to process more events (via. Ready())
68: if (EnqueueOperationResult.Full == Enqueue(ref currEvent))
69: {
70: ReleaseEvent(ref currEvent);
71: Ready();
72: }
73:
74: // Remember the start edge
75: lastEvent = currEvent;
76: }
That’s it – your first input adapter raising simulated data. Seems like a lot of work just to raise a few simple events, but this scaffolding can be reused to create very complex, rich adapters. It gets easier from here :)
Hey – where did that magical FormatEventForDisplay function come from? If you look in the sample project, you’ll see a StreamInsightUtils.cs file that contains a number of helper functions:
public static string FormatEventForDisplay<TPayload>(this TypedEvent<TPayload> evt, bool verbose) public static void AddPayloadDetailsList<TPayload>(StringBuilder sb, TypedEvent<TPayload> evt) public static void AddPayloadDetailsRow<TPayload>(StringBuilder sb, TypedEvent<TPayload> evt) public static void AddHeaderRow<TPayload>(StringBuilder sb)
Along with their UntypedEvent equivalents. Won’t dive into the details here; each of these functions iterates over the contents of the payload object (using reflection for TypedEvents and the CepEventType for UntypedEvents and pumps out the details. This handles the differences between Point, Interval and Edge events automatically.
Now that we’ve built an input adapter, it’s time to build the corresponding output adapter. This particular adapter will take untyped events coming from a query and write them to the windows or debug console. The steps are very much the same as for building an input adapter, with the exception of not having to manage CTI’s (as the output adapter will be receiving CTI’s from the engine, as opposed to being responsible for creating them).
In our configuration file we’ll define three key properties:
1: /// <summary>
2: /// Possible trace types.
3: /// </summary>
4: public enum TraceTarget
5: {
6: /// <summary>
7: /// Write messages to the debug output.
8: /// </summary>
9: Debug,
12: /// Write messages to the console.
13: /// </summary>
14: Console,
15: }
17: /// <summary>
18: /// Configuration structure for tracer output adapters.
20: public class ConsoleAdapterConfig
21: {
23: /// Gets or sets a value indicating whether or not to display CTI events in the output stream.
25: public bool DisplayCtiEvents { get; set; }
27: /// <summary>
28: /// Gets or sets a value indicating which output stream to use.
29: /// </summary>
30: public TraceTarget Target { get; set; }
33: /// Gets or sets a value indicating whether to output each event in
34: /// a single line or use a more verbose output format.
35: /// </summary>
36: public bool SingleLine { get; set; }
Note – this is a stripped down version from the sample on codeplex, and doesn’t include writing to files or to the .NET Trace facility.
The adapter factory is very similar to the input adapter factory; note the differences for using an Untyped set of adapters. The event type is passed into the Create method via a CepEventType class (which has the key/value pairs and data types) rather than being defined through a generic.
1: public class ConsoleAdapterFactory : IOutputAdapterFactory<ConsoleAdapterConfig>
3: internal static readonly string APP_NAME = "ConsoleOutput";
4:
5: /// <summary>
6: /// Create an instance of a console output adapter that dumps received
7: /// events to the .NET Debug or Console window.
9: public OutputAdapterBase Create(ConsoleAdapterConfig configInfo,
10: EventShape eventShape, CepEventType cepEventType)
12: OutputAdapterBase ret = default(OutputAdapterBase);
13: switch(eventShape)
14: {
15: case EventShape.Point:
16: ret = new ConsolePointOutputAdapter(configInfo, cepEventType);
17: break;
19: case EventShape.Interval:
20: ret = new ConsoleIntervalOutputAdapter(configInfo, cepEventType);
21: break;
23: case EventShape.Edge:
24: ret = new ConsoleEdgeOutputAdapter(configInfo, cepEventType);
27: return ret;
28: }
30: public void Dispose()
32:
33: }
To implement each of the untyped adapter instances we need to extend the Point/Interval/Edge AdapterBase class, and handle the various Start/Resume aspects of pulling data out of a queue then posting it to the destination data sink (in this case the Console or the Debug streams).
1: internal sealed class ConsolePointOutputAdapter : PointOutputAdapter
3: private StreamInsightLog trace;
4: private ConsoleAdapterConfig config;
5: private CepEventType eventType;
6:
7: public ConsolePointOutputAdapter(ConsoleAdapterConfig config,
8: CepEventType type)
10: trace = new StreamInsightLog(ConsoleAdapterFactory.APP_NAME);
11: this.config = config;
12: this.eventType = type;
16: /// Start() is called when the engine wants to let the adapter start producing events.
17: /// This method is called on a threadpool thread, which should be released as soon as possible.
18: /// </summary>
19: public override void Start()
21: new Thread(this.ConsumeEvents).Start();
22: }
23:
24: /// <summary>
25: /// Resume() is called when the engine is able to produce further events after having been emptied
26: /// by Dequeue() calls. Resume() will only be called after the adapter called Ready().
27: /// This method is called on a threadpool thread, which should be released as soon as possible.
29: public override void Resume()
30: {
31: new Thread(this.ConsumeEvents).Start();
32: }
33:
34: /// <summary>
35: /// Main worker thread function responsible for dequeueing events and
36: /// posting them to the output stream.
37: /// </summary>
38: private void ConsumeEvents()
39: {
40: PointEvent currentEvent = default(PointEvent);
42: try
43: {
44: while (true)
45: {
46: if (AdapterState.Stopping == AdapterState)
47: {
48: Stopped();
49: return;
50: }
51:
52: // Dequeue the event. If the dequeue fails, then the adapter state is suspended
53: // or stopping. Assume the former and call Ready() to indicate
54: // readiness to be resumed, and exit the thread.
55: if (DequeueOperationResult.Empty == Dequeue(out currentEvent))
56: {
57: Ready();
58: return;
59: }
60:
61: string writeMsg = String.Empty;
62:
63: if (currentEvent.EventKind == EventKind.Insert)
64: {
65: writeMsg = currentEvent.FormatEventForDisplay(eventType,
66: !config.SingleLine);
67: }
68: else if (currentEvent.EventKind == EventKind.Cti)
70: writeMsg = String.Format("CTI - {0}",
71: currentEvent.StartTime.ToString("hh:mm:ss.fff"));
74: if (config.Target == TraceTarget.Console)
75: Console.WriteLine(writeMsg);
76: else if (config.Target == TraceTarget.Debug)
77: Debug.Write(writeMsg);
79: // Every received event needs to be released.
80: ReleaseEvent(ref currentEvent);
81: }
83: catch (Exception e)
84: {
85: trace.LogException(e, "Error in console adapter dequeue");
86: }
87: }
88: }
Very similar to what you’ve seen before in the input adapter, leveraging a few more of the formatting display helper classes to clean up the message for output. The edge and interval events are identical to this (save for the different base class) as each leverages the helper classes to abstract away the differences.
Wow.. little bit of code thrown around there. Ok, deep breath. This is not the normal, run of the mill StreamInsight experience of connecting and configuring adapters, then focusing on writing queries. This is a from the ground up, end to end project. For many of your applications, adapters can be pulled (or adapted, pun intended) from the codeplex samples. Having been through this exercise, all of the code therein should make a lot more sense.
Key Takeaways:
thank you for the post.very helpful.