Process data, commonly accessed from OPC or a process historian such as OSIsoft PI, is well suited for analysis using StreamInsight – being time-series data with data values from a range of sensors.  In this post, I’ll walk through a couple of simple patterns for working with process data (not that these patterns are isolated to process data).

To follow along, the LINQ query for this example is here (if you haven’t used LINQPad with StreamInsight yet, follow the directions here to get up and running).

The general case for collecting process data is that of a range of sensors (temperature, pressure, humidity, flow, etc) connected to process control equipment (Programmable Logic Controllers, or PLC’s, SCADA systems, etc) that in turn relay the information upstream to process historians (or provide a direct data gathering interface, through OPC or a similar protocol). 

The reason I mention this level of detail has to do with how process data typically arrives – in a single or mixed stream.  That is to say as connections to these embedded devices tends to be ‘expensive’ we want to minimize connections and communication and pull multiple data values over a single connection, often emitted as a single CepStream<T> in StreamInsight (rather than having, for example, temperature and humidity information coming from two different data sources, in two different streams).  This is why I have intermingled data types in my examples below, and why the query patterns usually start out with creating different virtual streams based on a data type.

For these examples, let’s consider a representative process control data type with some simple fields

  • string Id
  • DateTime Timestamp
  • double Value

Next, assume that the Id field uses an “asset.sensor” naming convention (i.e. Station1.Temperature, and Station1.Windspeed).  Also assume that the data updates are not guaranteed to be synchronized (i.e. the underlying PLC may not report updates to temperature and humidity at the same time, nor on the same schedule).  For the first query, we want to create a query that answers the question:

“What is the current wind chill factor”  (or is it a dry cold Smile - the opposite side of the coin from the humidex)

Which breaks down to answering these questions:

  • For each station, what is the latest value of both temperature and wind speed?
  • Every time I have a new temperature or humidity value, I need to calculate a new wind chill factor.

Let’s start by examining the first question.  Whenever we see a question involving latest value, we need to take a series of point or interval events and convert them into a signal (refer to Point to Signal conversion on MSDN).


This is a timeline representation of what we’re looking to do.  We need to:

  1. Create separate streams for the temperature values and wind speed values, creating the inferred join key based off of the prefix of the Id.
  2. Convert each stream from a series of Point Events into signal stream using AlterEventLifetime and ClipEventDuration.
  3. For each joint event (i.e. each time we receive a new value from either sub-stream, calculate a new wind chil).

For the purposes of this example, assume that all temperature are in F, and the wind speed is in mph.

  1. /////////////////////////////////////////////////////////////////////////////
  2. // Create some sample values for both temperature and wind sensor
  3. // measurements
  4. /////////////////////////////////////////////////////////////////////////////
  5. var sensorValues = new[]
  6. {
  7.     new { Id = "Station10.Temperature", Timestamp = DateTime.Parse("10/23/2009 9:00:00 AM"), Value = 20.1f },
  8.     new { Id = "Station10.Temperature", Timestamp = DateTime.Parse("10/23/2009 9:00:30 AM"), Value = 18.1f },
  9.     new { Id = "Station10.Temperature", Timestamp = DateTime.Parse("10/23/2009 9:00:45 AM"), Value = 17.1f },
  10.     new { Id = "Station10.Temperature", Timestamp = DateTime.Parse("10/23/2009 9:01:00 AM"), Value = 22.5f },
  12.     new { Id = "Station10.Windspeed", Timestamp = DateTime.Parse("10/23/2009 9:00:05 AM"), Value = 30.1f },
  13.     new { Id = "Station10.Windspeed", Timestamp = DateTime.Parse("10/23/2009 9:00:25 AM"), Value = 40.4f },
  14.     new { Id = "Station10.Windspeed", Timestamp = DateTime.Parse("10/23/2009 9:00:35 AM"), Value = 50.3f },
  15.     new { Id = "Station10.Windspeed", Timestamp = DateTime.Parse("10/23/2009 9:01:55 AM"), Value = 40.6f },
  16. };
  18. /////////////////////////////////////////////////////////////////////////////
  19. // Convert the input set into a temporal stream by using the Timestamp
  20. // field (and ordering the incoming data)
  21. var inputStream = sensorValues.OrderBy(e => e.Timestamp)
  22.     .ToPointStream(Application, e =>
  23.         PointEvent.CreateInsert(e.Timestamp, e),
  24.         AdvanceTimeSettings.StrictlyIncreasingStartTime);
  26. // Create a temperature stream, and assign the ID as the component of the
  27. // ID before the .Temperature
  28. var temperatureStream = from e in inputStream
  29.                         where e.Id.EndsWith(".Temperature")
  30.                         select new
  31.                         {
  32.                             Id = e.Id.Substring(0, e.Id.IndexOf(".")),
  33.                             Timestamp = e.Timestamp,
  34.                             Value = e.Value
  35.                         };
  37. // Create a windspeed stream, and assign the ID as the component of the
  38. // ID before the .Windspeed
  39. var windspeedStream = from e in inputStream
  40.                         where e.Id.EndsWith("Windspeed")
  41.                         select new
  42.                         {
  43.                             Id = e.Id.Substring(0, e.Id.IndexOf(".")),
  44.                             Timestamp = e.Timestamp,
  45.                             Value = e.Value
  46.                         };
  48. // TEMP: dump the raw streams
  49. //temperatureStream.Dump("temps");
  50. //windspeedStream.Dump("wind");
  52. // Convert the temperature and wind speed streams into signals (i.e. remember
  53. // last known value).  Assume that we get at least one update every hour
  54. var temperatureSignal = temperatureStream
  55.     .AlterEventDuration(e => TimeSpan.FromHours(1))
  56.     .ClipEventDuration(temperatureStream, (e1, e2) => (e1.Id == e2.Id));
  58. var windspeedSignal = windspeedStream
  59.     .AlterEventDuration(e => TimeSpan.FromHours(1))
  60.     .ClipEventDuration(windspeedStream, (e1, e2) => (e1.Id == e2.Id));
  62. // TEMP: dump out the signal streams
  63. //var sinkTemp = from p in temperatureSignal.ToIntervalEnumerable()
  64. //            where p.EventKind == EventKind.Insert
  65. //            select new { p.Payload.Value, p.Payload.Id, p.StartTime, p.EndTime };
  66. //sinkTemp.Dump("temp dump");
  67. //
  68. //var windTemp = from p in windspeedSignal.ToIntervalEnumerable()
  69. //            where p.EventKind == EventKind.Insert
  70. //            select new { p.Payload.Value, p.Payload.Id, p.StartTime, p.EndTime };
  71. //windTemp.Dump("wind dump");
  74. // Join the two streams based on ID - the output will be an event containing
  75. // the last known values for both Temperature and Windspeed.
  76. var measurements = from e1 in temperatureSignal
  77.                     join e2 in windspeedSignal on e1.Id equals e2.Id
  78.                     select new
  79.                     {
  80.                         Id = e1.Id,
  81.                         Temperature = e1.Value,
  82.                         Windspeed = e2.Value
  83.                     };
  85. // TEMP - dump out the joint values
  86. var measures = from p in measurements.ToIntervalEnumerable()
  87.                 where p.EventKind == EventKind.Insert
  88.                 select new { p.Payload.Temperature, p.Payload.Windspeed, p.Payload.Id, p.StartTime, p.EndTime };
  90. measures.Dump("measures");

This LINQ snippet implements step one and two, giving us the joined result of the last known values for each sub-stream:


Now that we have the correlated temperature and wind speed in a single event, we can go ahead and calculate the wind chill factor for each data update.  The algorithm we’ll use, as defined by the National Weather Service is


with T being the e.Temperature value, and V being the wind speed factor.  We add this to the projection as

  1. // Join the two streams based on ID - the output will be an event containing
  2.     // the last known values for both Temperature and Windspeed.
  3.     var measurements = from e1 in temperatureSignal
  4.                        join e2 in windspeedSignal on e1.Id equals e2.Id
  5.                        select new
  6.                        {
  7.                             Id = e1.Id,
  8.                             Temperature = e1.Value,
  9.                             Windspeed = e2.Value,
  10.                             Windchill = Math.Round( 35.74
  11.                                 + (0.6215 * e1.Value)
  12.                                 - (35.75 * Math.Pow(e2.Value, 0.16))
  13.                                 + (0.4275 * e1.Value * Math.Pow(e2.Value, 0.16)))
  14.                        };

And observe the calculated results.


This query pattern will work for any data source wherein you need to work off of and synchronize the “last known value” for a given set or subset of data.  Note that if you are guaranteed to have synchronized timestamps on incoming data the point to signal conversion steps are not necessary before performing the join (as the timestamps of the incoming events already have the appropriate overlap).