After the release of Reactive Extensions (Rx) v1.0 SP1 and Reactive Extensions for JavaScript (RxJS) v1.0 late last year, we’ve been quite busy preparing the next big milestone of the product: Reactive Extensions v2.0 Beta. In this blog post, we’ll outline the main features, improvements, etc. We hope you’ll give it a try and let us know what you think.

 

What are the supported platforms?

At this point, Reactive Extensions v2.0 Beta supports the following platforms:

Going forward, more platforms may be added to the mix. “To ship is to choose”, so we chose to ship an update for .NET Framework 4.5 Beta and the Windows 8 Consumer Preview as close to those releases as possible.

To target the platforms listed above, use Visual Studio 11 Beta for .NET Framework 4.5 Beta, .NET Framework 4.5 Beta for Windows Metro style applications, and Silverlight 5. If you want to develop applications for Windows Phone 7.1, use Visual Studio 2010 for the time being.

 

Where can I find it?

Reactive Extensions v2.0 Beta is available both as an SDK MSI installer and as a set of NuGet packages.

 

How to use the SDK MSI installer?

The MSI will install a set of reference assemblies that are readily available from the Visual Studio “Add Reference…” dialog. The picture below shows the new “Reference Manager” dialog in Visual Studio 11 Beta, showing the Rx assemblies under the Assemblies, Extensions section:

image

We’ll discuss the set of assemblies, which has changed from the v1.0 release, further on in this blog post.

If you’re using Rx to develop Windows Metro style applications, there’s an easier way to include the required assemblies, based on a new feature called “Extension SDKs”. When installing our MSI-based SDK, we also install an Extension SDK for Rx which can be found under the Windows, Extensions section, as illustrated in the picture below:

image

After including the “Reactive Extensions” Extension SDK, you’ll find just one entry for the SDK in the References node in Solution Explorer. This greatly reduces the clutter of a bunch of assemblies:

image

How to use the NuGet packages?

Historically, Rx has been distributed in two flavors on NuGet: the official (supported) “stable” release with the “Rx-“ prefix, and the “experimental” release with the “Rx_Experimental-“ prefix. This was done to allow people to install the v1.0 official “stable” release without getting into automatic upgrading to newer (non-supported) experimental versions in the v1.1 version band.

Starting with NuGet 1.6, the package manager supports a feature called “semantic versioning”, otherwise known as Pre-Release Packages. This allows version numbers of packages to be suffixed with a label such as “beta” or “rc”, causing the package to be treated differently. In particular, pre-release packages won’t be picked up automatically as an update to existing released packages. As a result, we can safely have pre-released packages available without breaking people relying on the current stable release.

Looking at our Rx-Main package, you’ll notice a pre-release version is available:

image

At this point, the “Manage NuGet Packages…” dialog in Visual Studio doesn’t show pre-release packages. This issue will be addressed in NuGet 1.7. For the time being, you’ll have to rely on the Package Manager Console command shown on the page in order to install the package into your project. You can find the Package Manager Console in the Tools, Library Package Manager menu in Visual Studio. The picture below shows how to install the Rx-Main library into a Console Application project:

image

Use the following packages for different application project types:

  • Rx-Xaml for Silverlight 5, Windows Presentation Foundation 4.5, Windows Phone 7.1, or Windows Metro style applications.
  • Rx-Metro for Windows Metro style applications (this includes additional WinRT interoperability functionality in Rx on top of Rx-Xaml).
  • Rx-WinForms for Windows Forms 4.5.
  • Rx-Main for everything else. The packages listed above include a dependency on Rx-Main but install UI scheduler functionality as well.

 

Towards a Portable Library world

Over the years, Rx has supported a large number of variants of the .NET Framework, including the following:

  • .NET Framework 3.5, 3.5 SP1, 4.0, and 4.5.
  • Silverlight 3, 4, and 5.
  • Windows Phone 7, and 7.1.
  • XNA 3.1 Zune, XNA 4.0 Xbox 360

Having to redistribute different builds for each of those platforms has been a grueling experience, both for us and for our customers. Assemblies with the same name contain different functionality depending on the target platform, installer size grows significantly, lots of different builds have to be maintained (with #if preprocessor directives to the “rescue”), and adding platforms requires non-trivial amounts of work.

Luckily, there’s relief around the corner thanks to the new “Portable Library” project, shipping in the box with Visual Studio 11 Beta. The release of Rx v2.0 includes our first step towards supporting Portable Library, putting us on a track to reduce the number of assemblies going forward. When installing the MSI SDK, you’ll notice a new folder called .NETPortable in the installation folder:

image

In here, you’ll find three assemblies which are portable across .NET Framework 4.5 and .NET Framework 4.5 for Windows Metro style applications. Unfortunately, we can’t introduce portability for Silverlight 5 and Windows Phone 7.1 just yet, because our IObservable<T> and IObserver<T> core interfaces don’t live in the same assembly compared to the .NET Framework 4.0 and higher platforms. We’re actively working with the Windows Phone team to resolve this issue for future releases of the product. However, the assembly structure for all Rx distributions has been standardized to look and feel the same, regardless of portability or not. This will help users get familiar with the new refactoring of the API, which we’ll discuss next.

 

The old assembly structure

In order to gain portability, we had to refactor the API surface such that platform-specific dependencies are removed from the portable subset. Let’s remind you of our current v1.0 assembly structure first:

image

An overview:

While the System.Reactive.Windows.Forms and System.Reactive.Windows.Threading assemblies separate out platform-specific dependencies, the core of Rx – System.Reactive – still contains a number of platform-specifics for its schedulers (amongst a few other things). Examples include the TaskPoolScheduler, ThreadPoolScheduler, NewThreadScheduler, etc. Even though those are available on a lot of platforms we currently support, it’s not necessarily the case those will always be available on future platforms either.

For example, the new framework profile for Windows Metro style applications includes the Task Parallel Library, but not longer allows direct access to the thread pool or the System.Threading APIs. As a matter of fact, even the C# compiler has changed its IL generation to use Environment.CurrentManagedThreadId rather than Thread.CurrentThread.ManagedThreadId for the generated (state machine) code for iterators.

Without portability, you should think of the picture above as repeating itself for all supported platforms, with the size of the boxes varying a bit based on the available functionality of the underlying platform, and with some of the top-level boxes present or absent depending on the targeted platform.

 

The new assembly structure

Designing Rx v2.0 with portability in mind led to the refactoring of the assemblies into the following new structure:

image

The red area of the diagram indicates libraries that are available based on the Portable Library portions of the .NET Framework. While Portable Library supports a wide range of technologies including .NET Framework 4, .NET Framework 4.5, .NET for Metro style applications, Windows Phone 7, Windows Phone 7.5, Silverlight 4, Silverlight 5, and XNA 360, the current support in Rx for Portable Library includes .NET Framework 4.5 and .NET for Metro style applications.

We refactored the System.Reactive part of Rx into three new assemblies, all of which have a portable version:

  • System.Reactive.Interfaces contains the stable set of interfaces used by Rx. The assembly version of this assembly is kept constant to 2.0.0.0 and allows other libraries to rely on those interfaces without version changes between different releases of Rx service packs, etc.
  • System.Reactive.Core contains platform-independent schedulers such as the ImmediateScheduler and the CurrentThreadScheduler. It also includes various extension methods for schedulers, low-level synchronization capabilities for observable sequences, and base classes such as ObservableBase. Having those facilities here allows other schedulers to be built in separate libraries, e.g. to provide UI synchronization, without taking a dependency on the query operator library. For instance, if you just want to render an observable sequence in a UI but don’t need any query capabilities, you won’t need the LINQ APIs. We expect this assembly to be extended and serviced less often than higher layers of the system. Rx v2.1 may use the v2.0 core.
  • System.Reactive.Linq provides query capabilities on top of the infrastructure provided by System.Reactive.Core. You can think of this separation much like the separation of languages on a virtual machine or a relational query engine on top of a storage engine. It’s perfectly possible for others to build a custom set of query operators without relying on our LINQ to Events streaming query language, by just referencing the System.Reactive.Core assembly.
  • System.Reactive.Providers adds expression tree support for System.Reactive.Linq and hasn’t changed from v1.0 (modulo the extension with new operators, see further).

Alongside and on top of all this you’ll find the usual suspects of System.Reactive.Windows.Threading (now with a build for Windows Metro style applications, using the CoreDispatcher), System.Reactive.Windows.Forms, and Microsoft.Reactive.Testing (now with a build for Unit Test Library projects targeting Windows Metro style applications). All of those come in a platform-dependent installation folder, or get downloaded from NuGet based on the targeted platform.

Two new assemblies have been added:

  • System.Reactive.WindowsRuntime includes support for WinRT conversions, e.g. to/from IAsyncInfo types. This assembly may be merged with the next assembly going forward.
  • System.Reactive.PlatformServices contains platform-specific functionality, such as schedulers that leverage System.Threading (NewThreadScheduler and EventLoopScheduler come to mind) or the new WinRT thread pool scheduler. It also provides another role as an “enlightenment provider”, which we’ll get into further on.

When including the Rx-Main package from NuGet, you’ll bring down the triad of assemblies that corresponds to the old System.Reactive, as well as the System.Reactive.PlatformServices assembly to include the platform-specific functionality for your project’s target platform.

 

Where are we today?

For this first beta of Rx v2.0, we’re providing the Portable Library compatible assemblies in the .NETPortable folder of our SDK installer for you to play around with. However, the platform-specific SDK versions are not using the Portable Library core just yet. This is merely a reflection of our short release cycle where we haven’t had the time to do all required testing yet.

So, what can you do to try it? Assume you want to play around with building a custom library based on Rx’s portable subset. Simply start by creating a Portable Class Library project in Visual Studio 11 Beta. In the project’s properties, click Change under Target frameworks in the Library section:

image

Next, select .NET Framework 4.5 and .NET for Metro style apps:

image

Finally, go to the Add Reference… dialog and click Browse… to navigate to the “Binaries\.NETPortable\v4.5” subfolder of your Rx installation (as shown in the screenshot in the introduction of Portable Library in this post). Include the assemblies of your choice, keeping the layer map in mind:

  • System.Reactive.Providers depends on…
  • System.Reactive.Linq depends on…
  • System.Reactive.Core depends on…
  • System.Reactive.Interfaces

For my example, I chose the last three assemblies and wrote the following piece of code in my class library:

using System;
using System.Diagnostics.Contracts;
using System.Reactive.Linq;

namespace MyPortableRxLibrary
{
    public static class MathObservableExtensions
    {
        public static IObservable<int> Primes(this IObservable<int> source)
        {
            Contract.Requires(source != null);

            return source.Where(x =>
            {
                for (int d = 2; d <= Math.Sqrt(x); d++)
                    if (x % d == 0)
                        return false;

                return true;
            });
        }
    }
}

You can now add your portable library to any other .NET Framework 4.5 or .NET for Metro style application project. As an example, add a Windows Metro style “Blank Application” project to your solution and set it as the startup project:

image

Add a little bit of UI markup in BlankPage.xaml to show a Button and a TextBlock control. I’ll save you the designer interactions and just paste the XAML code (please forgive my lack of UI design skills):

<Page
    x:Class="Application1.BlankPage"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    xmlns:local="using:Application1"
    xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
    xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
    mc:Ignorable="d">

    <Grid Background="{StaticResource ApplicationPageBackgroundBrush}">
        <Button Click="Button_Click_1" Content="Button" HorizontalAlignment="Left"
Margin="64,41,0,0" VerticalAlignment="Top" Width="75"/> <TextBlock x:Name="txt" HorizontalAlignment="Left" Margin="189,41,0,0"
TextWrapping="Wrap" Text="TextBlock" VerticalAlignment="Top"/> </Grid> </Page>

The highlighted markup shows changes you have to make in order to successfully write the Rx code in the Click event handler for the Button control, which we’ll show next. But first, go to the “Add Reference…” dialog and add a reference to the Portable Class Library project (from the Solution section in the dialog) as well as the three portable assemblies we referenced in the Portable Class Library project. Also add a reference to the System.Reactive.Windows.Threading assembly you can find in the Framework, Extensions section of the dialog:

image

Note:  Don’t use the Extension SDK for this example because it contains non-portable assemblies for the time being. In the Rx v2.0 Beta timeframe we haven’t done the final layering yet (as I mentioned before: “However, the platform-specific SDK versions are not using the Portable Library core just yet.”). In future builds of Rx v2.0, you should be able to just reference the Extension SDK here as well, which will include the portable core and the Metro-specific Rx libraries. For now, you can use the Extension SDK for non-portable uses of Rx only.

The Solution Explorer should look like the picture on the left. The picture on the right shows what things should ultimately look like in the final release of Rx v2.0 (see remark above):

image     image

Finally, we can write a bit of code in our event handler to use the library. Let’s make a very useful prime second clock:

Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
    .Select(_ => DateTime.Now.Second)
    .Primes()
    .ObserveOnDispatcher()
    .Subscribe(x => txt.Text = x + " seconds is prime time");

Notice the use of ObserveOnDispatcher, now supporting the CoreDispatcher, allowing you to write the code just like you always did before for WPF, Silverlight, or Windows Phone. Don’t forget to include the System.Reactive.Linq and MyPortableRxLibrary (or whatever you called your Portable Class Library) namespaces. Finally hit F5 to run the project.

 

Platform enlightenments explained

Making the core of Rx portable comes at a cost: the library can only rely on functionality in the intersection of all platforms targeted by the Portable Library “profile” we’re using. For example, including .NET Framework 4 to our set of target frameworks makes us lose the ExceptionDispatchInfo functionality that was added in .NET Framework 4.5, which greatly improves debugging Rx code.

Similarly, query operators in System.Reactive.Linq that rely on schedulers to introduce concurrency (such as Delay, Throttle, Timeout, Window to name a few) have always provided an overload that relies on a default scheduler choice with the right amount of concurrency introduced, e.g. using the ThreadPool. However, the CLR threadpool isn’t accessible from the Metro profile, in favor of using the Task Parallel Library or WinRT threadpool instead.

Another example includes the use of timers, where the portable intersection targeted by Rx only provides Task.Delay at the moment. If one would build a classic desktop application, server application, or service relying on the portable version of Rx (which is our way forward), it’s unfortunate we couldn’t leverage the Timer class directly, reducing the number of Task object allocations (which could be high – and long-lived – for operators that schedule a lot of time-based actions).

To make this situation better, yet having a portable core, we added “platform enlightenments”. This innocent-looking System.Reactive.PlatformServices assembly plays a dual role. First of all, it provides platform-specific schedulers. Secondly, the (portable!) System.Reactive.Core assembly knows about its platform-dependent sibling and can check for its presence at runtime to get enlightened about the environment it’s running in. You can compare this to Integration Components in hardware virtualization stacks such as Hyper-V where the guest OS can perform better with the presence of certain additions (or enlightenments in the kernel) that make the OS aware of it being virtualized.

Let’s illustrate all of this with an example. Assuming you followed the example above carefully, you now have a Windows Metro style application sitting around that relies on the Rx v2.0 portable libraries. Set a breakpoint inside the selector function passed to Select as shown below:

image

Now run the application and start the query by clicking the button. Upon hitting the breakpoint, have a look at the Call Stack window (right-click and check “Show External Code” to show the Rx parts of the call stack):

image

Notice the bottom of the call stack uses System.Threading.Tasks, invoked by the System.Reactive.Concurrency.DefaultScheduler type. This scheduler is new in Rx v2.0 and uses the best platform facility to introduce concurrency in the most efficient fashion. In other words, if there’s a thread pool, it will use that. So why is it using Task<T> now?

The answer is simply because the portable System.Reactive.Core doesn’t know any better; it hasn’t been enlightened. Therefore it has to fallback to the facilities it has handy in the intersection of the platforms it targets, i.e. Task.Delay. Now, let’s pull off a trick to enlighten the Rx binaries. No, we’re not going to sacrifice the portable copy of System.Reactive.Core (we’re not able to, because we’re using a portable library using Rx after all). Instead, we will add a new reference to the Metro style application project and include System.Reactive.PlatformServices:

image

Note:  In future (preview) releases of Rx v2.0, the plan is you won’t have to do this step yourself. Including the Rx-Xaml NuGet package or the Extension SDK will automatically include the three core (portable) assemblies, as well as the Metro-specific platform enlightenments.

Now re-run the application and let it hit the breakpoint again. Notice something different about the call stack?

image

This time, the System.Reactive.Core assembly has picked up the enlightenments of System.Reactive.PlatformServices which implements a more efficient way of queuing work, this time using the WinRT thread pool APIs. If we’d be using the same library in a regular .NET project using the System.Reactive.PlatformServices for .NET Framework 4.5 our call stack would reveal use of the System.Threading.ThreadPool class instead.

image

A long story short – at the end of the day, you won’t have to know about all this. We’re committed to make it as easy as possible for you to use Rx from whatever platform you wish, enabling you to create portable libraries, and without sacrificing performance that can be gained by platform-specific optimizations.

 

Deprecated members

One unfortunate design choice we made in Rx v1.0 was to provide a one-stop shop for schedulers in the form of static properties on the Scheduler class. In the light of a portable design, we can no longer have those in the portable System.Reactive.Core assembly (where would we get NewThread from?). At the same time, the class provides a set of (perfectly portable) extension methods that are quite useful for users of System.Reactive.Core, including those who’re building new schedulers or custom operator libraries.

Different options to tackle this problem exist. One solution is to move Scheduler to the System.Reactive.PlatformServices assembly, but then platform-neutral schedulers such as Immediate and CurrentThread are no longer available in the core and libraries depending on it. A similar problem exists for the Scheduler extension methods, though we could move those to a different type (SchedulerExtensions or so). Another solution is to keep the Scheduler class with its familiar methods and platform-neutral properties in System.Reactive.Core, and move platform-specific schedulers to their own assemblies.

We chose the latter option by which more advanced platform-specific schedulers should no longer be addressed using properties on the Scheduler class, but rather using a constructor on the scheduler type or another factory facility exposed by it. In practice this means that you’ll write NewThreadScheduler.Instance instead of Scheduler.NewThread (or use its constructor to control thread creation logic yourself), TaskPoolScheduler.Default instead of Scheduler.TaskPool (or use its constructor to control the TaskFactory to be used), etc.

Instead of breaking all user code, we kept the properties around in Rx v2.0 Beta (but only on the non-portable assemblies), using the enlightenment mechanism to try to locate the scheduler implementations from the System.Reactive.PlatformServices assembly. If this assembly is not deployed, an exception will be thrown. However, this is purely a measure to help users to migrate code and as such we’re marking those properties as deprecated:

image

Note:  In the portable distribution of Rx v2.0 Beta, you won’t even find those deprecated properties anymore. Our goal is to rely on enlightenments for as little features as possible, stimulating a clean separation of the portable core from the platform-dependent assemblies. In future releases, we may have more scheduler types, and it wouldn’t make sense to provide the universe of all possible schedulers on the Scheduler type, discovering those in some fancy way at runtime. Instead, users include libraries with specialized platform-specific schedulers and use creation patterns exposed in there (constructors, factory patterns, etc.). To discover schedulers, users can browse the System.Reactive.Concurrency namespace.

 

What’s next?

In the next preview release of Rx v2.0, we’ll continue our work on Portable Library support, including a re-layering of the platform-specific assemblies on top of the portable ones for the supported platforms. Our hope is to ship Rx v2.0 with such a layering, allowing for code reuse between .NET Framework 4.5 and .NET for Metro style applications.

Going forward, we’re engaging with the Portable Library BCL/CLR folks and the Windows Phone team to ensure an extended reach of our future portability story for Reactive Extensions. We’ll keep you posted on our progress.

For now, please give the Portable Library subset of Rx a spin and let us know if you’re missing something!

 

Rx v2.0 and .NET 4.5 “async” / “await” – a better together story

Rx v2.0 is planned to be released in the same timeframe as .NET Framework 4.5 with its new asynchronous programming features. Historically, we’ve evangelized Rx as a technology to deal with event-stream and asynchronous programming. With the advent of “async” and “await” in C# 5.0 and Visual Basic 11, this hasn’t changed. In fact, both worlds can coexist nicely and be mutually beneficial. An overview…

 

The asynchronous programming landscape

Asynchronous programming has been notoriously hard, with the need to wire up callbacks (or continuations if you prefer fancy lingo) manually. The APM and other design patterns are a good illustration of this. Code that used to be sequential in nature – all of a sudden – ends up being split across different method bodies, and regular control flow primitives are no longer available for use across asynchronous invocations. As an exercise, try to write a loop that sequentially and asynchronously downloads one website at a time using the DownloadStringAsync method on WebClient. Don’t forget about exception handling!

When we embarked on the Reactive Extensions (Rx) mission several years ago, one of the things we realized was we could provide a unification of asynchronous programming patterns through a common abstraction of IObservable<T>. Remember, back then in late 2007, even the Task Parallel Library wasn’t released yet. However, our mission was bigger than this and this realization came as a side-effect of tackling the more general problem of event stream processing using LINQ operators (initially called “LINQ to Events” internally). Events really are streams of data, with elements provided asynchronously relative to the call to Subscribe.

The important distinction pointed out above is the number of objects retrieved asynchronously. Are we talking about a single object, or a stream of them? Each of those areas can benefit from a nice way of expressing intent, without the manual plumbing of callback or continuations. Just like one uses imperative control flow with semi-colons, curly braces, and blocks to sequence single-value synchronous computations, one can now write similar code with single-value asynchronous computations thanks to the new async and await keywords in the C# 5.0 and Visual Basic 11 languages. What about the domain of multiple values? The same parallel image for synchronous versus asynchronous holds here. Where you’d write LINQ queries and foreach loops over enumerable sequences, Rx gives you a way to write LINQ queries and event handlers over observable sequences. The diagram below summarizes this:

image

 

Support for awaiting observable sequences

One of the features we’ve added to Rx v2.0 (to be precise: previously available in v1.1 Experimental) is “await” support for observable sequences. In case where an observable sequence represents an operation that has a meaningful single value result, “await” support can help to bridge functional composition of observable sequences with an imperative code style.

For example, the code shown below uses the Window operator to create consecutive, non-overlapping 5 second windows of file system change events.

var fsw = new FileSystemWatcher(@"C:\")
{ 
    IncludeSubdirectories = true,
    EnableRaisingEvents = true
};

var changes = from c in Observable.FromEventPattern<FileSystemEventArgs>(fsw, "Changed")
              select c.EventArgs.FullPath;

changes.Window(TimeSpan.FromSeconds(5)).Subscribe(async window =>
{
    var count = await window.Count();
    Console.WriteLine("{0} events last 5 seconds", count);
});

Console.ReadLine();

A new inner stream for the window is emitted at the start of the window, allowing one to observe the elements in the window or to write other queries on those sub-streams. One such operator is Count, which returns an IObservable<int> signaling the number of elements upon completion of the stream, i.e. when the window is closed. There are various ways we could write a pure Rx query to project each window into its element count and produce a result stream that tells us how many events have happened in each 5 second window. However, with the async feature in C# 5.0 and Visual Basic 11 we can now write code as shown above, awaiting the result of Count.

image

Awaiting an observable sequence returns the last element of the sequence, or throws an exception if the sequence ended exceptionally. If you want another element from the sequence, you can use operators such as ElementAt, FirstAsync, ToList, ToArray, etc. A few examples are shown below:

static void Main(string[] args)
{
    Samples().Wait();
}

static async Task Samples()
{
    var xs = Observable.Range(0, 10, ThreadPoolScheduler.Instance);

    Console.WriteLine("Last  = " + await xs);
    Console.WriteLine("First = " + await xs.FirstAsync());
    Console.WriteLine("Third = " + await xs.ElementAt(3));
    Console.WriteLine("All   = " + string.Join(", ", await xs.ToList()));

    try
    {
        Console.WriteLine("Error = " + await xs.Select(x => 1 / (5 - x)));
    }
    catch (DivideByZeroException)
    {
        Console.WriteLine("Yups, we failed!");
    }
}

Notice we can use regular exception handling thanks to the await language feature. Here we’re catching an exception that’s thrown deep inside the selector function passed to Select.

image

 

Light up Task<T> with Rx

In the samples above, we’ve seen how the use of “await” is natural when writing imperative style asynchronous code to retrieve a single value. Task<T> is the type built in to the .NET Framework that represents such an ongoing (asynchronous) computation of a single value. A bunch of APIs in the .NET Framework have been revamped to provide Task-based asynchronous methods in addition to the classic APM pattern with Begin/End pairs of methods.

Using Rx, one can convert such a Task<T> into an IObservable<T>, use Rx query operators to perform various operations, and finally await the result just as you’d have awaited the original task. For example, the code below uses the Timeout operator to time out the awaiting of a Task<string> after converting it using ToObservable.

static async Task Samples()
{
    var wc = new WebClient();

    var html = await wc.DownloadStringTaskAsync(“http://www.bing.com).ToObservable()
                       .Timeout(TimeSpan.FromSeconds(5));

    Console.WriteLine(html.Length);
}

If no response arrives in 5 seconds, the Timeout operator will unsubscribe from its source and propagate a TimeoutException. If the underlying operation would support cancellation (which isn’t the case for DownloadStringTaskAsync), you can wire that up too. An example of cancellation is shown below using our new FromAsync method:

static async Task Samples()
{
    var x = 100000000;

    try
    {
        var res = await Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                  .Timeout(TimeSpan.FromSeconds(5));

        Console.WriteLine(res);
    }
    catch (TimeoutException)
    {
        Console.WriteLine("Timed out :-(");
    }
}

static Task<double> SumSquareRoots(long count, CancellationToken ct)
{
    return Task.Run(() =>
    {
        var res = 0.0;

        for (long i = 0; i < count; i++)
        {
            res += Math.Sqrt(i);

            if (i % 10000 == 0 && ct.IsCancellationRequested)
            {
                Console.WriteLine("Noticed cancellation!");
                ct.ThrowIfCancellationRequested();
            }
        }

        return res;
    });
}

Running the code above with different values for x may or may not timeout. In case a timeout occurs, the unsubscription of FromAsync (carried out by our Timeout operator) will cause the CancellationToken to be set, hence cancelling the operation.

image

Note:  Timeout for a Task object can be achieved in various imperative ways as well, involving the use of Task.WhenAny to await the arrival of either a result of the completion, or of a Task.Delay operation. However, this doesn’t yet take care of automatic cancellation of the operation in case a timeout occurred. Using Rx for such scenarios typically results in simpler, fluent code that takes care of the hard parts.

 

Leveraging “async” in Rx query operators

In addition to providing “await” support for observable sequences, we’ve also added Async variants of a number of operators. First of all, we’ve deprecated blocking operations in favor of asynchronous ones. Those include:

  • First[OrDefault]Async
  • Last[OrDefault]Async
  • Single[OrDefault]Async
  • ForEachAsync

Various creation operators now also have asynchronous variants. For example, CreateAsync allows an asynchronous method to be passed in which will be called upon a call to the resulting sequence’s Subscribe method. A CancellationToken can be passed in to observe cancellation caused by unsubscription. An example where this operator can be used is when obtaining a source is a potentially long-running operation. Obviously, you want to keep Subscribe non-blocking, so this creation operator becomes very useful. The code below shows a trivial example of using an overload of CreateAsync:

var xs = Observable.CreateAsync<int>(async (observer, ct) =>
{
    var wc = new WebClient();
    var html = await wc.DownloadStringTaskAsync("http://www.bing.com");

    //
    // Could do more elaborate processing based on the result,
    // e.g. crawl other pages and provide results as they come.
    //
    observer.OnNext(html.Length);
    observer.OnCompleted();
});

Notice the CancellationToken is ignored. Obviously you can just wire that one up to any operation inside the async method that supports cancellation.

Other creation operators that have gained async capabilities include DeferAsync and UsingAsync. We’re illustrating the latter below:

var xs = Observable.UsingAsync<byte[], FakeFile>(
    async ct =>
    {
        Console.Write("Obtaining resource... ");
        var file = await FakeFile.OpenAsync("bar.txt", ct);
        Console.WriteLine("Done!");

        return file;
    },
    async (file, ct1) =>
    {
        Console.Write("Obtaining reader... ");
        var reader = await file.GetReader(ct1);
        Console.WriteLine("Done!");

        return Observable.CreateAsync<byte[]>(async (observer, ct2) =>
        {
            var b = new byte[1024];

            var n = 0;
            while ((n = await reader.ReadAsync(b, 0, b.Length, ct2)) > 0)
            {
                var res = new byte[n];
                Array.Copy(b, res, n);
                observer.OnNext(res);
            }

            observer.OnCompleted();
        });
    }
);

Console.WriteLine("Press ENTER to cancel...");
using (xs.Subscribe(bs => Console.WriteLine(bs.Length)))
{
    Console.ReadLine();
}

Console.ReadLine();

In this piece of code, we obtain the resource – a fake file implementation – asynchronously, allowing for cancellation to happen. When the user unsubscribes before we can obtain the resource, OpenAsync can do deep cancellation. If, however, we can proceed post the step of obtaining the resource, unsubscription will cause proper disposal of the resource under any circumstance: triggered automatically when the stream ends (with OnError or OnCompleted), or when the user explicitly disposes the subscription. Once the resource is obtained, we retrieve the reader – again asynchronously – and produce a stream of byte arrays (guess what, again asynchronously using a while loop).

The implementation of the FakeFile class and related types is shown below:

class FakeFile : IDisposable
{
    public static async Task<FakeFile> OpenAsync(string file, CancellationToken ct)
    {
        //
        // Mimic it takes a long time to get the resource, e.g. talking to the file system.
        // Notice the CancellationToken is passed in, so we can cancel obtaining the resource.
        //
        await Task.Delay(2000, ct);
        return new FakeFile();
    }

    public async Task<FakeFileReader> GetReader(CancellationToken ct)
    {
        await Task.Delay(1000, ct);

        return new FakeFileReader(this);
    }

    public void Dispose()
    {
        Console.WriteLine("File closed!");
    }
}

class FakeFileReader
{
    private readonly FakeFile _fakeFile;
    private readonly Random _rand = new Random();
    private int _n;

    public FakeFileReader(FakeFile fakeFile)
    {
        _fakeFile = fakeFile;

        //
        // Random number of chunks that will be read. Mimics arbitrary file length.
        //
        _n = _rand.Next(5, 20);
    }

    public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct)
    {
        if (_n == 0)
            return 0;

        Console.WriteLine("Reading chunk...");
        await Task.Delay(_rand.Next(50, 200), ct);
        _rand.NextBytes(buffer);
        _n--;

        //
        // Fake partial result at the end.
        //
        return (_n == 0 ? _rand.Next(0, count) : count);
    }
}

The picture below shows cancellation at various stages of the stream processing:

image

One final word on exceptions due to cancellation. You may wonder why there’s no TaskCanceledException being propagated from the code above when cancelling the deepest asynchronous operations, in this case mimicking file I/O using Task.Delay. The answer is simple: if you dispose the subscription object returned from the call to Subscribe, the first thing that happens is silencing of the observer so that no further messages can be received. Once that’s done, the disposal triggers cancellation, which may swing back at us by the asynchronous method’s Task returning exceptionally. Even though we can capture such exceptions and feed them to OnError, the silencing of the observer has already taken place, so you don’t observe the cancellation exception. Regular exceptions propagate just fine.

 

Making schedulers easier to use with “await”

Yet another use of the new “await” language feature in our product is in making schedulers easier to use. Recall the operational layering of Rx with schedulers at the bottom, driving generated streams and various operations, with a query language based on LINQ on top. In other words, queries translate into chains of observers which, at the end of the day, are driven by schedulers.

Let’s have a look at the example of writing a simple Range operator akin Observable.Range. Because such a factory method has to create an observable out of thin air, it needs to know where to generate the observer’s messages. That’s the role of the scheduler. One attempt is shown below:

static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
    return Observable.Create<int>(observer =>
    {
        return scheduler.Schedule(() =>
        {
            for (int i = 0; i < count; i++)
            {
                Console.WriteLine("Iteration {0}", i);
                observer.OnNext(start + i);
            }
            observer.OnCompleted();
        });
    });
}

The technique is pretty straightforward: upon subscription, we schedule a for loop to produce the range, and return the IDisposable that can cancel the scheduling. In other words, when the user tries to unsubscribe from the source, cancellation of the underlying scheduled work is attempted. It should be clear this doesn’t quite work to cancel the work in the middle. Even though you can unsubscribe from the source and the observer will shut up (due to automatic silencing behavior built in to Observable.Create), the underlying scheduler is still pumping. To illustrate this, try running the following code and press ENTER to cancel:

using (Range(0, int.MaxValue, ThreadPoolScheduler.Instance).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

Console.WriteLine("Stopped... or not?");

This produces an undesirable result as shown in the next picture. After receiving message 1919, we unsubscribe. Message 1920 still made it – the OnNext and the Dispose call are racing in opposite directions after all – but we succeeded shutting off the observer. However, the underlying scheduler is still running and will do so for a long time, occupying a thread pool thread.

image

Another problem is the lack of fairness in the system if multiple such range generators are active on the same scheduler or a pool of resources. Say you created a nice EventLoopScheduler instance (which runs a single thread to perform work on) and want to reuse it across different source generators, like this:

var e = new EventLoopScheduler();

Console.WriteLine("[ 0,  9]");
Range( 0, 10, e).Subscribe(Console.WriteLine);

Console.WriteLine("[10, 19]");
Range(10, 10, e).Subscribe(Console.WriteLine);

Console.WriteLine("[20, 29]");
Range(20, 10, e).Subscribe(Console.WriteLine);

Console.ReadLine();

Running this code clearly shows the Subscribe calls are asynchronous, yet the streams are generated sequentially. After all, our for loop is running on the one and only thread in the EventLoopScheduler and has to finish before other work can be processed. In other words, our scheduled work items are too coarse-grained:

image

Prior to Rx v2.0 Beta, the way to solve this problem was to use recursive scheduling extension methods provided on IScheduler. Each recursive call causes a new work item to be queued up, resulting in fine-grained work items. On top of that, having small work items allows prompt cancellation of the rest of the computation (if you cancel work item n, which contains a recursive call to schedule work item n + 1, it’s obvious that n + 1 can never happen). However, this made your code significantly more complex:

static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
    return Observable.Create<int>(observer =>
    {
        return scheduler.Schedule(0, (i, self) =>
        {
            if (i < count)
            {
                Console.WriteLine("Iteration {0}", i);
                observer.OnNext(start + i);
                self(i + 1);
            }
            else
            {
                observer.OnCompleted();
            }
        });
    });
}

Not only doesn’t this look like a loop any longer, holding on to resources that need to be cleaned up becomes even more worrisome, as you don’t have a way to tie the work item cancellation Dispose call to cleanup logic inside your own code, unless you resort to an even more complex Schedule method, namely the one on the interface itself.

Luckily, async/await gives some relief. What you’re really doing in the code above is a manual rewrite of the iteration by scheduling the body of a loop, while the Schedule extension method provides the driver of the loop simply by maintaining a scheduler queue. With async/await, we can take away the pain of having to restructure the loop yourself and rely on the compiler transformation to get the next piece of work in the form of the continuation created for code post an await expression. A code sample will make this clearer:

static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
    return Observable.Create<int>(observer =>
    {
        return scheduler.ScheduleAsync(async (ctrl, ct) =>
        {
            for (int i = 0; i < count; i++)
            {
                Console.WriteLine("Iteration {0}", i);
                observer.OnNext(i);
                await ctrl.Yield();
            }
            observer.OnCompleted();

            return Disposable.Empty;
        });
    });
}

Now you get to write the loop simply like you did initially, but by inserting an “await ctrl.Yield()” call you basically cooperatively yield control, causing the rest of the method (in this case the next iteration through the loop, etc.) to be scheduled as another work item. At that point, other work can take place. So we gain the fine-grained work items, but also prompt cancellation. You can even wire up the CancellationToken to computations you’re carrying out internally, e.g. performing I/O that provides cancellation support.

Note:  The fact you need to return an IDisposable from the method body is a restriction in the Beta. While this is useful in case you schedule more work (for example, performing a fan-out of computation onto multiple machines), it’s often not required because of the CancellationToken passed in as an alternative cancellation mechanism. We’re looking into simplifying this post-Beta.

With the changes above, the output looks as we expect. The scheduler stops pretty much immediately (the automatic silencing of the observer takes place first before the Dispose call propagates to the scheduler, see left), and different sequence generators can interleave their work (see right):

            image        image

It’s even easy now to yield control with a different granularity, for example by doing a modulo check in the loop (every tenth iteration, await the Yield call). In addition to the Yield method, we also have a Sleep method which allows to suspend the current logical unit of work for the specified duration. All the code after awaiting the sleep is lifted into a low-level call to Schedule with the specified TimeSpan or DateTimeOffset. Those operations also allow for cancellation of the sleep using the CancellationToken provided in ScheduleAsync:

static IObservable<Unit> Timer(TimeSpan period, IScheduler scheduler)
{
    return Observable.Create<Unit>(observer =>
    {
        return scheduler.ScheduleAsync(async (ctrl, ct) =>
        {
            try
            {
                while (true)
                {
                    observer.OnNext(Unit.Default);
                    await ctrl.Sleep(period, ct);
                }
            }
            finally
            {
                Console.WriteLine("Timer stopped!");
            }
        });
    });
}

Below is a piece of code using this timer, composing it with a Take(5) call to stop the timer after 5 seconds:

using (Timer(TimeSpan.FromSeconds(1), ThreadPoolScheduler.Instance).Take(5)
.Subscribe(_ => Console.WriteLine(DateTime.Now))) { Console.ReadLine(); }

The result looks as expected – even triggering the finally block to run after Take(5) calls Dispose on the Timer, tunneling the Dispose call all the way back to the scheduled work and its CancellationToken:

image

All of this should make it much easier to create asynchronous, responsive, cancellable, and fair observable sequences simply using ScheduleAsync and the new async/await support built into the managed languages starting with the .NET 4.5 release.

 

 

Generalizing some query operators

In this release, we’ve also done some work around generalizing various query operators. The most notable additions are discussed below.

 

More flexible time-based operations

In Rx v1.0, we generalized the Window and Buffer operations to accept observable sequences to represent openings and closing of windows or buffers. Instead of giving us an element count or a TimeSpan to denote the length of a window or buffer, you can give us an observable sequence that indicates when to open a new window or buffer (e.g. this could be a timer), paired with a selector function that denotes the length of the window or buffer based on an observable sequence whose first element indicates the duration (e.g. this could be a single-shot timer). The picture below illustrates this:

image

We’ve now generalized time-based operations to accept observable sequences to denote duration as well. Those include:

  • Delay
  • Throttle
  • Timeout

For example, the new overloads to Delay allow one to specify (optionally) the delay for the subscription as well as the delay for each element based on a selector function. The code below is an example of this:

static void Main(string[] args)
{
    var input = GetInput().ToObservable(NewThreadScheduler.Default);

    var res = input.Delay(x => Observable.Timer(TimeSpan.FromSeconds(x.Length)));

    res.ForEach(x => Console.WriteLine("Received \"{0}\" at {1}", x, DateTime.Now));
}

static IEnumerable<string> GetInput()
{
    while (true)
    {
        Console.Write("{0}> ", DateTime.Now);
        yield return Console.ReadLine();
    }
}

Given the user input, it gets delays for a duration equal to the length of the input in seconds. Stated otherwise, the delay of each element can now be dependent on the data itself. Another example would be to delay birthday wishes for Person objects arriving on a stream until their birthday based on a yearly recurring timer inferred from the Person’s Birthday property. The picture below shows the output of the sample shown above:

image

Changes to the other operators are completely analogous, using a selector function invoked for each element. A good example is the use of Timeout with a selector. Assume you got a message stream where each message includes a TimeSpan value with the promised maximum duration it will take to deliver the next message (e.g. as a way of detecting a failed connection between producers and consumers). Now it’s trivial to fish out that TimeSpan value and construct a timeout duration selector from it. As an exercise, simply change the code above to replace Delay with Timeout and notice the behavior:

image

In the sample run above, I didn’t make it to send the next message within 5 seconds after receiving “Short” (5 characters long), hence a TimeoutException results.

 

Custom push-to-pull adapters

Push-to-pull adapters to convert IObservable<T> sequences to IEnumerable<T> sequences have existed in Rx virtually since day one. Obvious conversions include ToEnumerable (using blocking MoveNext calls, removing concurrency) and ToObservable (taking a scheduler to introduce concurrency). There are more strategies than surfacing the entire sequence though: maybe you don’t want to block until a new element is produced. Maybe you don’t want queues to grow too big if the consumer is slower than the producer. Who knows? To accommodate for various needs, we offered a series of push-to-pull adapters:

  • MostRecent – returning the most recently observed element upon a call to MoveNext, resulting in sampling behavior. Two consecutive calls to MoveNext may return the same element (same meaning at the same index in the sequence).
  • Latest – returning the last element observed upon a call to MoveNext, but blocking subsequent MoveNext calls until another value is available. Two consecutive calls to MoveNext will not return the same element (same meaning at the same index in the sequence).
  • Next – calling MoveNext blocks until the next value (relative to this point in time) in the sequence becomes available. Of the three methods shown here, this blocks most often.

None of those buffer more than one element, in contrast to the ToEnumerable conversion which has an unbounded queue. It’s clear there are middle grounds that haven’t been explored yet.

In this release, we introduce a generalized push-to-pull operator called Collect, reflecting it collects elements that are received in a push-based manner and allows users to retrieve those collections in a pull-based manner. It goes without saying the behaviors described above can be mimicked using a general-purpose operator like this. An example is shown below:

var xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));

var ys = xs.Collect(() => new List<long>(), (list, x) => { list.Add(x); return list; }, lst => lst);

using (var e = ys.GetEnumerator())
{
    while (true)
    {
        Console.ReadLine();
        e.MoveNext(); // always true in this example
        Console.WriteLine("{0} - {1}", DateTime.Now, string.Join(", ", e.Current));
    }
}

The three functions passed to Collect control the push-to-pull storage. First, a factory function for the initial storage is invoked. Next, for each element received by the underlying observer, the merge function is invoked. Notice this function can abandon the current storage and return a new one if it wishes to do so (e.g. reclaiming a buffer if it grows too large). Finally, the substitution function at the end is invoked upon every pull request, where the current storage is returned and the function can create a new storage object (or retain the current one). In this case, we retain the current list all the time, so we basically are keeping a list that contains all elements from the start but that can be peeked at periodically using an enumerator. The output looks as follows:

image

Naturally we could also create new lists upon every pull request, simply by changing one line of code:

var ys = xs.Collect(() => new List<long>(), (list, x) => { list.Add(x); return list; }, lst => new List<long>());

Now we have behavior that chunks up the sequence into consecutive lists. The concatenation of all lists received through the push-to-pull adapter corresponds to the original sequence (modulo the timing of course). In fact, we have an operator called Chunkify that does precisely this. Now the behavior looks as follows:

image

Notice none of the MoveNext calls are blocking in this case. You may end up receiving an empty list in no elements were received, but the whole push-to-pull conversion is non-blocking.

 

Support for WinRT style event handlers

As we’ve mentioned earlier in this blog post, Rx v2.0 Beta also runs on Windows 8 Consumer Preview, allowing you to build Windows Metro style applications leveraging the power of Rx to program against event streams. We had to change just a few things in the core of Rx to make this possible. We discussed one such change before, where System.Reactive.Windows.Threading now supports the CoreDispatcher, so you can just write ObserveOnDispatcher as you’ve always done to synchronize an observer with the UI message loop.

The other addition we made is recognizing WinRT style event handlers when using FromEventPattern. As you may know by now, WinRT uses the same metadata format as the CLI’s assemblies, specified in ECMA 335, modulo a few attributes. Windows Metadata files, also known as “WinMD files” for short, use this format to describe types and their members, including events. In essence, events are merely metadata citizens that refer to a few methods to add and remove event handlers. However, the shape of .NET events and WinRT events is slightly different. Let’s have a look at a .NET event in ILDASM first:

image

Notice the symmetry between the add and remove methods for the event. Both take in the event handler delegate object, returning void. One of the implications of this should be well-known to Rx users: in order to deal with classic .NET events, resource maintenance becomes a burden. Namely, you need to remember the delegate object you give to the add method (hidden behind += in C# and AddHandler in Visual Basic) in order to be able to unsubscribe from the event at a later point (using –= in C# or RemoveHandler in Visual Basic). That’s precisely what FromEventPattern does at the end of the day, as shown in the following simplified example:

class MyEventArgs : EventArgs
{
    public MyEventArgs(int value)
    {
        Value = value;
    }

    public int Value { get; private set; }
}

class Bar
{
    public event EventHandler<MyEventArgs> Foo;

    public void OnFoo(int x)
    {
        var foo = Foo;
        if (foo != null)
            foo(this, new MyEventArgs(x));
    }
}

static void Main()
{
    var bar = new Bar();

    var foo = Observable.Create<EventPattern<MyEventArgs>>(observer =>
    {
        var handler = new EventHandler<MyEventArgs>((sender, e) =>
        {
            observer.OnNext(new EventPattern<MyEventArgs>(sender, e));
        });

        bar.Foo += handler;

        return () => 
        {
            bar.Foo -= handler;
        };
    });

    using (foo.Select(e => e.EventArgs.Value).Subscribe(Console.WriteLine))
    {
        bar.OnFoo(42);
        bar.OnFoo(43);
    }

    // Won't be printed anymore!
    bar.OnFoo(44);
}

For the reflective overloads of FromEventPattern, the handler delegate has to be cooked up using reflection by inspecting the event metadata, locating the add and remove methods, and recognizing their pattern (hence the use of the word “Pattern” in the method name). However, in WinRT the add and remove methods look different ever so slightly. Let’s have a look by opening C:\Windows\System32\WinMetadata\Windows.UI.Xaml.Controls.winmd in ILDASM and locating the TextChanged event declaration:

image

This code screenshot is rather small to read, so I’ll paste and outline it here to have a closer look:

.event Windows.UI.Xaml.Controls.TextChangedEventHandler TextChanged
{
  .removeon instance void
                     Windows.UI.Xaml.Controls.TextBox::remove_TextChanged(
                        
valuetype [Windows.Foundation]Windows.Foundation.EventRegistrationToken)

  .addon instance valuetype [Windows.Foundation]Windows.Foundation.EventRegistrationToken
                     Windows.UI.Xaml.Controls.TextBox::add_TextChanged(
                         class Windows.UI.Xaml.Controls.TextChangedEventHandler)
} // end of event TextBox::TextChanged

Instead of both add and remove methods taking in the delegate, only the add method does now. What gets returned is an EventRegistrationToken value that can be passed to the corresponding remove method to get rid of the event handler. This model closely follows Rx’s. Luckily, as a managed code developer, you don’t have to worry about this at all. In fact, you could write the sample code using Observable.Create shown above against any WinRT event. The C# and Visual Basic compilers have been changed to recognize this pattern and emit the right IL code to carry out the job of attaching and removing event handlers, leveraging helper methods in the WindowsRuntimeMarshal class.

As an Rx user, you don’t have to know anything about those WinRT event handlers either. The code in FromEventPattern has been changed to recognize this new pattern and wires up the EventRegistrationToken handling behind the Subscribe and Dispose calls. So you can simply write the following code if you like to use our reflection-based overloads:

Observable.FromEventPattern<TextChangedEventArgs>(txt, "TextChanged")

 

Performance improvements

We’ve kept the best for the last. Rx v2.0 Beta introduces a whole slew of performance improvements across the board. Over the past year, we’ve seen Rx being used in a wide set of scenarios, ranging from sensor measurements and smart grid management, over UI programming scenarios, to cloud-scale data processing. Various teams at Microsoft are looking into or are already adopting Rx as the way to process event streams, and we learn about new exciting customers on a daily basis.

In our first release, we had a mantra of “correctness first”. The semantics of observable sequences, including the grammar of observer messages, serialization of messages, dispose behavior, error propagation policies, etc. is quite complex to get right, and we spent a lot of time making sure the behavior of operators is correct in the face of concurrency etc. This led to the creation of a quite substantial test suite, currently running 2700 unit tests with over “three nines” of code coverage at the point of shipping v1.0:

image

While our performance has been quite well-received for the v1.0 release, we decided to have a closer look and see where we could improve. Rx v2.0 Beta introduces the first set of performance enhancements that should benefit a wide range of use cases.

 

Improving producer speed

One of the obvious bottlenecks to the speed a query can process a stream of events is the speed at which events can be pumped into the query in the first place. At the edges of a reactive query network you typically find Subject<T> objects or source producers. Subjects have been optimized in the v1.1 timeframe by reducing the number of locks required to coordinate multiple subscribers to the subject, optimizing for the case where there are 0, 1, or a few observers. A very simple test is shown below:

static void Main()
{
    Console.Title = typeof(Observable).Assembly.GetName().Version.ToString();
    Events();
    Subjects();
}

static void Events()
{
    var s = new Action<int>(_ => { });

    for (int n = 1; n < 5; n++)
    {
        var sw = Stopwatch.StartNew();

        for (int i = 0; i < 100000000; i++)
            s(42);

        sw.Stop();
        Console.WriteLine("{0} handler(s) - {1}", n, sw.Elapsed);

        s += _ => { };
    }
}

static void Subjects()
{
    var s = new Subject<int>();

    for (int n = 0; n < 5; n++)
    {
        var sw = Stopwatch.StartNew();

        for (int i = 0; i < 100000000; i++)
            s.OnNext(42);

        sw.Stop();
        Console.WriteLine("{0} observer(s) - {1}", n, sw.Elapsed);

        s.Subscribe(new Nop<int>());
    }
}

class Nop<T> : IObserver<T>
{
    public void OnCompleted()
    {
    }

    public void OnError(Exception error)
    {
    }

    public void OnNext(T value)
    {
    }
}

 

A side-by-side comparison of Rx v1.0 SP1 (running on .NET 4.0) and Rx v2.0 Beta (running on .NET 4.5 Beta) is shown in the next picture, clearly showing the improved speed of using subjects with a few consumers (even surpassing the speed of multicast delegate invocations in some cases).

image

Besides subjects, source generators are often used to create an observable sequence out of thin air. Examples include Repeat and Range. As we’ve seen earlier in this post, such generators are in an interesting position as they need to rely on a scheduler to get work done, but need to do so in a way that doesn’t monopolize the scheduler in order to ensure responsive cancellation and fairness. The way those operators have been written historically is by relying on recursive scheduling.

While this is still desirable for schedulers that are used in a shared manner (ensuring interleaving of operations scheduled by different parties), some other schedulers can be used in an exclusive manner. A good example is the NewThreadScheduler which represents a scheduler where every top-level scheduling operation gets allocated its own dedicated thread (unlike EventLoopScheduler for which every scheduler instance has one thread that’s be shared across all scheduled work). Such schedulers can afford to run a loop that periodically checks a cancellation flag, without sacrificing fairness in the system because each unit of work gets an exclusive non-shared dose of concurrency (such as a thread). Other facilities exist that can ensure such a scheduler doesn’t monopolize the process or the system, including the OS scheduler.

To allow for this optimization on a case-by-case basis for particular types of schedulers, we introduced ISchedulerLongRunning. This interface is implemented by the NewThreadScheduler as well as the TaskPoolScheduler (using a Task whose TaskCreationOption flags contain the LongRunning flag). Operators can check whether a scheduler implements this optional interface, and if so, rely on it to perform a long-running unit of work. Examples includes Repeat and Range, but also operators that have to drain a queue internally (such as ObserveOn).

using System.Reactive.Disposables;

namespace System.Reactive.Concurrency
{
    /// <summary>
    /// Scheduler with support for starting long-running tasks.
    /// This type of scheduler can be used to run loops more efficiently instead of using recursive scheduling.
    /// </summary>
    public interface ISchedulerLongRunning
    {
        /// <summary>
        /// Schedules a long-running piece of work.
        /// </summary>
        /// <param name="state">State passed to the action to be executed.</param>
        /// <param name="action">Action to be executed.</param>
        /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
        IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action);
    }
}

The use of this method is simple. Inside the action you’re allowed to run a long-running piece of work – such as an infinite loop – periodically checking for the cancellation flag passed in through our ICancelable type – basically an IDisposable with a Boolean property exposed. Notice you could also take advantage of async methods here. For portability across platforms (including older ones), we don’t rely on a CancellationToken at the moment. Also, the ICancelable object is often simply handed out to the caller of the scheduler, conform the pattern in Rx where “cancellation tokens” in the form of IDisposable object are handed out, rather than passed in.

An example illustrating the performance gains is shown below, using Observable.Repeat:

static void Main()
{
    Console.Title = typeof(Observable).Assembly.GetName().Version.ToString();
    Repeat();
}

static void Repeat()
{
    var xs = Observable.Repeat(42, Scheduler.NewThread);

    var c = new Count<int>();

    var sw = Stopwatch.StartNew();
    using (xs.Subscribe(c))
        Thread.Sleep(1000);
    sw.Stop();

    Console.WriteLine(c.Total / sw.Elapsed.TotalSeconds + " msgs/s");
}

class Count<T> : IObserver<T>
{
    public int Total;

    public void OnNext(T value)
    {
        Total++;
    }

    public void OnError(Exception error)
    {
        throw new NotImplementedException();
    }

    public void OnCompleted()
    {
        throw new NotImplementedException();
    }
}

The difference is stunning:

image

 

Reducing pipeline path length

After making producers faster, the next hurdle to overcome is any speed limitations imposed by the pipeline of observers that gets built up by queries over event streams. Stated differently, how fast can a message travel through the chain of observers to reach its final destination? To analyze this behavior, let’s first have a look at the requirements of operators when processing event stream data. Consider the following implementation of a Select operator:

public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source,
Func<TSource, TResult> selector) { return Observable.Create<TResult>(observer => { return source.Subscribe( value => { var result = default(TResult); try { result = selector(value); } catch (Exception error) { observer.OnError(error); return; } observer.OnNext(result); }, observer.OnError, observer.OnCompleted ); }); }

Good old Observable.Create is used to create the new observable sequence, passing it an implementation for the core Subscribe logic. All we do is subscribe to the underlying source, handling the three continuations in different ways. Errors and completion messages simply burst through, while OnNext calls are intercepted in order to invoke the selector function. A few questions come to mind when reading this code though...

First of all, what’s the exception handling business around the selector invocation all about? Well, clearly if we wouldn’t handle exceptions thrown by user code, we’d be blowing up the caller of OnNext, i.e. the upstream producer. As we’ve seen in the previous section, such a producer could be running on a scheduler, which we don’t want to blow up. Instead, we catch the exception and propagate it through the OnError channel, just like an exception originating in an iterator would come out as an exception on the MoveNext call. There’s one remaining question about this whole error handling business: how does calling observer.OnError to propagate the error downstream cause the subscription to the source to be disposed? After all, because OnError’s fatal nature, there’s no point in listening to the source any longer. In fact, if we’d leave the source running, it could be burning cycles on a scheduler ad infinitum.

The answer to this question is found higher up the code: Observable.Create isn’t as innocent as it seems. The observer passed to the subscription implementation isn’t quite the observer that was given to it by a consumer. Instead, it’s a wrapper around this observer that implements auto-detach and dispose behavior. When a final message – i.e. OnError or OnCompleted – is sent to it, it automatically disposes the subscription to the underlying source and makes sure no further messages can be propagated. All of this involves a number of careful steps, also keeping in mind the observer passed to the source may start running before the call to Subscribe returns its IDisposable. I’ll save you all of the details, but it should be clear that Observable.Create is a powerful Swiss army knife that knows about the precise semantics of observable sequences, enforces some of those, and assists with dispose behavior. As an implementer of query operators, you don’t have to worry about resource disposal after calling OnError or OnCompleted, which is quite a convenience.

A second question we should ask ourselves is how the Subscribe extension method that accepts three action delegates really works. Does it simply wrap the three delegates into an anonymous implementation of IObserver<T>? Well, sort of, but there’s more. Again, it has to do with the observer message grammar. Upon receiving an OnError or OnCompleted message, the observer automatically gets silenced such that no further messages can be propagated. This protects the subscriber against rogue sources that don’t obey to the message grammar of Rx.

Summing up all those requirements, there are a lot of things going on between receiving a value from the source and sending it out on the downstream observer. First of all, the anonymous observer created by the Subscribe method checks whether we’ve not reached a final state yet – just in case the source passed to Select is ill-behaved. Next, it calls our OnNext handler implementation specified as a lambda expression. Assuming everything goes well, we call OnNext on the observer that was passed in, which has auto-detach behavior attached, finally reaching our destination (which could be another query operator, and so on). As a result, call stacks in Rx can be quite deep, as illustrated below for a query that chains a Where and Select operator applied to Range:

image

Based on my explanation above, the four highlighted stack frames surrounding our operator should be more or less explicable now. Adding up all those calls gives us a total path length of 1 (for the operator’s core implementation code) + 4 (for the surrounding wrappers ensuring a proper grammar and auto-detach behavior). It turns out we can do better than this by obeying to two things:

  • Ensure no operator – given valid inputs – can produce invalid output in terms of the message grammar. This way, we only need to protect against rogue sources on the edges of Rx. This is really a proof by induction: given that all source producers in Rx are “safe”, and all operators retain safety, any Rx query is “safe” too. Unknown sources implemented using Observable.Create or by inheriting from ObservableBase are fine too, because there we use the protections mentioned above. However, internally we can avoid the use of Observable.Create to avoid a lot of wrapping.
  • Don’t rely on auto-detach behavior upon sending a terminal message. This requires a bit more care when implementing query operators, but allows us to get rid of calls to the Subscribe method with the three lambdas (which is error prone to use in case you forget one of the continuations, e.g. leaving errors unhandled, causing producers to blow up).

As a result, the same code that was shown above (but now using Rx’s Select implementation) results in the following call stack in Rx v2.0 Beta, with exactly three call frames for the three operators involved in the query:

image

Not only does this improve performance – as we shall see further on – it also improves the debugging experience significantly. The top three frames are caused by the consumer’s use of the lambda-form of Subscribe. The bottom-most frames reflect the recursive scheduling carried out by Range on the CLR ThreadPool (notice the enlightenment module popping up).

To showcase the performance gain this brings, consider the following example with an unconstrained sender (to prevent us from seeing the speed limitations on producers in Rx v1.0), chaining together a few query operators that shouldn’t influence the original stream:

static void Main()
{
    Console.Title = typeof(Observable).Assembly.GetName().Version.ToString();
    Pipeline();
}

static void Pipeline()
{
    var xs = new Fast().Where(x => true).Select(x => x).Take(int.MaxValue).Skip(0);

    var c = new Count<int>();

    var sw = Stopwatch.StartNew();
    using (xs.Subscribe(c))
        Thread.Sleep(1000);
    sw.Stop();

    Console.WriteLine(c.Total / sw.Elapsed.TotalSeconds + " msgs/s");
}

//
// *** DON'T IMPLEMENT OBSERVABLE  SOURCES THIS WAY IN PRODUCTION CODE! ***
//
// Use ObservableBase or Observable.Create to ensure a well-behaved stream.
//
class Fast : IObservable<int>
{
    public IDisposable Subscribe(IObserver<int> observer)
    {
        var d = new BooleanDisposable();

        new Thread(() =>
        {
            while (!d.IsDisposed)
                observer.OnNext(42);
        }).Start();

        return d;
    }
}

Performance differences between Rx v1.0 SP1 and Rx v2.0 Beta are illustrated in the picture below, roughly a factor 3 the difference for this micro-benchmark. It should go without saying one shouldn’t stare blindly at those numbers and measure real-world performance, but this showcases the potential for significant speed improvement by revisiting assumptions and reducing the number of additional operations (in terms of method calls and termination checks) required per message, also leveraging some lock-free techniques for a more lightweight mechanism to silence observers.

image

 

Less allocations

Another area of performance improvement we invested in heavily is hunting down operators that exhibited a linear (or worse) allocation behavior compared to the number of messages flowing through the pipeline. While object allocation in the CLR is fairly cheap – essentially involving moving a pointer in the Gen 0 area of the managed heap – you pay for excessive redundant heap allocations in the long run when a garbage collection has to be carried out. This could manifest itself as glitches in the rate of messages flowing through the pipeline (a temporary “stall”). Due to the time-based nature of various operators that may involve queuing items (e.g. Delay), allocated helper objects may also have a high promotion rate (ultimately to Gen 2), making those more expensive to reclaim.

One common reason why a lot of object allocations are introduced is the use of materialization, also known as reification. This technique turns the notifications on observers into concrete objects deriving from the Notification<T> base class. This makes it easier to write certain types of operators that now can switch on the Kind property of the notification rather than having to define three separate callbacks. Once processing in the materialized domain has been carried out, the opposite technique – dematerialization – can be used to turn the objects back into calls on observers. Sometimes this roundtrip isn’t as simple as this. Some operators, like Delay, keep internal queues for the original observer’s message stream, in order to deliver those at a later point. This includes keeping track of when to complete the result stream by means of a delayed OnCompleted notification. Errors on the other hand have to burst through immediately, making the technique of materialization only partly useful.

Quite a few operators heavily relied on materialization in Rx v1.0, in order to ensure proper semantics (remember, “correctness first” was our number one concern, also keeping in mind that “to ship is to choose”). While this doesn’t have a huge impact for low-volume message streams, the effects of excessive materialization start to pop up when dealing with a lot of messages. One such operator is TakeUntil that has to listen to two streams simultaneously, which it did by leveraging an internal operator called “Combine” that makes the task of observing two streams easier through a so-called BinaryObserver<T>. In doing so, it went through the motions of materialization (amongst other things). Here’s an example test, using our Fast observable from previous examples:

static void Allocations()
{
    var xs = new Fast().TakeUntil(Observable.Never<int>());

    var c = new Count<int>();

    var sw = Stopwatch.StartNew();
    using (xs.Subscribe(c))
        Thread.Sleep(1000);
    sw.Stop();

    Console.WriteLine(c.Total / sw.Elapsed.TotalSeconds + " msgs/s");
}

The speedup in Rx v2.0 Beta is quite significant, compared to v1.0, as shown below:

image

To analyze what’s going on a bit further, make the total run time of the sample above much higher (say 30 seconds), and watch the behavior of both versions using the Performance Monitor (perfmon.msc) for the “.NET CLR Memory”, “% Time in GC” counter. The result is shown below, with Rx v1.0 in red and Rx v2.0 Beta in green:

image

Rx v2.0 Beta spends no time in GC whatsoever for this query, while the materialization of objects in the pipeline led to 3.27% time spent in GC on the average for the same query running in Rx v1.0. Monitoring the number of Gen 0 collections can be done as an exercise, but it should be no surprise that this skyrockets in Rx v1.0 due to the short lifetime of the intermediate Notification<T> objects created. Rx v2.0 Beta simply avoids those allocations altogether.

Another example we mentioned before is the Delay operator which maintains a queue of Notification<T> objects to replay those at a later time specified by the delay length. In the early days of Rx v1.0 pre-releases, exceptions were materialized too and ended up in this queue, causing them to be delayed. By the time we shipped Rx v1.0 and solidified our design, this was no longer the case due to our “immediate exception propagation” policy. Exceptions sent through OnError are treated as fatal events and require immediate care (btw, if you don’t like this behavior, there are different ways to allow for separate value and error channels, or to multiplex errors and values through the OnNext channel). As a result, the notifications introduced by delay are only really used for OnNext messages and possibly one OnCompleted message. By storing those separately now – including proper synchronization – we can get rid of Notification<T> allocations altogether and maintain a [Concurrent]Queue<Timestamped<T>> (recall Timestamped<T> is a value type) and target completion time value.

Consider the following example where we apply the Delay operator to a large-volume stream, with a delay sufficiently long to cause any intermediate object allocations (between the time of receiving a message and its delayed sending out) to go through various GC generations.

static void Survival<TFittest>()
{
    var xs = new Fast().Delay(TimeSpan.FromSeconds(5));

    var c = new Count<int>();

    var sw = Stopwatch.StartNew();
    using (xs.Subscribe(c))
        Thread.Sleep(30000);
    sw.Stop();

    Console.WriteLine(c.Total / sw.Elapsed.TotalSeconds + " msgs/s");
}

This time we’re interested in monitoring the “.NET CLR Memory”, “Promoted Memory from Gen 0” counter. The result of running the code above and the resulting performance measurements is shown below:

image

Notice how Rx v2.0 Beta was able to produce about 10x more messages in the same timeframe due to improvements to the queuing but also due to more careful use of memory. It should be clear from the graph that the allocation of Notification<T> objects that end up in a queue for 5 seconds ultimately leads to high promotion rates, making those objects harder to reclaim in the long run. Rx v2.0 Beta sees almost no promotions, and the ones seen early on in the graph are explained by the growth of the delayed message queue which uses segments for its implementation. Ultimately, the size of the queue will be pretty stable, holding 5 seconds worth of messages.

Those two cases are just a few examples of extreme cases where we brought the number of allocations down tremendously. The performance effort has been an iterative process for the entirety of Rx, using various tools to measure progress across builds. We’re not done yet, but this should give the reader a good idea about the work we’re doing.

 

To compose or not to compose?

One of the big advantages of Rx is the power of composition. Given any number of query operators, simply chaining them together often provides the solution to a problem that seems untamable and very complex at first. Just think of our typical “dictionary suggest” sample where we join a textbox’s changing input with a web service call, taking care of out of order arrival, throttling down the input, etc. just with a few lines of code.

Given just a few basic operators, one can do a lot simply by composing those primitives together. Driving this to the extreme results in projects like MinLINQ. While this compositionality is great for end-users, building core operators using it isn’t always the best choice. For example, we could have defined the Where operator in terms of SelectMany, Return, and Empty (left as an exercise), but the number of hoops a message has to go through to be processed gets out of hand quickly. The same holds for the intricate web of IDisposable “subscription handles” that have to be maintained by very powerful operators such as SelectMany, typically used with larger volumes of data in inner sequences (cf. you solution to this exercise).

It turns out, in a “correctness first” oriented release like Rx v1.0, it makes a ton of sense to use composition internally in quite some places. Not only does this help reasoning about the overall correctness of the system, it also reduces complex pieces of code – that were still undergoing changes as we were figuring out the precise semantics – by reuse. Furthermore, the more code paths lead to a set of common, sometimes rather complex, operators, the better for testing purposes.

Now that we have a solid understanding of the desired semantics, edge conditions, etc. – backed by a substantial test suite to catch regressions – there’s an excellent opportunity to revisit some of those implementation choices and analyze whether we can improve performance by doing things differently. As a golden rule of thumb, the decision to carry out such optimizations has been backed by lots of measurements in different configurations and use cases.

 

A tale about Buffer

To illustrate this category of optimizations, consider the Buffer operator and its rather turbulent history in the v1.0 days. When we started the design of Rx, it was clear that common use cases require a way to reduce the message volume of an event stream, allowing the producer to move fast while consumers can process a reduced volume of data. This is especially important when dealing with sensors that need to send information over a slow connection (e.g. using Throttle or Sample to reduce volume), or when you can’t afford to stall a high-volume pipeline and need to offload computation over the data to worker threads (e.g. using ObserveOn), which tends to be more efficient by handing over chunks of multiple elements rather than individual elements.

So, we designed the Buffer operator to partition a stream into chunks represented by IList<T> values. We started off by introducing a simple buffer operator that takes in the number of elements per buffer. From that, it was a simple step to allow overlap by adding a skip parameter. Next, we wondered what we could do to have buffers based on duration rather than element count, and ended up with two more variants (called BufferWithTime versus BufferWithCount back then, when we didn’t converge on an overload-based design yet) for rolling and sliding buffers. But there was more… What about a buffer with behavior like a ferry: when a timer expires, the ferry leaves the dock (sending out a buffer), but why can’t it leave immediately when all seats are filled as well (count behavior)? That’s where BufferWithTimeOrCount entered the picture.

public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source,
int count); public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source,
int count, int skip);
public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source,
TimeSpan timeSpan); public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source,
TimeSpan timeSpan, IScheduler scheduler);
public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source, 
TimeSpan timeSpan, TimeSpan timeShift); public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source,
TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler);

public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source,
TimeSpan timeSpan, int count); public static IObservable<IList<TSource>> Buffer<TSource>(this IObservable<TSource> source,
TimeSpan timeSpan, int count, IScheduler scheduler);

Various months later, we realized buffers were a special case of windows. Rather than sending out a buffer upon expiration of a duration (determined by a count or time threshold, or both), why can’t we send out an inner stream upon creation of a new buffer … err, window. That’s what we called windowing, leveraging the higher order nature of event streams in Rx, meaning they can be nested (just like grouping produces a stream of groups, which are streams themselves). You can guess the effect on the API surface: all of the above got duplicated…

public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source,
int count); public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source,
int count, int skip);
public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source,
TimeSpan timeSpan); public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source,
TimeSpan timeSpan, IScheduler scheduler);
public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source, 
TimeSpan timeSpan, TimeSpan timeShift); public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source,
TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler);

public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source,
TimeSpan timeSpan, int count); public static IObservable<IObservable<TSource>> Window<TSource>(this IObservable<TSource> source,
TimeSpan timeSpan, int count, IScheduler scheduler);

Not only did the API surface explode, the implementation of those operators was also strikingly similar, having to deal with the same edge cases etc. Because the latter is strictly more powerful than the former, we decided to reduce risk and implement Buffer in terms of Window, which by itself led to new insights into nature of the pipeline in the face of blocking. It wasn’t as easy as projecting the produced windows (the inner observables) into IList<T> objects by doing something like ToEnumerable().ToList(), because that’s too blocking and would ultimately stall the entire pipeline. So we needed a non-blocking ToList operator as well. However, such an operator is nothing but a special Aggregate with a List<T> as the accumulator value, so why not implement it as such?

At the end of the day, our Buffer implementation started to look roughly like this, preventing empty buffers from leaking at the end (an artifact of windows being created at the beginning of a window, while buffers were sent out at the end):

private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)
{
    return Window_<TSource>(source, count, skip).SelectMany(ToList).Where(list => list.Count > 0);
}

But wait… Wasn’t List a special Aggregate? And how about SelectMany. Turns out that’s really a Select followed by a Merge. Adding everything up here, there’s a lot of more primitive operators in the play to make this work. As a proof that composition works, all of the above produces the right result for all scenarios including various edge cases that are tricky to get right manually. If Window (and all other operators used) is well-tested and deemed correct, it follows that Buffer is too.

private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)
{
    return Window_<TSource>(source, count, skip)
.Select(w => w.Aggregate(new List<TSource>(), (lst, x) => lst.Add(x)))
.Merge()
.Where(list => list.Count > 0);
}

As you may have guessed by now, while this is a victory for functional correctness through composition, this isn’t optimal for performance. Debugging suffers as well, not being able to recognize the Buffer operator anywhere in call stacks leading to the consumer’s observer being invoked. In Rx v2.0 Beta, backed by a bunch of tests, we took on the task of inlining a number of those operators and take advantage of operator-specific behavior to optimize further. For example, Window itself isn’t ideal for buffering through ToList transformations, because it needs to allocate a Subject<T> object for each window. Those subjects also need to be hidden using AsObservable such that observers of the windows can’t fire elements into them, which could lead to very strange results and leaks an implementation detail that may change (in fact it did change a bit in v2.0).

Just to illustrate the performance gain, consider the following piece of code:

static void Main()
{
    Console.Title = typeof(Observable).Assembly.GetName().Version.ToString();
    Composition(100);
    Composition(1000);
    Composition(10000);
    Composition(100000);
}

static void Composition(int n)
{
    var xs = new Fast().Buffer(n);

    var c = new Count<IList<int>>();

    var sw = Stopwatch.StartNew();
    using (xs.Subscribe(c))
        Thread.Sleep(1000);
    sw.Stop();

    Console.WriteLine(c.Total / sw.Elapsed.TotalSeconds + " msgs/s");
}

The results for v1.0 SP1 and v2.0 Beta are shown below. For the first invocation with small buffers, the allocation cost of subjects as well as lists in the v1.0 implementation influences the total throughput. For later calls, the reduced wrapping of operators yields significant gains.

image

Setting a breakpoint in the Count<T> observer for the query shown above presents us with the following nice picture in Rx v2.0 Beta. The stack in Rx v1.0 SP1 won’t fit the screen, so I won’t include it here…

image

We measured the impact of such changes to the operator implementation for a bunch of cases, and carried out the optimization where it pays off. At the end of the day, this should result in “do more (event processing) with less (resources)” when using Rx.

 

Optimizations for tail-recursive operators

Users on stackoverflow noticed attempts to build an Rx Builder for Reactive Extensions in F# didn’t quite fare well due to – very appropriately given the name of the forum – stack overflows. While I won’t go into too much detail about how workflow builders work in F# (you can read more here), the performance work we did in this release did put us in a good position to tackle this problem. Let’s start with an example of what we want to make work:

static void Tail()
{
    var f = default(Func<int, IObservable<int>>);
    f = x => Observable.Defer(() => Observable.Return(x).Concat(f(x + 1)));

    f(0).Select(_ => new StackTrace().FrameCount).ForEach(Console.WriteLine);
}

This piece of code creates a function that, given an integer value, returns an observable containing the infinite range [x, …[. It does so by returning the given number and concatenating the sequence with another one obtained by calling the same function, now with x + 1. The use of Defer ensures that calling f doesn’t result in immediate endless recursion trying to compute f(x + 1), f(x + 2), etc. Instead, when Concat wants to subscribe to its second sequence, this causes the recursion to be expanded one step further.

Rather than printing the sequence itself, we print the depth of the call stack of OnNext, i.e. the path length of messages traveling to the consumer. Running this code on Rx v1.0 SP1 caused a stack overflow ultimately, caused by an ever growing chain of observers: when Concat receives an OnCompleted message from its first sequence, it subscribes to the second and wires up the observer for that second sequence to the outgoing one. However, Defer adds some wrapping as well, and ultimately things get out of hand (adding 9 frames per step as the recursion unfolds). Not any longer on Rx v2.0 Beta:

image

Keep in mind optimizations for tail recursion (as carried out for Catch, OnErrorResumeNext, and Concat in this release) are easy to disturb, so this works well in a limited number of scenarios such as an Rx Builder for F# which emits a pattern similar to the one mimicked above in C# code (and unless you’re a functional programming geek, it’s not very likely you’ll write recursive observable sequence definitions like this yourself). We may do more work in this area going forward.

 

DateTimeOffset.UtcNow

Investigating test runs for potential memory hogs is always a fun undertaking. Collect profiler data, use WinDbg with SOS and dump heap stats, you name it. The fun moment comes when you sort the heap by bytes/type and find an unexpected type at the top of your results. After various iterations of memory usage improvements listed earlier in this blog post, well-known types such as Notification<T> dropped off the charts, and in came new ones. One remarkable type that showed up during a test run was System.Globalization.DaylightTime:

image

For this particular run, over 89% of the managed heap were objects of this type we most definitely didn’t use directly. Looking through the CLR Profiler trace’s allocation graph, it was trivial to find the culprit (click to enlarge):

image

This particular test was measuring delays between messages after several operations to ensure the relative timing was preserved. It did so by using the TimeInterval operator. The way this operator has been implemented since day one, is by using the scheduler’s Now property, carrying out subtractions between consecutive values. This gives us a wall clock time difference, which is typically what users expect (even though rare cases exist where time differences may be observed to be non-positive). At some point in the Rx v1.0 days, we changed Scheduler.Now to use DateTimeOffset.UtcNow, which leads to the graph shown above when this property is consulted way too often. We’ve worked with the BCL team to bring this issue to their attention, but calling this property in a tight loop isn’t a good idea in the first place, also taking performance overhead into consideration (which we measured separately).

In Rx v2.0, we introduce a new notion of stopwatches that can be implemented optionally by schedulers. Those allow to measure the elapsed time between events more efficiently than the naïve technique used before. In fact, schedulers may be able to do a better job than consulting the system clock through various well-known techniques. It turns out this is quite tricky to get right though. In cases where some time drift is allowed, the BCL’s Stopwatch provides a good measure (relying on QueryPerformanceCounter underneath). Subtraction of GetSystemTimeAsFileTime values works too, but with lower resolution and other behavior (such as system time change impact). Yet another technique is to use multimedia timers but one should avoid calling those frequently. All of those techniques are subtly different, have different performance characteristics, etc. Elaborating on all the details may be subject of a future blog post.

Note: In the Rx v2.0 Beta release, the implementation of stopwatches on the default schedulers uses Stopwatch, but this has changed since in order to avoid a set of problems, and to conform to the semantics our users have come to expect: report wall clock time differences. We also expect this stopwatch service to be used in more places besides TimeInterval and Delay. Stay tuned for more news post-Beta.

 

Real-world performance and better scalability

Without doubt, all of the optimizations shown above look very nice. You may wonder though how those impact real-world scenarios. Over the last year, we’ve spoken to numerous customers inside and outside the company, with a wide range of different requirements. We’ve seen people use Rx for UI programming (typically low message volumes, but F5 debugging experiencing complex call stacks), for large-scale event processing with sensors in the field and aggregators in the cloud (typically high message volumes with demands for low latency processing), for cloud and system monitoring, and much more. From the dialogs we’ve had with people, we learned about a lot of different types of queries and measured our progress against those real-world queries.

We’ll finish this post’s performance discussion by looking at a sample derived from a real-world scenario, performing a Map/Reduce style computation over a large stream of data using LINQ. The idea is to partition a stream and process each of the partitions in parallel on different schedulers. As you know, Rx has plenty of ways to partition a stream, including Window and GroupBy. Using the ObserveOn operator, processing for each partition can be offloaded to a dedicated scheduler. Finally, using SelectMany or Merge, the partial results can be obtained and aggregated using any of our aggregation operators.

An example query – simplified for illustration purposes, but based on real-world sensor data processing – is shown below. Given a sequence of inputs (here random numbers), we want to count the number of primes in the sequence. Notice this query by itself could be part of a much bigger query, e.g. processing a window of 60 minutes worth of observations and computing aggregates over that window.

image

The first thing we do is partition the stream by a key using GroupBy. Because we want to look for prime numbers, modulo arithmetic isn’t very useful for good distribution (e.g. everything % N == 0 is definitely not prime). In reality, a key is often available as part of the data, e.g. a sensor ID, a data center origin ID, etc. For this sample, we come up with a simple round-robin scheme.

var i = 0;
var result = source.GroupBy(_ => i++ % N)

Next, we want to process every partition stream obtained by GroupBy on a different dedicated scheduler. For this purpose we can use ObserveOn in combination with the NewThreadScheduler. We could use Select to do the transformation of groups onto their “scheduled streams”, but will resort to SelectMany for reasons we’ll see in a moment:

var i = 0;
var result = source.GroupBy(_ => i++ % N)
.SelectMany(g => g.ObserveOn(
NewThreadScheduler.Default)

Now every partition needs to be filtered by an IsPrime predicate. This is the job of a Where filter:

var i = 0;
var result = source.GroupBy(_ => i++ % N)
.SelectMany(g => g.ObserveOn(
NewThreadScheduler.Default) .Where(x => { for (var d = 2; d <= Math.Sqrt(x); d++) if (x % d == 0) return false; return true; })

Next, we aggregate every prime-filtered partition by counting the number of elements using Count:

var i = 0;
var result = source.GroupBy(_ => i++ % N)
.SelectMany(g => g.ObserveOn(
NewThreadScheduler.Default) .Where(x => { for (var d = 2; d <= Math.Sqrt(x); d++) if (x % d == 0) return false; return true; }) .Count()

Now it should be apparent how SelectMany takes the elements from the inner stream – simply a single count value – and creates a single stream out of it which can be aggregated using Sum. So, to recap, every group was offloaded to a separate scheduler thread, running the prime filter and counting the number of occurrences. All those count values were then “flattened” into a single sequence whose sum is what we’re looking for:

var i = 0;
var result = source.GroupBy(_ => i++ % N)
                   .SelectMany(g => g.ObserveOn(NewThreadScheduler.Default)
                                     .Where(x =>
                                     {
                                         for (var d = 2; d <= Math.Sqrt(x); d++)
                                             if (x % d == 0)
                                                 return false;
                                         return true;
                                     })
                                     .Count())
                   .Sum();

That’s it. For any given value of N, this will parallelize the compute job over N threads. Let’s plug in a sample source and run some performance tests:

static void Scale()
{
    var N = Environment.ProcessorCount;

    var source = new Subject<int>();

    var i = 0;
    var result = source.GroupBy(_ => i++ % N)
                       .SelectMany(g => g.ObserveOn(NewThreadScheduler.Default)
                                         .Where(x =>
                                         {
                                             for (var d = 2; d <= Math.Sqrt(x); d++)
                                                 if (x % d == 0)
                                                     return false;
                                             return true;
                                         })
                                         .Count())
                       .Sum();

    var e = new ManualResetEvent(false);
    var sw = Stopwatch.StartNew();

    result.Subscribe(Console.WriteLine, () => e.Set());

    NewThreadScheduler.Default.ScheduleLongRunning(_ =>
    {
        var rand = new Random(19830211);
        for (var j = 0; j < 1000000; j++)
            source.OnNext(rand.Next());
        source.OnCompleted();
    });

    e.WaitOne();
    sw.Stop();

    Console.WriteLine(sw.Elapsed);
}

The picture below illustrates we’re effectively utilizing all CPU horsepower on the machine, in this case a quad core machine:

image

Obviously it’s not hard to hog all CPUs, so was it all worth it and did we gain a noticeable speedup by parallelizing the processing? To illustrate we can effectively scale up reasonably well, have a look at the picture below where we run the code above on one of our 64 core lab machines, measuring the total time taken for this query (with adjusted number of primes to filter) for increasing values of N:

image

While you shouldn’t expect linear scaling, the result is quite amazing for this simple query (in fact, the IsPrime filter isn’t exactly a number crunching job), reaching a 26x speedup when using 63 cores. In practice, you’d likely limit the number of CPUs used. For this example run on the test machine, using 10 CPUs still resulted in a 9x speedup, before things started to scale sub-linearly.

We used several examples like this to measure our performance tuning progress in Rx v2.0 Beta. It’s quite interesting to trace all the functionality used by this query and identify the different bottlenecks that hurt scalability. Just a small overview of improvements that contributed to the result above:

  • GroupBy uses Subject<T> objects for its groups, in this case with a single observer attached to them. We optimized this case such that the single observer in the subject is talked to directly without having to iterate through a single-element list of observers.
  • The NewThreadScheduler now uses ISchedulerLongRunning, allowing the ObserveOn operator to pump a queue-draining loop protected by a SemaphoreSlim (this last change was made post-Beta and is reflected in the sample above).
  • Path length of the pipeline turned out to be a significant source of overhead, especially in the processing of the partitions (here simply using Where, in other tests adding much more operations to the inner streams).
  • Direct implementation of operators such as Count, instead of relying on more general-purpose operators (like Aggregate), contributes to the reduction of delegate calls for each message flowing through the pipeline.

The screenshots below show a comparison of Rx v1.0 SP1 and a recent Rx v2.0 post-Beta build on a 4-core machine, with varying degrees of parallelism from 1 to 4. As you can see, the lack of long-running scheduling in Rx v1.0 prevented us from effectively using all cores (threads would quit if there’s no further work to do, which would only be a very short time for this particular use case), causing the processing over 4 cores to take a really long time.

Rx v1    Rx v2

The runtime timing results are shown below:

map reduce compare

Note: The Rx v2.0 Beta build has a known issue with runaway threads when using ObserveOn with long-running schedulers. This has been fixed since, hence the use of a newer build above to show fair results.

One of the tools we relied on heavily to trace bottlenecks in the Rx code base is the Visual Studio 11 Concurrency Visualizer. In the picture below, you can see the concurrency trace of the test shown above, this time with a degree of parallelism of 4, filtering a few million primes. Synchronization overhead is minimal, except for an initial blocking of one of the worker threads waiting for the producer give it work (as shown in the unblocking stack in the second picture).

trace v2

unblock

There’s more work left to be done in the performance arena, but I hope this blog post has shown some of the ways we’re approaching performance for Rx v2.0. If you encounter any bottlenecks, please let us know.

 

Various fixes

Rx v2.0 also includes a number of fixes to existing functionality:

  • Remotable now allows to control lease times.
  • A few error propagation issues have been fixed, e.g. in SkipUntil.
  • Edge cases with overflow in Range has been resolved.

 

What’s next?

With this post, we hope you got a good idea of some of the improvements we’re making to Rx in our second major release, Rx v2.0. Our current plan is to release a few more Beta Refresh builds and a Release Candidate, before going RTW close to the .NET 4.5 release. To address any critical issues in Rx v1.0, we’ll release an Rx v1.0 SP2 build later this year.

We’ll have plenty more exciting things to talk about in the next months: even more new features, integration with existing products, etc. Stay tuned!


Bart J.F. De Smet
Senior SDE – Cloud Programmability Team