Scripting StreamInsight queries

Scripting StreamInsight queries

  • Comments 5

Over the past couple of weeks, a handful of people have asked for help dynamically creating StreamInsight queries. I usually scrawl some boxes and arrows on the whiteboard and say “you could try something like this…” My hand-waving hasn’t been very helpful. I’ll write some code instead…

A StreamInsight query includes a “template” definition that essentially describes the operator tree. Normally, a developer describes a template using a LINQ query (StreamInsight understands a LINQ dialect for streaming/temporal queries). The LINQ query is then translated into an XML document (schema details are available here) which is compiled and executed by the StreamInsight engine. In one sense, you can see that StreamInsight already supports dynamic queries since an application can construct a query operator tree using the XML specification language directly. A visual design surface or DSL could potentially be used to generate the XML. My preferred general-purpose stream query language is LINQ however, so I’ll instead consider what it takes to dynamically generate a LINQ query using StreamInsight.

Anders Hejlsberg’s talk on The Future of C# gives a possible answer. Using the proposed C# eval facility, I could write something like:

// initialize an evaluator

var ev = new CSharpEvaluator

{

    References = { typeof(CepStream).Assembly, typeof(Expression).Assembly, typeof(EventType1).Assembly, },

    Usings = { "MyNamespace", "Microsoft.ComplexEventProcessing.Linq", },

};

 

// add input streams

ev.Evaluate(@"var input1 = CepStream<EventType1>.Create(""input1"")");

 

// script query

var queryDefinition = (CepStream<EventType1>)ev.Evaluate("from i in input1 where i.X > 10 select i");

 

Since I don’t want to wait for C# 5.0, I’ll use a more specialized “scripter” tool designed specifically for StreamInsight query templates:

// initialize a scripter

var scripter = new QueryTemplateScripter();

 

// add input streams

scripter.AddStream("input1", typeof(EventType1));

 

// script query

var template = scripter.CreateQueryTemplate(app, "MyTemplate", null,

    "from i in input1 where i.X > 10 select i");

 

The scripter allows you to:

  • reference assemblies (the required StreamInsight assemblies are referenced by default);
  • register input streams for your query using the AddStream method;
  • add “using” namespaces (the expected namespaces are included by default), and finally;
  • create a query template based on a LINQ query specification.

The scripter essentially inlines the LINQ query into a program containing any event definitions (in the above example, we are merely referencing an existing type so no definition is required) and a query “context” that passes the LINQ query definition to an existing Application.CreateQueryTemplate method. An example of a generated program is shown below:

//------------------------------------------------------------------------------

// <auto-generated>

//     This code was generated by a tool.

//     Runtime Version:2.0.50727.4952

//

//     Changes to this file may cause incorrect behavior and will be lost if

//     the code is regenerated.

// </auto-generated>

//------------------------------------------------------------------------------

 

namespace @__QueryTemplateScripter

{

    using System;

    using Microsoft.ComplexEventProcessing;

    using Microsoft.ComplexEventProcessing.Linq;

 

 

    public class @__QueryTemplateScripterContext

    {

 

        public static Microsoft.ComplexEventProcessing.QueryTemplate @__CreateQueryTemplate(Microsoft.ComplexEventProcessing.Application application, string name, string description)

        {

            CepStream<ConsoleApplication1.EventType1> input1 = CepStream<ConsoleApplication1.EventType1>.Create("input1");

            var definition =

 

#line 1 "__LinqQueryDefinition"

 from i in input1 where i.X > 10 select i

 

#line default

#line hidden

;

            return application.CreateQueryTemplate(name, description, definition);

        }

    }

}

 

Such programs are generated, compiled and the query template creation method is dynamically invoked to register a new query template in StreamInsight. This approach has some limitations:

  • Compiler error reporting may or may not be useful. I use #line pragmas in the generated code to pinpoint the location of any local errors in the user’s query, but the LINQ query definition may not be a well-formed expression. Consider what would happen if you were to call scripter.CreateQueryTemplate(app, “q”, null, “input1; return null }”);
  • Related to the above point, there is an injection risk. The usual common sense security rules apply: do not evaluate code from untrusted sources.
  • Every call to CreateQueryTemplate results in the creation of a new assembly that remains loaded for the lifetime of the app domain.

Full source code for the scripter is attached. The event definition code is intentionally extensible to support specification of stream event types using something other than System.Type. Possible alternatives: CepEventType or some other representation of the event record layout.

Attachment: ScriptingStreamInsight.zip
Leave a Comment
  • Please add 8 and 4 and type the answer here:
  • Post
  • Cool! Thanks for sharing.

    In related work, you might be aware of the C# REPL recently released by the Mono team as well:

    www.mono-project.com/CsharpRepl

    Also, I ran across this post by Ayende recently:

    ayende.com/.../runtime-code-compilation-amp-collectible-assemblies-are-no-go.aspx

    He found that he could not un-load the compiled queries, which became a memory leak. Work-around was to compile in a separate app domain and dump that process when the size got too big. Any thoughts?

    David

  • @David: Thanks for the pointers! C# REPL looks pretty cool.

    The app domain solution works well for 'remote' StreamInsight server instances since we can connect to the same server instance from the separate app domain. It doesn't work as smoothly for an 'embedded' StreamInsight server since the server and the query expression cannot cross the boundary -- they aren't serializable. I've been toying with an alternate solution for the embedded case that exploits the query expression serialization support buried in the StreamInsight client SDK... I'll let you know if I can figure it out.

    -Colin

  • Hello,

    Do you also have an example available how to use the provided class "QueryTemplateScripter"?

    Thanks for your help

    Klaus Aschenbrenner

    http://www.csharp.at | twitter.com/Aschenbrenner

  • Thanks for the great technique. I just came across it recently.

    One question though. I found a few useful query patterns on Linqpads sample streaminsight queries. One of them is DetectSpikes() for example and it's signature is...

    CepStream<SpikeEvent> DetectSpikes<TInput>(

    CepStream<TInput> inputStream, int threshold, double ratio,

    TimeSpan windowSize, TimeSpan hopSize,

    Expression<Func<TInput, int>> fieldSelector)

    If I wanted to try to parse something like this...

    "from e in DetectSpikes(inputstream, 40, 0.5, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1), w => double.Parse(w.strValue)) select e"

    No surprise I get an error "The name 'DetectSpikes' does not exist in the current context". Is there any way to get this to work.

    Thanks for your help,

  • Hello,

    I have the same problem as Cormac. I want to use the following query:

    from i in input1 where isEqualTo50(i.X) select i"

    where isEqualTo50 is a static function but i have the following error:

    The name isEqualTo50 does not exist in the current context

    Is there a way to do this?

    Thank you for your help,

Page 1 of 1 (5 items)