Following on the heels of my last query pattern blog post, I started to dig into creating a custom data context for use with StreamInsight and LINQPad.  The default contexts supplied out of box with the StreamInsight driver (including the Hitchhiker’s Guide context) are great, but don’t contain a large volume of data.  I wanted to put together a much larger context to have a baseline to play with larger and more complicated query patterns.  My goals for the context/data streams were:

  • Contain “real” process data, around a million “real” data records with accompanying metadata.
  • Be easily transferrable / downloadable
  • Be (relatively) human readable
  • Be performant (sustain at least 60k events read / sec on my desktop)

Building on the fun I had putting together the windchill sample, here’s what I decided on:

  • Use weather data from Environment Canada, who has kindly made a great deal of information available for download in XML format.
    • This includes both the “live” weather and reference data about the producing weather stations.  I chose to use airport weather information, as it is a lot more granular than your average weather station.
  • Create a custom data context which can pull information from compressed CSV files, and expose it as a CepStream.

 

3d Glossy Green Orbs Transport Travel Ship Wheel (Wheel) Icon

If you’re already comfortable with exposing data via IEnumerable interface, check out this blog post that provides a concise summary of creating data contexts.

 

3d Glossy Green Orbs Transport Travel Car Gauge (Gauges) Icon version 1

Why did I choose to store the canned data set as a compressed CSV file?  Based on a series of performance explorations, this turned out to be the best choice between file size (being downloadable), and performance (throughput).

For all of the gory performance exploration, check out this blog post

This blog post walks through the various steps of creating the custom data context:

  • Preparing a canned data set (Downloading, parsing and formatting).
  • Reading back the canned data set
  • Creating a LINQPad data context which exposes the data set as a CepStream<T>.

If you’re comfortable with creating/accessible canned data sets, and you just want to create a custom LINQPad context for use with StreamInsight, skip directly to that section. 

3d Glossy Blue Orbs Business Typed Folded Page Corner Document Icon

Before you start reading through the blog, download and follow along with the code samples:

 

Preparing a canned data set – Downloading

Environment Canada rocks.  They really do (caveat: I am Canadian Smile).  They’ve made a great deal of climatic data available for download in their National Climate Archives On-line.  From the National Climate Archives site, I chose to download ten year’s worth of information for ten airports in Canada (airport weather stations tend to record and archive data in 30 minute or hourly increments – most “other” weather stations seem to record 2-3x per day).  Since the goal is lots of time-series data to look at weather stations it is (hourly records gives us 8760 records per year per weather station.  10 stations times 10 years gives us 876k records.. close enough Smile).

You can see the available stations in a given area and timeframe by looking at their customized search.  Being familiar with Canadian airport geography, I selected them by:

  • Searching by province for a specific date.
  • Selecting the data for an airport (I happened to know what they were).
  • From the specific screen, there’s a navigation option to download data in either CSV or XML format.  I chose to download in XML (this was before I had settled on using CSV as the store format of choice).
    • The station code and station metadata is also available from the URL for this view (for example, the station code for St. John’s, Newfoundland is 6720.

The web URL for downloading data was of the form:

  1. var urlQuery = "http://www.climate.weatheroffice.gc.ca/climateData/bulkdata_e.html? timeframe=1&Prov=XX&StationID={0}&Year={1}&Month={2}&Day={3}&format={4}&type=hly";

With the following variables:

  • ( 0 ) Station ID
  • ( 1 ) Year
  • ( 2 ) Month
  • ( 3 ) Day
  • ( 4 ) Format (XML, CSV)

Next step was to script up a downloader that pulled down data for 2010 for each of my target weather stations.  Threw something quick together in LINQPad to accomplish this.  Each invocation of this URL will return data for the given month (i.e. the day value is ignored).

3d Glossy Blue Orbs Business Typed Folded Page Corner Document Icon Download the LINQ script Download_WeatherData.

After running this script, I had available to me roughly 1.0 GB of raw XML weather data.  The contents took the form:

  1. <stationinformation>
  2.   <name>OTTAWA MACDONALD-CARTIER INT&apos;L  A</name>
  3.   <province>ONTARIO</province>
  4.   <latitude>45.32</latitude>
  5.   <longitude>-75.67</longitude>
  6.   <elevation>114.00</elevation>
  7.   <climate_identifier>6106000</climate_identifier>
  8.   <wmo_identifier>71628</wmo_identifier>
  9.   <tc_identifier>YOW</tc_identifier>
  10.   <note>All times are specified in Local Standard Time (LST). Add 1 hour to adjust for Daylight Saving Time where and when it is observed.</note>
  11. </stationinformation>
  12. <stationdata day="1" hour="0" minute="0" month="1" year="2010" quality=" ">
  13.   <temp description="Temperature" units="°C">-5.30</temp>
  14.   <dptemp description="Dew Point Temperature" units="°C">-6.00</dptemp>
  15.   <visibility description="Visibility" units="km">4.80</visibility>
  16.   <relhum description="Relative Humidity" units="%">95.00</relhum>
  17.   <winddir description="Wind Direction" units="10&apos;s deg">9.00</winddir>
  18.   <windspd description="Wind Speed" units="km/h">15.00</windspd>
  19.   <stnpress description="Station Pressure" units="kPa">100.03</stnpress>
  20.   <humidex description="Humidex"></humidex>
  21.   <windchill description="Wind Chill">-11.00</windchill>
  22.   <weather description="Weather">Snow</weather>
  23. </stationdata>

Note that we have both the station metadata and the “live” weather data available to us.  The next step will be to render this data into two separate data sets:

  • Sorted list of weather data across all of the weather stations, saved as a CSV file with headers.
  • List of weather station information (one record for each station), saved as a CSV file with headers.

Note that we will also have to shift the timestamp on each data record to the appropriate DateTimeOffset.  We’ll use another handy little LINQ nugget to do this (I’ve really gotten into using LINQPad as my general purpose C# scratchpad).

3d Glossy Blue Orbs Business Typed Folded Page Corner Document Icon Download the LINQ script Parse_WeatherData.

Let’s walk through the main method before diving into the two key functions, ParseWeatherData and CsvSequenceStore.WriteEvents<T>.  We grab the list of files from the download directory (in this case we just take 5 to run quickly and work with a smaller amount of data for test purposes).  Next, iterate over the list of files, parsing each and inserting the results into the weatherData and stations lists.  The weatherData list is then sorted before being written into the output file via the CsvSequenceStore.  Note how we use the GZipStream to transparently write out a compressed stream. 

3d Glossy Blue Orbs Alphanumeric Simple Question Mark Icon

Note that the entire 1 GB worth of data is read into memory before being written out to the destination file.  This isn’t the world’s best coding practice.  What I should have done was:

  • Written out each weather data file to a common output .CSV file
  • Sorted the .CSV file.
  • Compressed the .CSV file.

However, as this was a one-shot, and I have lots of memory on my desktop, I brute-forced it. Smile

  1. void Main()
  2. {            
  3.     // Get a list of all of the downloaded XML files.
  4.     var fileList = Directory.GetFiles(@"C:\temp\weatherdata\")
  5.         .Take(5) /* Take the first 5 for testing purposes */
  6.         .Where(d => d.EndsWith(".xml"));
  7.  
  8.     // Define the output file
  9.     var outputFile = @"C:\temp\weather_data_out.csv.gz";
  10.     var stationFile = @"C:\temp\station_data_out.csv.gz";
  11.  
  12.     // Create holding lists for weather data and stations        
  13.     var weatherData = new List<WeatherReading>();
  14.     var stations = new List<StationData>();
  15.  
  16.     // Iterate through the file list and parse for weather data
  17.     var fileCount = (double)fileList.ToList().Count();            
  18.     var progress = 0.0;
  19.     fileList.ToList().ForEach(fl =>
  20.     {                                
  21.         Console.WriteLine("{0:00.0} Parsing file {1}",
  22.             (progress / fileCount * 100.0), fl);
  23.         ParseWeatherData(fl, weatherData, stations);
  24.         progress++;
  25.     });
  26.  
  27.     // Sort the weather data into strict ascending time order
  28.     weatherData = weatherData.OrderBy(d => d.Timestamp).ToList();
  29.  
  30.     // Use the CSV store
  31.     CsvSequenceStore store = new CsvSequenceStore();
  32.  
  33.     // Write the data to a compressed file using the GZipStream class.  Way cool.
  34.     Console.WriteLine("Writing {0} events to file {1}",
  35.         weatherData.Count, outputFile);
  36.  
  37.     using (FileStream fs = new FileStream(outputFile, FileMode.Create))
  38.     {
  39.         using (GZipStream zipStream = new GZipStream(
  40.           fs, CompressionMode.Compress))
  41.         {
  42.             // Write out the weather data set to the file, using the Timestamp field
  43.             // as the event timestamp
  44.             store.WriteEvents<WeatherReading>(zipStream, weatherData,
  45.                 e => e.Timestamp);
  46.         }
  47.     }
  48.  
  49.     using (FileStream fs = new FileStream(stationFile, FileMode.Create))
  50.     {
  51.         using (GZipStream zipStream = new GZipStream(
  52.           fs, CompressionMode.Compress))
  53.         {
  54.             // Write out the station data set to the file, using a hard coded
  55.             // timestamp
  56.             store.WriteEvents<StationData>(zipStream, stations,
  57.                 e => DateTime.Parse("2010-01-01 00:00:00"));
  58.         }
  59.     }
  60. }

Now to dive into the first of the two key functions being called – ParseWeatherData.

  1.  
  2. /// Parse through the file, extracting weather and station data      
  3. void ParseWeatherData(string fileName,
  4.     List<WeatherReading> weatherData, List<StationData> stations)
  5. {
  6.     if (!System.IO.File.Exists(fileName))
  7.         throw new ArgumentException("Could not load file" + fileName);
  8.  
  9.     using (StreamReader sr = new StreamReader(fileName))
  10.     {
  11.         var xml = XElement.Load(sr);
  12.  
  13.         // Parse the station information
  14.         var q = from e in xml.Elements() where e.Name == "stationinformation" select e;
  15.         var stationInfo = q.First();
  16.         var stationName = GetValue<string>(stationInfo, "name");
  17.         var stationCode = GetValue<int>(stationInfo, "wmo_identifier");
  18.  
  19.         var newStation = new StationData()
  20.         {
  21.             Name = stationName,
  22.             Elevation = GetValue<double>(stationInfo, "elevation"),
  23.             Latitude = GetValue<double>(stationInfo, "latitude"),
  24.             Longitude = GetValue<double>(stationInfo, "longitude"),
  25.             TCID = GetValue<string>(stationInfo, "tc_identifier"),
  26.             StationCode = GetValue<int>(stationInfo, "wmo_identifier"),
  27.             Province = GetValue<string>(stationInfo, "province")
  28.         };
  29.  
  30.         // If this is a new station, add it to the list
  31.         if (!stations.Where(s => s.Name == stationName).Any())
  32.             stations.Add(newStation);
  33.  
  34.         // Parse the station data
  35.         var sds = from e in xml.Elements() where e.Name == "stationdata" select e;
  36.         sds.ToList().ForEach((e) =>
  37.         {
  38.             if (!String.IsNullOrEmpty(GetValue<string>(e, "temp")))
  39.             {
  40.                 // Assign the core values
  41.                 var wr = new WeatherReading()
  42.                 {
  43.                     StationCode = stationCode,
  44.                     Temperature = GetValue<double>(e, "temp"),
  45.                     DewpointTemperature = GetValue<double>(e, "dptemp"),
  46.                     Humidex = GetValue<double>(e, "humidex"),
  47.                     RelativeHumidity = GetValue<double>(e, "relhum"),
  48.                     StationPressure = GetValue<double>(e, "stnpress"),
  49.                     Visibility = GetValue<double>(e, "visibility"),
  50.                     WindChill = GetValue<double>(e, "windchill"),
  51.                     WindDirection = GetValue<double>(e, "winddir"),
  52.                     WindSpeed = GetValue<double>(e, "windspd"),
  53.                 };
  54.  
  55.                 // Convert the timestamp into universal time based
  56.                 // on the current station information (lookup based on
  57.                 // province                        
  58.                 var tzi = tzMap[newStation.Province];
  59.                 
  60.                 // Asemble the timestamp
  61.                  var dt = new DateTimeOffset(
  62.                     GetAttribute<int>(e, "year"),
  63.                     GetAttribute<int>(e, "month"),
  64.                     GetAttribute<int>(e, "day"),
  65.                     GetAttribute<int>(e, "hour"),
  66.                     GetAttribute<int>(e, "minute"),
  67.                     0, tzi.BaseUtcOffset);
  68.                 wr.Timestamp = dt.UtcDateTime;
  69.                                           
  70.                 // Parse weather conditions - WeatherConditions is an enum
  71.                 // flags field
  72.                 string[] conditions = GetValue<string>(e, "weather")
  73.                     .Replace(' ', '_')
  74.                     .Split(new char[] { ',' });
  75.  
  76.                 WeatherConditions cond = WeatherConditions.None;
  77.                 WeatherConditions total = WeatherConditions.None;
  78.                 foreach (var s in conditions)
  79.                 {
  80.                     if (Enum.TryParse<WeatherConditions>(s, out cond))
  81.                     {
  82.                         total |= cond;
  83.                     }
  84.                 }
  85.  
  86.                 // Add the data to the list                    
  87.                 weatherData.Add(wr);
  88.             }
  89.         });
  90.     }
  91. }

This looks like a lot, but it’s simply an XML parser, leveraging the XElement class.  After opening up the file, we pull the file into an XElement object, then pluck out the station and weather (station data) information.  The GetValue<T> and GetAttribute<T> functions are simple helpers to ease extracting typed data from the various nodes and coercing the types.

3d Glossy Blue Orbs Alphanumeric Simple Question Mark Icon

Note that I use Convert.ChangeType in the GetValue<T> and GetAttribute<T> functions.  This is a pretty slow method – I would have been better off using a switch() statement on the data type and XmlConvert for performance reasons. 

However, as this is code that only runs once, I didn’t bother to add the performance optimization.  I did spend a lot of time on performance optimizations for reading the data, which we’ll cover in the next section.

There are only two “custom” operations in ParseWeatherData, to handle two custom data parsing condtions:

  • Timezone offset.  Each of the timestamps in the file is in local time.  In order to adjust to UTC time we need to determine the time zone (based on the province field in the stations information, which we use to look up a TimeZoneInfo object from a static dictionary I defined at the bottom of the linq file).  This is used as an argument to DateTimeOffset to shift the date to UTC.
  • Weather conditions.  The input XML file can define weather conditions as having multiple values.  To support this in StreamInsight in a relatively straightforward fashion, I defined a bitwise [Flags] enum (WeatherConditions) that I fill in on lines 72-83.

Finally, we have our friendly neighborhood public void WriteEvents<T>(Stream stream, IEnumerable<T> events, Func<T, DateTimeOffset> dtFunc) method, which writes out each event to a .CSV file.  It takes as arguments:

  • Stream stream.  The destination stream – taking a stream rather than a file name made it easy to substitute the GZipStream writer to easily add in compression.
  • IEnumerable<T> events.  The list of events to write out to the file.
  • Func<T,DatetimeOffset> dtFunc.  In order to easily stream events into StreamInsight, we need to know the temporal properties of the event (i.e. the StartTime).  Since the CsvSequenceStore is generic, we hand in a function that identifies the temporal properties (in our case, returning e.Timestamp).
  1. public void WriteEvents<T>(Stream stream, IEnumerable<T> events,
  2.         Func<T, DateTimeOffset> dtFunc)
  3.     {
  4.         var encodingArray = new byte[16000];
  5.         var fields = typeof(T).GetProperties();
  6.         var lastField = fields.Last();
  7.  
  8.         var sb = new StringBuilder();
  9.         var str = String.Empty;
  10.         var byteLen = 0;
  11.  
  12.         foreach (var e in events)
  13.         {
  14.             sb.Clear(); str = String.Empty; byteLen = 0;
  15.  
  16.             // Write out the event kind, start time and end time                
  17.             sb.Append("Point,");
  18.             sb.Append(dtFunc(e).Ticks);
  19.             sb.Append(",,");
  20.  
  21.             foreach (var o in fields)
  22.             {
  23.                 sb.Append(o.GetValue(e, null).ToString());
  24.                 if (o != lastField)
  25.                     sb.Append(",");
  26.             }
  27.             sb.AppendLine();
  28.             str = sb.ToString();
  29.  
  30.             byteLen = Encoding.ASCII.GetBytes(str, 0, str.Length, encodingArray, 0);
  31.             stream.Write(encodingArray, 0, byteLen);
  32.         }
  33.     }

This is a very straightforward function.  Using reflection, we obtain the list of public properties for the type (via GetProperties()), and use a StringBuilder to assemble the text.  Wanted to avoid using String.Join(), hence remembering lastField.  Then iterate through the list of properties, and use the reflection method GetValue to write out the string representation to the line.

As a base Stream class only understands writing out bytes, we convert the line into a byte array and write it out.

3d Glossy Blue Orbs Alphanumeric Simple Question Mark Icon

Note that I ASCII encoding here.  Really should be using Unicode encoding, but I happen to know that the data is pure ASCII (at least it is THIS time).  Should a future blog feature Unicode data, this line will change Smile

After executing the script, I have a compressed .CSV file containing weather data.  Let’s have a look at the contents.  First step will be to unzip it into a plain .csv file (via the gzip.exe utility – I could write a script to do this, but we’ll get to using GzipStream to uncompress in the next section).

image

Now, we crack open our .CSV files with note and have a look (not that these only have 5 weather files in them, as I left the .Take() method uncommented on line 5).

Point,633979188000000000,,71628,1/1/2010 5:00:00 AM,-5.3,-6,95,9,15,4.8,100.03,0,-11,0

Excellent – we now have (mostly) human-readable data in a downloadable (i.e. small) format that we can extract quickly.  On to extraction!

Reading back the canned data set

We’ll stay in LINQPad for the time being (will switch over to Visual Studio when it comes time to create the actual data context – feel free to work through this code in VS.NET if it’s more comfortable).  Now that we have the canned data set (as our compressed .CSV file), we need to be able to read it back out again in a form that can be turned into a CepStream<T>.

3d Glossy Blue Orbs Business Typed Folded Page Corner Document Icon Download the LINQ script Read_WeatherData.

This is very similar, yet a little simpler, than the code which we used in the last section to write the data into the compressed CSV file.  The parsing code is the same (also using the CsvSequenceStore class, only now using the ReadEvent<T> method in place of the WriteEvents<T> method.  The new piece that has been introduced is the assign function, which defines how we map from the CSV format into POCO (plain old CLR object) format.

3d Glossy Blue Orbs Alphanumeric Simple Question Mark Icon

This doesn’t seem like a terribly generic way of implementing this, and its not.  The reason I’m using a function delegate, not reflection (via PropertyInfo.SetValue() and Convert.ChangeType()) is that we’re now in performance critical code and those two methods were killing performance.

I’ll write the code for dynamically generating this function in a later blog, but for now if you’re interested in the performance numbers, head on over to my performance blog on this topic.

  1. void Main()
  2. {
  3.     CsvSequenceStore store = new CsvSequenceStore();
  4.      
  5.     long count = 0;
  6.  
  7.     Action<string[], WeatherReading> assign = (str, e) =>
  8.     {
  9.         e.StationCode = Int32.Parse(str[0]);
  10.         e.Timestamp = DateTime.Parse(str[1]);
  11.         e.Temperature = Double.Parse(str[2]);
  12.         e.DewpointTemperature = Double.Parse(str[3]);
  13.         e.RelativeHumidity = Double.Parse(str[4]);
  14.  
  15.         e.WindDirection = Double.Parse(str[5]);
  16.         e.WindSpeed = Double.Parse(str[6]);
  17.         e.Visibility = Double.Parse(str[7]);
  18.         e.Humidex = Double.Parse(str[8]);
  19.         e.WindChill = Double.Parse(str[9]);
  20.         e.WeatherConditions = Int32.Parse(str[9]);
  21.     };
  22.         
  23.     using (FileStream fs = new FileStream(@"C:\temp\weather_data_out.csv.gz", FileMode.Open))
  24.     {
  25.         using (GZipStream zipStream = new GZipStream(
  26.             fs, CompressionMode.Decompress))
  27.         {
  28.             var evts2 = store.ReadEvents<WeatherReading>(zipStream, assign);
  29.             var weatherStream = evts2.Select (e => e.Payload)
  30.                     .Take(50);
  31.             weatherStream.Dump("weather events");                
  32.         }
  33.     }
  34. }

Executing this simple main function gives us the shiny weather events (50 of them, as limited by the Take() method on line 30):

image

The core of this approach lies in the ReadEvents<T> method, which is remarkably simple given the amount of code we’ve written to get to this point Smile.  Remembering that each line of CSV consists of the event shape (Point, Interval), the Start and End times, then the payload.  We use a wrapper object, TypedEventPayload<T> to encapsulate the payload along with the event characteristics.

  1. public IEnumerable<TypedEventPayload<T>> ReadEvents<T>(Stream stream,
  2.         Action<string[], T> assign) where T : new()
  3.     {
  4.         StreamReader sr = new StreamReader(stream);
  5.         String line = null;
  6.         var fields = typeof(T).GetProperties();
  7.         var lastField = fields.Last();
  8.  
  9.         while ((line = sr.ReadLine()) != null)
  10.         {
  11.             if (line.StartsWith("EventShape,"))
  12.                 continue;
  13.             string[] tokens = line.Split(new char[] { ',' });
  14.             TypedEventPayload<T> ep = new TypedEventPayload<T>();
  15.             ep.Payload = new T();
  16.  
  17.             ep.Shape = (EventShape)Enum.Parse(typeof(EventShape), tokens[0]);
  18.             ep.StartTime = new DateTimeOffset(Int64.Parse(tokens[1]), TimeSpan.FromSeconds(0));
  19.             if (String.IsNullOrEmpty(tokens[2]))
  20.                 ep.EndTime = ep.StartTime.AddTicks(1);
  21.             else
  22.                 ep.EndTime = new DateTimeOffset(Int64.Parse(tokens[2]), TimeSpan.FromSeconds(0));
  23.             
  24.             assign(tokens.Skip(3).ToArray(), ep.Payload);        
  25.  
  26.             yield return ep;
  27.         }
  28.     }

We now have an IEnumerable<T> which we can convert into a CepStream<T> via use of the ToPointStream method. 

3d Glossy Blue Orbs Alphanumeric Lucinda Exclamation Point Icon

The really critical piece here is returning the events as an IEnumerable<T>, not as an array or list.  If we had to load the entire file into memory before sending a single event to StreamInsight, that wouldn’t be performant from either a memory or a latency perspective.  Instead, as we pull and deserialize events from the CSV file we yield them back.

This will allow the CepStream<T> to consume events as they are raised (which we’ll do in the next step).

Finally, we convert the IEnumerable<T> into a CepStream<T> and run some basic queries over it:

  1. var evts2 = store.ReadEvents<WeatherReading>(zipStream, assign);
  2.     var weatherStream = evts2.Select (e => e.Payload)                                        
  3.         .ToPointStream(Application, t =>
  4.             PointEvent.CreateInsert(t.Timestamp, t),
  5.             AdvanceTimeSettings.IncreasingStartTime);
  6.         
  7.     var query = from e in weatherStream
  8.         .TumblingWindow(TimeSpan.FromDays(1000), HoppingWindowOutputPolicy.ClipToWindowEnd)
  9.     select new
  10.     {
  11.         Count = e.Count()
  12.     };

Excellent – we now have a CepStream<T> – the only remaining step to having a canned data set is simple packaging Smile

image

Firey Orange Jelly Signs Bomb (Bombs) Icon Style 2 Wait!  You’re probably looking at the IEnumerable<UntypedEventPayload> ReadEvents method and thinking to yourself “Mark.. that’s dumb.  That won’t work, because of the IEnumerable state machine, and how Dispose is handled!”.  And you’d be right.

Here’s the problem:

  • The method ReadEvents assumes that someone else is responsible for initializing and disposing of the stream (and any underlying resources – such as the file handle). 
  • IEnumerable, and the corresponding yield statement, create an internal state machine when compiled (i.e. the enumeration uses deferred execution – the behavior we want).  The IEnumerable is not finished with the underlying stream until it has enumerated through all of the available values (in our case read to the end of the file).
  • As such, the lifetime of the Stream needs to be tied to the lifetime of the enumerator (as only the code that manages the enumerator knows when it is complete).
  • Since this design has the stream managed separately from the enumerator, the underlying stream could be closed before the enumerator even begins to evaluate.

Our current code base works, as we force the entire query to complete before exiting the using block that creates the stream.  In a LINQPad context wherein we need to raise the stream from a function and have no direct control over the lifetime of a query this will have the effect of closing the underyling file before any events are read.

We need to redesign our code to encapsulate the file reading into the same class that exposes IEnumerable, and handles stream cleanup correctly.

Reading back the canned data set – the Right Way!

Now, to take the prototype we built out in the last section, and put it in a form suitable for embedding into a library (which the LINQPad context will leverage).  We’ll create a base class, SequenceBase, which encapsulates creating streams from files (including compressed files).  This will allow us to more easily plug in different types of “serializers”.

ClassDiagram1

SequenceBase is very straightforward, simply providing a helper function to create a new stream upon demand (to allow the enumerator to wrap it in a using() statement).

  1. public abstract class SequenceBase
  2. {        
  3.     private string _fileName;
  4.     private bool _compress;
  5.     private bool _open;
  6.  
  7.     protected Stream GetStream()
  8.     {
  9.         // If opening the file,     Open/read data/allow others to read
  10.         // If writing a new file,   Create/write data/deny others access
  11.         FileMode fMode = (_open) ? FileMode.Open : FileMode.Create;
  12.         FileAccess aMode = (_open) ? FileAccess.Read : FileAccess.Write;
  13.         FileShare fShare = (_open) ? FileShare.Read : FileShare.None;
  14.  
  15.         // Set the compression mode
  16.         CompressionMode cMode = (_open) ? CompressionMode.Decompress
  17.             : CompressionMode.Compress;
  18.  
  19.         // Create the base file stream            
  20.         var fs = new FileStream(_fileName, fMode, aMode, fShare);
  21.         Stream _stream = fs;
  22.  
  23.         // If using Gzip compression, wrap the file stream
  24.         if (_compress)
  25.         {
  26.             var zs = new System.IO.Compression.GZipStream(fs, cMode);
  27.             _stream = zs;
  28.         }
  29.         return _stream;
  30.     }
  31.  
  32.     public SequenceBase(string fileName, bool compress, bool open)
  33.     {
  34.         _fileName = fileName;
  35.         _compress = compress;
  36.         _open = open;           
  37.     }    
  38. }

With this as a foundation, our Csv reader class becomes very straightforward to implement – essentially taking the contents of the ReadEvents<T> method from the previous section, and wrapping them up in a class derived from SequenceBase.

  1. public class CsvSequenceReaderTyped<T> :
  2.     SequenceBase, IEnumerable<IntervalEvent<T>>
  3.     where T : new()
  4. {
  5.     protected Action<string[], T> _assign;
  6.  
  7.     public CsvSequenceReaderTyped(string fileName, bool compress,
  8.         Action<string[], T> assign) : base(fileName, compress, true)
  9.     {
  10.         _assign = assign;
  11.     }
  12.  
  13.     public IEnumerator<IntervalEvent<T>> GetEnumerator()
  14.     {
  15.         using (var stream = GetStream())
  16.         {
  17.             using (var sr = new StreamReader(stream))
  18.             {
  19.                 String line = null;
  20.                 var fields = typeof(T).GetProperties();
  21.                 var lastField = fields.Last();
  22.  
  23.                 while ((line = sr.ReadLine()) != null)
  24.                 {
  25.                     if (line.StartsWith("EventShape,"))
  26.                         continue;
  27.                     string[] tokens = line.Split(new char[] { ',' });
  28.                                                 
  29.                     var shape = (EventShape)Enum.Parse(typeof(EventShape), tokens[0]);
  30.                     var startTime = new DateTimeOffset(Int64.Parse(tokens[1]), TimeSpan.FromSeconds(0));
  31.                     DateTimeOffset endTime;
  32.  
  33.                     if (String.IsNullOrEmpty(tokens[2]))
  34.                         endTime = startTime.AddTicks(1);
  35.                     else
  36.                         endTime = new DateTimeOffset(Int64.Parse(tokens[2]), TimeSpan.FromSeconds(0));
  37.  
  38.                     var ep = IntervalEvent<T>.CreateInsert(
  39.                         startTime, endTime, new T());
  40.                         
  41.                     _assign(tokens.Skip(3).ToArray(), ep.Payload);
  42.  
  43.                     yield return ep;
  44.                 }
  45.             }
  46.         }
  47.     }
  48.  
  49.     System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  50.     {
  51.         return GetEnumerator();
  52.     }
  53. }

Note that we assume that the assignment Action<string[], T> is handed in when the reader is created.

3d Glossy Blue Orbs Alphanumeric Lucinda Exclamation Point Icon

The really critical piece here is creating (via GetStream()) and disposing of (via the using statement) of our underlying file resources each time (which required that the resource management code – opening and closing files – be merged with the resource using code).

That’s it – now that we have our IEnumerable<T> wrapped up in a class that can close out the underlying file stream on Dispose(), we are ready to wrap it up in a LINQPad data context.

Creating a LINQPad data context which exposes the data set as a CepStream<>

Creating the custom data context will be the shortest section of this blog post Smile.  With a data source (either an IEnumerable<T>, IObservable<T> or a StreamInsight adapter) ready to go, all that stands between you and LINQPad data context goodness is exposing the CepStream<T> object in a class.

Really – that’s it.  Here are the steps:

  • Create a class that derives from StreamInsight Context.
    • StreamInsightContext is defined in StreamInsightLinqPad.Samples.dll, which is typically found in C:\ProgramData\LINQPad\Drivers\DataContext\4.0\StreamInsightLinqPad (3d3a4b0768c9178e)
    • Expose the two streams (weather data and station data), based on the code we explored in the reading back the canned data section).

Or, to look at the code:

  1. public class WeatherContext : StreamInsightContext
  2. {
  3.     private static readonly string weatherFile;
  4.     private static readonly string stationFile;
  5.  
  6.     static WeatherContext()
  7.     {
  8.         // Look for the data files in the same directory as this assembly
  9.         var assemblyDir = Path.GetDirectoryName(
  10.             Assembly.GetExecutingAssembly().Location);
  11.         weatherFile = Path.Combine(assemblyDir, @"weather_data_out.csv.gz");
  12.         stationFile = Path.Combine(assemblyDir, @"station_data_out.csv.gz");
  13.     }
  14.  
  15.     public WeatherContext(Server server)
  16.         : base(server)
  17.     { }
  18.  
  19.     public CepStream<WeatherReading> SmallWeather
  20.     {
  21.         get
  22.         {
  23.             return new CsvSequenceReaderTyped<WeatherReading>
  24.                 (weatherFile, true, WeatherReading.GetFill()).Take(50)
  25.                     .ToStream(Application, AdvanceTimeSettings.IncreasingStartTime);
  26.         }
  27.     }
  28.  
  29.     public CepStream<WeatherReading> LargeWeather
  30.     {
  31.         get
  32.         {
  33.             return new CsvSequenceReaderTyped<WeatherReading>
  34.                 (weatherFile, true, WeatherReading.GetFill())
  35.                 .ToStream(Application, AdvanceTimeSettings.IncreasingStartTime);
  36.         }
  37.     }
  38.             
  39.     public CepStream<StationData> Stations
  40.     {
  41.         get
  42.         {
  43.             return new CsvSequenceReaderTyped<StationData>
  44.                 (stationFile, true, StationData.GetFill())
  45.                     .ToStream(Application, AdvanceTimeSettings.IncreasingStartTime)
  46.                     .AlterEventLifetime(e => new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc),
  47.                         e => TimeSpan.MaxValue);
  48.         }
  49.     }    
  50. }

There are a few changes from the last section, to clean things up a bit in a project:

  • The assignment functions are returned by static functions on the data types themselves (as opposed to public properties – didn’t want to have a non-StreamInsight compatible data type in the event type definitions).
  • Instead of raising a generic wrapper class and then converting into PointEvent<T>, we instead raise TypedEvent<T> directly from the file (i.e. our enumerator is now IEnumerator<IntervalEvent<T>> GetEnumerator()
  • The file names are inferred to be in the same directory as the context assembly.
  • In the Stations stream, we perform an AlterEventLifetime to convert the list of stations into a reference stream by extending the time from January 1st 2000 (the timestamp of the first record in the data set), to max time.
    • Ordinarily, this would be done with query logic in your application, but for ease of use in LINQPad (i.e. removing an extra step) I did it in the context.

Go ahead and compile the project, to obtain the WeatherData.LinqpadContext.dll.  We’ll load this into LINQPad by

  • Add Connection, then select Microsoft StreamInsight from the list of drivers.
  • Select Custom Context from the Context Kind dropdown.
  • Click on the ellipsis for Assembly Path (...), then browse to the directory containing WeatherData.LinqpadContext.dll.
  • Click OK to finish selecting the new context.

image

3d Glossy Blue Orbs Alphanumeric Lucinda Exclamation Point Icon

Our weather context should now be available on the connections list.  Before we can start querying these streams, we will need to take our data files (weather_data_out.csv.gz and station_data_out.csv.gz from our first step) and copy them to the assembly directory (in this case, the bin directory for our project).

If you’re not sure which directory you need to place the files in, simply run a query against the data context.  The exception will show you the correct directory. 

Now that we have our custom context ready to go, let’s run some queries.

Running some Queries

With the query context loaded up, let’s go ahead and run some queries against it.

  • From LINQPad, click File, New Query.
  • From the Language drop-down, select C# Statements.
  • From the Database drop-down, select StreamInsight: WeatherContext

image

Now we’re ready to run some queries.  Let’s start with a couple of basic one to look at the SmallWeather and Stations streams.

image

image 

Excellent!  One last little query to close out our walkthrough of creating a data context – join the reference stream with the small data stream and filter for a specific airport.

image

3d Glossy Blue Orbs Alphanumeric Lucinda Exclamation Point Icon

Remember – you can execute just the selected text in LINQPad by highlighting it, then pressing F5 (or clicking Query –> Execute), which is why all of the screenshots show highlighted text.