Official blog of the development team of StreamInsight.
In the previous blog posting, we showed how to construct and deploy query fragments to a StreamInsight server, and how to re-use them later. In today’s posting we’ll integrate this pattern into a method of dynamically composing a new query with an existing one.
The construct that enables this scenario in StreamInsight V2.1 is a Subject. A Subject lets me create a junction element in an existing query that I can tap into while the query is running.
To set this up as an end-to-end example, let’s first define a stream simulator as our data source:
This ‘generator’ produces a new instance of SourcePayload with a period of t (system time) as an IObservable. SourcePayload happens to have a property of type double as its payload data.
Let’s also define a sink for our example—an IObserver of double values that writes to the console:
The observer takes a string as parameter which is used as a label on the console, so that we can distinguish the output of different sink instances. Note that we also deploy this observer, so that we can retrieve it later from the server from a different process.
Remember how we defined the aggregation as an IQStreamable function in the previous article? We will use that as well:
Then we define the Subject, which acts as an observable sequence as well as an observer. Thus, we can feed a single source into the Subject and have multiple consumers—that can come and go at runtime—on the other side:
Subject are always deployed automatically. Their name is used to retrieve them from a (potentially) different process (see below).
Note that the Subject as we defined it here doesn’t know anything about temporal streams. It is merely a sequence of SourcePayloads, without any notion of StreamInsight point events or CTIs. So in order to compose a temporal query on top of the Subject, we need to 'promote' the sequence of SourcePayloads into an IQStreamable of point events, including CTIs:
In a later posting we will show how to use Subjects that have more awareness of time and can be used as a junction between QStreamables instead of IQbservables.
Having turned the Subject into a temporal stream, we can now define the aggregate on this stream. We will use the IQStreamable entity avg that we defined above:
In order to run the query, we need to bind it to a sink, and bind the subject to the source:
Lastly, we start the process:
Now we have a simple query running end-to-end, producing results. What follows next is the crucial part of tapping into the Subject and adding another query that runs in parallel, using the same query definition (the “AverageQuery”) but with a different window length. We are assuming that we connected to the same StreamInsight server from a different process or even client, and thus have to retrieve the previously deployed entities through their names:
The attached solution demonstrates the sample end-to-end.
Regards, The StreamInsight Team
Very useful example. Thanks!