MSDN UK Team blog

Get all the latest Microsoft developer news, tools, downloads, tutorials and tips right here, every day.
Search Form
Unfiltered HTML
Developers code with microsoft

The Reactive Extensions (Rx) for .NET

Unfiltered HTML
Unfiltered HTML

The Reactive Extensions (Rx) for .NET

  • Comments 1

The Reactive Extensions (Rx) library shipped its first release just a couple of weeks ago and so I thought I’d put together a quick, introductory post on it.

Rx is a library that helps with event-based and asynchronous programming by providing framework classes for modelling and processing sequences of data.

We already have support for sequences of data in .NET – if we can get hold of an IEnumerable<T> then it’s easy to grab an IEnumerator<T> in order to pull data from a sequence;

      foreach (var number in Enumerable.Range(1, 10))
      {
        Console.WriteLine("The number is {0}", number);
      }

and we’ve got the whole “power of LINQ!” to play with around these enumerables;

      foreach (var number in 
        (
          from i in Enumerable.Range(1, 10)
          where i % 2 == 0 
          orderby i descending 
          select new { EvenNumber = i, Square = i * i }
        ))        
      {
        Console.WriteLine("The even number is {0} and the square is {1}",
          number.EvenNumber, number.Square);
      }

and life’s good J Where it gets a little tricky though is if the production of the sequence of values is in any way asynchronous. There’s nothing in the IEnumerator<T> interface to model the notion of “come back later”.

That’s where new interfaces in .NET 4.0, IObservable<T> and IObserver<T> come in. They model the inverse of the IEnumerable situation in that this is about push rather than pull. When we have an IObservable<T>;

  public interface IObservable<out T>
  {
    IDisposable Subscribe(IObserver<T> observer);
  }

This allows for an observer to subscribe to an observable and that IDisposable return type indicates that unsubscribing is a matter of disposing the subscription here. An observer is an implementation of;

  public interface IObserver<in T>
  {
    void OnCompleted();
    void OnError(Exception error);
    void OnNext(T value);
  }

and while it’s not possible to convey the information in a .NET interface, the sequence that’s being modelled here is zero or more OnNext calls followed by either a single call to OnError or a single call to OnCompleted;

OnNext* ( OnError | OnCompleted )

To those fundamental interfaces, Rx comes along and adds framework support around these observable sequences of data. Support for creating, combining, querying, transforming, grouping along with temporal capabilities like delaying, buffering and so on.

Let’s imagine that we want a simple sequence that produces an arbitrary value every 3 seconds. We can create an observable sequence with;

      IObservable<long> sequence = 
        Observable
          .Timer(
            TimeSpan.FromSeconds(3),  // start time
            TimeSpan.FromSeconds(3)); // interval time

Now, if we want to subscribe to that sequence, we can use an extension method on IObservable<T> to subscribe by passing lambda that handles the OnNext messages rather than having to completely implement IObserver<T> ourselves;

      IDisposable subscription =
        sequence.Subscribe(
          value => Console.WriteLine("Value produced is {0}", value));

and that subscription is now going to receive that sequence of long values (0, 1, 2, 3, … ) until the subscription is cancelled with;

subscription.Dispose();

Now, it’s not particularly useful example but it does have just enough to allow for some illustration. What about some LINQ style operators?

      var sequence =
        Observable
          .Timer(
            TimeSpan.FromSeconds(3),  // start time
            TimeSpan.FromSeconds(3)) // interval time
          .Where(
            value => value % 2 == 0)
          .Skip(2)
          .Take(5)
          .Select(
            value => new
            {
              Number = value,
              Square = value * value
            });
 
      IDisposable subscription =
        sequence.Subscribe(
          value => Console.WriteLine("Even number is {0}, Square is {1}",
            value.Number, value.Square),
          () => Console.WriteLine("Completed"));
 
      Console.ReadLine();
      subscription.Dispose();

Notice how the operators nicely compose to filter out any odd numbers, ignore the first 2 results and then take the next 5 results and project them into an anonymous type yielding the output;

Even number is 4, Square is 16

Even number is 6, Square is 36

Even number is 8, Square is 64

Even number is 10, Square is 100

Even number is 12, Square is 144

Completed

although no output at all arrives for the first 16 seconds and then output occurs every 6 seconds until the sequence completes (limited by the Take()) operator.

Notice also that we passed a second lambda here to Subscribe() in order to handle the OnCompleted method call along with the OnNext method call.

These kinds of operators bring an instant, LINQ-like feel to Rx and mean that you can also use query expression syntax for queries. The duality between IEnumerable and IObservable is very much apparent in Rx. We can take any enumerable and make it observable;

      IEnumerable<char> enumerableChars = "abc";
 
      IObservable<char> observableChars = enumerableChars.ToObservable();
 
      observableChars.Subscribe(
        value => Console.WriteLine(value),
        () => Console.WriteLine("Completed"));
 
 
      Console.ReadLine();

With output;

a

b

c

Completed

Equally, we can travel in the other direction and convert an observable to an enumerable;

      IObservable<long> observableLongs = 
        Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
 
      IEnumerable<long> enumerableLongs = observableLongs.ToEnumerable();
 
      foreach (long value in enumerableLongs)
      {
        Console.WriteLine(value);
      }

Which outputs the values 0, 1, 2, 3, … with a 1 second interval between each item although you could perhaps argue that this particular example is not a very friendly enumerable.

Once you start to think in terms of sequences, Rx really opens up. There are operators for combining sequences such as the Zip operator which pairs up values from sequences as they become available;

      IObservable<int> seq1 = new int [] {  1, 2, 4 }.ToObservable();
      IObservable<int> seq2 = new int [] {  3, 6, 9 }.ToObservable();
 
      IObservable<int> zipped =
        seq1.Zip(seq2,
          (value1, value2) => value1 * value2);
 
      zipped.Subscribe(
        value => Console.WriteLine(value));

produces the output;

3

12

36

but, equally, Rx would give us operators to Concat or Merge those sequences and many other ways to combine them.

What about manipulating the timeliness of the data? What if I have a sequence producing values every second but I can only handle those values every 5 seconds so I need to buffer them. Rx steps in;

      IObservable<long> sequence =
        Observable.Timer(
          TimeSpan.FromSeconds(1),
          TimeSpan.FromSeconds(1));
 
      IObservable<IList<long>> buffered =
        sequence.Buffer(TimeSpan.FromSeconds(5));
 
      IDisposable subscription =
        buffered.Subscribe(
          values =>
          {
            Console.WriteLine("5 second batch of data produced");
 
            foreach (long value in values)
            {
              Console.WriteLine("Value {0} produced", value);
            }
          });

making it pretty easy to batch the data until my code is ready to handle it. What if I wanted to buffer by count rather than by time? What if I need 15 values at a time but I can’t wait more than 10 seconds for them? This is doomed to failure in this particular example but it illustrates the technique;

      IObservable<long> sequence =
        Observable.Timer(
          TimeSpan.FromSeconds(1),
          TimeSpan.FromSeconds(1));
 
      sequence =
        sequence
          .Buffer(15)
          .SelectMany(values => values);
 
      sequence =
        sequence
          .Timeout(TimeSpan.FromSeconds(10));
 
      IDisposable subscription =
        sequence.Subscribe(
          value => Console.WriteLine("Value produced is {0}", value));

Here, we produce one value per second but buffer them into groups of 15 and then we demand via Timeout() that they are produced in less than 10 seconds. That’s not going to happen and this particular example fails with an unhandled exception because of that Timeout.

If we had dealt with the OnError notification by subscribing more completely to OnNext/OnCompleted/OnError then we could have dealt with that exception;

      IDisposable subscription =
        sequence.Subscribe(
          value => Console.WriteLine("Value produced is {0}", value),
          error => Console.WriteLine("Error occurred [{0}]", error.Message));

and avoid it being left unhandled. Alternatively, I might want to plan for exceptions in sequences and handle them in different ways. For example;

    IObservable<long> sequence =
        Observable.Timer(
          TimeSpan.FromSeconds(1),
          TimeSpan.FromSeconds(1));
 
      sequence =
        sequence
          .Buffer(15)
          .SelectMany(values => values);
 
      sequence =
        sequence
          .Timeout(TimeSpan.FromSeconds(10))
          .Catch(
            new long[] { 1, 2, 3, 4, 5 }.ToObservable());
 
      IDisposable subscription =
        sequence.Subscribe(
          value => Console.WriteLine("Value produced is {0}", value),
          () => Console.WriteLine("Sequence completed normally"));

Notice that in this example, the Rx Catch operator is used to detect any OnError “messages” that are produced in the sequence. What’s really interesting is that the operator deals with the error by producing a new sequence which in this case is the values 1, 2, 3, 4, 5. This sample then produces the output;

Value produced is 1

Value produced is 2

Value produced is 3

Value produced is 4

Value produced is 5

Sequence completed normally

after the initial 10 second timeout completes.

That gives you a flavour of the sorts of things that you can do with Rx but, for simplicity, I’ve pretty much just talked about Observable.Timer as the way in which sequences are created and sequences of numbers that go { 0, 1, 2, 3, … } are not perhaps the most interesting or real-world examples of data.

Sequences of data are everywhere. Think of an RSS news-ticker that you poll. Think of a database-query that you run periodically in your app. Rx makes it pretty easy to bring these into the application as a sequence of data.

Here’s a more complex example that uses some of the things already introduced in this article along with some new capabilities in order to poll the BBC’s news feed for news stories once every 30 seconds until someone hits the ‘x’ key on the console. I’ve tried to comment it quite heavily;

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Xml.Linq;
 
namespace RxExample
{
  class Program
  {
    // We take a URL and use Rx to produce an IObservable<WebResponse>
    // from it using the magic of "FromAsyncPattern<T>".
    static IObservable<WebResponse> MakeObservableWebRequest(
      string url)
    {
      // Make a standard web request.
      WebRequest webRequest = WebRequest.Create(url);
 
      // Bridge it to the world of Rx by using FromAsyncPattern to turn it
      // into a function that can return IObservable<WebResponse>
      Func<IObservable<WebResponse>> rxFactory =
        Observable
          .FromAsyncPattern<WebResponse>(
            webRequest.BeginGetResponse, webRequest.EndGetResponse);
 
      // Call that factory function and return the IObservable<WebResponse>
      return (rxFactory());
    }
    static IEnumerable<T> TransformNewsXml<T>(XElement xElement,
      Func<XElement, T> selector)
    {
      // a bit fast and loose with the XML, pulling out all element that
      // are called "item" and expecting that they'll have certain
      // sub-elements such as "pubDate".
      return (
        xElement
          .DescendantsAndSelf("item")
          .OrderByDescending(
            element => (DateTime)element.Element("pubDate"))
          .Take(10)
          .Select(selector).ToList());
    }
    // Try to turn keys onto the console into an observable sequence of
    // char.
    static IObservable<char> MakeObservableConsoleInput()
    {
      // Observable.Create() here is a way of making an observable by 
      // writing the code that runs when someone subscribes. Here, we
      // return all characters input to the console as part of the 
      // sequence.
      return (
        Observable.Create<char>(
          observer =>
          {
            ConsoleKeyInfo keyInfo = Console.ReadKey();
 
            observer.OnNext(keyInfo.KeyChar);
 
            // the return value here is what code to run to dispose 
            // of this sequence - i.e. nothing.
            return (() => { });
          })
       .SubscribeOn(Scheduler.NewThread));
    }
    static void Main(string[] args)
    {
      // This sequence will do one asynchronous web request and then it will
      // complete. It will not begin the asynchronous web request until
      // a subscriber subscribes because of the Defer() call.
      IObservable<Stream> streams = 
        Observable
        .Defer(() =>
          MakeObservableWebRequest("http://feeds.bbci.co.uk/news/rss.xml"))
        .Select(
          webResponse => webResponse.GetResponseStream());
 
      // This sequence will produce a value immediately and then wait for
      // 30 seconds before producing another value.
      IObservable<long> timer = 
        Observable
          .Timer(DateTime.Now, TimeSpan.FromSeconds(30));
 
      // This sequence will produce a value as soon as the Stream from the
      // web request is available. Then it will wait 30 seconds before
      // completing because the response sequence has completed and the
      // zip of the 2 sequences is over.
      IObservable<Stream> streamsOnTimer = timer.Zip(streams, (t,s) => s);
 
      // We now repeat that previous sequence over and over to procuce an
      // HTTP request every 30 seconds.
      IObservable<Stream> repeatedStreams = streamsOnTimer.Repeat();
 
      // We want to read the XML from that stream. Here we use LINQ to XML
      // to get the top 10 stories ordered by date.
      var newsStories = 
        repeatedStreams
          .SelectMany(stream =>
            {
              XElement xElement = XElement.Load(stream);
 
              var stories = TransformNewsXml(
                xElement,
                element => new 
                {
                  Title = (string)element.Element("title"),
                  Description = (string)element.Element("description"),
                  Date = (DateTime)element.Element("pubDate")
                });
 
              stream.Close();
 
              return (stories);
            });
 
      // Set up a sequence that monitors keys from the console looking for
      // the 'x' key for eXit.
      IObservable<char> consoleCharacters =
        MakeObservableConsoleInput()
        .Where(key => key == 'x');
 
      // Keep taking the news stories until the console sequence produces a
      // value because someone has hit the 'x' key.
      newsStories
        .TakeUntil(consoleCharacters)
        .ForEach(
          story =>
          {
            Console.WriteLine("Story \"{0}\" [{1}]",
              story.Title,
              story.Date.ToShortDateString());
 
            Console.WriteLine("\t{0}", story.Description);
 
            Console.WriteLine();
          });
    }
  }
}

Clearly, there’s quite a lot going on in that sample but I wanted to add something to the post which wasn’t just “Hello World” to show some possibilities around how far you can take this approach to asynchronous, sequence-based programming.

There’s also a lot to learn about Rx but there are some great resources out there to get you started including the latest video series on Channel 9.

If your interest has been sparked, download Rx and follow along with the video series including the tutorials at the end of each episode.

 

 

OnCompleted()

  • fascinating post! - i've been meaning to read up on this for ages but have only now got a clear picture of what Rx actually is!

Page 1 of 1 (1 items)
Leave a Comment
  • Please add 8 and 2 and type the answer here:
  • Post