Three months after the release of Reactive Extensions v2.0 Beta, we’re happy to announce the availability of our next milestone build for the Rx v2.0 product cycle! Today, we released the Release Candidate (RC) build to the Download Center and NuGet, ready for you to start playing around. This post will highlight important things you should know to get started, and provide some in-depth information on what’s new and changed. We did do a ton of work to improve various aspects of the technology, and hope you’ll like it!

 

What are the supported platforms?

When we released the Reactive Extensions v2.0 Beta earlier this year, we limited the set of supported platforms to .NET 4.5, Silverlight 5, Windows Phone 7, and .NET for Metro style apps. All of those platforms continue to be supported as first-class citizens for Rx. But there’s more…

Quite a few customers asked us about support for .NET 4.0, so we brought it back in the RC, allowing users to move to .NET 4.5 at their own pace. Keep in mind though, some parts of Rx rely on advanced .NET 4.5 capabilities (such as ExceptionDispatchInfo) and provide a better together story with other .NET 4.5 features (such as async/await), so we still recommend to try out .NET 4.5 whenever possible.

Note: Users of NuGet will automatically receive the .NET 4.0 bits when using a .NET 4.0-based project. For those of you who prefer to use the MSI-based installation of the SDK, make sure to select the “Custom” option and include the .NET 4.0 binaries from the feature selection window. We don’t install those assemblies by default in order to reduce clutter in “Add Reference” dialogs in Visual Studio. If we did, duplicate entries would show up in .NET 4.5 projects, because .NET 4.0 assemblies are compatible with .NET 4.5 configurations, so picking the Rx assemblies targeting the right platform becomes a bit of an art.

It almost goes without saying that this release supports .NET Framework 4.5 Release Candidate, as well as Windows 8 Release Preview, and integrates fully with the Visual Studio 2012 RC.

 

How to install and use it?

Reactive Extensions v2.0 Release Candidate is available both as an SDK MSI installer and as a set of NuGet packages. The same information on how to use the product applies as it did in the beta release. A few quick notes:

  • When using the MSI:
    • The assemblies will appear in the Add Reference dialog in Visual Studio, with version number 2.0.20612.0.
    • Make sure to install the .NET 4.0 binaries if you want to use those (see the remark in the “What are the supported platforms?” section).
    • When targeting .NET for Metro style apps on Windows 8, you can include Rx as an Extension SDK.
  • When using NuGet:
    • Starting from Rx v2.0 Beta, we use the semantic versioning feature of NuGet. See this separate post for more information.
    • To make the pre-release Rx v2.0 RC assemblies show up through the “Manage NuGet Packages” dialog, choose the “Include Prerelease” option.
    • The NuGet package names follow the classic versioning scheme, suffixed with rc, e.g. Rx-Main 2.0.20612-rc.

One thing we changed to the MSI installation is the installation location. In an effort to consolidate all SDKs under a common location, we moved our SDK into the well-known “Microsoft SDKs” folder in “Program Files (x86)”:

image

 

Factoring of the assemblies and Portable Library

One of the highlights of the Rx v2.0 Beta release was the newly introduced support for Portable Library. Although our initial set of supported platform is limited to the .NET Framework 4.5 and .NET for Metro style applications (Windows 8) flavors of the .NET Framework, this puts us on a path to have portability across more profiles going forward. We received a lot of good feedback from various customers who’re writing applications and libraries targeting many platforms, being very happy about the reduced build burden to accommodate for all sorts of different platforms. Check out this recent Channel 9 video on the subject of Portable Class Libraries as well.

 

Where we are today…

Making this change involved a refactoring exercise of the Rx assemblies, to separate the portable aspects from the platform-specifics. One of the assemblies that was born out of this, is System.Reactive.PlatformServices, which you’ve heard all about in our last post. This is the place where various platform-dependent facilities live, both visible to the end user, and under the covers:

  • Platform-specific schedulers, e.g. NewThreadScheduler.
  • Platform Enlightenment Provider facilities, e.g. providing ways to run timers, dispatch exceptions, etc.

In order to refactor platform-specific functionality, we had to change some of the API surface. The most notable difference is in the static properties on Scheduler, where you get an a la carte menu of available schedulers:

image

Notice however how three properties are now marked as Obsolete. The reason for this is the place where the Scheduler type is defined, namely in the bottom-most assembly in our layer map (ignoring the interfaces assembly), i.e. System.Reactive.Core.dll. This assembly has a portable version and shouldn’t (can’t) know about specific platform facilities used to introduce concurrency, such as the CLR’s System.Threading.ThreadPool or WinRT’s Windows.System.Threading.ThreadPool, the TPL, or the creation of new threads.

When including the platform-specific builds of System.Reactive.Core (as we did before for Rx v2.0 for .NET 4.5), those properties are using the platform enlightenment mechanism to dynamically discover the required scheduler instances from the System.Reactive.PlatformServices assembly that should be deployed alongside the other Rx assemblies. (If the assembly can’t be found, an exception is thrown.)

image

We did this in order to retain code compatibility when migrating your apps from Rx v1.0 to Rx v2.0, in case you don’t (yet) go down the portable road. When compiling, you’ll be advised about the new, portable-friendly way to obtain the schedulers. We improved the warning message to give you actionable guidance (in fact, post-RC we’ve improved the message a bit further to state “Please include a reference to the System.Reactive.PlatformServices assembly for your target platform and …”):

image

 

When including the portable build of System.Reactive.Core – from the .NETPortable folder in the SDK installation – those obsolete properties no longer show up. We didn’t want to carry obsolete stuff into the new portable world right from the start, so took those properties out in order to start from a clean slate:

image

 

Our direction beyond Beta…

Initially, in the beta timeframe, we made the plan to perform a new layering of the Rx v2.0 builds for .NET 4.5 and .NET for Metro style apps, where the core set of assemblies (Interfaces, Core, Linq, Providers) would simply be the portable builds, complemented with the non-portable ones (PlatformServices, etc.). However, that would mean to take away the non-portable static properties on the Scheduler type from our users, leading to possible confusion. There’d simply be no place to put the obsolete warning either, so guidance for code migration couldn’t be given at compile time. Given that we want a smooth migration – also because of .NET 4.5 being an upgrade to .NET 4.0 – this is undesirable.

As a result, we decided not to layer the .NET 4.5 and .NET for Metro style apps versions of Rx v2.0 on top of the portable set of core assemblies for this release. This decision was further motivated by the observation our portable reach is rather limited right now, so the cost/benefits towards our customers (some of which may only build apps for a specific platform, and want an easy job migrating to Rx v2.0) isn’t yet justified.

In order to build a portable class library using Rx, you’ll need to go through the Browse… option in the Add Reference dialog, and locate the portable assemblies from the SDK installation folder. Users of the resulting portable class library can put the platform-specific versions of complementary assemblies (including PlatformServices, as well as various UI-supporting Rx assemblies) alongside your library and use the power of Rx, combined with portability of your library.

In a future release, we’ll likely have a broader set of portable library targets, and users will have gone through the exercise of addressing the obsolete warnings, allowing us to switch to the final layering where the Scheduler class no longer exposes non-portable scheduler types. In the meantime, we encourage our users to start transitioning to the portable world, which will become more and more beneficial going forward.

 

Remaining bits of refactoring

In the Rx v2.0 RC build, we’ve tackled the remaining refactoring requirements required on our road to portability. One Rx feature that fell off the ship in our Beta release (meaning it wasn’t in the union of the portable core and the platform-specific additions) was our .NET Remoting support. The API surface of this feature is simply the Remotable extension method, which used to be defined on the Observable class.

Starting from Rx v2.0 RC, there’s a separate assembly called System.Reactive.Runtime.Remoting (named after the corresponding BCL namespace where .NET Remoting types live), available for the .NET 4.5 target platform assembly set of Rx. Users of NuGet can include this functionality by using Rx-Remoting. This new assembly doesn’t depend on the LINQ layer of Rx, and sits right on top of the System.Reactive.Core layer. As a result, the transport of observable sequences through remoting only requires a small dependency on the rest of Rx. As we explained in the announcement post for Rx v2.0 Beta, the same holds for our UI technology binding assemblies, which only rely on functionality in System.Reactive.Core. So simple tasks like rendering a sequence on a UI control don’t involve a lot of dependencies (unless you want to write additional queries over the sequence of course).

Because Remotable is defined as an extension method, there shouldn’t be a compile-time breaking change if you were using this method in a “fluent” instance method invocation like style, rather than calling it as a static method. For those of you who wonder about the type the Remotable method is defined on now: have a look at System.Reactive.Linq.RemotingObservable.

 

It’s all about time!

This new release of Rx includes a number of improvements to the way we deal with time. As you likely know, dealing with time is a complex undertaking in general, especially when computers are involved. Rx has a lot of temporal query operators to perform event processing, and therefore it needs to be able to schedule work to happen at particular times. As a result, notions of time exist in virtually any layer of the system: from the schedulers at the bottom (in System.Reactive.Core) to the query operators at the top (in System.Reactive.Linq),

The way various uses of time – and constructs dealing with time – are layered wasn’t always crystal clear, so we did a bit of an overhaul to straighten things out. An overview…

 

The curse of absolute time

Rx has always had two notions of time, surfacing both at the LINQ layer, and at the scheduling layer. Either you can make things happen at an absolute time – based on the DateTimeOffset type – or at a relative time – using the TimeSpan type. For example, overloads of Observable.Timer allow you to raise an event, producing the long value 0, based on either notion of time:

// Relative time based on TimeSpan
Observable.Timer(TimeSpan.FromSeconds(30)).Subscribe(_ =>
{
    Console.WriteLine("It's 30 seconds later now!");
});

// Absolute time based on DateTimeOffset (through implicit conversion)
Observable.Timer(new DateTime(2013, 2, 11, 0, 0, 0)).Subscribe(_ =>
{
    Console.WriteLine("Happy birthday!");
});

At the scheduler layer, things look pretty similar. In the sample below, I’m using convenience extension methods on the IScheduler type to schedule work specified using an Action delegate:

// Relative time based on TimeSpan
Scheduler.Default.Schedule(TimeSpan.FromSeconds(30), () =>
{
    Console.WriteLine("It's 30 seconds later now!");
});

// Absolute time based on DateTimeOffset (through implicit conversion)
Scheduler.Default.Schedule(new DateTime(2013, 2, 11, 0, 0, 0), () =>
{
    Console.WriteLine("Happy birthday!");
});

In fact, the Observable.Timer observable sequence factory (roughly) calls into the schedulers in a way similar to the code shown above, where the work doesn’t perform the console output but rather fires an OnNext and OnCompleted callback on the observer. There are some complications I’ll talk about further on, but you could write your own timer functions a bit like this:

public static IObservable<Unit> OneShotTimer(TimeSpan dueTime,
IScheduler scheduler = null) { return Observable.Create<Unit>(observer => { return (scheduler ?? Scheduler.Default).Schedule(dueTime, () => { observer.OnNext(new Unit()); observer.OnCompleted(); }); }); } public static IObservable<Unit> OneShotTimer(DateTimeOffset dueTime,
IScheduler scheduler = null) { return Observable.Create<Unit>(observer => { return (scheduler ?? Scheduler.Default).Schedule(dueTime, () => { observer.OnNext(new Unit()); observer.OnCompleted(); }); }); }

What’s of interest for the topic of this section though, is how the highlighted Schedule method works. It takes in a DateTimeOffset with the promise of invoking the callback at the specified time. However, if you look at a lot of APIs in the BCL, you won’t quite find mechanisms that take in an absolute time to run timers. For example, the System.Threading.Timer class has constructors that accept integer values (denoting millisecond durations) and TimeSpan values, but none that accept some kind of absolute time. So, what’s going on here?

The answer for Rx v1.0 is that schedulers basically implemented the overload of Schedule based on absolute time by doing a little subtraction to compute a relative due time. It’s as simple as “var dt = Now - dueTime;” with a bit of normalization code to make sure we don’t schedule at negative times (rounding up to a zero timespan to run work that’s past its due time as soon as possible). However, this approach isn’t ideal for various reasons. Let’s drill in.

 

Timer drift

One issue with emulation of absolute time based on a computed relative timespan is how we’re dealing with two vastly different notions of time. You’d (rightfully so) expect the following to print a happy face when wishing a happy birthday:

var birthday = new DateTime(2013, 2, 11, 0, 0, 0);
Observable.Timer(birthday).Subscribe(_ =>
{
    var now = DateTime.Now;
    var onTime = Math.Abs((now - birthday).TotalMilliseconds);
    Console.WriteLine("Happy birthday! {0}", onTime < 1000 ? ":-)" : ":-(");
});

Chances are (very much so) that you’ll arrive at the birthday party with a sad face, being too late or too early. Why’s that? At the time of writing this post, we’re talking about a relative due time of “239.09:07:05.9531517”, which amounts to “20682425953.1517” milliseconds. Some timers don’t particularly work well (in terms of accuracy) over such long durations, because of various reasons. One thing you could ask yourself is what notion of time various timers are based on, compared to the wall clock time we’re comparing to here using DateTime.Now (which is merely a P/Invoke wrapper around GetSystemTimeAsFileTime).

As an experiment, run the following piece of code:

// Used to root the timers, preventing GC.
var ts = new HashSet<Timer>();

for (int i = 10; i <= 5 * 60; i += 10)
{
    var dt = DateTime.Now;

    var t = new Timer(_ =>
    {
        Console.WriteLine(DateTime.Now - dt);
    }, null, i * 1000, Timeout.Infinite);

    ts.Add(t);
}

You may be surprised about the output, showing how the system clock and the timers seem to drift apart. After some three minutes (on this particular machine), you see the timer is already 100ms off:

image

The behavior of various timer constructs can vary significantly, also because of different design points and anticipated usage. For example, in a world where you expect a small number of timers to fire sporadically (e.g. to detect timeouts) you’re dealing with a whole different set of design constraints compared to a timer that’s meant to fire periodically at a high frequency, but maybe needs to always post back to the same thread (e.g. in a UI context with a message loop). Below is another example, this time using the WPF dispatcher’s timer facility:

Dispatcher.CurrentDispatcher.BeginInvoke(new Action(() =>
{
    for (int i = 10; i <= 5 * 60; i += 10)
    {
        var dt = DateTime.Now;

        var t = new DispatcherTimer { Interval = TimeSpan.FromSeconds(i) };
        t.Tick += (o, e) =>
        {
            Console.WriteLine(DateTime.Now - dt);
            t.Stop();
        };
        t.Start();
    }
}));

Dispatcher.Run();

The results are vastly different this time around. First of all, notice how the DispatcherTimer is designed for periodic operations, so we make the operation a single-shot one by stopping the time from recurring inside the callback. Another difference is the timer’s relationship to the dispatcher where work is run. Either way, you shouldn’t be surprised to see different results again:

image

This time we seem to arrive early at the timer callback parties, drifting away in the other direction. Not as dramatically as we did for the Timer class (in the other direction there), but still in a significant way. The reader is invited to run other experiments with other timer facilities (or maybe even using a single thread and using Thread.Sleep calls) to observe similar effects.

When applying this technique of simulating absolute time scheduling by subtractive math to compute a relative due time, it’s clear things will go wrong. Apply any of the drift factors to our sample of sending birthday wishes in “20682425953.1517” milliseconds from now, and things will be off quite a bit. Even a one part per million divergence (below both of the samples we’ve seen here, so this is very optimistic) would cause a staggering 20 seconds difference on such long periods of time! In lab runs of a variety of hardware and with a variety of OS configurations, we’ve seen drift factors up to 10E-4.

 

System clock changes

Another worry of using absolute time is how to deal with system clock changes. Let’s even ignore daylight saving time (DST) worries here, and assume we’re dealing with UTC time all over the place. When the system clock changes, and we’re running timers based on relative durations, it’s plain obvious we won’t be accounting for the shift in time that has occurred underneath us. One situation of a system clock change forward in time is depicted below. The opposite direction of a time change is completely analogous:

image

Absolute time mechanisms in Rx v1.0 were susceptible to such problems, as noticed many times on the MSDN forum for Rx. In fact, the problem is bigger than just single-shot timer scenarios, as we’ll see later in this post. For now, let’s focus on the single-shot timer case, based on absolute time.

But there’s more than just jumps forward or backward in time. When your machine is set up to use NTP, the need for a correction of the system time will be checked for on a periodic basis. When the difference between the time reported from the service and the actual system clock time on the machine is small enough, the Windows Time service (W32Time) won’t perform a rough SetSystemTime operation, as shown in the picture above. Doing so would cause certain points in time to exist twice (if the clock goes backwards) or not to exist at all (if the clock goes forward). Instead, an adjustment is made to the way the system time is incremented for each clock tick. The function in Windows that allows to configure this behavior is called SetSystemTimeAdjustment, which is called by the W32Time service in this case.

To see the time adjustment behavior in action (based on NTP synchronization), we’ll write a simple program that polls the GetSystemTimeAdjustment function to check whether an adjustment is active or not. This function reports two values: the time adjustment and time increment, both in 100 nanosecond units. It also tells you whether adjustments are applied. From the MSDN documentation (keep in mind the parameter is called lpTimeAdjustmentDisabled):

A value of TRUE indicates that periodic time adjustment is disabled, and the system time-of-day clock advances at the normal rate. In this mode, the system may adjust the time of day using its own internal time synchronization mechanisms. These internal time synchronization mechanisms may cause the time-of-day clock to change during the normal course of the system operation, which can include noticeable jumps in time as deemed necessary by the system.

A value of FALSE indicates that periodic time adjustment is being used to adjust the time-of-day clock. For each lpTimeIncrement period of time that actually passes, lpTimeAdjustment will be added to the time of day. If the lpTimeAdjustment value is smaller than lpTimeIncrement, the system time-of-day clock will advance at a rate slower than normal. If the lpTimeAdjustment value is larger than lpTimeIncrement, the time-of-day clock will advance at a rate faster than normal. If lpTimeAdjustment equals lpTimeIncrement, the time-of-day clock will advance at its normal speed. The lpTimeAdjustment value can be set by calling SetSystemTimeAdjustment. The lpTimeIncrement value is fixed by the system upon start, and does not change during system operation. In this mode, the system will not interfere with the time adjustment scheme, and will not attempt to synchronize time of day on its own via other techniques.

The program we’ll use is shown below, using a little bit of P/Invoke magic to interop with the Win32 API (omitting some error checking to keep things simple for the scope of the blog post):

[DllImport("kernel32.dll")]
static extern bool GetSystemTimeAdjustment(
    out uint lpTimeAdjustment,
    out uint lpTimeIncrement,
    out bool lpTimeAdjustmentDisabled);

static void Main()
{
    const int MSTO100NS = 10000;

    while (true)
    {
        uint adj, inc;
        bool disabled;
        GetSystemTimeAdjustment(out adj, out inc, out disabled);

        if (!disabled)
            Console.WriteLine("Adjustment {0}ms per {1}ms",
(
double)adj / MSTO100NS,
(
double)inc / MSTO100NS); else Console.WriteLine("Adjustment disabled"); var now = DateTime.Now; Thread.Sleep(1000); Console.WriteLine("> " + (DateTime.Now - now)); } }

Most likely, when you run this program, it will start off by showing there’s no adjustment active. We also measure the time it takes to wake up after a sleep of 1 second, based on the system time. From this, we’ll be able to draw some conclusions about how the relative time values passed to various threading and timer APIs relates (or doesn’t) to the system time. Below is the output when running on a Windows 8 machine that’s not domain joined, using the public NTP service:

image

While running the experiment, we trigger an re-synchronization of the clock using the w32tm command-line tool. Alternatively, you could also use the “Date and Time” applet in the Control Panel, where you’ll find the following dialog (assuming you’re not in a network with a domain):

ntp

In the output, notice what happened. After we triggered the synchronization operation, the W32Time service noticed our clock was lagging behind the time reported from the server. As a result, the service applies an adjustment for each time increment in order to catch up. The reported increment is a 15.6ms period between clock ticks – corresponding to 64MHz – and is typical for multi-core systems. The same value is reported by ClockRes.exe, using the same API. Instead of incrementing the system time by the same value, we’re trying to catch up by adding 15.4679ms increments each time. Notice how the Thread.Sleep-based time computations are also off. Sleeping one second doesn’t quite line up with one second of system clock time!

From now on, the adjustment will be changed periodically, each time – hopefully – getting closer to our natural rhythm, until we’re back in sync and the adjustment can be disabled. Some machines where the clock ticks cause systematic drift from reported system time will have an adjustment applied (almost) all the time. As you can see from the little experiment, the perception of time will be off when observed through various APIs.

Note: We didn’t even mention various other notions of time, or mechanisms to deal with time. For example, there’s the Stopwatch class which uses high-resolution performance counters, and there are multimedia timers.

In summary, dealing with absolute time by converting it to relative time for scheduling operations is troubled. In Rx v2.0, we did rethink our treatment of absolute time thoroughly, as we’ll explain next.

 

Improving absolute time scheduling in Rx v2.0

During the Rx v2.0 Beta timeframe, we started a refactoring exercise for all scheduler implementations, by introducing a common LocalScheduler base type. This abstract base class reduces the burden of implementing a new scheduler type that’s meant to run work on the local system. It does so by fixing the implementation of the Now property to use DateTimeOffset.UtcNow, and provides an implementation of all but one Schedule methods:

  • For the “schedule immediate” overload of Schedule that doesn’t take in any time value, it redirects to the relative time based overload, passing in TimeSpan.Zero.
  • For the “schedule with absolute time” overload of Schedule, the Beta implementation also redirected to the relative time based overload, after performing the normalized Now - dueTime computation.

Scheduler implementations can still override those virtual methods and provide a custom implementation, but we recommend to only implement the remaining abstract method, which is the Schedule method that accepts a TimeSpan value. If the TimeSpan is zero (we’ll make sure to normalize negative values to zero), you can always go down a different route to schedule work that’s due immediately (still asynchronously though), other than by running a timer.

What’s changed since the Beta release is how we implement the Schedule method that accepts a DateTimeOffset value. Rather than doing the subtraction to convert to relative time – which causes problems as explained in much depth before – we now have a common timer queue shared across all types derived from the LocalScheduler base class. The scheduling algorithm used to run work based on an absolute due time is subject to change, so take the description in this blog post with a grain of salt. The basic principles are likely to stay the same across releases though. What's more important towards our (advanced) users is how absolute time scheduling ultimately calls into your scheduler implementation code.

In the base class implementation for absolute time scheduling, we keep a number of data structures that are shared across derived scheduler types. One such structure is a priority queue that keeps all scheduled work ordered by its absolute due time. Items in the queue contain the action, the state to be passed to the action, as well as the derived LocalScheduler type instance where the work was scheduled on. This information is used later on to transition the work from our queue to your scheduler implementation.

Before queueing work in the so-called long term queue, we evaluate whether the work is due soon or not. This decision is based on a comparison to the current system clock (using the Now property) and currently uses on a threshold constant of 10 seconds that's subject to change (either to another constant value or a dynamic scheme). Two things can happen now: either the work is due soon, or it's to be treated as a long term job.

  • If the work is due soon, it's scheduled in the target scheduler using a call to the Schedule overload that's based on a relative TimeSpan value. You may think this is the same as the old Rx v1.0 approach, but there's one difference. Instead of forgetting about the work, we keep track of the IDisposable returned by the call to Schedule in a so-called short term list. We also wrap the scheduled action with a bit of synchronization code to prevent us from running the work twice. How could that happen? Well, if we notice a system clock change (the mechanism of which we'll discuss in a bit), we'll scan the short term list and try to cancel all the work and transition it back to the long term queue for re-evaluation based on the new system clock time. If the underlying scheduler couldn't grant the cancellation, we need to make sure the re-queuing operation won’t cause double execution, which would be disastrous.
  • If the work is due in the long term, things look quite different. Now, we enqueue the work into the long term queue and evaluate when the first item in the queue is due. If this hasn't changed by the enqueue operation - because the new work is due at a time later than the first item - we bail out. The IDisposable returned will take care of removing the work from the queue, among some other housekeeping tasks. However, if the new work is due the soonest of all the long term work, we reset one single timer we keep: the dispatch timer.

The role of the dispatch timer is to transition work from the long term queue to the target schedulers, by dequeuing all the work that's due in the short term at the point of the timer firing. This is based on a threshold value similar to the one used during the Schedule call, but if we can, we prefer to transfer more than just a single item of work, in order to reduce the number of times the dispatch timer has to run. Transitioning work to the short term list follows the same mechanisms as discussed before, computing the relative due time, and keeping track of the IDisposable objects in order to allow for cancellation in case the system clock changes during the remaining time.

When the dispatch timer is set to fire is based on the long term work item that due soonest. However, we don't just use the difference between the due time and DateTimeOffset.UtcNow, because we may be experiencing timer drift, which we illustrated before. Instead, we make sure to fire early, again based on a computation that's subject to change. Think of it as the "number of nines" cut-off point which we determined based on empirical results from the lab. At this point, we schedule the dispatch timer at the 0.999 mark of the due time (one extra order of magnitude compared to the worst drift we've seen, to err on the safe side), or at 5 seconds before the due time, whichever is earlier. This allows us to arrive early to re-evaluate how we're doing, possibly triggering a false positive dispatch timer tick, which won't do anything but adjust itself to fire well in advance of the first item that's due. We're basically playing an Achilles and the turtle game, but at some point work will transition to the short term list.

In the illustration below, you can see an example of an initial state of the LocalScheduler, containing work in the long term queue with the dispatch timer set to fire before the first item is due. All of the target schedulers are idle at this point:

image

When the dispatch timer fires, work is transitioned to the underlying schedulers based on a cutoff point (subject to change, as mentioned earlier). Let’s assume the first three items in the queue are considered to be short time now, causing them to transition and fill the short term list:

image

The one remaining piece is how we deal with system clock changes. In Rx v1.0, the library was simply unaware of system clock changes, hence the problems with timers we discussed before (and we will add another twist to this further on). Now that we have various core facilities to discover platform-dependent functionality - known as the Platform Enlightenment Provider (PEP) - we can tune in to various system events such as system clock change notifications. And that's precisely what we do here, using an infrastructure-only interface called INotifySystemClockChanged, whose implementation is discovered through the PEP. The default implementation runs a periodic timer from the Concurrency Abstraction Layer (CAL), which corresponds to whatever periodic timer is available on the platform (System.Threading.Timer from the CLR, the WinRT threadpool timer from the Metro profile, etc.). This timer's job is to periodically check the system clock's value against an expected value computed from the timer's period. If we notice a substantial difference, we conclude the clock has changed, and raise a change notification. The period is currently fixed, but may be tweaked based on the target platform, to conserve battery power. Also, change notifications are disabled when there's no outstanding work based on absolute time.

Upon receiving a system clock change notification, we cancel the dispatch timer and all of the short term work. Next, all the short term and long term work is re-evaluated by comparing the due time against the current system clock time. Based on this, work is bucketized in the short term or long term camp, and a new dispatch timer is run for the head of the long term queue. As mentioned before, work is protected from running multiple times by doing some wrapping that checks a flag using an Interlocked.Exchange operation.

Other implementations of the system clock change notification mechanism can be provided by the host, by providing a custom PEP. This is an advanced scenario that's put in place for hosts that have better ways of retrieving this information. For example, one could use the Microsoft.Win32.SystemEvents class, though this mechanism doesn't work in certain scenarios because it relies on a window message and Windows Services that aren't configured to interact with the desktop don't receive such messages. Starting with Windows 7 and Windows Server 2008 R2, the Service Control Manager (SCM) provides a SERVICE_CONTROL_TIMECHANGE notification to the HandlerEx callback if the service implementation elects to receive such notifications. Advanced hosts could wire up change notifications based on this OS facility.

Oh, and for completeness: we currently don't rely on information about which direction the clock changed, because some change notification implementations cannot get this information.

In a nutshell, if you implement a custom scheduler, we recommend to derive from the LocalScheduler base class, and do the minimum amount of work: simply provide the implementation of the Schedule overload that accepts a TimeSpan due time value, and we'll take care of the rest. This method will be called for all three different ways of scheduling work. If work is due immediately, you'll see a TimeSpan.Zero value coming in. If work was due at an absolute time, you won't hear from us until a later point in time, when the work has become short-term and we hand it over to you. The handover threshold we discussed before is also tuned to make sure you should have enough time to schedule the work on your end, i.e. in the multiple seconds range. On the second scale, drift due to relative time scheduling is considered to be below what you can expect as a precision from non-real-time operating systems like Windows anyway. If you want to knock your socks off and try to achieve millisecond precisions, you can still go ahead and give it a shot inside your scheduler's relative time overload for Schedule.

In case you really want to differentiate between the code path for work that's due immediately and work that has a non-zero due time, you can override the Schedule method that takes in no time value. Overriding the overload that deals with absolute time is a whole different story, and is likely to be avoided in the majority of case, unless you're hooking into a platform facility that natively provides absolute time scheduling (such as the Windows Task Scheduler API, see ITimeTrigger. Or, you could override the method for logging or interception purposes and still call the base implementation of course. Notice though, you shouldn't rely on the object identity of the action delegate to attempt to do smart things like recognizing an action coming back at you that was initially passed to the absolute time scheduling overload, because of the wrapping we do inside.

 

Scheduling periodic work

Seasoned Rx developers will know about our use of recursive scheduling as the core abstraction on how to introduce concurrency in the system, as required by the various layers in Rx. To remind you about this, have a look at one of the Schedule methods on the IScheduler interface again:

IDisposable Schedule<TState>(TState state,
TimeSpan dueTime,
Func<IScheduler, TState, IDisposable> action);

The recursive structure is visible from the type of the delegates passed to the Schedule methods. Besides passing in the user-supplied state, we also hand the delegate an instance of a scheduler to perform recursive work on. This scheduler instance may be different from the parent scheduler, in order to reduce scheduling costs. For example, the NewThreadScheduler will only start a new thread for top-level Schedule calls. Recursive calls enqueue work to run on the parent thread, basically by using a dedicated EventLoopScheduler in disguise.

IDisposable objects returned from the recursive calls to Schedule can be returned from the Func<IScheduler, TState, IDisposable> delegate and are wired up – by the scheduler's implementation – to the top-level returned IDisposable object. In other words, the top-most IDisposable object is now your handle to cancel the scheduled work and all of the spin-offs it created.

Convenience overloads to Schedule are provided as extension methods, making recursive scheduling easier to use in case the work that has to recur has a loop body characteristic. One such extension method is illustrated below:

public static IDisposable Schedule<TState>(this IScheduler scheduler,
TState state,
TimeSpan dueTime,
Action<TState, Action<TState, TimeSpan>> action);

This mechanism is used quite extensively throughout the Rx v1.0 codebase, e.g. to generate sequences like Observable.Range (using an overload that doesn’t take in a due time). Each iteration through the generation loop corresponds to its own scheduled work item, which allows for proper interleaving as required by various schedulers (e.g. you shouldn't run a potentially long-running loop on the UI thread). We've discussed this recently in our blog post about testing in virtual time, which includes a discussion of our scheduler abstraction.

One of the drawbacks of recursive scheduling is the cost it brings in terms of the number of scheduler calls and closures created to translate the recursive action into the Func<IScheduler, TState, IDisposable> delegate expected by the interface implementation. This cost is unnecessary when schedulers support long-running operations, so we introduced ISchedulerLongRunning in the Rx v2.0 Beta release. However, this doesn’t take care of time-based recurring operations, which we’ll discuss right now.

Quite a number of operations in Rx rely on periodic tasks. Examples include the Observable.Timer and Observable.Interval sequence factories that allow to create an observable sequence that produces periodic ticks (represented as increasing long values). Other examples are query operators such as Observable.Sample (which periodically samples a stream) and Observable.Buffer and Observable.Window. Let’s have a closer look.

 

Absolute time strikes again!

Most of you will likely know about the Observable.Interval sequence generator, which merely is a convenience overload for the Observable.Timer factory, passing in a period as a TimeSpan:

public static IObservable<long> Interval(TimeSpan period,
IScheduler scheduler);

However, did you ever think about what the period measures? There are two common interpretations of the period parameter possible: either it measures the time between the start of consecutive callbacks, or it denotes the time that elapses between returning from one callback and starting the next callback. The picture below illustrates the difference:

image

Why does this matter? Recall the role of schedulers as the drivers of observable sequences, a task they fulfill by making callbacks to observers. In this case, the OnNext calls to the sequence contain the incrementing (long value) ticks. The important thing to keep in mind here is that those callbacks involve running a continuation, potentially taking a long time and reaching out to unknown user code. In other words, user code can cause slippage of time if we go for the second interpretation: the marbles in the diagram may be oval-shaped and have a significant width.

One way to think about time slippage is by comparing the situation to wheels of a car slipping on the road surface. In this analogy, there are two notions of progress (~ time). From the car engine’s perspective (~ scheduler) the number of rotations it produces on the wheels is what counts. From the road’s point of view (~ observable sequence), points on the wheel are touching the ground (~ OnNext). However, the contact of a point on the wheel to the ground may exceed a “tick” when the tire is slipping over the ground. To the scheduler, time seems to stand still. But to the road, there’s forward progress. Think of this as the OnNext – generated by a point of the tire touching the ground – taking a non-zero time:

image

In the picture above, the distance “delta T” traveled on the ground may be different from the distance “R” the tire moves. The red triangle represents the point we’re watching to make a callback to the road (~ an OnNext tick). When the triangle touches the ground and the road surface is running user code, slippage can be caused, as indicated by the red imprint it leaves on the road. The scheduler (car engine) is prevented from making forward progress: there’s no rotation of the wheel generated during the callback (~ an oil spill on the ground causing us to slip).

Obviously, the question is what Interval means in Rx v1.0. Let me give you a hint: what would you like it to mean if you’d use it as a sequence to drive a sampler (fed in to Sample), or a buffer that’s to compute aggregate information over a sequence every hour? Likely you want the first interpretation where you can set your clock by the produced ticks (sampled values, aggregate data over buffers, etc.). That’s precisely what we did in Rx v1.0 (and still do in Rx v2.0, albeit using a different mechanism).

So, how did something like Interval work in the presence of slippage, given we only have a recursive mechanism for scheduling? Doing a tail recursive call with the period won’t work:

static IObservable<long> PeriodicTimer(TimeSpan period,
                                       IScheduler scheduler = null)
{
    return Observable.Create<long>(observer =>
    {
        return (scheduler ?? Scheduler.Default).Schedule(0L, period,
            (t, rec) =>
            {
                observer.OnNext(t);
                rec(t + 1, period);
            }
        );
    });
}

Can you spot the defect? The recursive call to produce the next tick is scheduled after invoking the callback, which may take a non-zero amount of time:

PeriodicTimer(TimeSpan.FromSeconds(1)).Subscribe(_ =>
{
    Console.WriteLine("{0:hh:mm:ss.fff}", DateTime.Now);
    Thread.Sleep(100);
});

The result is shown below, clearly illustrating the time slippage due to the user callback:

image

If we were to invoke the recursive action “rec” at the beginning of the scheduled action, we’d end up having a different problem of callback reentrancy: the current callback may overlap with recursively scheduled callbacks, producing an invalid observable sequence because OnNext messages should not happen concurrently. To fix this, we’d have to add an AsyncLock (a utility that lives in the System.Reactive.Concurrency namespace) to the code:

static IObservable<long> PeriodicTimer(TimeSpan period,
                                       IScheduler scheduler = null)
{
    return Observable.Create<long>(observer =>
    {
        var gate = new AsyncLock();

        return (scheduler ?? Scheduler.Default).Schedule(0L, period,
            (t, rec) =>
            {
                rec(t + 1, period);
                gate.Wait(() =>
                {
                    observer.OnNext(t);
                });
            }
        );
    });
}

This is undesirable for various reasons. If a callback systematically overstays its welcome (i.e. exceeds the period, thus preventing the next callback from taking place), the queue inside the AsyncLock keeps growing and the owner of the lock will have to continue pumping callbacks that have queued up. In other words, it’d never cause the scheduler to yield anymore because we’re really running an unbounded loop. We could dismiss this by saying callbacks should be short-lived, and long-running work should be offloaded by using ObserveOn; fair enough. But there’s another problem with this code: it still doesn’t work properly with regards to the desired timing characteristics:

image

Can you guess what’s wrong this time? The problem is smaller, but we’re still drifting about 10ms each iteration. What we’re seeing this time around is the cumulative cost of the recursive scheduling itself, involving the allocation of several objects, the scheduling call for the next iteration, etc. The way we solved this in the Rx v1.0 timeframe was by using absolute time to compute the next due time, and perform recursive calls for the remainder duration after returning from the OnNext callback:

static IObservable<long> PeriodicTimer(TimeSpan period,
                                        IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;

    return Observable.Create<long>(observer =>
    {
        var next = scheduler.Now + period;

        return scheduler.Schedule(0L, period,
            (t, rec) =>
            {
                observer.OnNext(t);

                next += period;
                rec(t + 1, Scheduler.Normalize(next - scheduler.Now));
            }
        );
    });
}

This worked more or less. While it seemed like you could set your clock by it (as shown below), and there was some correction for drift (because every tick calibrated with regards to the system clock), there were a number of problems.

image

All problems have to do with the use of absolute time, which seems very counterintuitive when looking at the signature of our PeriodicTimer factory method: it only takes in a TimeSpan, yet it somehow uses absolute time under the covers. In fact, we could have made recursive calls using the “next” value and do the initial schedule call using “next” too, therefore only using DateTimeOffset-based overloads of the Schedule method. While you asked for something that’s purely based on relative time, it exclusively uses absolute time under the covers?!

So, what about the “positive effects” this has, namely some correction of drift? This works pretty well for small periods, e.g. in the range of seconds. However, as soon as you go beyond the small scale, systematic drift cannot be corrected reliably this way. Say you have a 10E-5 drift factor and you’re scheduling something with a period of 1 hour, which is 3,600,000ms. The drift is now somewhere between -36ms and +36ms, so next time we’ll schedule with at an hour from now, give or take 36ms, which is negligible on such a large number: 10E-5 of 3,600,036 will be 36ms again: we’re accumulating drift despite seemingly correcting it. Notice though, with the Rx v2.0 fixes to absolute time scheduling, this would work better.

However, the use of absolute time here has many other drawbacks. For example, what happens when the system clock changes? We now have a stale copy of the scheduler’s old clock (retrieved through Now) sitting somewhere hidden in the value of “next”. When we wake up after the expiration of the recursive timer (which under the hood was always based on relative time), we just carry on with the previous value of “next”, not knowing the value doesn’t have a meaning anymore. Due to the use of Normalize (to protect against negative values), we may end up making recursive calls with TimeSpan.Zero due times in an attempt to catch up with the system clock. The loop is now going crazy:

image

In fact, this problem doesn’t only occur when the system clock changes, but also when the application doesn’t observe a certain time interval. The way that happens is when some lifecycle mechanism suspends the application and when it comes back alive after resumption, it notices the current system clock. You may already have guessed the kind of mechanism I’m referring to here: the dormant state of Windows Phone applications. A similar mechanism exists today in Windows 8’s new Metro-style applications. This led to an interesting bug early in the Rx v1.0 timeframe, which we fixed subsequently (including in the version in WP7’s ROM), where an application would come out of dormant state, and the Observable.Timer/Interval implementation would start its quest to catch up with the new time, draining the battery due to the flood of callbacks it triggered. The fix was more of a band aid, whereby the timer would ignore ticks that occurred in the distant past, resetting the “next” state to be based on the current system time.

As you can see, the curse of absolute time strikes again! While we could use the same system clock notification mechanism (and extend it with awareness of lifecycle events – something we’ll encounter again later on in this blog post) to make query operators like Timer, Sample, Buffer, etc. aware of system clock changes, this would start to pervade the whole Rx library at all layers, significantly increasing testing burden, add complexity all over the place, the need for extra synchronization (because system clock change notifications may occur concurrently with other timer operations), etc.

So, we decided to tackle this problem in a different way…

 

Introducing ISchedulerPeriodic

Periodic scheduling using recursive scheduling is merely some kind of emulation, that – as we’ve seen above – brings with it a slew of headaches. When we first designer the scheduler abstraction in Rx, we went for a minimalistic approach that works for various scheduler mechanisms provided by the platform, ensuring proper interleaving of operations (e.g. for UI schedulers), while keeping the LINQ layer relatively simply because of the one and only way of performing scheduling requests (modulo overloads that are used to simplify the code).

Just like we did with ISchedulerLongRunning in Rx v2.0 Beta, we’re now introducing another capability – called ISchedulerPeriodic – in the Rx v2.0 RC build. In the layer map of Rx, higher layers of the system can detect scheduler capabilities and perform a more optimal scheduling approach when such capabilities are available. The new periodic scheduling capability is used by all of the operators that have a time-based recurring characteristic, such as Timer, Interval, Sample, Buffer, and Window (and potentially more going forward). So, what does the interface look like?

namespace System.Reactive.Concurrency
{
    public interface ISchedulerPeriodic
    {
        IDisposable SchedulePeriodic<TState>(TState state,
TimeSpan period,
Func<TState, TState> action); } }

The interface should be pretty straightforward to understand: given an initial state and a TimeSpan-based period, the action will be called periodically, returning an IDisposable in the grand Rx tradition of providing a cancellation mechanism.

What’s up with the threading of the TState through the action – or should I say function – though? To make it clear for implementers of the interface that the action is supposed to be called sequentially, we decided to go for a delegate that shows this requirement. The idea is that the next periodic callback can only occur when the current callback has returned the state to be passed to the next invocation. In other words, no reentrancy is allowed. Some implementations will have to guard against this, because the underlying infrastructure allows reentrant calls of timer callbacks (e.g. System.Threading.Timer has this behavior), while others don’t have to do anything special because they are piggybacking on a single thread pumping a message loop or so. As a result, reentrancy protection is only put in place for those schedulers that need it, as opposed to requiring the LINQ layer to put such measures in place all the time.

Extension methods are provided that provide simpler ways to run periodic tasks:

public static IDisposable SchedulePeriodic(this IScheduler scheduler,
TimeSpan period,
Action action);
public static IDisposable SchedulePeriodic<TState>(this IScheduler scheduler,
TState state,
TimeSpan period,
Action<TState> action);
public static IDisposable SchedulePeriodic<TState>(this IScheduler scheduler,
TState state,
TimeSpan period,
Func<TState, TState> action);

Wait a second… Why do those extension methods appear on the IScheduler interface, rather than the ISchedulerPeriodic interface? In order to clean up the LINQ layer, we centralized the decision whether to go down the ISchedulerPeriodic route or to perform some kind of emulation in System.Reactive.Core. Query operators can simply request a periodic task to be run, and the underlying layer will figure out an optimal way of doing it, rather than requiring type-check based logic in various places.

Depending on what scheduler capabilities are available, this method will either forward the request to the periodic scheduling interface implementation, or run some kind of emulation based on stopwatches or recursive scheduling. The use of stopwatches – based on the Rx v2.0 Beta concept of IStopwatchProvider – for emulation replaces absolute time obtained from the system clock as a frame of reference to detect time slippage. Only when neither periodic scheduling or stopwatches are available, recursive scheduling is used much like we did in Rx v1.0. This situation should only exist when using third party schedulers that were written for Rx v1.0, and don’t drive from LocalScheduler. As soon as you derive from LocalScheduler, stopwatch capabilities are exposed.

Note:  Detection of scheduler capabilities isn’t based on a type check for an interface implementation (using C# “is” or “as”). Instead, we look for an IServiceProvider implementation and ask that one for an implementation of the required interface (e.g. passing typeof(ISchedulerPeriodic)”.

When deriving from LocalScheduler, the default IServiceProvider implementation is based on an interface type check, so if you’re implementing your own scheduler it suffices to derive from LocalScheduler and implement the other specialized scheduling interfaces you support (ISchedulerPeriodic and/or ISchedulerLongRunning for Rx v2.0 – others may be added in the future). An IStopwatchProvider implementation is provided through the LocalScheduler base type for free.

The reason we use the IServiceProvider pattern is to allow for extension methods such as DisableOptimizations, which can be used to hide certain scheduling optimization aspects (mainly used for testing purposes, or as a (rare) workaround for compatibility issues). All the DisableOptimizations method does, is return a wrapper around the original scheduler object, providing an IServiceProvider implementation that lies about the available interfaces (by excluding the ones that are disabled, returning null from the GetService call instead).

While we’re touching on the subject of stopwatches, one additional PEP interface was added in Rx 2.0 RC. When applications are suspended, constructs like a stopwatch (including the one in the BCL, i.e. System.Diagnostics.Stopwatch, but also things like Environment.TickCount) suffer from gaps in their perception of time. What’s going on here is that those facilities are unaware of host lifecycle events – such as dormant states – and surface progress of time based on the system underneath. While the application is put asleep, the ticks continue increasing at the OS and hardware level. When the application resumes, properties like Elapsed on a stopwatch would reveal a time gap that includes the time the application was suspended. An infrastructure-only interface called IHostLifecycleNotifications is used to hook platform-specific system events (on Windows Phone 7, using PhoneApplicationService, and on Windows 8, using events on CoreApplication) through the System.Reactive.PlatformServices, allowing stopwatches to be reset upon resumption of the application. Pretty much the only place this is used today is in the emulation of periodic scheduling based on stopwatches, which is rather rare by itself because most schedulers in Rx support ISchedulerPeriodic.

An example of the internal use of ISchedulerPeriodic can be observed when using the built-in Observable.Interval factory method:

image

At the bottom of the call stack you can observe the use of System.Threading.Timer* stuff, which is used from the SchedulePeriodic implementation on the ThreadPoolScheduler in Rx. Notice the AsyncLock on the call stack as well, used to protect us against reentrancy. When disabling the periodic scheduling using the DisableOptimization extension method, the call stack looks quite different:

image

This time, a number of frames on the call stack reveal the use of recursive scheduling and a stopwatch used to detect time slippage. The reader should feel free to experiment a bit with other scheduler types that ship with Rx, to see the underlying mechanism used. It should be fairly easy to guess what facilities we’re using to do periodic scheduling on various UI frameworks such as WPF, Silverlight, Metro, Windows Forms.

 

Pros and cons of periodic scheduling

Periodic scheduling has many benefits, which is why we introduced it in Rx v2.0 RC, mainly because of two reasons. First, this fits very well in our performance-driven release plans. Second, it frees us from pervasive worries about absolute time, centralizing all periodic scheduling to one spot in System.Reactive.Core. Let’s drill in a bit more.

Performance benefits of periodic scheduling stretch from Rx’s use of scheduling, all the way to the underlying platform. In the Rx layers, we no longer have to go through the burden of allocating delegate objects deep inside the recursive scheduling extension methods, which leads to allocation costs linear in the number of iterations that are simulated through the recursive mechanism. For example, recursive scheduling requires wiring of the IDisposable objects that are returned from each recursive call, causing churn of objects kept by a CompositeDisposable.

Below the Rx surface, the performance benefits are mainly due to the transparent communication of our intent to the underlying scheduling facilities. Imagine you’re a timer inside framework, BCL, WinRT, or OS code. When recursive scheduling is taking place above our heads, the only thing we’ll see coming through is a flood of single-shot timer requests: “hey, call me back in 995ms” followed by “hey, call me back in 1002ms” followed by “hey, call me back in 987ms”, etc. You get the idea. However, when periodic scheduling is used, a single call to the underlying infrastructure is made: “hey, call me every 1000ms”. In fact, this call typically comes all the way from the LINQ layer, and tunnels through to the platform.

From an allocation point of view, this is beneficial: only one call is made, likely to new up a new timer object, with a tiny bit of wrapping at the Rx level (to provide an IDisposable implementation and maybe to protect the callback against reentrancy). But there’s more: communicating our intent of doing periodic work allows for various optimization schemes to kick in motion. As platforms are getting more and more constrained for power utilization, techniques like timer coalescing are becoming more popular. For example, Windows 7 introduced this in both user mode and kernel mode, and Windows 8 is adding more such capabilities. While we’re not directly talking to such functions at this point (which take in an extra parameter to specify a “tolerable delay”), it’s not unimaginable for platform-exposed timers to start doing more coalescing in order to reduce power consumption (e.g. two UI timers that have the same period but are only a few milliseconds apart could be optimized into running a single timer). By communicating periodic intent, the platform stands more chance to perform optimizations.

One drawback of periodic scheduling is its sensitivity to systematic timer drift, when comparing the results against e.g. the system clock. The main question here is whether the original timer implementation did really provide an advantage, given that it can’t protect against systematic drift (as we saw before), while it opens a whole can of worms because of its dependency on absolute time. System clock changes can make the timer skip a beat, or go in a catch-up regime, which is likely far more unexpected than a periodic timer that’s independent from the system clock and has a steady, more or less predictable, rhythm but may exhibit a higher degree of variability.

In case behavior is desired that’s closer to the Rx v1.0 approach (including its weaknesses), the DisableOptimizations extension method can be used to force the system into a recursive scheduling approach, with or without the presence of a stopwatch. Or, single shot behavior can be requested by going through non-periodic but recurring operators, such as the Generate factory method that accepts a time selector function.

Whether you can set your clock by a timer such as Observable.Interval depends on the characteristics of the IScheduler implementation. In Rx v2.0, we’re taking the approach where the scheduling mechanism is predictable to users of the underlying framework, e.g. using the DispatcherTimer for periodic tasks when using our WPF DispatcherScheduler, using CreatePeriodicTimer when using our WinRT ThreadPoolScheduler. If additional guarantees are required for specialized uses of Rx, custom IScheduler implementations can be built that optimize for desired characteristics such as accuracy and/or precision.

 

Now belongs to the past

As part of the work of introducing the ISchedulerPeriodic capability and the careful treatment of absolute time, we went through the entire LINQ layer codebase of Rx, hunting for uses of the Now property on schedulers. It’s no longer allowed for functionality at the LINQ layer to use this property in a way that stores a copy of the value for later use, simply because the system clock may change and we want to avoid clock change notification code paths in all layers of the system.

An example of an operators that used absolute time is Delay, which used DateTimeOffset timestamps to keep its queue of messages to be delayed. Instead, it now relies on stopwatches to record the time between consecutive messages, which are replayed using the same frame of reference established by an offset applied to the stopwatch’s elapsed time.

Only a handful of places in the LINQ layer still use – rightfully so – absolute time. For example, the Timestamp operator retrieves the Now property of the specified scheduler for each element flowing through the OnNext channel, in order to timestamp the message. This is okay because we’re not doing any computation based on this value. User still can interpret the Timestamped<T> value, taking into account that consecutive values may seem to be unordered in case the system clock changed abruptly between two OnNext messages.

The result of this cleanup is also better predictability of the behavior of various query operators. When a query operator accepts a DateTimeOffset, it will ultimately make a call to the DateTimeOffset-based overload of Schedule. When a query operator accepts a TimeSpan, it will communicate through relative time based scheduling mechanisms (possibly including the periodic capabilities). Conversion between both notions of time is a “not done” at the LINQ layer (or at the core layer for that matter, ignoring the world of virtual time scheduling where computations based on time values is common).

 

New temporal query operators

Rx v2.0 RC also introduces a few new query operators to deal with time. The list is short and self-explanatory, so I’ll simply paste the method signatures:

public static IObservable<TSource> Skip<TSource>(this IObservable<TSource> source,
    TimeSpan duration);

public static IObservable<TSource> Skip<TSource>(this IObservable<TSource> source,
    TimeSpan duration,
    IScheduler scheduler);

public static IObservable<TSource> SkipLast<TSource>(this IObservable<TSource> source,
    TimeSpan duration);

public static IObservable<TSource> SkipLast<TSource>(this IObservable<TSource> source,
    TimeSpan duration,
    IScheduler scheduler);

public static IObservable<TSource> SkipUntil<TSource>(this IObservable<TSource> source,
    DateTimeOffset startTime);

public static IObservable<TSource> SkipUntil<TSource>(this IObservable<TSource> source,
    DateTimeOffset startTime,
    IScheduler scheduler);

public static IObservable<TSource> Take<TSource>(this IObservable<TSource> source,
    TimeSpan duration);

public static IObservable<TSource> Take<TSource>(this IObservable<TSource> source,
    TimeSpan duration,
    IScheduler scheduler);

public static IObservable<TSource> TakeLast<TSource>(this IObservable<TSource> source,
    TimeSpan duration);

public static IObservable<TSource> TakeLast<TSource>(this IObservable<TSource> source,
    TimeSpan duration,
    IScheduler scheduler);

public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source,
    DateTimeOffset startTime);

public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source,
    DateTimeOffset startTime,
    IScheduler scheduler);

 

Improvements to virtual time scheduling

We also made an improvement to virtual time scheduling, allowing to simulate delays when running work in virtual time. This is useful to test for conditions such as time slippage introduced in callbacks, as we’ve seen before when discussing the problems of the emulation of periodic timers. The new method introduced on the VirtualTimeSchedulerBase class is called Sleep and increments the virtual time based on the specified delay (specified as relative time or absolute time, both of which need to denote a point in the future). An example is shown below:

var start = DateTimeOffset.Now;

var scheduler = new HistoricalScheduler(start);

scheduler.ScheduleAbsolute(start + TimeSpan.FromTicks(100), () =>
{
    Console.WriteLine("1st work item @ {0}", scheduler.Now.Ticks - start.Ticks);
    scheduler.Sleep(TimeSpan.FromTicks(50));
});

scheduler.ScheduleAbsolute(start + TimeSpan.FromTicks(200), () =>
{
    Console.WriteLine("2nd work item @ {0}", scheduler.Now.Ticks - start.Ticks);
    scheduler.Sleep(TimeSpan.FromTicks(150));
});

scheduler.ScheduleAbsolute(start + TimeSpan.FromTicks(300), () =>
{
    Console.WriteLine("3rd work item @ {0}", scheduler.Now.Ticks - start.Ticks);
    scheduler.Sleep(TimeSpan.FromTicks(50));
});

scheduler.Start();

This will print three work items being executed. The first occurs at time 100. Despite the nap of 50 ticks in the callback, the second work item can still run at time 200. However, the second work item overstays its welcome sleeps until virtual time 350, which is 50 ticks after the third item was supposed to start running. Virtual time schedulers don’t introduce any concurrency and simulate a single-threaded execution strategy, so the third item will start running as soon as it can, reporting a tick count of 350 when it starts.

Notice this method is similar to but different from the AdvanceTo and AdvanceBy methods. The goal of the Advance* methods is to drive the scheduler forward until the specified time. Doing this within a callback – while the single-threaded virtual time scheduler is already running – is not allowed. After all, what would it mean? Enter a nested pump to execute work? Mimic multiple logical threads of executions? In Rx v1.0, calls to Advance* methods were silently ignored while the scheduler was running. In Rx v2.0, an InvalidOperationException will be thrown in this case. The Sleep method can be called at any point in time and advances the clock by the specified amount of relative time (or until the specified absolute time), without executing any work. Calls to Start or the Advance* set of methods are the only ways to trigger the execution of work in relative time.

 

Revamped error handling strategy

Dealing with errors in an asynchronous world isn’t straightforward. Because parts of the user’s code (such as the continuation, i.e. an observer’s On* method calls) are running at a later point in time, there’s often another party involved whose task it is to run the code, sometimes far away from where the operation was kicked off (i.e. the call to Subscribe). This party who bites the bullet of running the continuation can be a .NET event (when using FromEvent*), a thread pool thread (when using FromAsync or when converting Task objects), or in general an Rx scheduler (when using sequence generators such as Range).

In Rx v2.0, we’re streamlining the way we’re dealing with errors across the board. Part of this work is done to support scenarios where Rx is hosted in an application that has high availability requirements. One of the biggest advantages of Rx is the first-class representation of events as objects – through the IObservable<T> interface – so one can pass event streams around. However, with great power comes great responsibility. When handing out an observable sequence, you’re subscribing (pun intended) to foreign code running in the observers, either directly (by a user calling Subscribe) or indirectly (by a user composing more query operations on top of the sequence).

Let’s have a look at the different aspects of error handling, how we dealt with some of those in Rx v1.0, and how we’re making improvements in Rx v2.0.

 

Who’s to blame?

As mentioned in the introduction, the push-based, asynchronous model of data retrieval provided by observable sequences poses an interesting challenge when errors occur. Namely, who’s to blame for causing a crash? The chain of continuations, formed by observers, is a bit like a row of dominos: pushing a domino can be dangerous, and you’ll ultimately will make the source of the chain fall over as well… In the picture below the bad guy is hiding outside the picture and causing an exceptional (?) chain reaction of dominos falling over.

image

On the other hand, if you were to pull the first domino to you in an attempt to MoveNext it, the worst thing that can happen is you get crushed by the domino throwing itself at you. The rest of the chain keeps standing up, and you only caused yourself harm.

In the push-based world of observables, when an exception happens in any of the observers, it’d swing back to the place where the original observer call was made from. Any stage of the pipeline can cause the domino effect of things being taken down, ultimately ending up as a stack of dominos representing the exception stack trace. To continue the analogy of dominos a bit further: when preparing a world record attempt of dominos falling over, you have better chances of keeping the pipeline standing (until the big day of planned cascading failure) if you build in some exception handling at various places of the chain, in this case by keeping gaps to prevent errors from propagating. Back to the world of data access in computer systems now…

When accessing data through a pull-based model, for example using a DataReader for a database command, an exception during the processing of the data isn’t harmful to the database. The exception may originate from the database, telling the consumer something went wrong, but it may equally well be the consumer doing something wrong when manipulating the data that came back. In the latter case, the database doesn’t even know about the error: the consumer caused only harm to itself.

On the other hand, when data is acquired using a push-based model, for example receiving tweets from Twitter, the sender of the data can be in danger when reaching out to the consumer. Again, the event source may notify us about an exceptional situation (through OnError), but the consumer may do something wrong inside a callback as well. In the latter case, the consumer can cause harm its caller. Wouldn’t it be sad if throwing an exception from a TweetReceived event handler caused the Twitter service to go down?

 

Safeguarding the pipeline

In Rx v1.0, we safeguarded the pipeline against user code exceptions that run as part of various operators. For example, when an exception is thrown from the selector function passed to Select or the predicate function passed to Where, it doesn’t propagate upstream through the chain of observers back to the source. Instead, the error is caught and sent downstream to the consumer’s observer through the OnError channel. As part of this operation, the subscriptions to any of the sources involved are taken down as well:

var src = Observable.Range(0, 10, Scheduler.Default)
                    .Do(x => Console.WriteLine("> {0}", x))
                    .Finally(() => Console.WriteLine("> Dispose()"));

var res = from x in src
          where x % 2 != 0
          let y = 5 - x
          select 100 / y;

res.Subscribe(
    x  => Console.WriteLine("  OnNext({0})", x),
    ex => Console.WriteLine("  OnError(\"{0}\")", ex.Message),
    () => Console.WriteLine("  OnCompleted()")
);

This simple code fragment cooks up a source sequence that performs a bit of logging of all produced messages as well as the disposal of the subscription, which is further used in a query where the final Select operation will throw due to a division by zero error. The happens while the Range source is producing value 5, passing the Where filter, and being projected using an intermediate “let” clause (a Select operation in disguise) to a divisor “y” with value 0.

If the exception were to propagate up the call stack, it’d take down the Range generator which is running on the default scheduler (introduced in Rx v2.0, using the platform’s thread pool). However, the exception propagates through the OnError channel as illustrated below. Also notice how the propagation of the error to the downstream observer triggered the disposal of the upstream producer: the Range generate is turned off, causing the work it scheduled to be cancelled.

image

Looking a bit deeper, we’ll set a breakpoint right inside the final “select” clause’s selector body. In the screenshot below, we’re broken in the debugger at the point where the selector is about to attempt a division by zero. Have a look at the call stack, revealing the DefaultScheduler being the volunteer to drive the continuation through the chain of operators. If we were to throw here, a thread pool thread would crash, causing the process to be torn down:

image

Obviously, we don’t want to swallow the exception, so instead it propagates down the OnError channel, which we can see by setting a breakpoint in the error handler now:

image

Notice where the OnError propagation originates from now: the Select operator caught the exception when reaching out to the selector and is now redirecting the exception to its observer’s OnError channel.

Is the source – Range and its scheduler – out of the danger zone now? Not quite… If we were to throw from the OnError channel, the exception would still travel upstream all the way back to the thread pool. At least, that’s what happened in Rx v1.0. Notice the fourth frame in the call stack, containing SafeObserver in its name? This was introduced in Rx v2.0 RC as a pipeline safeguard when an unknown observer is subscribed to an Rx-generated observable sequence. Inside Rx, we make sure observers never throw (except for truly exceptional circumstances, such as failure to allocate).

Note:  Post-RC we may tweak this mechanism a bit further to reduce the call stack depth that has grown due to the introduction of the SafeObserver stack frame. Part of the work we did in this release (cf. Rx v2.0 Beta) is making sure the implementation of Rx never uses AnonymousObservers, which are born when using any of the Subscribe extension methods that take in On* delegates. As a result, we could fuse the safeguarding mechanism with the AnonymousObserver implementation going forward.

This observer whose role it is to safeguard the pipeline doesn’t really catch the exception, instead it ensures resources are properly cleaned up when any of the callback methods throws. But if it doesn’t catch the exception, what is it good for? The answer is we’re dealing with two orthogonal concerns:

  • How to provide a last chance exception handling mechanism that prevents quick death from schedulers?
  • How to ensure proper cleanup when a rogue observer takes down the pipeline?

We’ll come back to the first part in the next section, and will focus on the SafeObserver’s role here, which covers the second bullet point. Consider the example of a binary operator, CombineLatest, taking two source sequences. While the combiner function has always been protected against exceptions since Rx v1.0 (just like predicates and selectors in Where and Select are), there’s something interesting going on when the downstream observer throws. Have a look at the following illustration where we inspect the call stack during two OnNext calls on the resulting sequence:

image

Can you see what’s going on? The free-threaded nature of Rx – one of the keys to achieving the performance because we don’t need to handshake with a common synchronization context or so – causes the CombineLatest operator to piggyback on either of the observers it provided to its sources. One time, we’re borrowing the call stack of the right hand source (i.e. a FromEvent sequence “ys” that forwards events that were seemingly generated on some Dispatcher). The other time, we’re surfing the call stack waves of the left hand source (i.e. an Observable.Timer sequence “xs” generated by the thread pool scheduler).

What would happen if we were to throw from the first OnNext callback in the picture? Right, we’d crash the Dispatcher where the call originated from, but how about the second sequence that was involved in the computation through the use of CombineLatest? Without the new safeguarding feature, this sequence would get orphaned, leaving a scheduled (periodic) timer ticking. Because we are failing somewhere in the middle of the outgoing call to the observer, we may end up messing up internal state (e.g. making the outgoing call may happen under a lock, which would get released during the exception propagation due to its use of a try…finally… block) and producing erroneous results (e.g. if the timer continues ticking we may continue to combine values with the last value seen on the dispatcher, even though we crashed part of the pipeline).

Note:  You may wonder why we don’t redirect the exception thrown from OnNext to the OnError channel instead (which causes automatic disposal of the pipeline, as we’ve seen before). Doing so would be plain weird: an observer throwing from its OnNext callback would get called again on its OnError channel. Now it’s indistinguishable whether the sequence propagated an exception (and the previous OnNext callback succeeded), or the observer fed back into itself. In addition to this, the OnNext call never terminated gracefully, so strictly speaking the OnError call is now overlapping the failing OnNext call, which is a violation of the observer grammar. Also, this mechanism wouldn’t quite work for terminal messages – OnError and OnCompleted – because no further callbacks to the observer are expected anymore at that point.

Starting with Rx v2.0 RC, the pipeline is being taken down gracefully at the point the SafeObserver sees the exception propagating upstream (using some kind of fault handler). In the sample illustrated above, this would correctly dispose the subscriptions to “xs” and “ys”, causing the event handler to be removed and the thread pool timer to be disposed as well. The exception continues to propagate though (see further)!

Intermezzo

Notice how this isn’t complex to get right in the world of pull-based data retrieval, with pull-based query operators. For example, the implementation of the Zip operator in LINQ to Objects looks a bit like this:

    static IEnumerable<R> Zip<T1, T2, R>(this IEnumerable<T1> xs,
IEnumerable<T2> ys,
Func<T1, T2, R> f) { using (var ex = xs.GetEnumerator()) using (var ey = ys.GetEnumerator()) while (ex.MoveNext() && ey.MoveNext()) yield return f(ex.Current, ey.Current); }

The reason error handling isn’t seemingly hard in this case, is because the compiler-generated state machine for the iterator method takes care of a lot of the complexity. Let’s walk through what’s going on here.

During the first MoveNext call to the resulting sequence, the iterator acquires two enumerators which it protects with a using block to guarantee proper disposal if anything goes wrong. In fact, things can already go wrong here, if any of the GetEnumerator methods throws (we’ll come back to this point further on, because there’s an analogy in the world of Rx). Assuming we make it through the phase where we perform the acquisition of both enumerators, we move on to the loop itself. From this point on, the remainder of the execution of the first MoveNext call looks the same as all of the subsequent calls to MoveNext.

As we a MoveNext call to the resulting sequence triggers an iteration through the loop – on its way to reach the first yield statement – a lot of things can go wrong. Either of the source sequences’ MoveNext calls could throw, the Current property getters could throw, and finally the combiner function “f” could throw. In any of those erroneous cases, the iterator will transfer control to the generated finally blocks to dispose the enumerators.

So, what’s different compared to the push-based model of Rx here? The key difference lies in the driver of the computation being the consumer making a MoveNext call. During this MoveNext call, all of the code that makes further calls to the underlying sources is protected – in a manner similar to lexical scoping with a try…finally… block – against exceptions in order to ensure proper clean-up. On its way out to the caller, the exception propagation triggers disposal of both enumerators.

In the push-based world, the driver of the computation is a callback on either of the two observers that are hooked up to the sources. Instead of one control flow channel (the MoveNext triggered by the consumer) there are no less than six stimuli we need to react to here: OnNext, OnError, and OnCompleted channels for each of the observers, routing results to the three output channels on the consumer’s observer. All of the code that runs as part of the incoming callbacks needs to be properly protected against user exceptions, and so do the calls to the outgoing observer as part of the safeguarding feature work.

 

A safety net for subscriptions

More can go wrong than exceptions originating from outgoing calls to observers though. What about the side-effects that happen during the subscription to sequences? As many of you know, the mother of all observable sequences is Observable.Create, which consists of two parts: the code that sets up the subscription, and the code that disposes the subscription. The skeleton is shown below:

image

What happens though if the code that runs as part of the Subscribe call throws an exception? As suggested by the comment, this code – pointed at by the green arrow – may be doing a lot of things. For example, it may schedule timers, as well as make subscriptions to other sequences. If any of those operations fails, we need to “roll back” the previous operations or we may be orphaning resources again. After all, we didn’t yet return from the Subscribe method, so the user wasn’t able to put her hands on an IDisposable that can be used to cancel the work.

Note:  Some readers may raise the question why we’re not taking in a cancellation mechanism (like a CancellationToken) rather than handing out a disposable object. Part of this is historical, because Rx started before the TPL took its final shape, ultimately making its way to the BCL. But there’s more than just this…

Passing in cancellation mechanisms makes certain aspects of composition harder: often, we want to orchestrate the disposal by taking down the pipeline in a well-ordered manner. When cancellation stimuli can propagate all the way to the leaf nodes of the computation, we can’t know which of our dependent resources is already on its way of granting the cancellation request, unless we intercept the cancellation propagation somehow by registering callbacks and/or performing a mapping between incoming cancellation tokens and outgoing ones. Examples include ref counting schemes required by operators such as GroupBy and Window, where we need to know when all subscriptions to inner streams have been disposed, before we can take down other parts of the pipeline (including the subscription to the source stream).

Using the disposable mechanism, each dependent unit of work can cook up a handle to cancel its computation (much like a cancellation token with a registered callback). The beauty comes in when many such pieces of work meet in a certain place. The composition of multiple streams is now directly mirrored in the composition of multiple disposables returned from the subscriptions. Using System.Reactive.Disposables, we provide a whole slew of utilities to compose disposable objects. In fact, we do have a CancellationDisposable as well, which uses the TPL’s CancellationToken mechanism, used for interop with async methods etc.

We could do a number of things to deal with this situation. One way is to add a lot of error handling to the body of each Subscribe implementation, in order to dispose all the resources we already acquired at a point of failure. This is very similar to the code generated by iterators in C#, which actually emits fault handlers, which unfortunately aren’t directly available in the C# exception handling primitives. An alternative approach is to put a safety net around subscriptions, which allows us to piggyback on a resource cleanup mechanism we already have in place in Rx, namely auto-dispose behavior upon terminal messages sent to an observer. Let’s drill in a bit more.

 

A brief look at async methods…

In order to understand the subscription safety net feature, it’s worthwhile to have a brief look at the C# 5.0 and Visual Basic 11 async methods first. Those are similar to the computation model that Rx provides, based on a continuation mechanism. Rather than using IObserver<T> to represent the continuation, async methods use Task<T> (or Task or void, but let’s ignore those) as the way to hook up a (single-shot) continuation using ContinueWith. One difference is in arity of the results, where observable sequences can have zero to an infinite number of calls to the continuation – reporting the sequence’s elements – while tasks can report at most one result. The point of interest though is how exceptions are propagated:

static async Task<int> Oops(int d)
{
    await Task.Delay(TimeSpan.FromSeconds(1));
    return 100 / d;
}

When calling this async method, we get back a Task<int> object that represents the ongoing computation. Notice tasks are typically hot, meaning they’re representing an operation that’s already running at the point you retrieve the task object. This difference from Rx – where sequences are typically cold and don’t do anything until you subscribe to them – isn’t very important for this discussion.

Assume we’re calling the method with a 0 value for the parameter “d”, causing the division to fail. Where does the exception come out? It’s clear we can’t throw the exception synchronously during the call to Oops because that’d involve blocking for the Delay operation to complete, and the whole point of an async method is to return to our caller as soon as we’re awaiting an operation that’s not ready yet. This is definitely the case here: the 1 second sleep operation is represented by a Task which won’t be in the completed state yet at the point of acquiring it (as the name Delay implies, the Task will only complete after the specified delay). So, the exception will be generated at a later point in time, really inside the continuation that’s generated by the compiler and passed to the Delay operation.

The code shown above corresponds to the following fragment, at least at a conceptual level. (The real compiler-generated implementation is much more advanced, and involves a state machine that keeps track of the location in the asynchronous method where we paused execution.)

static Task<int> Oops(int d)
{
    return Task.Delay(TimeSpan.FromSeconds(1))
        .ContinueWith(t => 100 / d);
}

This situation corresponds pretty much to the Rx case where a projection – specified through a selector function passed to Select – throws an exception, causing the error to propagate through the OnError channel. We saw an example of this earlier in this blog post. For the world of Task<T>, the error thrown from the 100 / d division by zero attempt will get propagated through the resulting task’s continuation, so any caller awaiting the Task<int> returned from Oops would see the exception getting propagated there. In a way completely analogous to Rx, the exception is propagated to the consumer’s continuation in order for it to be observed. Sweet!

However, what’s more interesting to have a look at is the following slightly modified fragment:

static async Task<int> Oops(int d)
{
    var res = 100 / d;
    await Task.Delay(TimeSpan.FromSeconds(1));
    return res;
}

When calling the Oops method, we’re running until the first await operation that lets us know it’s not ready yet, prompting us to return to our caller (with the promise of continuing execution at a later point in time when the operation is ready). But wait a moment… For this particular sample, wouldn’t this synchronously throw the division by zero exception? After all, the division isn’t happening after the first await operation. Let’s investigate by a simple example:

image

From the experiment, we can conclude the exception didn’t propagate as part of the synchronous call to Oops (on our way to the first await). Instead, the exception was caught in the compiler-generated code in order to be redirected to the returned Task<int>’s continuation. The “await” operation in SomeAsync is really the equivalent of hooking up a continuation and kicking the rest of the code to continue upon receiving a result, in this case the exception.

How does this relate to Rx behavior? An “await” operation really consists of two parts. First, there’s the execution that reaches an “await” point in the code, causing a continuation to be hooked up (ignoring the fall-through case of synchronous completion here). For Task<T>, this corresponds to a ContinueWith operation, which is analogous to the Subscribe method on IObservable<T>. The second part is run when the operation completes, and execution resumes where it left off. In the code fragment shown above, the continuation includes fetching the result and assigning it to “x”. Think of this phase as the code that runs as part of the continuation that was passed to the ContinueWith call. This is where the operation fails, which we can prove from the call stack:

image

The part where the await pattern reaches out to make a GetResult call on the awaiter object takes place after the continuation runs. Again, think of the continuation as the observer, and the error thrown from GetResult as the parallel to an OnError call.

In conclusion, async methods propagate exceptions that occur during the “initialization phase” (before the first await) through the continuation. This has the nice property of streamlining the way exceptions flow through an async computation by providing a consistent story: no matter where an exception happens during the execution of an async method, it always propagates through the continuation. In the world of Rx, we could achieve the same effect by propagating exceptions through the observer, even if they occur during the “initialization phase” (i.e. inside the Subscribe). After all, we already got an observer passed in we can communicate with…

 

…and iterators

Iterator methods used to write query operators over enumerable sequences in LINQ to Objects are very similar to the use of Observable.Create to write query operators over observable sequences in LINQ to Events, aka Rx. As such, any analogies are worthwhile to investigate closely. Consider the following implementation of a Where query operator over enumerable sequences:

static IEnumerable<T> Where<T>(this IEnumerable<T> xs, Func<T, bool> f)
{
    if (xs == null)
        throw new ArgumentNullException("xs");
    if (f == null)
        throw new ArgumentNullException("f");

    foreach (var x in xs)
        if (f(x))
            yield return x;
}

Unfortunately, this code is flawed. Why?

As most of you will know, the compiler transforms an iterator block into a state-machine driven implementation of the IEnumerable<T> and IEnumerator<T>. Evaluation of the iterator takes place in a lazy fashion, where each call to MoveNext on the resulting enumerator drives the state machine forward such that one yield statement is processed. During this process, an exception may be propagate (resulting in MoveNext throwing), a “yield return” statement may be encountered (resulting in MoveNext returning true after having set Current to the computed result), or the iterator may stop by means of a “yield break” statement (resulting in MoveNext returning false). In fact, the mechanism of generating the state machine is based on logic that’s very similar to – and to some extents shared with – the logic used for the new async/await features, which require a similar decomposition of a method into several stages.

The main question to ask here is when the code runs that reaches the first “yield return” statement. If it were to execute this code synchronously as part of the call to Where, we wouldn’t achieve laziness at all! From this, we can already conclude the code is flawed because the argument null checks don’t run immediately when the Where method is called. We need a refactoring operation to achieve the desired effect of eager argument checking:

static IEnumerable<T> Where<T>(this IEnumerable<T> xs, Func<T, bool> f)
{
    if (xs == null)
        throw new ArgumentNullException("xs");
    if (f == null)
        throw new ArgumentNullException("f");

    return WhereImpl(xs, f);
}

static IEnumerable<T> WhereImpl<T>(IEnumerable<T> xs, Func<T, bool> f)
{
    foreach (var x in xs)
        if (f(x))
            yield return x;
}

Now the code is correct, but the question remains when the code runs up to the first “yield return” statement, which now lives in WhereImpl. You may think this code ought to be run during the execution of the GetEnumerator method on the resulting sequence. In fact, this would be quite strange… The goal of the MoveNext method is to “move to the next element”. If the first part of the code were to run during the GetEnumerator call, we’d already have computed the result of the first MoveNext call by either encountering an exception, or hitting a “yield” statement. In fact, we may already have drained the whole sequence on our quest to find the first element that passes the predicate, so we’re defeating laziness again. It’s a bit like an off-by-one error.

What we really want is the first call to MoveNext to start the scan of the source sequence for the first element passing the predicate, reporting the result of this search through the return channel – exceptional or Boolean-valued – of the MoveNext call. As a result, the call to GetEnumerator shouldn’t be running any code in the original (pre-compiler transform) iterator code. Instead, all side-effects are taking place as part of the MoveNext calls. Here’s the key observation: this also includes the call to xs.GetEnumerator, as part of the foreach loop construct!

At this point, we can draw an analogy with Rx. Does this mean that the dual to GetEnumerator – i.e. Subscribe – shouldn’t run any side-effects of subscriptions to input sequences? No, we can’t drive the analogy this far, because we’d end up in a Catch 22 situation. Because the control flow is reversed, we need to stimulate producers of the incoming streams to start sending stuff, so we have to wire things up during the call to Subscribe (also returning an IDisposable to our caller). To set the scene, have a look at an implementation of a Where method for observable sequences:

static IObservable<T> Where<T>(this IObservable<T> xs, Func<T, bool> f)
{
    if (xs == null)
        throw new ArgumentNullException("xs");
    if (f == null)
        throw new ArgumentNullException("f");

    return Observable.Create<T>(observer =>
        xs.Subscribe(
            x =>
            {
                var b = false;
                try
                {
                    b = f(x);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }

                if (b)
                    observer.OnNext(x);
            },
            observer.OnError,
            observer.OnCompleted
        )
    );
}

The code inside the OnNext handler passed to the input sequence “xs” should have no surprises by now. In particular, exceptions that occur in user code – here the call to the predicate function “f” – should be caught and sent downstream into the OnError channel, rather than propagating upstream to the source’s producer (which may be a scheduler, among other things).

What’s of more interest here is the body of the Subscribe method, which roughly corresponds to the body of the lambda expression that gets passed to the Create method. In here, we’re making a call to Subscribe on “xs”, which is the equivalent of the GetEnumerator call to “xs” in the LINQ to Objects case. As we’ve seen before when analyzing iterators, an exception caused by the GetEnumerator call to “xs” doesn’t propagate during the GetEnumerator call on the resulting sequence, because of laziness. Instead, the error comes out upon the first MoveNext call, which is the equivalent to an OnError callback.

Considering all of the above, we’re in an interesting spot now. We have to make a call to Subscribe on “xs” in order to kick the gears of the pipeline into motion: someone has to start pushing results at us. At the same time, all of the bad things that can happen during the call to Subscribe on “xs” would now occur as part of the Subscribe call on the resulting filtered sequence. Why is this a big deal though? Consider the following poor man’s specialization of CombineLatest to compute sums of the latest values of two sequences:

static IObservable<int> SumLatest(this IObservable<int> xs, IObservable<int> ys)
{
    return Observable.Create<int>(observer =>
    {
        var gate = new object();

        var cx = 0, cy = 0;

        var n = 2;

        var dx = xs.Synchronize(gate).Subscribe(
            x => { cx = x; observer.OnNext(cx + cy); },
            observer.OnError,
            () => { if (--n == 0) observer.OnCompleted(); }
        );

        var dy = ys.Synchronize(gate).Subscribe(
            y => { cy = y; observer.OnNext(cx + cy); },
            observer.OnError,
            () => { if (--n == 0) observer.OnCompleted(); }
        );

        return new CompositeDisposable(dx, dy);
    });
}

Notice how we need two subscriptions to source sequences now. What happens if the second Subscribe call throws an exception? Right, it will propagate out of the lambda body of the subscribe function passed to Create, all the way to the caller of Subscribe on the result sequence. However, it also leaves the subscription to “xs” orphaned if we don’t take any precaution.

If we were to write a similar function in LINQ to Objects – the closest is a pull-based Zip, as we’ve shown the code for before – the subscriptions would correspond to enumerators, which are guaranteed to be cleaned up because of a “using” statement protecting them. This takes place no matter how the iterator terminates. In here, we don’t have such luxury handy. Putting a using block on dx and dy doesn’t work: we’d dispose the subscriptions even before returning to the caller, so nothing would happen. It simply doesn’t work in the world of asynchrony.

 

Introducing SubscribeSafe

We’re presented two options now, as mentioned briefly before our detour through async methods and iterators:

  • Put a lot of exception handling code in each and every Subscribe method implementation for query operators, being prepared for exceptions to occur at any point in time, doing proper clean up when needed.
  • Tackle the main culprit of exceptions during Subscribe calls, namely dependent Subscribe calls failing. If we’d somehow be able to ensure Subscribe methods do not fail, this property would transitively apply when composing query operators.

Evaluating our options, the second road seemed the more promising of the two, at least without compiler-generated assistance where all of the plumbing required for the first could be automated.

The idea is this: by making Rx-generated Subscribe methods no longer throw exceptions, query operators can safely handshake when building up the pipeline, knowing nothing can go wrong as part of setting up subscriptions. (Other errors such as the failure to schedule work are currently considered fatal.) When subscribing to unknown sources, a bit more caution is required, because the Subscribe method could throw. However, we have an observer sitting around at this stage already, because we’re running inside a Subscribe method ourselves.

Just like async methods propagate exceptions (including the ones that occur before reaching the first await expression) through the task’s continuation, and just like iterators propagate exceptions (include the ones that occur before the first yield statement) through the MoveNext exceptional return path, we can redirect exceptions from the subscription phase to the OnError channel. The picture below summarizes the behavior of async methods and iterators:

image

The new SubscribeSafe method we’re introducing in Rx v2.0 RC protects subscribers against synchronous exceptions and is meant mostly to be used inside query operator implementations. All sequences that are built using Rx inherit this behavior, no matter how complex the query composition gets. By curing unknown sources with safe subscriptions, the rest of the pipeline gets safeguarded by transitivity. In fact, when using the Observable.Create method or when deriving from ObservableBase<T> to construct observable sequences, we apply this safety net around your subscription delegate as well. In the picture below, we show the situation with observable sequences – using the Rx v2.0 safety net – analogous to the async method and iterator sample:

image

In this sample, the use of Observable.Create in Bar suffices to get safe behavior. However, if Foo were to call Subscribe on an unknown sequence, we’d use SubscribeSafe instead (it’s actually fine to use SubscribeSafe routinely though). For example, our CombineLatest lookalike could be made safe as follows:

static IObservable<int> SumLatest(this IObservable<int> xs, IObservable<int> ys)
{
    return Observable.Create<int>(observer =>
    {
        var gate = new object();

        var cx = 0;
        var cy = 0;

        var n = 2;

        var dx = xs.Synchronize(gate).SubscribeSafe(
            x => { cx = x; observer.OnNext(cx + cy); },
            observer.OnError,
            () => { if (--n == 0) observer.OnCompleted(); }
        );

        var dy = ys.Synchronize(gate).SubscribeSafe(
            y => { cy = y; observer.OnNext(cx + cy); },
            observer.OnError,
            () => { if (--n == 0) observer.OnCompleted(); }
        );

        return new CompositeDisposable(dx, dy);
    });
}

Say the (second) subscription to “ys” propagates an error, which will now go through the observer.OnError delegate passed to SubscribeSafe. How does this take down the (first) subscription to “xs” though?

The answer lies in the magic behind Observable.Create. It should be clear that by going through the observer’s channel, the code above doesn’t throw an exception inside the subscription delegate passed to the Create method. In other words, we reach the last line where we return the CompositeDisposable object that holds on to dx and dy. When this disposable object gets treated in order to be returned to the caller of Subscribe (which happens inside ObservableBase<T>), it’s assigned to a SingleAssignmentDisposable (which also holds on to a few related disposable objects that cause silencing of the messages sent to the observer upon calling Dispose). However, during the propagation of an error through the OnError channel – caused by the safety net provided by SubscribeSafe – this disposable object gets automatically disposed because the sequence has reached a terminal message according to the observer grammar (so-called auto-detach behavior). As a result, the assignment of the CompositeDisposable to the already-disposed SingleAssignmentDisposable causes immediate on-the-spot disposal, causing dx and dy to be disposed as well.

 

Higher-order streams

In fact, one of the places where the introduction of SubscribeSafe matters in a subtle way is when dealing with higher-order streams and higher-order query operators, i.e. when you’re dealing with “streams of streams”. For example, consider the Merge query operator that has the following signature:

public static IObservable<TSource> Merge<TSource>(
this IObservable<IObservable<TSource>> sources);

What this operator has to do is subscribe to the “sources” sequence in order to receive the inner streams. Each stream it receives is subsequently subscribed to, by passing in an observer that will propagate the received elements on the result stream (in a non-overlapped way, thus involving a bit of synchronization).

Did you notice the important word in the previous sentence? Yes, “subscribed to” is what I meant. The subscription to the inner streams happens in the context of the OnNext call on the outer stream (of streams). If this subscription were to fail exceptionally, we’d be faced with an OnNext that throws. We can’t know where the inner sequence comes from (in fact, each inner sequence in the higher order stream could have been produced elsewhere), so we need to safeguard us against rogue streams that throw upon calling Subscribe. We do so by using SubscribeSafe instead.

In fact, we could also have used the SafeObserver mechanism introduced earlier. However, this wouldn’t be as good as using SubscribeSafe for various reasons. First, while it’d correctly clean up the subscription to the outer stream, more plumbing would be required to clean up the subscriptions to all of the inner streams as well. Also, an exception thrown from an inner stream subscription would now have the potential of causing the outer stream producer (e.g. a scheduler) to blow up, which seems a little rude. Finally, the use of SafeObserver introduces another stack frame in the observer chain, causing minor performance degradation. Using the SubscribeSafe mechanism, none of those concerns apply.

 

A new method with a catchy name...

One final feature that fits in the Rx v2.0 RC theme of enhanced error handling is the introduction of an extension method on the IScheduler type, called Catch. Its signature looks as follows:

public static IScheduler Catch<TException>(this IScheduler scheduler,
Func<TException, bool> handler)
where TException : Exception;

This method allows to wrap an existing scheduler with an exception handler callback that's invoked in case scheduled work throws an exception. The callback can then decide whether it is willing to handle the exception or not, communicating the outcome using a Boolean return value. If true is returned, the exception is treated as being handled, and the call stack can simply unwind gracefully. In case a false value is returned, the exception is rethrown. At this point, not all is lost for the poor scheduler though. Often, schedulers simply wrap an existing mechanism in the underlying framework, which has an exception handling mechanism by itself. For example, the Dispatcher type used in various UI frameworks exposes a static event called "UnhandledException" that can be used to deal with such exceptions. Similarly, the task pool has a facility to deal with so-called unobserved task exceptions.

Note: Wrapping of schedulers isn't a trivial task. Quite often the wrapper needs to apply recursively to the IScheduler that gets passed as the first parameter on the action being scheduled. Rest assured though, the Catch method has an in-depth effect, making it effective for recursive scheduling as well. Also, the use of Catch applies to the specialized interfaces for long-running and periodic scheduling, in case those were available through the IServiceProvider on the original scheduler instance. For those of you who're implementing their own schedulers, here’s one bit of advise.

The way to apply policies like Catch and DisableOptimizations with a recursive effect is by keeping a map between scheduler instances we've seen during recursive calls and their wrappers. (In fact, this map is based on weak references to avoid leaks.) We use the map to avoid creating wrappers for every single scheduler instance we see passing. Typically, those scheduler instances are some kind of proxy to the original scheduler, which is the same across different recursive calls, making the technique of using a map very effective.

However, when implementing custom schedulers, it's easy to defeat this mechanism by always passing a new scheduler instance (for recursive usage) to the scheduled actions, causing the map to grow rapidly and causing a lot of GC churn. In fact, this isn't really a problem with the implementation of our wrappers: the custom scheduler is flawed to begin with because recursion on it is more costly than it should be! The wrapping merely magnifies the flaw.

It almost goes without saying that throwing from any of those last-chance exception handling opportunities - including the callback passed to Catch - is a not done. If we were to circle back to the same exception handling mechanism every time again, things would grow out of hand quickly: stack overflows, endless loops, etc. You name it. Therefore, it's key to ensure the handler callback is fail-safe, does the minimal amount of work to handle the exception if it can, and simply returns true or false. Notice this mechanism doesn't provide a way to keep the crashing pipeline alive: damage has been done, and the pipeline will tear down. All the Catch facility provides is a way to limit the damage done, maybe do some clean-up (though there's no context other than the exception object to work with), and allow the scheduler to keep running nonetheless. Even though it's bound to happen - recall the saying about powerful weapons and responsibility - it's almost never a good idea to implement a catch-all here by always returning true. Especially since the Rx v2.0 pipeline has been hardened at various levels – with the SafeObserver feature and the SubscribeSafe approach – such extremist measures shouldn’t be necessary anymore.

Intermezzo

Why did we choose an approach of an extension method rather than exposing some static event? In Rx, the way of life is composition. As such, we decided to go for the extension method approach to create a derived scheduler with exception handling applied, rather than having an approach based on events. Various schedulers that are built in to Rx are singletons. Given that Rx is a library, the introduction of global facilities - such as static events to deal with exceptions - would be worrisome, in case other libraries start to hook into those. When several such libraries have their own opinion on how to handle things and come together in an application, the result is something with unpredictable behavior.

As a general rule of thumb, scheduler implementations in Rx need to obey to a number of rules. First, scheduled work items should have the smallest amount of wrapping required to run the work and provide a reasonably good way to perform best-effort cancellation. Fat closures are to be avoided at all costs, especially since the dialog of the LINQ layer with the scheduler layer can be chatty at times (though this has been reduced significantly in the v2.0 release with the introduction of long-running and periodic scheduling specializations). The second rule is to avoid stack overflows when recursive scheduling is involved: for example, it's not uncommon for top-level scheduling requests to spawn a logical thread of execution and drain a queue of dependent, recursive work that will be processed on the same logical thread. Third, exceptions that occur during the execution of scheduled actions should have a chance of reaching built-in error handling facilities the underlying infrastructure provides. There's a few more, but we'll leave this for future posts dedicated to the concept of schedulers.

What's interesting from the discussion above in the context of error handling is the third rule. In Rx v1.0, this rule generally applied to our built-in schedulers, except for one. The TaskPoolScheduler actually did prevent the TPL's built-in TaskScheduler.UnobservedTaskException event from being raised. This event is something peculiar worth mentioning here. When a Task completes exceptionally and no-one observes the exception (e.g. through the Result property, by calling Wait, or by hooking up a continuation that runs when the task has faulted) by the time the task becomes eligible for garbage collection, the exception is propagated when the finalizer runs. This could happen a long time after the task's last reference was lost, due to the non-deterministic nature of garbage collection. Before the exception is thrown on the finalizer, the UnobservedTaskException event is raised, allowing applications to observe the exception and decide whether the escalation policy of bringing down the app through the crashing finalizer should kick in. Starting from .NET 4.5, the use of tasks has become omnipresent thanks to the introduction of asynchronous methods. In this world, fire-and-forget async methods aren't uncommon, and losing a reference to a Task object can happen easily. For example, you may be using a WhenAny construct to time out a task by pairing it up with a Delay operation, throwing a TimeoutException in case the delay is satisfied first. In that case, the original task's potential failure is irrelevant, because it got superseded by the timeout exception. Even if you go through the trouble of trying to cancel the original task, there's still a race possible between propagating the timeout and triggering the cancellation. In the end, there'd be a lot of cases where the exception escalation policy could kick in rather unexpectedly. As a result, the behavior in .NET 4.5 has changed to prevent unobserved task exceptions from crashing the application, though the event is still raised.

Back in the Rx v1.0 days, our TaskPoolScheduler did wrap the actions passed to the TaskFactory with an exception handler that expedited the exception by starting a new thread to throw the exception as soon as possible, rather than having the finalizer crash the process at a later point in time. This approach did fit well in our "fail fast" philosophy which we apply throughout Rx (e.g. OnError messages propagate as soon as possible, rather than being delayed in operators like Delay). An unfortunate side-effect of this was that the UnobservedTaskException event was never raised, much to the surprise of some seasoned TPL users. Also, Rx had a mind of its own on how to deal with such exceptions, rather than deferring the decision to the underlying platform. In Rx v2.0, we're going for a more transparent approach that also happens to align well with the .NET 4.5 changes to the exception escalation mechanism in the TPL. In case one wants to get the old fail-fast behavior of Rx back for the TaskPoolScheduler, it's always possible to use the Catch method and escalate the exception from the handler callback (e.g. by spawning a thread whose sole task it is to throw the exception). One could do something similar on a TPL-wide level, because we now make sure the UnobservedTaskException can get raised (though it will happen at a later time, when the Task we creates has no more outstanding references to it, and the dance with the GC has taken place).

Finally, on the subject of schedulers, a word of advise. As you know by now, various operators at the LINQ layer of Rx are parameterized by a scheduler, specifying where concurrency is introduced. In fact, one can recognize the nature of a given query operator simply by checking whether or not it depends on a scheduler for its execution, as revealed by the method signature. In order to keep things simple, we've provided an overload for each such query operator, omitting the scheduler parameter and supplying a meaningful default. This follows the principle of least concurrency, something we can discuss in great length in another post. Entry-level users of Rx can simply walk up to the plethora of query operators without having to worry about the concept of schedulers at first, thanks to the overloads that provide defaults. Advanced users on the other hand are more likely to tweak and control the choice of each scheduler, and will want to ignore those default-picking overloads. Unfortunately, there's no "right answer" to those conflicting requirements.

Intermezzo

From time to time, users come up to us and ask how to change the default schedulers. The answer is: "you can't". The rationale for this is the same as what we discussed before in the context of (static) events to handle exceptions: it doesn't play nice in a library context. Ideally, we'd have some kind of "static extension method" feature by which we could separate out the methods on System.Reactive.Linq.Observable that pick the defaults and keep them separate from the IScheduler-parameterized mother overloads that do the hard work. This way, one could substitute the assembly that picks defaults for a custom one. (Oh, and we're not a big fan of IoC and dependency injection at this level either.)

Instead, in case you need to change the defaults, we recommend you to keep some one-stop shop in your codebase where you can go and grab the scheduler instance you want to pass to various query operators, bypassing the overloads that pick defaults altogether. This kind of explicitness has many benefits. People reading the code can easily track what's going on in the concurrency domain, you can apply exception handling in a central location using the new Catch extension method, and last but not least: it allows for better testability by substituting "real" schedulers for ones that operate in virtual time. Given the functional style nature of Rx, it's advised to factor your reactive logic into several methods, each of which consists of the composition of various Rx primitives, and carries out a well-defined task. Such methods are typically parameterized by one or more observable sequences, and possibly schedulers. Testing each reactive method in isolation, using the TestScheduler, is a great way to ensure you're building the right thing.

An example of using Catch is shown below:

static void Main()
{
    var tp = TaskPoolScheduler.Default;

    var scheduler = tp.Catch<InvalidOperationException>(ex =>
    {
        var canHandle = ex.Message.EndsWith("!");

        Console.WriteLine("In Catch - {0} (Handled = {1})", ex.Message,
canHandle);
return canHandle; }); TaskScheduler.UnobservedTaskException += (o, e) => { var ex = e.Exception.InnerExceptions.Single(); Console.WriteLine("In UnobservedTaskException - {0}", ex.Message); e.SetObserved(); }; scheduler.Schedule(() => { Console.WriteLine("Task 1"); throw new InvalidOperationException("Oops !!!"); }); scheduler.Schedule(() => { Console.WriteLine("Task 2"); throw new InvalidOperationException("Oops :-("); }); Thread.Sleep(1000); GC.Collect(); GC.WaitForPendingFinalizers(); Console.ReadLine(); }

Running this sample prints the following:

image

As you can see, the Catch handler did kick in for both faulty actions, but only signaled it could handle the first exception. The second task’s exception leaked through to the built-in exception handling mechanism exposed by the TPL.

 

Bridging with existing events – Revisiting the FromEvent family

With the ever growing set of supported target platforms, we continuously need to ensure compatibility of the Rx library when used in cutting edge technologies. One such technology is the new Metro style application platform in Windows 8, and more specifically its eventing system and the UI stack.

Back in the Rx v2.0 Beta timeframe, we made several changes to support WinRT style events. This work focused mainly on getting FromEventPattern to work against such events. Even though pretty much the same metadata format is used across .NET and the WinRT platform, the structure of events and their add/remove method pairs is subtly different in WinRT. We refer back to the blog post about Rx v2.0 Beta for more information, also including information on the new TypedEventHandler type, amongst some other event-related changes.

However, between Beta and RC, we continued our work on getting first-class support for WinRT events in Rx, which led to a number of additional changes required to the way FromEvent and FromEventPattern work. Let's have a look.

 

Oops... on the wrong thread?

As all of you who've dealt with UI frameworks are well-aware, interacting with UI elements such as controls can be a non-trivial task due to the threading requirements imposed by platform underpinnings such as message loops and STAs. Thanks to the scheduler abstraction in Rx, we've made synchronization with the UI much easier, and have thought our users the pattern of inserting an ObserveOn block to a reactive query pipeline, right before the final observer is about to touch the UI. A typical example is shown below:

Observable.Interval(TimeSpan.FromSeconds(1))
          .ObserveOnDispatcher()
          .Subscribe(/* callbacks run on the dispatcher */);

In this sample, the Interval call doesn't specify a target scheduler, hence the sequence generator falls back to the default scheduler – in this case the platform's thread pool scheduler – to run (periodic) timers on. This is where the callbacks to the observer will originate from, so touching the UI isn't a particularly good idea. Two solutions to this problem exist. The one shown here is to toss messages over the dispatcher wall using ObserveOnDispatcher, or more generally to toss them over to another scheduler using the most general-purpose ObserveOn overload. This one works particularly well if you've received an observable sequence from some other party and can't control where that sequence runs its callbacks. Another approach that would work here is to parameterize the Interval generator with a scheduler that represents the UI dispatcher or message loop, in this case by using the DispatcherScheduler type.

As a minor feature in Rx v2.0 RC, we've increased the number of overloads on ObserveOn to support various UI-affine objects, such as WPF's DispatcherObject base class, as well as the DependencyObject base class used elsewhere. This makes it easier to do the right thing without having to go and hunt for the required Dispatcher, etc.

But there's more... (Isn't there always?) The free-threaded nature of Rx poses another challenge that's gone largely unnoticed before. Consider the following abstract example first:

var res = from x in xs
          from y in f(x)
          select x + y;

LINQ experts will immediately recognize this as a SelectMany method call in disguise. For observable sequences, this operator merges all of the selected inner sequences that are retrieved based on the elements in the source sequence "xs" by computing "f(x)". The picture below illustrates the hidden selector function:

image

So far so good, but there's a little caveat. The subscription to the inner sequence – obtained by running f(x) – happens in the context of the callback that produced "x", hence on the scheduler producing "xs". All side-effects that run during the call to Subscribe take place on this same thread. Generally, this doesn't pose a problem, especially if the source is produced by Rx itself, living a free-threaded life. However, when the subscription code reaches out to various platform-specific APIs, thread affinity problems may be lurking around the corner. Guess what, this is exactly what's going on when we're trying to attach an event handler to an event as part of subscribing to a FromEvent[Pattern] sequence.

As a quick pop quiz, tell me whether or not the following piece of code (where Foo is declared as an event) can throw an exception:

bar.Foo += (o, e) => { /* intentionally left blank */ };

You'd be forgiven for answering "no" to this question. When using standard events in e.g. C#, all the += operation does is cause a Delegate.Combine call behind the scenes, with an Interlocked.CompareExchange to handle multi-threaded accesses. However, if an event is declared with custom add and remove accessors, the code that runs as part of the += and -= operations can be anything.

In fact, Observable.FromEvent[Pattern] in Rx v1.0 is merely a wrapper around the pair of += and -= operations:

var foos = Observable.Create<EventPattern<object, MyEventArgs>>(obs =>
{
    var h = new EventHandler<MyEventArgs>((o, e) =>
    {
        obs.OnNext(new EventPattern<object, MyEventArgs>(o, e));
    });

    bar.Foo += h;

    return () => { bar.Foo -= h; };
});

Because of this thin wrapper, exceptions originating from the add/remove operations on the event can bubble to the surface during the call to Subscribe and during the disposal for the subscription, respectively. This may sound familiar if you read through our discussion of error handling earlier in this blog post: exceptions can be thrown from Subscribe, which is undesirable. Unfortunately, this situation has become more common in the new Metro platform's UI stack where attaching an event handler to UI-affine objects has to happen on the UI thread. An example of a violation of this rule is shown below:

metro handler exception

It turns out previous XAML-based platforms were more forgiving about this in some cases. The Metro UI stack has chosen to make any accesses to the UI from a different thread invalid, resulting in the exception shown above. The little repro code used here mimics what would happen if you we to trigger a subscription to the FromEventPattern-based wrapper around the TextChanged event from the TaskPoolScheduler. This is something that can happen easily, due to the free-threaded nature of Rx, as shown below:

fromevent error

Recall our discussion of SelectMany earlier, pointing out the LINQ syntax hides a collection selector function. In this case, the “textChanged” sequence is subscribed to on the callback of the Timer, which happens on the thread pool (also accessed through the DefaultScheduler in Rx v2.0, as visible from the call stack). As a result, we’re getting the exception we saw before, deep inside the SelectMany code that subscribes to the inner sequence.

Because we’re dealing with user code here, you can see the exception triggering the exception dialog in Visual Studio. If we were to let the program continue to run now, the use of SubscribeSafe internally in SelectMany – because we’re dealing with a higher-order stream, see earlier – would catch the exception, and send it down the OnError channel:

onerror

If we were to omit an OnError handler, the default behavior would be to rethrow the exception, which would still happen in the context of the timer call, hence on the DefaultScheduler. This would cause the exception to propagate up the call stack, back to the scheduler, in this case running on the WinRT thread pool, which doesn’t make the application crash in Windows 8 Release Preview. However, depending on the scheduler you’re running on, you may trigger some kind of escalation policy, as we saw before. Obviously, you could use the new Catch extension method on IScheduler to deal with this as well.

Knowing the FromEventPattern is basically doing what we’ve shown in this example, and knowing that some schedulers don’t escalate exceptions, can you see what the problem is? Violations against cross-thread accesses for thread-affine objects may go unnoticed, rendering the resulting application non-functional. You wouldn’t even see the exception when running the application in Visual Studio (as we did in the picture above), depending on the overload of FromEventPattern you use, because the add/remove handler operation may go hidden in framework code rather than user code. This is true when using the reflection-based overloads. The result is a hard to debug problem.

 

SubscribeOn to the rescue?

Advanced Rx users will likely know how to tackle this kind of problem at its core: use SubscribeOn, which posts the subscription and dispose operations to the specified scheduler. Unfortunately this approach suffers from a few problems...

The first problem is a non-technical one. Because of their similar sounding names, it’s easy for users to get confused between ObserveOn and SubscribeOn. In fact, combined with the general guidance of tagging on ObserveOn calls to ensure proper synchronization with the UI, it becomes even more confusing if one now needs to add a SubscribeOn in the right spot as well:

var tc = Observable.FromEventPattern<TextChangedEventArgs>(txt,
"TextChanged") .SubscribeOnDispatcher(); var res = from t in Observable.Timer(TimeSpan.FromSeconds(1)) from c in tc select c; res.ObserveOnDispatcher().Subscribe(_ => { // Handle the event });

In this proposal for a fix, we make the subscriptions to the FromEventPattern bridge happen on the dispatcher (which, by the way, is obtained during the call to SubscribeOnDisaptcher, so where to put this call can be important if Dispatcher.Current uses thread-local state), so when the SelectMany operator receives the tick event from the timer, it will post to the Dispatcher to make the add event handler operation happen. Also, when a disposal happens (e.g. because the subscription to “res” is disposed), the remove event handler operation takes place on the scheduler.

Notice how we also added ObserveOnDispatcher here, for good measures. In this case, it turns out it isn’t required because the events produced by “res” are always observed on the dispatcher to begin with (as they originate from the TextChanged event directly, which is produced on the dispatcher). However, if we were to add seemingly simple operations like Delay, Buffer, Throttle, etc. we may end up receiving events from a different thread, so we need to make sure to marshal observations to the right context by using ObserveOn.

The placement of both operators is essential though. Moving ObserveOn higher up the chain wouldn’t be a good idea if you’re moving it in front of operators that change the thread context (e.g. Throttle). So, as a general recommendation, one should put ObserveOn – when used for UI synchronization purposes, that is – as close to the final observation (that touches the UI) as possible. For SubscribeOn, the story is different: move it as close to the source as possible. For example, if we’d move the SubscribeOnDispatcher down and apply it to “res” instead, the only Subscribe call that would be “protected” by the operator would be the one to the Timer. The subsequent subscription to the inner “textChanged” stream happens in the context of the timer’s OnNext message after 1 second, and isn’t subject to the SubscribeOn policy.

As you can tell from the description above, this is quite subtle and quite tricky to get right. While we could spend countless hours educating people about all of this, it isn’t the only problem though. There’s a technical issue with the blind/naïve use of SubscribeOn as well. Consider what happens if “tc” in the code fragment above is subscribed to from the UI thread itself. Due to the use of SubscribeOnDispatcher, the code that runs as part of the subscribe operation (in this case the logic in FromEventPattern to perform a += operation on the specified event) is posted to the dispatcher. No longer does the subscription happen synchronously, even through we could perfectly do this when we’re already on the right thread. The net result is that we could experience a time gap between returning from the Subscribe operation on “tc” and events starting to flow. Other UI operations – including ones that change the text in “txt” – could be dispatched before the posted work to hook up the event handler (causing OnNext calls to start flowing) takes place:

image

In a lot of cases, this is acceptable. But in others, this is totally undesirable and leads to Heisenbugs. While we prefer “composition to the rescue” as the way of life in Rx, for bridges to the external worlds (i.e. factories prefixed with From), we sometimes make an exception to that rule and add a bit of logic to ensure the common case does what you expect it to (e.g. FromAsync* inserts an AsyncSubject to deal with race conditions). In Rx v2.0 RC, we did the same for FromEventPattern. Let’s have a closer look.

 

A smarter SynchronizationContextScheduler

Our IScheduler interface provides a means to abstract over different ways to schedule work. Typically, schedulers correspond to platform facilities such as the thread pool, the task pool, UI message loops, thread creation APIs, etc. However, the .NET Framework already has an abstraction to synchronize with a given “location” where work can be run, known as the SynchronizationContext.

It shouldn’t come as a surprise that Rx includes an IScheduler implementation that wraps a given SynchronizationContext object, with the predictable name of SynchronizationContextScheduler. All it does is perform a Post operation for scheduled work (using a thread pool timer for time-based scheduling, in order to transition the work to the underlying context in a delayed manner).

In Rx v1.x, the implementation of the SynchronizationContextScheduler always performed a Post operation, regardless of where the call to Schedule took place from. Starting with Rx v2.0, we expose a knob to tweak this behavior, known as the “alwaysPost” parameter on the constructor (enabled by default if omitted, to be compatible with v1.x behavior). As we’ll see in a moment, we new up instances of the SynchronizationContextScheduler with an “alwaysPost” parameter set to false in one place inside Rx. When disabling the “alwaysPost” behavior, an object reference equality check is performed between SynchronizationContext.Current and the specified context object. In case the two are equal, the Post operation is bypassed. In fact, work has been done in various places in .NET 4.5 to make the kind of check more reliable. Those changes were motivated by the heavy use of SynchronizationContext objects in the await infrastructure.

 

Putting the pieces together

Given the improvements to the SynchronizationContextScheduler, we can now pull off an enhancement to the FromEvent* factories, also accommodating for the issue of time gaps we looked at before. As you explore the API surface in Rx v2.0 RC, you may notice the FromEvent* methods now have an overload that accepts an IScheduler parameter. This is indicative of the method relying on the scheduler infrastructure to get work done. As you may have guessed by now, the use of the scheduler is to post the add/remove handler operations to.

You can think of the v1.x behavior of FromEvent* to fix the IScheduler parameter for Scheduler.Immediate, which isn’t really a scheduler but a way to execute scheduled work immediately, hence synchronously. In other words, the add/remove handler operations just happened to run wherever the corresponding calls to Subscribe and Dispose were made from. As we saw before, this is where things get troublesome is thread-affine objects are involved.

Starting with Rx v2.0 though, the default is no longer the immediate scheduler. Instead, the FromEvent* methods capture the SynchronizationContext.Current property at the time of the call (i.e. not deferred or lazy). If this property doesn’t return null, we new up a SynchronizationContextScheduler with the alwaysPost parameter set to false. At the point someone subscribes to the resulting observable sequence (or disposes the resulting disposable object), we’ll Schedule the add handler operation (or remove handler operation) on the scheduler. Because of its alwaysPost option being disabled, this still may cause synchronous operations to take place if we already happen to be on the target SynchronizationContext. In case FromEvent* doesn’t find a current SynchronizationContext to capture, we fall back to the old, free-threaded, Rx v1.x behavior, using Scheduler.Immediate as the default. If you always want to get the old behavior, it suffices to parameterize the FromEvent* methods with Scheduler.Immediate.

Intermezzo

We considered alternative ways of determining where to run the add/remove handler logic on. Unfortunately, the frameworks we’re dealing with today (.NET and WinRT) don’t have a widely used abstraction to bundle an object with its thread-affine information (i.e. the thread the object should be accessed from). In the olden days of Windows Forms, the ISynchronizeInvoke interface provided such a facility, but alas this design hasn’t lived on to other UI frameworks. Well, it kind of has, by exposing a Dispatcher property on various objects, though this doesn’t fit the layer map where platform-specific abstractions aren’t welcome in the core of Rx.

This said, even if we’d have such a single abstraction for thread-bound objects, it would be of no use in the context of the FromEvent* method, for a different reason. Why? Well, the overloads to those methods that take in a pair of add/remove handler delegates would prevent us from finding the target object, in order to derive the target scheduler from it. How so? Have a look at the following code:

    Observable.FromEventPattern(h => bar.Foo += h, h => bar.Foo -= h)

In here, anonymous methods are generated for the add/remove handler delegates. The body of those methods is opaque to use, so we can’t quite figure out the target object is “bar��. What we’d really need for this to work is the ability to pass in the add_Foo and remove_Foo compiler-generated methods through a “method group conversion”. In that case, the Delegate’s Target property would reveal the object we’re looking for.

Both problems mentioned here could be worked around in clever ways. I’ll keep the description of such workarounds short and leave it to the reader’s imagination: platform enlightenments to discover target schedulers for given objects, expression tree to inspect the user intent. We don’t like clever to the extent required here.

This enhanced implementation of FromEvent* comes with a bit of advise though. In order for this mechanism to work, the call to FromEvent* needs to take place in a context where we can capture the current SynchronizationContext. The guidance is to put bridging code such as FromEvent* outside your reactive queries, which serves two goals:

  1. It keeps the query clean of code that’s solely there for plumbing, gluing, and bridging purposes. No-one likes to see a FromEvent* of FromAsync* construct in the middle of an otherwise clean and concise query.
  2. It frees you from negative effects that can come from the free-threaded nature of Rx. When refactoring queries, it’s easy to reduce or introduce concurrency in various places, which would cause FromEvent* to capture another context.

An example of correct usage is shown below:

var tc = Observable.FromEventPattern<TextChangedEventArgs>(txt,
"TextChanged"); var res = from t in Observable.Timer(TimeSpan.FromSeconds(1)) from c in tc select c; res.Subscribe(_ => { // Handle the event });

It goes without saying other overloads of FromEventPattern could be used; I’m solely using the reflection-based overload to keep the code a bit more concise for the purposes of this blog post. What’s of more critical important here is the place where FromEventPattern is called from. In particular, inlining the whole “tc” definition right inside the query’s second “from” clause will likely not work. Why? As explained above, the capturing of the current SynchronizationContext happens during the call to FromEvent* methods. If we were to perform this operation in the second “from” clause, we’d be capturing it in the context of the OnNext call of the timer, i.e. the thread pool. This would result in a null context, causing FromEventPattern to perform synchronous add/remove handler operations, therefore running those on the thread pool as well. As we’ve seen, the Metro UI framework doesn’t quite like that.

The reason I’m stating it “will likely not work” is because some UI frameworks are forgiving about where the event operations take place. Mileage will vary depending on when you’re using WPF, Silverlight, or Windows 8’s Metro UI stack.

 

Cold, hot, or tepid?

There’s one more (final, I promise) thing to say about the revamped FromEvent* implementations. As stated earlier on, the FromEvent* bridges are pretty similar to an Observable.Create operation whose Subscribe part performs the += operation on the target event, and who Dispose part performs the –= operation. From the discussion above, we’ve learned how the Rx v2.0 implementation adds the dance with the SynchronizationContext to the mix, providing a best-effort way to ensure event handler manipulations take place on the right thread. Even with this change, the result is a cold observable sequence: whenever a subscription is made, a handler is hooked up to the target event. In other words, each observer that subscribes gets its own private handler to the underlying .NET event.

This behavior has caught people by surprise from time to time, noticing a lot of event handlers to be registered with an object, most likely because of an incorrect or suboptimal query that missed out on opportunities to share a single event handler by using a Publish operation. We’ve seen cases where users tried to “fix” this by using a Subject to bridge with an event, manually hooking up an event handler to the source event, making it pump OnNext messages into the subject. This solved the problem of multiple subscriptions leading to the same number of event handlers, if it weren’t for the fact the event handler that fed into the subject was never removed when no longer needed. There’s a world of difference between events with no handlers versus events with even just one handler: object allocations. Any .NET developers should know how to raise events:

class Foo
{
    public EventHandler<MyEventArgs> Bar;

    public void OnBar(int x)
    {
        var bar = Bar;
        if (bar != null)
            bar(this, new MyEventArgs(x));
    }
}

The null check is all important here. Not only does it prevent you from causing a NullReferenceException when trying to raise an event that has no handlers attached; it also guards the object allocation of the MyEventArgs objects. Sending those into a single handler that happens to broadcast the object into a subject with no observers attached is plain wasteful:

var s = new Subject<MyEventArgs>();

foo.Bar += (o, e) => s.OnNext(e);

// Use the subject in a query further on

In other words, using the hot observable that is a subject doesn’t work either. Even worse, if we were to make FromEvent* return a hot sequence (just like most other From* bridges do, because they bridge with ongoing operations like APM method pairs or tasks), we’d always new up EventPattern objects, on top of the event argument objects that are handed to us. This is clearly a no-go.

Again, readers with a significant Rx background will know a way around this: compose FromEvent* with Publish and RefCount, and you got an observable sequence that will cause at most one event handler to be attached to the target event. If there are no observers subscribed to this sequence, the one and only event handler would be removed as well. This approach provides a tepid observable, one that’s cold if no-one uses it, and becomes hot (but only for one dose of heath) when any number of observers are using it.

We decided the tepid approach is what people most often want, so we built this behavior into the FromEvent* implementations starting from Rx v2.0. This decision was further motivated by the fact that most .NET events have a hot behavior in and of themselves, in that their producer is already going regardless of the number of event handlers (e.g. mouse moves exist even if no-one is listening for them). Therefore, typical .NET events don’t have significant observable side-effects in their add/remove implementations (other than combining/breaking apart delegates, or registering an entry in a table of sorts). It’s very unlikely for an event to have each of its handlers trigger a “session” of its own. In case you’re faced with such an event, the plain use of Observable.Create is what we recommend instead.

This change fits well in our performance theme for the Rx v2.0 release.

 

Summary

The Rx v2.0 release is a major milestone for Rx. Not only are we adding support for two new platforms, .NET Framework 4.5 and Windows 8 Metro style apps, we’re also providing a bunch of improvements to various aspects of the library. This includes performance (which was the main theme of the beta) as well as error handling (a big theme in RC).

We hope you like what you see. As usual, we welcome all feedback. Stay tuned for future announcements about the Rx v2.0 RTW release later this year, and thanks for your continued interest!

 

Bart J.F. De Smet
Senior SDE – Cloud Programmability Team