Lately, we’ve been talking to a lot of customers (both internally and externally) on their use of Rx. Quite often, meetings like these end up explaining various concepts in Rx, ranging from observable sequences, over subjects, to … schedulers. Talking about schedulers almost always leads to a discussion on how to test Rx queries using virtual time scheduling, for many a hidden gem in Rx. In this post, we’ll talk about the role schedulers play in Rx, and how introducing this concept helped an awful lot to provide a testing story for queries.

 

Hmm… Layered Cake!

The architecture of Rx consists of three layers, which are nicely separated using interfaces (and starting with Rx v2.0, this is reflected in the assembly structure as well). From a 10,000 feet level, the layered cake of Rx looks as follows:

image

  • At the very bottom, you can find our scheduler infrastructure, whose core interface is IScheduler. Using schedulers, one can introduce concurrency, which is required by the layers on top. In this post, we’ll focus on this layer quite a bit. Nonetheless, it’s essential to understand the layers on top as well.
  • The second layer contains what most people are familiar with, namely the core Rx interfaces used to represent observable sequences (IObservable<T>) and their observers (IObserver<T>). Both interfaces live in the BCL nowadays and have been explained thoroughly in many of our Channel 9 videos.
  • Finally, at the very top of the diagram is our LINQ support to perform event processing by writing queries over observable sequences. This is where you’ll find the Observable class defining all kinds of extension methods, mainly on the IObservable<T> type.

The implementation of Rx keeps itself honest with regards to this layer map: functionality at a lower layer doesn’t access functionality higher up the map. Of particular interest for this blog post is how the two upper layers rely on schedulers to get their jobs done. Whether you realize it or not, the Rx schedulers are somehow doing work for you when you’re writing a query and when you subscribe to an observable sequence. Let’s drill in.

 

The Role of Schedulers

Have you ever wondered why the very simple code fragment below does not block upon subscribing to the observable sequence?

var clock = from _ in Observable.Interval(TimeSpan.FromSeconds(1))
            select DateTime.Now;

clock.Subscribe(now =>
{
    Console.WriteLine("It's now {0} o'clock", now);
});

Some of you will likely say: “Obviously, the sequence returned by Interval runs a timer!”. But let me ask a second question then: which timer do you think is being used here? That’s where schedulers come in…

Whenever Rx needs to introduce concurrency to get a job done, it relies on a scheduler to do so. In fact, if you’re writing the code above in Visual Studio, you’ll notice there’s a second overload of Interval available for you to use:

image

 

Under the hood

What’s really going on here? Every time you subscribe to the Interval sequence, the LINQ layer of Rx starts a dialog with the underlying scheduler infrastructure to do some work after 1 second, namely calling the specified observer’s OnNext method. This process is repeated every second, resulting the sequence of ticks to surface to the observer you passed in to Subscribe. In fact, if you set a breakpoint in your OnNext handler, you’ll see where the call is initiated from:

image

(For those who count stack traces to fall asleep at night, you may not have seen this one before. The call stack shown here is from a post-Beta build where we changed a few things around how we deal with time. Stay tuned for more info as we ship our v2.0 Release Candidate.)

As you can see in the call stack, the LINQ layer of Rx (in System.Reactive.Linq) is talking to the scheduling layer of Rx (in System.Reactive.Concurrency), ultimately going down into the System.Threading infrastructure. However, if we parameterized the Interval operator with a different scheduler, the bottom half of the call stack could look vastly different. For example, we could have used the Windows Forms scheduler:

image

The way to think about schedulers is as a single abstraction over all the different ways that exist in the underlying platform to get work done. Think about the many patterns you’ve used in the past to run a piece of work in the background:

  • new Thread(() => { /* do work */ }).Start()
  • ThreadPool.QueueUserWorkItem(_ => { /* do work */ }, null)
  • Task.Factory.StartNew(() => { /* do work */ })
  • syncCtx.Post(_ => { /* do work */ }, null)
  • Dispatcher.BeginInvoke(() => { /* do work */ })

And the list goes on… Notice I’m ignoring the different ways that exist to run work at a given time, leading to a parallel list of constructs one has to know about. In Rx, we’ve abstracted over all those mechanisms using a single interface called IScheduler:

namespace System.Reactive.Concurrency
{
    public interface IScheduler
    {
        DateTimeOffset Now { get; }

        IDisposable Schedule<TState>(TState state,
Func<IScheduler, TState, IDisposable> action);
IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime,
Func<IScheduler, TState, IDisposable> action);
IDisposable Schedule<TState>(TState state, TimeSpan dueTime,
Func<IScheduler, TState, IDisposable> action); } }

Ignore some of the subtleties around recursive scheduling here (let’s keep that for another post), and think of the action parameter as a plain Action delegate. The return type is also of interest: the disposable object can be used for cancellation of the scheduler work (and its recursive spin-offs, if any). Convenience extension methods exist for IScheduler that simplify the signatures to plain Action delegates as well.

 

A sequence generator using schedulers

In order to understand the relationship between the Observable class’s methods and the IScheduler infrastructure a bit better, let’s take a look at how a very simple sequence generator called “Return” works. Using Observable.Return, you’re simply creating a sequence that returns a single element to its observers. For example, the marble diagram of Observable.Return(42) looks as follows:

image

Again, you’d be right to ask yourself the question: “But where does the yellow line run?”. In other words, if I subscribe to the sequence generated by using the Return method, where will I see the OnNext and OnCompleted calls originate from? Whenever such a question can be raised without a clear answer, an overload exists that allows you to specify the IScheduler. The overload without such a parameter will use a meaningful default, following a design principle of using “the least amount of concurrency” required. Because Return returns a constant length sequence, we use a default scheduler called Immediate here, exhibiting synchronous behavior. (A discussion of the Immediate scheduler would be a good subject for a follow-up post, too.)

Let’s now try to implement the overload of Observable.Return that takes in an IScheduler. Here’s the signature:

/// <summary>
/// Returns an observable sequence that contains a single element,
/// using the specified scheduler to send out observer messages.
/// </summary> /// <typeparam name="TResult">
///
The type of the element that will be returned in the produced
/// sequence.
///
</typeparam> /// <param name="value">
///
Single element in the resulting observable sequence.
///
</param> /// <param name="scheduler">
///
Scheduler to send the single element on.
///
</param> /// <returns>
///
Observable sequence containing the single specified element.
///
</returns> public static IObservable<TResult> Return<TResult>(TResult value,
IScheduler scheduler) { }

Luckily, functional programming is easy. It’s just a matter of making sure we’re using all parameters, gluing them together in a way that the type system agrees upon. Let’s start with the thing we have to return, an observable sequence. Resist your urges to start implementing the interface, please! There are many reasons why this is a bad idea (again deserving a blog post of its own). Instead, you should immediately think of your next best friend, Observable.Create:

public static IObservable<TResult> Return<TResult>(TResult value,
IScheduler scheduler) { return Observable.Create<TResult>(subscribe: observer => { }); }

Observable.Create takes in a single delegate that will become the core implementation of the Subscribe method on the resulting IObservable<T> implementation. We do some clever wrapping around this delegate to enforce the observer contract, amongst other things (which is why you shouldn’t implement the interface yourself). I’m using C# 4.0’s named parameter syntax in the code fragment above to make this clear. As we all know, the Subscribe method takes in an observer and returns an IDisposable, which is used to cancel the subscription. Where to we get this disposable from?

Well, we have two more pieces on the kitchen table: the scheduler kitchen appliance and the value ingredient of the resulting observable dish. Clearly, the value isn’t going to give us an IDisposable, so let’s grab the scheduler and see what we can do. Recall the signature of Schedule there? Right, it returns an IDisposable upon giving it a piece of work to run. You can take my word for it: there’s a convenient extension method overload for Schedule that takes in an Action delegate, so we can write the following:

public static IObservable<TResult> Return<TResult>(TResult value,
IScheduler scheduler) { return Observable.Create<TResult>(subscribe: observer => { return scheduler.Schedule(() => { }); }); }

In fact, what we’ve created here is something much like Observable.Never: upon subscribing to the resulting sequence, it doesn’t talk to the observer that was passed in. Clearly, we still need to use the result value somehow if we want to implement Return behavior. The only thing we haven’t used yet is the observer, so there can only be one meaningful implementation at this point:

public static IObservable<TResult> Return<TResult>(TResult value,
IScheduler scheduler) { return Observable.Create<TResult>(subscribe: observer => { return scheduler.Schedule(() => { observer.OnNext(value); observer.OnCompleted(); }); }); }

Congratulations! You’ve just implemented Observable.Return, without implementing the IObservable<T> interface, using all ingredients, only omitting a null-check for the scheduler parameter.

Let’s recap what’s going on here when the user subscribes to the resulting sequence. Obviously, we’re passing in the observer to be called back on. In return, the user expects an IDisposable which will unsubscribe from the sequence, causing no further work to be done (in a best-effort way). We get this simply by returning the disposable resource we got from the scheduler’s call to Schedule, upon passing it the piece of work that will talk to the observer. Under the debugger, the interaction sequence looks like this. First, we step into the Return method:

image

This will return the sequence that’s constructed by Observable.Create. At this point, no code has run inside the sequence (we call this a cold sequence, in that its side-effects don’t occur until a subscription is made). So, next we’ll end up back in the Load event handler, on our way to run the Subscribe method. Let’s step into that one and see what happens:

image

Look, we ended up inside the subscribe lambda we passed to Create. Notice our call stack contains interesting things like ObservableBase, which is the implementation of IObservable<T> that got generated by the call to Create. Ignore the CurrentThreadScheduler stuff at the bottom of the stack (once more, this would make a good subject of a future post).

If we’d continue stepping through the code using F10, we’d end up back in the Load event handler, now with a disposable object in our hands that we got back from the call to Schedule. Well, almost. The returned disposable has a little wrapper around it, used to ensure the observer won’t be called anymore after returning from the Dispose call, even though the scheduler may not be at a good stopping point yet. (Yet another reason you should never implement the IObservable<T> interface by hand. Oh, and by the way, there’s even more!)

Instead, let’s set a breakpoint inside our OnNext handler, and see it getting hit:

image

Notice how the ControlScheduler is the one that executed the work we passed to Schedule, ultimately serving the call to the observer’s OnNext method, which is where we’re broken in the debugger right now. If we had used another IScheduler implementation passed to the Return method, the bottom of the call stack would look totally different.

 

The bottom line?

If there’s one crucial thing to remember from this post, it’s the following simple statement:

Schedulers parameterize where work is run.

When you browse the methods on the Observable class, you’ll notice a lot of them to have an overload that takes in a scheduler. This means the operator has to schedule work, maybe based on time, to get its job done. Luckily, in most cases you don’t even need to worry about this happening under the covers, thanks to the overloads that pass in a meaningful default value. If you want to see all the methods that are parameterized by a scheduler, run the following:

var withScheduler = (from m in typeof(Observable).GetMethods()
                     from p in m.GetParameters()
                     where p.ParameterType == typeof(IScheduler)
                     orderby m.Name
                     select m.Name)
                    .Distinct();

foreach (var method in withScheduler)
    Console.WriteLine(method);

Here’s the list as of Rx v2.0 Beta (a few more will be added in our upcoming RC):

  • Buffer
  • Delay
  • DelaySubscription
  • Empty
  • Generate
  • Interval
  • Merge
  • ObserveOn
  • Range
  • Repeat
  • Replay
  • Return
  • Sample
  • Start
  • StartWith
  • SubscribeOn
  • Take
  • Throttle
  • Throw
  • TimeInterval
  • Timeout
  • Timer
  • Timestamp
  • ToAsync
  • ToObservable
  • Window

I’ve highlighted a few in bold for the reader to ponder about. Where does the concurrency come in that requires a scheduler? (Some of those are a bit more tricky than others; don’t worry if you don’t see the light for Take immediately.)

Other operators such as Where or Select don’t need a scheduler at all. Why’s that? All those operators do is pass through the elements from the source sequence, either after applying a filter or a projection. So, when the source generates a new element by calling OnNext, the filter or the projection simply piggybacks on that call to forward the message to its downstream observer. No additional concurrency is required beyond what’s already introduced by the source sequence. To illustrate this, the code below is a (slightly simplified) version of Where:

public static IObservable<TSource> Where<TSource>(
IObservable<TSource> source,
Func<TSource, bool> predicate) { return Observable.Create<TSource>(subscribe: observer => { return source.Subscribe( x => { if (predicate(x)) observer.OnNext(x); }, observer.OnError, observer.OnCompleted ); }); }

 

Virtual Time Scheduling

By now, you may be wondering what all of this scheduling stuff has to do with the title of this post, namely “Testing Rx Queries”. Recall my earlier remark about the implementation of Rx keeping itself honest:

No concurrent work is introduced in Rx without going through the IScheduler interface.

Stated differently, all of the schedulers passed in to various operators collectively maintain a global view of all the work done in the system. Compare this to an operating system that coordinates calls that end up doing work on the underlying hardware. Or a virtual machine whose instruction set is the gateway to running work. Or the relational engine in a database that can’t live without a storage engine. There are plenty of analogies, but you get the point.

The beauty of this design is the ability to intercept all work and apply different kinds of policies or execution strategies. Typical implementations of IScheduler forward work directly to the underlying infrastructure (such as the .NET task pool or the WinRT thread pool, to name just a few), or maintain a queue of work ordered by the time the work is due. In fact, in the former case, there’s typically a queue maintained elsewhere in the system to dispatch work. As a result, the best way to think of schedulers – as reflected in the interface definition – is three things:

  • Interface to schedule work (cf. the Schedule methods).
  • Keeper of a clock (cf. the Now property).
  • Dispatcher of work (e.g. using a queue, timers, etc.).

Of particular interest for the remainder of the discussion is the second bullet point above: keeper of a clock. Schedulers do have a clock, used to order work. When scheduling a piece of work without specifying a due time, it’s due at the current time. If a due time is specified, you can think of the progress of the clock triggering work to be dispatched that’s already due (which can occur if the scheduler is too busy, so work gets delayed) or due just now.

 

It’s all absolute or relative

As you can see from the IScheduler interface – repeated below – two notions of time are present. One is absolute time, represented by DateTimeOffset, and another is relative time, represented by TimeSpan. Conceptually, scheduling requests based on relative time are converted into jobs with an absolute due time (obtained by adding the Now value at the point of the call to Schedule) and are inserted in a queue that’s maintained based on absolute time, from which work is dispatched.

image

In reality, things are a bit more complicated, and we’ve been doing a lot of work lately in Rx v2.0 RC on improving the scheduler infrastructure across the board (more on that in a separate blog post as we release Rx v2.0 RC).

Nonetheless, the essence is in the interface, acting as the entry point to get work done. This provides a great opportunity to play tricks on how we execute work though. For example, we can virtualize time by mapping the DateTimeOffset and TimeSpan values onto another representation. From the discussion above, all that’s needed to execute work in an order according to the clock is an ordering based on the absolute time representation. As soon as we have that, we can dispatch work in a well-defined order. To put it in BCL terms: the representation of absolute time, say TAbsolute, needs to implement IComparable<TAbsolute>. In addition to this, we need a way to add a relative time value to an absolute time value, in order to get the absolute due time we use to sort things in the work queue.

We captured all of the above in an abstract class called VirtualTimeSchedulerBase<TAbsolute, TRelative> which lives in the System.Reactive.Concurrency namespace and looks as follows (hiding a few members that are irrelevant for this discussion):

image

First of all, notice the class implements IScheduler, so it can be used as a parameter to any LINQ operator that requires a scheduler to introduce its concurrency. The implementation of the interface simply forwards work to the underlying ScheduleAbsolute and ScheduleRelative members, using the abstract ToDateTimeOffset method for conversion of absolute times. The ScheduleRelative method simply adds the current TAbsolute value of the Clock property to the given TRelative value, using the abstract Add method. For the Now property of the IScheduler interface, we have a conversion that goes the other way. The internal representation of Clock based on TAbsolute can be converted back to DateTimeOffset, such that the LINQ layer can query us about our current time. It should be clear all requests through the IScheduler interface can now be mapped onto a global order based on TAbsolute values, while keeping up appearances to the rest of the system, making us look like any other ordinary IScheduler.

Intermezzo 1

You may wonder why we didn’t parameterize IScheduler using an absolute and relative time generic parameter to begin with. Unfortunately, this would lead to generic parameter explosion on members of the Observable class.

While this isn’t a problem at first sight, generic parameter inference in languages like C# and Visual Basic is too weak to make this a viable option all around. When omitting generic parameters from a method call, it’s all or nothing. Consider you now have a hypothetical operator called Qux that transforms an IObservable<TBar> into an IObservable<TFoo> using an IScheduler<TAbs, TRel>. Clearly, this operator has to be parameterized by TBar, TFoo, TAbs, and TRel. But what if any of those four parameters could not be inferred from the context? Now you need to specify everything (which sometimes isn’t even possible, think of anonymous types), and you end up specifying DateTimeOffset and TimeSpan in a lot of places for no obvious reason.

Instead, we went for a mapping scheme where we chose DateTimeOffset and TimeSpan as natural representations of absolute and relative time, and allow virtualization of time by means of a user-supplied mapping. Notice both types have plenty of range, based on ticks represented as 64-bit long values.

Now that we know how the conversion to a virtual time representation is supposed to work, what about the internal representation of the scheduler queue? This too is customizable through the VirtualTimeSchedulerBase<TAbsolute, TRelative> type. Notice the abstract ScheduleAbsolute method entry-point where all the scheduling requests are routed too? This is where work would get inserted into some internal data structure of choice (typically a priority queue). In order to run the work in the order it was received, the abstract GetNext method is used. This method is called to retrieve the stored scheduled items (basically wrappers around the scheduled actions and their state) in the order of their absolute due time, in a one-by-one fashion. When using the public Start or Advance* methods, calls to this method take place to obtain and dispatch work.

Intermezzo 2

Notice the execution of work may cause the scheduling of more work. This is where the Func<IScheduler, TState, IDisposable> part comes in. When Rx needs to run recursive work (e.g. to run a loop), it will talk to the IScheduler passed in to the action delegate in order to schedule the next iteration. This by itself deserves a whole blog post of its own, so we’ll ignore the details of this mechanism for now.

 

Replaying the past

So, what can we do using the VirtualTimeSchedulerBase<TAbsolute, TRelative> base class? Well, we can pick arbitrary representations of time (e.g. integral numbers), plug in a priority query, and start throwing work at the scheduler to have it executed in a well-defined order. That seems to be quite a bit of bootstrapping work in order to get something done, so we included a number of derived types that fix certain design choices on your behalf.

For example, using the VirtualTimeScheduler<TAbsolute, TRelative> type, you get an implementation that fills in the enqueue and dequeue logic using a priority query. The only things left to be implemented are the Add, ToDateTimeOffset, and ToRelative methods. In other words, all you have to tell us is how you want to view time in your virtual world, and how it relates to our notions of time. An example of this is the use of sequence IDs in log files that define a causal order of events, but are not necessarily directly related to UTC timestamps.

Another branch in the class hierarchy is HistoricalSchedulerBase and HistoricalScheduler. Those go the other way to fill in one of the degrees of freedom of their parent virtual time scheduler, namely by fixing the TAbsolute and TRelative parameters for DateTimeOffset and TimeSpan. The detail left to be filled out for classes deriving from HistoricalSchedulerBase is the underlying data structure, which again is fixed using a priority query in the HistoricalScheduler class.

Those schedulers are useful to replay historical data, and write queries over that data, using the virtualized notion of time. In other words, you’re decoupling the notion of time from the local system clock, using the passage of time as defined by the replayed source instead. For example, reading data from a log file – sorted by time – can drive the historical scheduler’s clock forward and generate sequences surfacing the log’s data. Using the same scheduler instance, one can now parameterize Rx operators over those sequences, e.g. to buffer the data based on duration. By doing so, the duration is measured against the data in the file rather than based on the system clock. Virtual time schedulers are also pushed forward simply by calling Start or the Advance* methods, rather than relying on system time. This allows you to replay historical data as fast as possible, e.g. analyzing historical data that spans years in just a matter of seconds.

The picture below illustrates the use of a historical scheduler to replay data from a file and do temporal analysis over it, simply by parameterizing the Buffer operator using the scheduler. Calling AdvanceTo runs the work in the scheduler from the start time (12AM) till the specified time (4AM), merely by draining the queue and invoking the scheduled actions (OnNext of 1 and 2, the code scheduled by the buffer at 2:30AM, OnNext of 3). Notice periodic/recursive scheduling in the Buffer operator puts the next “send out current buffer, and create new buffer” action at 5:00AM upon sending out the buffer at 2:30AM. It should be clear from this picture that the replayed sequences are merely a series of OnNext invoking actions (the green dots) that end up in the same queue as the actions scheduled by the Rx operators (the lightning bolts) that use the same historical scheduler.

image

 

Testing Rx queries

Having a virtualized timeline provides for yet another great capability: testing. By drawing a sequence as a marble diagram in virtual time, we’ve really created a specification of a sequence including its data points (in the form of notifications, i.e. OnNext, OnError, and OnCompleted) as well as its timing. In order to assert the behavior of an Rx query, the trick is to specify zero or more input sequences and zero or more output sequences, and somehow assert the expected outcome against the actual results.

Virtual time schedulers provide the answer on how to draw observables as timelines, and assert the behavior of a query. More specially the TestScheduler derived type (in Microsoft.Reactive.Testing.dll), shown in the class diagram below, is what you need:

image

One of the key things to notice here is the use of long as the representation of virtual time (both absolute and relative) for testing purposes. The abstract methods for conversion and addition are implemented in a trivial way: long values are treated as ticks for both DateTimeOffset and TimeSpan, e.g. a relative virtual time of 100 corresponds to TimeSpan.FromTicks(100).

 

Cold and hot

Important concepts to understand before using the TestScheduler are the notions of cold and hot observables.

  • Cold observable sequences start producing elements upon each subscription to the sequence. For example, Observable.Return is a cold sequence. For each and every observer that’s provided to Subscribe, the gears are put in motion to produce an OnNext and OnCompleted message.
  • Hot observable sequences are doing work even if nobody is listening to them; they feel hot when you put your hands on the engine cover. An example of such a sequence is mouse moves. Even if nobody is observing the sequence, the mouse moves are going on (till this day, there is no feedback mechanism in the OS – or the HID hardware for that matter – to tell the mouse to glue itself to the desk if no program is observing mouse moves).

Another way to look at this from a testing point of view is by bringing the notion of time into account. When drawing the marble diagram for a cold sequence, you’re really stating times relative to the subscription time. On the other hand, if you’re sketching a hot sequence using a marble diagram, events are really relative to the birth of the sequence, and subscriptions are merely tuning in to the sequence from that point on.

The TestScheduler API provides for the creation of both kinds of sequences by specifying a virtual timeline of messages, represented as Notification<T> objects. For example, the specification of a cold sequence with behavior like Return would look as follows:

var scheduler = new TestScheduler();

var xs = scheduler.CreateColdObservable(
    OnNext(10, 42),
    OnCompleted<int>(20)
);

We’ll discuss where the OnNext and OnCompleted methods come from in a moment, but let’s focus on the creation and timing aspect first. Here, we’re asking the scheduler to create a cold observable sequence, which produces an OnNext(42) message at time 10, and an OnCompleted() message at time 20, relative to the subscription time (because it’s cold). Each and every subscription at time N will be followed by those messages at time N + 10 and N + 20 (though it’s possible for the subscription to be disposed at any time). The figure below shows two subscriptions to this sequence happening at absolute times 200 and 220, causing each observer to observe both notifications:

image

A hot observable sequence is created in a similar way, but using CreateHotObservable. Most of Rx’s observable factories return a cold sequence, so assume the sequence below represents some kind of .NET event stream such as mouse moves:

var scheduler = new TestScheduler();

var mouse = scheduler.CreateHotObservable(
    OnNext(190, new Point(125, 63)),
    OnNext(198, new Point(128, 63)),
    OnNext(203, new Point(128, 42)),
    OnNext(209, new Point(130, 40)),
    OnNext(217, new Point(127, 35)),
    OnNext(229, new Point(120, 30)),
    OnNext(232, new Point(105, 28)),
    OnNext(236, new Point(109, 29))
);

Observers are now tuning in to the sequence, receiving values from the point of the subscription on. However, the specification of this stream can be thought of as using times relative to the creation of the sequence. In case of mouse moves, this coincides with time relative to the beginning of all observations (well, since the machine booted), so we can think of it as a specification based on absolute time.

image

You can already guess what the TestScheduler really does when it creates those sequences. It basically schedules On* calls to the subscribed observers based on the timeline specified as a set of timestamped Notification<T> objects (more on that in a moment). In other words, if you were to subscribe to such a sequence and look at the Clock property on the scheduler at the point of receiving the callback, you’d be able to observe the timing of the sequence. For example, consider the marble diagram for the cold “Return” sequence first:

scheduler.Schedule(TimeSpan.FromTicks(190), () =>
{
    xs.Subscribe(
        x  => Console.WriteLine("Received OnNext({0})   at {1}",
x, scheduler.Clock), () =>
Console.WriteLine("Received OnCompleted() at {0}",
scheduler.Clock) ); }); scheduler.Schedule(
TimeSpan.FromTicks(220), () => { xs.Subscribe( x => Console.WriteLine("Received OnNext({0}) at {1}",
x, scheduler.Clock), () =>
Console.WriteLine("Received OnCompleted() at {0}",
scheduler.Clock) ); });

Here, we’re scheduling an action at times 190 and 220 (recall virtual time values on the TestScheduler map to TimeSpan tick values), subscribing to xs, and printing the received value and the clock to the console. Assuming we start the scheduler (as with any virtual time scheduler, it needs to be kicked off), we’d see console output that matches the first marble diagram: OnNext at 200 followed by OnCompleted at 210, and OnNext at 230 followed by OnCompleted at 240.

Obviously, you can do more. For example, you could store the IDisposable returned from Subscribe and schedule a Dispose call at a later time, effectively testing for subscription disposal behavior. However, having to wire up all of this stuff yourself: scheduling when to do a subscription, when to do a dispose, record clock values, etc. is way too tedious. So, there’s more.

 

Observing messages and timings

Testing for the behavior of Rx queries really boils down to asserting the notifications on event streams, as well as the timings. Instead of manual observation of virtual time to assert behavior (as shown in the previous section), what if we provided an observer that does this for you? That’s exactly what TestScheduler’s CreateObserver method does:

var scheduler = new TestScheduler();

var xs = scheduler.CreateColdObservable(
    OnNext(10, 42),
    OnCompleted<int>(20)
);

var observer1 = scheduler.CreateObserver<int>();
scheduler.Schedule(TimeSpan.FromTicks(190), () => xs.Subscribe(observer1));

var observer2 = scheduler.CreateObserver<int>();
scheduler.Schedule(TimeSpan.FromTicks(220), () => xs.Subscribe(observer2));

scheduler.Start();

// Assert messages recorded by the two observers.

What those testable observers do, is nothing more than what we did manually before: they record the notifications that are received – not at the console though – as well as the current Clock value of the scheduler. This is how the interface looks like:

namespace Microsoft.Reactive.Testing
{
    // Summary:
    //     Observer that records received notification messages and timestamps those.
    //
    // Type parameters:
    //   T:
    //     The type of the elements in the sequence.
    public interface ITestableObserver<T> : IObserver<T>
    {
        // Summary:
        //     Gets recorded timestamped notification messages received by the observer.
        IList<Recorded<Notification<T>>> Messages { get; }
    }
}

Recognize something here? The Recorded<Notification<T>> is exactly what we encountered before when writing creating our test sequences:

public ITestableObservable<T> CreateColdObservable<T>(params
Recorded<Notification<T>>[] messages);
public ITestableObservable<T> CreateHotObservable <T>(params
Recorded<Notification<T>>[] messages);

The OnNext and OnCompleted helpers we used in passing before do precisely this: create a Recorded<Notification<T>>, where the Recorded part acts as a container for a timestamp value in the TestScheduler’s virtual time world (based on long):

image

 

Using Unit Test projects

So, where are helpers like OnNext and OnCompleted defined? In order to make things clean and concise when writing test code, we introduced a base class called ReactiveTest for you to derive from. Instead of doing this in your regular playground Console Application project, create a new Unit Test project first though, adding references to the Rx assemblies (including Microsoft.Reactive.Testing). For this sample I’m using Rx v2.0 Beta, though you can do exactly the same in Rx v1.0 (using System.Reactive.dll instead of the four assemblies shown below). Or you could use NuGet to include Rx-Testing.

image

Now tweak the code in generated unit test class by deriving from ReactiveTest, and add the sample code to it:

using System;
using System.Reactive.Concurrency;
using Microsoft.Reactive.Testing;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace UnitTests
{
    [TestClass]
    public class Tests : ReactiveTest
    {
        [TestMethod]
        public void MyReturnClone()
        {
            var scheduler = new TestScheduler();

            var xs = scheduler.CreateColdObservable(
                OnNext(10, 42),
                OnCompleted<int>(20)
            );

            var observer1 = scheduler.CreateObserver<int>();
            scheduler.Schedule(TimeSpan.FromTicks(190),
() => xs.Subscribe(observer1));
var observer2 = scheduler.CreateObserver<int>(); scheduler.Schedule(TimeSpan.FromTicks(220),
() => xs.Subscribe(observer2)); scheduler.Start(); observer1.Messages.AssertEqual( OnNext(200, 42),
// 190 + 10 OnCompleted<int>(210) // 190 + 20 ); observer2.Messages.AssertEqual( OnNext(230, 42), // 220 + 10 OnCompleted<int>(240) // 220 + 20 ); } } }

Starting with Rx v2.0 RC – coming up soon – the AssertEqual extension method I’m using in the last two statements will be available as part of the public API surface in Microsoft.Reactive.Testing. If you’re on previous releases of Rx, it’s trivial to define an AssertEqual extension method on IEnumerable<T> that does the right thing:

public static void AssertEqual<T>(this IEnumerable<T> actual,
params T[] expected) { ReactiveAssert.AreElementsEqual(expected, actual); }

All ReactiveAssert.AreElementsEqual does is triggering an Assert.Fail if the two sequences aren’t equal according to Enumerable.SequenceEqual. Notice both the Recorded<T> and Notification<T> types define equality, so this will assert both the timestamp values as well as the notification kind and payload. It should be plain obvious that the test code written above asserts the timing and message behavior of the two observers when subscribed to xs at different times. Indeed, running the unit test project will confirm the results are correct:

image

However, we can still simplify this code a lot thanks to additional overloads to the Start method defined on TestScheduler. Quite often, all you’re testing in a unit test is the behavior of a single sequence, e.g. the result of an Rx query. Instead of having to specify the subscription time manually, cook up an observer, and reach out to the observer to assert messages, we can bundle up all of this in one single operation:

var scheduler = new TestScheduler();

var xs = scheduler.CreateColdObservable(
    OnNext(10, 42),
    OnCompleted<int>(20)
);

var res = scheduler.Start(() => xs);

res.Messages.AssertEqual(
    OnNext(210, 42),            // Subscribed + 10
    OnCompleted<int>(220)       // Subscribed + 20
);

xs.Subscriptions.AssertEqual(
    Subscribe(200, 1000)        // [Subscribed, Disposed]
);

The lambda expression passed to the Start method in the sample above acts as a sequence constructor. When precise timings are not specified in the call to Start, this sequence constructor is scheduled to be run at virtual time 100 (defined as a constant in ReactiveTest.Created). Subscription to the sequence happens implicitly at virtual time 200 (defined in ReactiveTest.Subscribed). Finally, disposal of the subscription happens at time 1000 (ReactiveTest.Disposed). The call to Start and the assert of the observed messages shown above are pretty much the same as writing the following by hand:

// Create at time 100
var res = default(ITestableObservable<int>);
scheduler.Schedule(TimeSpan.FromTicks(100),  () => res = xs);

// Subscribe at time 200
var sub = default(IDisposable);
var obs = scheduler.CreateObserver<int>();
scheduler.Schedule(TimeSpan.FromTicks(200),  () => sub = xs.Subscribe(obs));

// Dispose at time 1000
scheduler.Schedule(TimeSpan.FromTicks(1000), () => sub.Dispose());

obs.Messages.AssertEqual(
    OnNext(210, 42),
    OnCompleted<int>(220)
);

Creation time and subscription time are separated out, per our discussion of the different behavior of cold versus hot observable sequences. For a hot sequence, the time of the creation is where things may start happening under the sequence’s hood. Also, the side-effects of running the lambda passed to Start would occur at that very point in time (i.e. virtual time 100).

Did you notice the additional assert added at the bottom of the original code fragment? Using the TestScheduler, we can also check the timing of subscriptions to sequences. In this case, we’re asserting a subscription at virtual time 200 (due to our use of default timings, i.e. ReactiveTest.Subscribed which equals 200), and disposal of the subscription at virtual time 1000 (as defined in ReactiveTest.Disposed).

 

Testing sequence generators

With all the tools in our Microsoft.Reactive.Testing toolbox, we should now be up for the task of testing a sequence generator in Rx. Say we want to test the Observable.Interval operator (which creates a recurring timer with a specified periodic interval), how would we do it? Instead of creating our own observable sequence using Create[Hot|Cold]Observable, we’re now simply going to run Observable.Interval in the factory function passed to Start. Let’s give it a try:

[TestMethod]
public void Interval()
{
    var scheduler = new TestScheduler();

    var res = scheduler.Start(() => Observable.Interval(
TimeSpan.FromTicks(234)
)
); res.Messages.AssertEqual( OnNext(434, 0L), OnNext(668, 1L), OnNext(902, 2L) ); }

Given the default subscription at virtual time 200, we expect the first tick to take place at 200 + 234 = 434. Recall the mapping of the TestScheduler’s virtual time is based on treatment of the long values as ticks in TimeSpan and DateTimeOffset values, as observed by the rest of Rx. Due to the default disposal time at virtual time 1000, we expect the last message to be at virtual time 902, i.e. 200 + 3 * 234.

However, when we run the test code shown above, the test fails as follows:

image

Hmm, can you guess what’s up here? The actual output shows no results whatsoever… Tip: think where the Interval operator is getting its concurrency from, required to run the periodic timer. Here’s the rule we stated earlier:

No concurrent work is introduced in Rx without going through the IScheduler interface.

So, where’s the scheduler used by Observable.Interval in the code fragment above? We omitted parameterization by a scheduler, so a default value is used, which happens to be the threadpool scheduler in case of time-based operators. In other words, the first OnNext message would happen at system clock clock time 234ms (give or take, as we’re dealing with scheduling in non-real-time operating systems) relative to the time of the subscription. But our test completed in 50ms! Our test scheduler has no clue whatsoever about the work that’s scheduled to run the timer – the LINQ layer is talking to another scheduler. The fix is to parameterize the Interval operator with the test scheduler object, like this:

[TestMethod]
public void Interval()
{
    var scheduler = new TestScheduler();

    var res = scheduler.Start(() => Observable.Interval(
TimeSpan.FromTicks(234),
scheduler
)
); res.Messages.AssertEqual( OnNext(434, 0L), OnNext(668, 1L), OnNext(902, 2L) ); }

After applying this code change, the test will pass:

image

You’re now well-equipped to test other sequence generators, including your own. For example, going back to our original Return clone, what do you think the scheduler behavior should be?

public static IObservable<TResult> Return<TResult>(TResult value,
IScheduler scheduler) { return Observable.Create<TResult>(subscribe: observer => { return scheduler.Schedule(() => { observer.OnNext(value); observer.OnCompleted(); }); }); }
Got your answer ready? Here’s the test that will pass:
[TestMethod]
public void MyReturnClone()
{
    var scheduler = new TestScheduler();

    var res = scheduler.Start(() => Return(42, scheduler));

    res.Messages.AssertEqual(
        OnNext(201, 42),
        OnCompleted<int>(201)
    );
}

Why does the OnNext message occur at time 201? Recall we’re scheduling the subscription at time 200, causing the “subscribe” anonymous method passed to Create to run. This by itself schedules more work, which will eventually trigger the OnNext and OnCompleted calls on the given observer. In order to take a note of forward progress of time and simulate sequential execution, each scheduled job on the TestScheduler increases the clock with 1 tick. So, the call to subscribe happened at time 200, the clock incremented with 1 tick upon returning from running the subscription logic, and now we find work in our queue that’s due immediately, effectively running it at time 201. Notice our implementation of Return does run the OnCompleted as part of the same scheduled action, hence the OnCompleted message is observed at time 201 as well. If we were to run some kind of recursive scheduling scheme to make the OnCompleted call separately, it’d occur at time 202.

Now you know the “increment by one tick rule” mentioned above, it should be trivial to predict the output of Range:

[TestMethod]
public void Range()
{
    var scheduler = new TestScheduler();

    var res = scheduler.Start(() => Observable.Range(42, 5, scheduler));

    res.Messages.AssertEqual(
        OnNext(201, 42),
        OnNext(202, 43),
        OnNext(203, 44),
        OnNext(204, 45),
        OnNext(205, 46),
        OnCompleted<int>(206)
    );
}

As you can see, each message was sent out by a separate scheduled action, resulting in very granular scheduling units. If we were to run multiple Range subscriptions simultaneously, we’d see interleaving going on:

[TestMethod]
public void RangeInterleavings()
{
    var scheduler = new TestScheduler();

    var xs = Observable.Range(42, 5, scheduler);
    var ys = Observable.Range(24, 5, scheduler);

    var res = scheduler.Start(() => Observable.Merge(xs, ys));

    res.Messages.AssertEqual(
        OnNext(201, 42),
        OnNext(201, 24),
        OnNext(202, 43),
        OnNext(202, 25),
        OnNext(203, 44),
        OnNext(203, 26),
        OnNext(204, 45),
        OnNext(204, 27),
        OnNext(205, 46),
        OnNext(205, 28),
        OnCompleted<int>(206)
    );
}

In this case, the subscription to the sequence returned by the Merge operator happens at virtual time 200, resulting in subscriptions to both xs and ys at that same time (in a left-to-right order, so xs is subscribed to before the subscription to ys). Inside the Range operator’s Subscribe method, work is scheduled to send out the first element of the range as soon as possible, hence both xs and ys are scheduling their first OnNext to happen immediately. This results in two elements in the queue, both with a virtual due time of 200. When we return from the Subscribe call to Merge at time 200, the clock moves up one tick, bringing us at time 201, noticing the two actions scheduled by xs and ys are due now, so we see OnNext(42) and OnNext(24) surface at time 201. Besides sending out the OnNext messages, those scheduled actions also make a recursive scheduling call to send out the next message (and repeat the recursion), so we end up with two entries in the queue again, due at virtual time 201. After the scheduler has run the work due at time 201, it increments the clock to 202, and yet again we find ourselves with two pieces of work queued up, which are due now, this time to send out values 43 and 25. And the recursion goes on…

The diagram below shows the recursive unfolding of the computation going on, ignoring a few details on terminating conditions etc.:

image

On the left of the picture above, you can see the virtual clock as maintained by the TestScheduler. The yellow arrows denote the instruction pointer while executing work on the scheduler. Running through the work will take care of sending out messages – as observed on the resulting sequence on the right, correlated with the timing on the left – and causes the recursion to continue going on.

Intermezzo 3

The recursive scheduling mechanism in Rx allows schedulers to be shared by many event stream processing queries that are running concurrently. As we’ve seen in the sample above, the work to send out messages on the observers of two Range sequences is interleaved properly. Such behavior is also desirable for a lot of other schedulers, e.g. the IScheduler implementations for various UI frameworks, encapsulating a message loop in the end. If we were to produce messages using a loop, we’d block the thread for a long (potentially infinite) time, causing the application to hang.

Starting with Rx v2.0, the recursive scheduling mechanism is bypassed for certain types of schedulers that support long-running tasks without causing problems in the system as a whole. Basically, the LINQ layer in Rx now detects the capabilities of the scheduler (using an IServiceProvider pattern), and if it finds support for ISchedulerLongRunning, it goes down the route of running a loop that periodically checks for cancellation requests. This helps performance a lot, while still keeping the scheduler yielding behavior for schedulers that don’t support this mechanism.

Finally, a word of guidance. Don’t rely on precise tick increment behavior for built-in Rx primitives. For example, does Observable.Return consume one tick for the OnNext message, and one tick for the OnCompleted message, or does it bundle up both calls in one unit of scheduled work? For a constant number of messages (e.g. in Return, Throw, Empty), we typically choose the latter. In fact, when using the default scheduler for those – Immediate that is – the behavior is indistinguishable. For the concurrent case, we may adjust (though rather unlikely) the granularity of scheduling operations in the future.

 

It never happened, I swear!

One of the nice results of the honest layered cake architecture of Rx is how the schedulers are the only way to get things done. Sequence factories like Return, Empty, Never, Range, etc. all take in an IScheduler to run their work (to communicate with observers) on.

So, what about the Observable.Never<T> method? If you look at its signature, you’ll see it doesn’t take a source sequence, so it can’t piggyback on incoming messages. It also doesn’t take a scheduler, so it won’t perform any work. If it were to communicate to observers that are passed in upon subscription, it would require an IScheduler to make the On* calls on. In other words, simply by looking at the signature, you can conclude that the sequence generated by Never<T> won’t ever produce any messages.

Intermezzo 4

What’s Never even used for? It’s a pretty handy sequence constructor in case you need some sequence that denotes a duration. In Rx, we often use observable sequences to represent a duration by taking the time between the subscription and the first produced element to stand for the duration. Because Never doesn’t produce any element – which would indicate the end of the duration – it amounts to an infinite duration. For example, when using our reactive coincidence operators Join and GroupJoin, you can join elements of two streams based on the overlap of their elements’ durations. Such durations are represented by means of an observable sequence. In order to indicate an element has an infinite duration, you could use Never as the object returned from the duration selector function for that element.

Using the TestScheduler infrastructure, we can even prove that Never won’t do anything indeed. The code shown in the screenshot below does just that:

image

Recall how the Start method basically runs all the work in the scheduler queue, until it’s empty. It should be clear from looking at the signature of Never that no work can be scheduled because no scheduler is passed in, so the call to Subscribe at virtual time 200 won’t add anything to the queue. The loop draining the scheduler queue will immediately terminate (well, to be correct, the Dispose call at virtual time 1000 would run), proving no work was ever queued up, hence there’s no communication possible to the observer. Unless we violated something in the implementation of Rx of course. Which we didn’t, as you can see below:

public static IObservable<T> Never<T>()
{
    return Observable.Create<T>(observer => Disposable.Empty);
}

 

Testing standard query operators

In the previous section we saw how to test sequence generators, by parameterizing those using the TestScheduler, allowing to assert timing of messages. Next up is how to test queries that operate on one or more input sequences. In this case, we start from one or more sequences constructed using Create[Hot|Cold]Observable, write a query over those, and run it through Start. Once more, we can now assert the behavior in terms of messages and timing.

The example shown below applies a very simply filter using the Where query operator (here conveniently written using query syntax). Because the Where extension method on IObservable<T> doesn’t take in an IScheduler (see the list of operators parameterized by schedulers earlier in this post), we know for sure it doesn’t schedule work as part of its operation. As such, it piggybacks on the incoming messages, applies the filter, and passes those that pass the filter through to the downstream observer. Stated otherwise, Where shouldn’t influence timing, as we can see from the assertions in the code sample below:

[TestMethod]
public void WhereFilter()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateHotObservable(
        OnNext(210, "Erik"),
        OnNext(220, "Jeffrey"),
        OnNext(230, "Wes"),
        OnNext(240, "Danny"),
        OnNext(250, "Bart"),
        OnNext(260, "Matthew"),
        OnNext(270, "Aaron"),
        OnNext(280, "Georgi"),
        OnNext(290, "Brian"),
        OnCompleted<string>(300)
    );

    var res = scheduler.Start(() => from name in xs
                                    where name.Length <= 4
                                    select name);

    res.Messages.AssertEqual(
        OnNext(210, "Erik"),
        OnNext(230, "Wes"),
        OnNext(250, "Bart"),
        OnCompleted<string>(300)
    );
}

Another interesting example is the use of Take to complete a sequence early, when a specified number of elements has been received and propagated. The point of interest here is the auto-dispose behavior that’s applied to the source subscription upon sending the OnCompleted downstream. (This is yet another reason why manual implementation of IObservable<T> is strongly discouraged. When using Observable.Create<T>, we take care of this for you.) The sample code fragment below clearly illustrates this behavior by asserting the lifetime of the subscription to xs. Take basically piggybacks on the OnNext stream of its source, and when it sees the last element required to satisfy the specified count, it forwards that element, immediately followed by an OnCompleted message. A side-effect of sending out OnCompleted is immediate disposal of the source sequence’s subscription, so we see the subscription end at time 250 – the time of the fifth element.

[TestMethod]
public void Take5()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateHotObservable(
        OnNext(210, "Erik"),
        OnNext(220, "Jeffrey"),
        OnNext(230, "Wes"),
        OnNext(240, "Danny"),
        OnNext(250, "Bart"),
        OnNext(260, "Matthew"),
        OnNext(270, "Aaron"),
        OnNext(280, "Georgi"),
        OnNext(290, "Brian"),
        OnCompleted<string>(300)
    );

    var res = scheduler.Start(() => xs.Take(5));

    res.Messages.AssertEqual(
        OnNext(210, "Erik"),
        OnNext(220, "Jeffrey"),
        OnNext(230, "Wes"),
        OnNext(240, "Danny"),
        OnNext(250, "Bart"),
        OnCompleted<string>(250)
    );

    xs.Subscriptions.AssertEqual(
        Subscribe(200, 250)
    );
}

 

Testing N-ary sequence combinators

We can also test queries that involve multiple sequences of course. For example, consider the CombineLatest method which performs a pairwise combine between the latest observed elements in two or more streams, whenever any of the streams produces an element:

[TestMethod]
public void CombineLatest()
{
    var scheduler = new TestScheduler();

    var a1 = scheduler.CreateHotObservable(
        OnNext(240, 3),
        OnNext(270, 2),
        OnNext(330, 1)
    );

    var a2 = scheduler.CreateHotObservable(
        OnNext(220, 6),
        OnNext(280, 2),
        OnNext(290, 3),
        OnNext(350, 7)
    );

    var a3 = a1.CombineLatest(a2, (A1, A2) => A1 + A2);

    var res = scheduler.Start(() => a3);

    res.Messages.AssertEqual(
        OnNext(240, 9),
        OnNext(270, 8),
        OnNext(280, 4),
        OnNext(290, 5),
        OnNext(330, 4),
        OnNext(350, 8)
    );
}

Think of the code above as two Excel cells changing, requiring the resulting sequence (produced by applying the formula) to produce the sum that updates whenever any of the input cells has changed. In reality, you’d likely use StartWith to seed the cell streams with 0, and use DistinctUntilChanged to avoid triggering an update when a cell has changed to its current value (for example, due to the use of a TextBox change notifcation event, which may cause false positives). The picture below illustrates this sample in an Excel mockup:

image

 

Testing temporal query operators

Next up is testing of temporal constructs in Rx. Nothing’s really different here, other than having to keep in mind the mapping of e.g. TimeSpan values onto the virtualized notion of time based on ticks. For example, the query below does sample a sequence every 30 ticks:

[TestMethod]
public void Sample()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateHotObservable(
        OnNext(210, 1),
        OnNext(220, 2),
        OnNext(240, 3),
        OnNext(270, 4),
        OnNext(280, 5),
        OnNext(310, 6),
        OnNext(320, 7),
        OnNext(325, 8),
        OnNext(330, 9),
        OnNext(335, 10),
        OnNext(340, 11),
        OnCompleted<int>(350)
    );

    var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(30),
scheduler)); res.Messages.AssertEqual( OnNext(230, 2), OnNext(260, 3), OnNext(290, 5), OnNext(320, 7), OnNext(350, 11), OnCompleted<
int>(350) ); xs.Subscriptions.AssertEqual( Subscribe(200, 350) ); }

Other operations like Delay, Throttle, etc. are equally easy to test. When using absolute time – based on DateTimeOffset – the same mapping based on virtual time ticks applies. An example is shown below, using the Timer operator used to start listening to a sequence at a given point in time (also using a SelectMany under the covers):

[TestMethod]
public void AbsoluteTime()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateHotObservable(
        OnNext(210, 1),
        OnNext(230, 2),
        OnNext(260, 3),
        OnNext(300, 4),
        OnNext(350, 5),
        OnNext(410, 6),
        OnNext(480, 7),
        OnNext(560, 8),
        OnNext(650, 9),
        OnCompleted<int>(750)
    );

    var start = new DateTimeOffset(new DateTime(400), TimeSpan.Zero);

    var res = scheduler.Start(() =>
from _ in Observable.Timer(start, scheduler) from x in xs select x); res.Messages.AssertEqual( OnNext(410, 6), OnNext(480, 7), OnNext(560, 8), OnNext(650, 9), OnCompleted<int>(750) ); xs.Subscriptions.AssertEqual( Subscribe(400, 750) ); }

The subscription to the inner sequence only starts at time 400, as a result of using SelectMany (hidden behind the two uses of “from”). Notice how we’re using the constructor of DateTimeOffset that takes in a DateTime with 400 ticks and a zero offset value, adding up to a value whose Ticks property equals 400.

 

Testing higher order operators

One of the biggest advantages of Rx is its ability to deal with streams of streams, also known as higher order streams. This technique is often used to partition a stream into multiple substreams, e.g. as done by GroupBy and Window operators. In other cases, inner streams are used to denote durations etc.

Testing the use of query operators that deal with higher order streams is a bit more sophisticated, but not much. Essentially, it’s key to subscribe to the inner streams using a testable observer that can record timings. An alternative approach is to flatten the stream of streams before writing asserts, e.g. using operators like Merge or SelectMany.

Intermezzo 5

Here’s a bit of guidance on how to deal with higher order sequences, say of type IObservable<IObservable<T>>. In order to be sure you’ll observe all of the elements in the inner sequences, we required you to subscribe immediately to an inner sequence upon receiving it. For example, assume you’re grouping a sequence by a certain key selector. Each time an element appears in the source sequence whose computed key hasn’t been seen before, we send out a new inner stream (an IGroupedObservable<K, T>) on the result sequence. During this OnNext call where you’re handed the new inner sequence, the source pipeline is stalled, allowing you to subscribe to the sequence. As soon as you return from OnNext, elements may get pumped into the inner sequence. Because of this, it’s key to subscribe immediately to the given inner sequence, or you may miss elements.

Various operators will take care of this for you. For example, if you use Merge (or its sibling, SelectMany) on the resulting higher order sequence, each received inner sequence will be subscribed to immediately during the OnNext call. However, if you introduce asynchrony in the pipeline – e.g. by adding an ObserveOn operator to the mix – you’re effectively introducing a time gap during which we’ve handed out the sequence to you, control has been released on the OnNext channel, but subscription happens at a later point in time, causing you to miss elements. We can’t do any caching of elements because we don’t know when – if ever – someone will subscribe to the inner sequence, so the cache could grow in an unbounded fashion.

In summary, higher order sequences are a very powerful mechanism, but some care is required when handling those.

Let’s explore both options, starting from testing partitioning of a stream using GroupBy. First, we define our input using the typical factory pattern on the TestScheduler instance:

[TestMethod]
public void Grouping()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateHotObservable(
        OnNext(210, "Erik"),
        OnNext(220, "Jeffrey"),
        OnNext(230, "Wes"),
        OnNext(240, "Danny"),
        OnNext(250, "Bart"),
        OnNext(260, "Matthew"),
        OnNext(270, "Aaron"),
        OnNext(280, "Georgi"),
        OnNext(290, "Brian"),
        OnCompleted<string>(300)
    );

Next, we define a data structure that records all the information we’re interested in. Typically, the Start method on the scheduler takes care of this, but in the case of using GroupBy this would merely record the IGroupedObservable<K, T> objects, which is to no use if we want to assert what’s inside the groups. There are different ways to go about this, but let’s use a tuple object to record the clock when the group is received, the key of the group, as well as the observer we’re attaching to the group (and which will record the messages received on the group).

var groups = new List<Tuple<long, int, ITestableObserver<string>>>();

Armed with this data structure, we can schedule an action to subscribe to a GroupBy operation over the source sequence, receiving the generated groups, and stuffing results in the list shown above. Here we use the CreateObserver factory pattern to create observers for the groups:

scheduler.Schedule(TimeSpan.FromTicks(200), () =>
{
    xs.GroupBy(x => x.Length).Subscribe(g =>
    {
        var observer = scheduler.CreateObserver<string>();
        g.Subscribe(observer);
        groups.Add(Tuple.Create(scheduler.Clock, g.Key, observer));
    });
});

This code should be self-explanatory, keeping in mind that “g” is an IGroupedObservable whose key is the length of the input string, and whose elements will contain all of the names with the same length. We record the clock, key, and the observer into our data structure. Next, we shouldn’t forget to start the scheduler:

scheduler.Start();

After this returns, all work will have been executed (i.e. the scheduler queue is empty), and we’re ready to assert the results. To do so, we’ll write a little helper that asserts for each group that was recorded (kept in order, because we used a list) the time, key, and messages:

var assertGroup = new Action<int, long, int, Recorded<Notification<string>>[]>(
(index, clock, key, messages) => {
var g = groups[index]; Assert.AreEqual(clock, g.Item1); Assert.AreEqual(key, g.Item2); g.Item3.Messages.AssertEqual(messages); });

This hides the fact we were using a tuple and makes the assert code pretty concise. We could also have gone for a custom class with proper equality comparison operations, enabling us to use Rx’s helper assert method to check sequences for equality, but let’s keep things concise for the purpose of this blog post. Now we’re ready to write the actual asserts, first checking we got 5 groups:

Assert.AreEqual(5, groups.Count);

assertGroup(0, 210, 4, new[] {
    OnNext(210, "Erik"),
    OnNext(250, "Bart"),
    OnCompleted<string>(300)
});

assertGroup(1, 220, 7, new[] {
    OnNext(220, "Jeffrey"),
    OnNext(260, "Matthew"),
    OnCompleted<string>(300)
});

assertGroup(2, 230, 3, new[] {
    OnNext(230, "Wes"),
    OnCompleted<string>(300)
});

assertGroup(3, 240, 5, new[] {
    OnNext(240, "Danny"),
    OnNext(270, "Aaron"),
    OnNext(290, "Brian"),
    OnCompleted<string>(300)
});

assertGroup(4, 280, 6, new[] {
    OnNext(280, "Georgi"),
    OnCompleted<string>(300)
});

Notice how each group receives a completion message when the source stream completes.

An alternative way of asserting behavior of higher order streams is to flatten the results inside a query. For example, given a stream partitioned using windowing operations, we could aggregate the elements in each window to a single element, and flatten all of those aggregated one-element streams using SelectMany:

[TestMethod]
public void Flattening()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateHotObservable(
        OnNext(210, 1),
        OnNext(230, 2),
        OnNext(260, 3),
        OnNext(300, 4),
        OnNext(350, 5),
        OnNext(410, 6),
        OnNext(480, 7),
        OnNext(560, 8),
        OnNext(650, 9),
        OnCompleted<int>(750)
    );

    var res = scheduler.Start(() =>
from w in xs.Window(TimeSpan.FromTicks(100), scheduler) from o in w.ToList() select string.Join(", ", o)); res.Messages.AssertEqual( OnNext(300, "1, 2, 3, 4"), OnNext(400, "5"), OnNext(500, "6, 7"), OnNext(600, "8"), OnNext(700, "9"), OnNext(750, ""), OnCompleted<string>(750) ); xs.Subscriptions.AssertEqual( Subscribe(200, 750) ); }

In this sample, the ToList aggregation operator is used, which transforms an IObservable<T> into an IObservable<IList<T>> containing one element, a list containing all the elements received in the input sequence. The use of SelectMany (through multiple “from” clauses) grabs the lists from those inner streams, presents them in a single “flat” stream, for which we project each list onto a string using the string.Join method. The same technique could have been used to test for the GroupBy behavior:

[TestMethod]
public void Grouping_Flattening()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateHotObservable(
        OnNext(210, "Erik"),
        OnNext(220, "Jeffrey"),
        OnNext(230, "Wes"),
        OnNext(240, "Danny"),
        OnNext(250, "Bart"),
        OnNext(260, "Matthew"),
        OnNext(270, "Aaron"),
        OnNext(280, "Georgi"),
        OnNext(290, "Brian"),
        OnCompleted<string>(300)
    );

    var res = scheduler.Start(() =>
        from g in xs.GroupBy(x => x.Length)
        let t = scheduler.Clock
        from n in g.ToList()
        select string.Format("{0} @ {1}: {2}", g.Key, t, string.Join(", ", n))
    );

    res.Messages.AssertEqual(
        OnNext(300, "4 @ 210: Erik, Bart"),
        OnNext(300, "7 @ 220: Jeffrey, Matthew"),
        OnNext(300, "3 @ 230: Wes"),
        OnNext(300, "5 @ 240: Danny, Aaron, Brian"),
        OnNext(300, "6 @ 280: Georgi"),
        OnCompleted<string>(300)
    );
}

More timing information could be recorded simply by obtaining scheduler.Clock at any point in the query. Notice the “t” assignment in the query happens for each group creation, and before the aggregated list-containing observable are being flattened. Order matters a lot here: moving the “let” clause after the ToList call would result in the clock value being obtained at a later point in the execution of the query.

 

Testing for error conditions

User code executing as part of queries can fail. Errors that occur during the execution of user code should bubble up to the observer through the OnError channel. (Stay tuned for improvements in error handling in Rx v2.0 RC.) Testing for this behavior is pretty straightforward, as illustrated in the sample below:

[TestMethod]
public void UserErrors()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateHotObservable(
        OnNext(210, 8),
        OnNext(220, 3),
        OnNext(230, 2),
        OnNext(240, 5),
        OnNext(250, 4),
        OnNext(260, 0),
        OnNext(270, 1),
        OnNext(280, 9),
        OnNext(290, 7),
        OnCompleted<int>(300)
    );

    var res = scheduler.Start(() => xs.Select(x => 100 / x));

    res.Messages.Take(5).AssertEqual(
        OnNext(210, 12),
        OnNext(220, 33),
        OnNext(230, 50),
        OnNext(240, 20),
        OnNext(250, 25)
    );

    var err = res.Messages.Skip(5).Single();
    Assert.AreEqual(260, err.Time);
    Assert.IsTrue(err.Value.Exception is DivideByZeroException);

    xs.Subscriptions.AssertEqual(
        Subscribe(200, 260)
    );
}

The AssertEqual extension method could be used to check for an OnError message as well, in case the exception type supports proper equality checks. Often, you’ll introduce an error by throwing an exception inside the query, to observe the desired error propagation behavior:

[TestMethod]
public void UserErrors_More()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateHotObservable(
        OnNext(210, 7),
        OnNext(220, 3),
        OnNext(230, 2),
        OnNext(240, 5),
        OnCompleted<int>(250)
    );

    var ex = new Exception();

    var res = scheduler.Start(() => xs.Select(x =>
    {
        if (x % 2 == 0)
            throw ex;
        else
            return x;
    }));

    res.Messages.AssertEqual(
        OnNext(210, 7),
        OnNext(220, 3),
        OnError<int>(230, ex)
    );

    xs.Subscriptions.AssertEqual(
        Subscribe(200, 230)
    );
}

For both these samples, observe how the source sequence subscription is automatically disposed when an OnError message is propagated, as checked by the assert on the subscriptions.

 

Conclusion

Schedulers in Rx provide a great point for interception between the LINQ layer – where you formulate queries – and the concurrency layer – where scheduled jobs perform time-based operations and drive sequences. Using this abstraction, one can virtualize time in Rx, e.g. to replay historical events and perform analysis using temporal operators, independent of the actual system clock. This same abstraction gives us a great way to perform testing of event stream queries, by generating sequences in virtual time and parameterizing query operators by supplying the TestScheduler.

 

Bart J.F. De Smet
Senior SDE – Cloud Programmability Team