Following on the heels of my last query pattern blog post, I started to dig into creating a custom data context for use with StreamInsight and LINQPad. The default contexts supplied out of box with the StreamInsight driver (including the Hitchhiker’s Guide context) are great, but don’t contain a large volume of data. I wanted to put together a much larger context to have a baseline to play with larger and more complicated query patterns. My goals for the context/data streams were:
Building on the fun I had putting together the windchill sample, here’s what I decided on:
This blog post walks through the various steps of creating the custom data context:
If you’re comfortable with creating/accessible canned data sets, and you just want to create a custom LINQPad context for use with StreamInsight, skip directly to that section.
Before you start reading through the blog, download and follow along with the code samples:
Environment Canada rocks. They really do (caveat: I am Canadian ). They’ve made a great deal of climatic data available for download in their National Climate Archives On-line. From the National Climate Archives site, I chose to download ten year’s worth of information for ten airports in Canada (airport weather stations tend to record and archive data in 30 minute or hourly increments – most “other” weather stations seem to record 2-3x per day). Since the goal is lots of time-series data to look at weather stations it is (hourly records gives us 8760 records per year per weather station. 10 stations times 10 years gives us 876k records.. close enough ).
You can see the available stations in a given area and timeframe by looking at their customized search. Being familiar with Canadian airport geography, I selected them by:
The web URL for downloading data was of the form:
With the following variables:
Next step was to script up a downloader that pulled down data for 2010 for each of my target weather stations. Threw something quick together in LINQPad to accomplish this. Each invocation of this URL will return data for the given month (i.e. the day value is ignored).
After running this script, I had available to me roughly 1.0 GB of raw XML weather data. The contents took the form:
Note that we have both the station metadata and the “live” weather data available to us. The next step will be to render this data into two separate data sets:
Note that we will also have to shift the timestamp on each data record to the appropriate DateTimeOffset. We’ll use another handy little LINQ nugget to do this (I’ve really gotten into using LINQPad as my general purpose C# scratchpad).
Let’s walk through the main method before diving into the two key functions, ParseWeatherData and CsvSequenceStore.WriteEvents<T>. We grab the list of files from the download directory (in this case we just take 5 to run quickly and work with a smaller amount of data for test purposes). Next, iterate over the list of files, parsing each and inserting the results into the weatherData and stations lists. The weatherData list is then sorted before being written into the output file via the CsvSequenceStore. Note how we use the GZipStream to transparently write out a compressed stream.
Note that the entire 1 GB worth of data is read into memory before being written out to the destination file. This isn’t the world’s best coding practice. What I should have done was:
However, as this was a one-shot, and I have lots of memory on my desktop, I brute-forced it.
Now to dive into the first of the two key functions being called – ParseWeatherData.
This looks like a lot, but it’s simply an XML parser, leveraging the XElement class. After opening up the file, we pull the file into an XElement object, then pluck out the station and weather (station data) information. The GetValue<T> and GetAttribute<T> functions are simple helpers to ease extracting typed data from the various nodes and coercing the types.
Note that I use Convert.ChangeType in the GetValue<T> and GetAttribute<T> functions. This is a pretty slow method – I would have been better off using a switch() statement on the data type and XmlConvert for performance reasons.
However, as this is code that only runs once, I didn’t bother to add the performance optimization. I did spend a lot of time on performance optimizations for reading the data, which we’ll cover in the next section.
There are only two “custom” operations in ParseWeatherData, to handle two custom data parsing condtions:
Finally, we have our friendly neighborhood public void WriteEvents<T>(Stream stream, IEnumerable<T> events, Func<T, DateTimeOffset> dtFunc) method, which writes out each event to a .CSV file. It takes as arguments:
This is a very straightforward function. Using reflection, we obtain the list of public properties for the type (via GetProperties()), and use a StringBuilder to assemble the text. Wanted to avoid using String.Join(), hence remembering lastField. Then iterate through the list of properties, and use the reflection method GetValue to write out the string representation to the line.
As a base Stream class only understands writing out bytes, we convert the line into a byte array and write it out.
Note that I ASCII encoding here. Really should be using Unicode encoding, but I happen to know that the data is pure ASCII (at least it is THIS time). Should a future blog feature Unicode data, this line will change
After executing the script, I have a compressed .CSV file containing weather data. Let’s have a look at the contents. First step will be to unzip it into a plain .csv file (via the gzip.exe utility – I could write a script to do this, but we’ll get to using GzipStream to uncompress in the next section).
Now, we crack open our .CSV files with note and have a look (not that these only have 5 weather files in them, as I left the .Take() method uncommented on line 5).
Point,633979188000000000,,71628,1/1/2010 5:00:00 AM,-5.3,-6,95,9,15,4.8,100.03,0,-11,0
Excellent – we now have (mostly) human-readable data in a downloadable (i.e. small) format that we can extract quickly. On to extraction!
We’ll stay in LINQPad for the time being (will switch over to Visual Studio when it comes time to create the actual data context – feel free to work through this code in VS.NET if it’s more comfortable). Now that we have the canned data set (as our compressed .CSV file), we need to be able to read it back out again in a form that can be turned into a CepStream<T>.
This is very similar, yet a little simpler, than the code which we used in the last section to write the data into the compressed CSV file. The parsing code is the same (also using the CsvSequenceStore class, only now using the ReadEvent<T> method in place of the WriteEvents<T> method. The new piece that has been introduced is the assign function, which defines how we map from the CSV format into POCO (plain old CLR object) format.
This doesn’t seem like a terribly generic way of implementing this, and its not. The reason I’m using a function delegate, not reflection (via PropertyInfo.SetValue() and Convert.ChangeType()) is that we’re now in performance critical code and those two methods were killing performance.
I’ll write the code for dynamically generating this function in a later blog, but for now if you’re interested in the performance numbers, head on over to my performance blog on this topic.
Executing this simple main function gives us the shiny weather events (50 of them, as limited by the Take() method on line 30):
The core of this approach lies in the ReadEvents<T> method, which is remarkably simple given the amount of code we’ve written to get to this point . Remembering that each line of CSV consists of the event shape (Point, Interval), the Start and End times, then the payload. We use a wrapper object, TypedEventPayload<T> to encapsulate the payload along with the event characteristics.
We now have an IEnumerable<T> which we can convert into a CepStream<T> via use of the ToPointStream method.
The really critical piece here is returning the events as an IEnumerable<T>, not as an array or list. If we had to load the entire file into memory before sending a single event to StreamInsight, that wouldn’t be performant from either a memory or a latency perspective. Instead, as we pull and deserialize events from the CSV file we yield them back.
This will allow the CepStream<T> to consume events as they are raised (which we’ll do in the next step).
Finally, we convert the IEnumerable<T> into a CepStream<T> and run some basic queries over it:
Excellent – we now have a CepStream<T> – the only remaining step to having a canned data set is simple packaging
Here’s the problem:
Our current code base works, as we force the entire query to complete before exiting the using block that creates the stream. In a LINQPad context wherein we need to raise the stream from a function and have no direct control over the lifetime of a query this will have the effect of closing the underyling file before any events are read.
We need to redesign our code to encapsulate the file reading into the same class that exposes IEnumerable, and handles stream cleanup correctly.
Now, to take the prototype we built out in the last section, and put it in a form suitable for embedding into a library (which the LINQPad context will leverage). We’ll create a base class, SequenceBase, which encapsulates creating streams from files (including compressed files). This will allow us to more easily plug in different types of “serializers”.
SequenceBase is very straightforward, simply providing a helper function to create a new stream upon demand (to allow the enumerator to wrap it in a using() statement).
With this as a foundation, our Csv reader class becomes very straightforward to implement – essentially taking the contents of the ReadEvents<T> method from the previous section, and wrapping them up in a class derived from SequenceBase.
Note that we assume that the assignment Action<string[], T> is handed in when the reader is created.
The really critical piece here is creating (via GetStream()) and disposing of (via the using statement) of our underlying file resources each time (which required that the resource management code – opening and closing files – be merged with the resource using code).
That’s it – now that we have our IEnumerable<T> wrapped up in a class that can close out the underlying file stream on Dispose(), we are ready to wrap it up in a LINQPad data context.
Creating the custom data context will be the shortest section of this blog post . With a data source (either an IEnumerable<T>, IObservable<T> or a StreamInsight adapter) ready to go, all that stands between you and LINQPad data context goodness is exposing the CepStream<T> object in a class.
Really – that’s it. Here are the steps:
Or, to look at the code:
There are a few changes from the last section, to clean things up a bit in a project:
Go ahead and compile the project, to obtain the WeatherData.LinqpadContext.dll. We’ll load this into LINQPad by
Our weather context should now be available on the connections list. Before we can start querying these streams, we will need to take our data files (weather_data_out.csv.gz and station_data_out.csv.gz from our first step) and copy them to the assembly directory (in this case, the bin directory for our project).
If you’re not sure which directory you need to place the files in, simply run a query against the data context. The exception will show you the correct directory.
Now that we have our custom context ready to go, let’s run some queries.
With the query context loaded up, let’s go ahead and run some queries against it.
Now we’re ready to run some queries. Let’s start with a couple of basic one to look at the SmallWeather and Stations streams.
Excellent! One last little query to close out our walkthrough of creating a data context – join the reference stream with the small data stream and filter for a specific airport.
Remember – you can execute just the selected text in LINQPad by highlighting it, then pressing F5 (or clicking Query –> Execute), which is why all of the screenshots show highlighted text.