Being Cellfish

Stuff I wished I've found in some blog (and sometimes did)

July, 2011

Change of Address
This blog has moved to
  • Being Cellfish

    CCR tips and tricks - part 14


    Sometimes you need to call an asynchronous API that is not created for CCR. While there are several types of asynchronous interfaces the pattern to do it from CCR is basically the same. Here is an example:

      1: private IEnumerator<ITask> ReadStreamWithCcr(
      2:     Stream stream, 
      3:     PortSet<string, Exception> resultPort)
      4: {
      5:     var asyncPort = new Port<IAsyncResult>();
      6:     byte[] buffer = new byte[42];
      7:     stream.BeginRead(buffer, 0, buffer.Length, asyncPort.Post, null);
      9:     yield return asyncPort.Receive(
     10:         asyncResult =>
     11:             {
     12:                 int count = stream.EndRead(asyncResult);
     13:                 resultPort.Post(Encoding.ASCII.GetString(buffer, 0, count));
     14:             });
     15: }

    Note how the Port<IAsyncResult>.Post method is used as a callback since it has the same signature as the AsyncCallback. There is currently no helper in CCR that uses this pattern but you can write your own if you (like me) don't like to create ports everywhere. You need two helpers since some EndXxx methods return a value and others don't.

     16: public static PortSet<EmptyValue, Exception> DoAsyncOperation(
     17:     Action<AsyncCallback> beginAction, 
     18:     Action<IAsyncResult> endAction)
     19: {
     20:     return DoAsyncOperation<EmptyValue>(
     21:         beginAction,
     22:         result =>
     23:         {
     24:             endAction(result);
     25:             return EmptyValue.SharedInstance;
     26:         });
     27: }
     29: public static PortSet<T, Exception> DoAsyncOperation<T>(
     30:     Action<AsyncCallback> beginAction, 
     31:     Func<IAsyncResult, T> endAction)
     32: {
     33:     var resultPort = new PortSet<T, Exception>();
     34:     try
     35:     {
     36:         beginAction(result =>
     37:         {
     38:             try
     39:             {
     40:                 var r = endAction(result);
     41:                 resultPort.Post(r);
     42:             }
     43:             catch (Exception e)
     44:             {
     45:                 resultPort.Post(e);
     46:             }
     47:         });
     48:     }
     49:     catch (Exception e)
     50:     {
     51:         resultPort.Post(e);
     52:     }
     54:     return resultPort;
     55: }

    With that helper the original method looks like this:

     58: public IEnumerator<ITask> ReadStreamWithCcr2(
     56:     Stream stream, 
     57:     PortSet<string, Exception> resultPort)
     58: {
     59:     byte[] buffer = new byte[42];
     60:     var readPort = DoAsyncOperation<int>(
     61:             cb => stream.BeginRead(buffer, 0, buffer.Length, cb, null),
     62:             stream.EndRead);
     63:     yield return readPort.Choice(
     64:         c => resultPort.Post(Encoding.ASCII.GetString(buffer, 0, c)), 
     65:         resultPort.Post);
     66: }

    Looks very similar doesn't it. But there is a big difference! BeginXxx and EndXxx methods may throw exceptions and the first implementation I showed you does not handle that. So probably you want to add some try-catch-blocks (unless you purely rely on causalities to handle errors) which makes the first implementation more cumbersome to work with. And you'll probably forget to add those try-blocks once in a while...

  • Being Cellfish

    CCR tips and tricks - part 7


    There are many ways you can receive messages on a port. In all examples below I'll be using two implementations of Fibonacci that uses CCR. The methods I call have the following signatures:

     1: public Port<int> Fibonacci(int n)
     2: public IEnumerator<ITask> Fibonacci(int n, Port<int> resultPort)

    The assumption is that the first method creates a port, spawns a task using the second method and returns the result port hence calculating the Fibonacci number asynchronously. The second method just calculates the number synchronously and posts the result on the given port.

    First out is a very common way that I call "call and receive". Basically you create a little method train using the fact that the method we call spawns a task and returns a result port.

     1: public IEnumerator<ITask> CallAndReceive(
     2:     int n, 
     3:     Port<KeyValuePair<int, int>> resultPort)
     4: {
     5:     yield return Fibonacci(n)
     6:         .Receive(s => resultPort.Post(new KeyValuePair<int, int>(n, s)));
     7: }

    The second way I call "spawn and receive". The interesting thing about this one is that if the spawned operation completes before the receive is activated the message lives in the port until the receive is activated. This means that theoretically you have slightly worse performance than if you have the receive already activated when the result is posted.

     1: public IEnumerator<ITask> SpawnAndReceive(
     2:     int n, 
     3:     Port<KeyValuePair<int, int>> resultPort)
     4: {
     5:     var port = new Port<int>();
     6:     Arbiter.Activate(
     7:         dispatcherQueue,
     8:         Arbiter.FromIteratorHandler(() => Fibonacci(n, port)));
     9:     yield return port.Receive(
     10:         s => resultPort.Post(new KeyValuePair<int, int>(n, s)));
     11: }

    In the third example called "receive and spawn" I guarantee that the receiver is activated before the task is executed. Hence the theoretical performance penalty given above is eliminated since in this example there will always be a receiver activated when the result is posted. Also note that I here use the Arbiter.Receive method rather than the Port.Receive extension method used above.

     1: public IEnumerator<ITask> ReceiveAndSpawn(
     2:     int n, 
     3:     Port<KeyValuePair<int, int>> resultPort)
     4: {
     5:     var port = new Port<int>();
     6:     Arbiter.Activate(
     7:         dispatcherQueue, 
     8:         Arbiter.Receive(
     9:         false, 
     10:         port, s => resultPort.Post(new KeyValuePair<int, int>(n, s))));
     11:     yield return new IterativeTask(() => Fibonacci(n, port));
     12: }

    The forth way I call "spawn and test" uses two features of CCR that are not that well known. especially if you're new to CCR. This example (and the one above) uses "yield return new IterativeTask" which actually executes that task to completion before returning. This is equivalent to using Arbiter.ExecuteToCompletion. The second feature used is the fact that casting a Port (or PortSet) to a given type will remove a message from the port and return its value or the default value if no message of given type is available. This is truly an interesting feature of CCR since it means that the cast operator now has a side effect on the object you're casting from.

     1: public IEnumerator<ITask> SpawnAndTest(
     2:     int n, 
     3:     Port<KeyValuePair<int, int>> resultPort)
     4: {
     5:     var port = new Port<int>();
     6:     yield return new IterativeTask(() => Fibonacci(n, port));
     7:     int result = port;
     8:     resultPort.Post(new KeyValuePair<int, int>(n, result));
     9: }

    Line 7 could also be written like this:

     7:     int result = (int)port.Test();

    or this:

     7:     int result;
    7.5:     port.Test(out result);

    Of these four variants the receive and spawn is the most dangerous one to use since it's easy to introduce a race condition if you're not careful. Take a look at the following code where some local variables are introduced to increase (?) readability. Maybe you'd do something like this to see values in the debugger.

     1: public IEnumerator<ITask> ReceiveAndSpawnWithRaceCondition(
     2:     int n, 
     3:     Port<KeyValuePair<int, int>> resultPort)
     4: {
     5:     int result = 0;
     6:     var port = new Port<int>();
     7:     Arbiter.Activate(
     8:         dispatcherQueue, 
     9:         Arbiter.Receive(false, port, s => result = s));
     10:     yield return new IterativeTask(() => Fibonacci(n, port));
     11:     resultPort.Post(new KeyValuePair<int, int>(n, result));
     12: }

    This code probably works fine when you step through it in the debugger, but there is a race condition. The problem is that when the Fibonacci method posts the result, the result handler (line 9) is scheduled for execution. You do not know when it's executed. If you're unlucky the method above gets back after completing the iterative task and executes line 11 before the handler on line 9 and hence the wrong result is posted back to the caller.

  • Being Cellfish

    CCR tips and tricks - part 17


    Sometimes when you have a number of things that have to be executed in sequence a common pattern is to have some kind of status variable to track errors and abort the sequence when there is an error. There is however another pattern that works well not only with sequential work but also works for scatter-gather operations; the abort port (say that fast and it sounds like fake Swedish). Here is an example of a method to calculate a Fibonacci number that can be aborted:

      1: public void FibonacciAbortable(
      2:     int n, 
      3:     Port<int> resultPort, 
      4:     Port<Exception> abortPort)
      5: {
      6:     if (n < 0)
      7:     {
      8:         abortPort.Post(new ArgumentOutOfRangeException("n"));
      9:     }
     10:     else if (n <= 1)
     11:     {
     12:         resultPort.Post(n);
     13:     }
     14:     else
     15:     {
     16:         int n1 = -1;
     17:         Port<int> n1port = new Port<int>();
     18:         Port<int> n2port = new Port<int>();
     19:         Arbiter.Activate(
     20:             dispatcherQueue, 
     21:             Arbiter.FromHandler(
     22:                 () => this.FibonacciAbortable(n - 1, n1port, abortPort)));
     23:         Arbiter.Activate(
     24:             dispatcherQueue,
     25:             Arbiter.Choice(
     26:                 Arbiter.Receive(
     27:                     false,
     28:                     n1port,
     29:                     s =>
     30:                         {
     31:                             n1 = s;
     32:                             Arbiter.Activate(
     33:                                 dispatcherQueue,
     34:                                 Arbiter.FromHandler(
     35:                                     () => this.FibonacciAbortable(
     36:                                         n - 2, n2port, abortPort)));
     37:                         }),
     38:                 Arbiter.Receive(false, abortPort, abortPort.Post)));
     39:         Arbiter.Activate(dispatcherQueue, Arbiter.Choice(
     40:             Arbiter.Receive(false, n2port, s => resultPort.Post(s + n1)),
     41:             Arbiter.Receive(false, abortPort, abortPort.Post)));
     42:     }
     43: }

    There are four important tricks to consider when you use an abort port:

    • If you have an error port, you want to post the error to the abort port too since it's a good pattern to let the message on the abort port be the reason for the abort.
    • All handlers for the abort port must repost the message to the same port so that any number of tasks can be aborted with just one abort item. This means that once all tasks have been aborted there should be once item in the abort port queue.
    • If you wrap code you do not know will repost to the abort port you should not use your own abort port for child tasks; let child tasks have their own abort port.
    • The use of an abort port will not guarantee that tasks are canceled, only things you ignore because of an abort port will be ignored. it also means that scheduled tasks will post responses so the use of a choice is important to prevent other handlers form executing.
  • Being Cellfish

    CCR tips and tricks - part 6


    Today I have three recommendations for your CCR methods. For methods that are public interfaces to components I would recommend methods that returns ports like this:

     1: public PortSet<ResultType, Exception> DoSomething()

    This makes it easy for the consumer (yourself or other developers) to just call the method and then use the port returned. As a consumer I do not need to know how to create the port needed and I think that is convenient. It also means the implementation may complete synchronously or asynchronously. As a caller I don't really care.

    For internal helpers I however tend to use the following two variants:

     1: private IEnumerator<ITask> DoSomething(PortSet<ResultType, Exception> resultPort)
     2: private void DoSomething(PortSet<ResultType, Exception> resultPort)

    The first one should only be used if the method itself needs to yield return. If it ends up having a yield break in the end just to pass compilation you should go for the second variant. However if the method is part of an interface definition I always use the first (iterator) variant.

    So wait you may think. isn't an interface public and hence should have the variant returning a port? And the answer is yes, if the interface is public. But if the interface is internal and it may make sense to treat it as an internal helper.

  • Being Cellfish

    Kinect services for RDS 2008 R3


    I hope you noticed that we realeased an update for RDS yesterday...

  • Being Cellfish

    CCR tips and tricks - part 9


    The pattern in CCR to use iterators to implements tasks is very powerful since it means that the code of the task looks very synchronous even though it performs asynchronous tasks. But sometimes you may yield on the wrong thing and the resulting behavior is unexpected. But even before you use an iterative handler (as they're called in CCR) you should ask yourself if you really need it. The rule of thumb is fairly easy. If you need a "yield break" in the end of your method to make it compile (since there are no "yield return" in it) you should not use an iterative handler. The exception is if you implement some interface or base implementation that others will override and where you anticipate those implementations to need an iterative handler. The reason to not use an iterative handler unless needed is that there is a slight performance penalty from using iterative handlers so if you don't need them, don't pay the price.

    In the following code examples I will be reusing the following simple methods/variables (and yes I have an iterative handler that should not be it, but it's used to show a difference between iterative handlers and regular handlers.

      1: private DispatcherQueue dispatcherQueue = new DispatcherQueue();
      2: private bool didSomething = false;
      3: private bool didSomethingElse = false;
      5: private void DoSomething()
      6: {
      7:     didSomething = true;
      8: }
     10: private IEnumerator<ITask> DoSomethingElse()
     11: {
     12:     didSomethingElse = true;
     13:     yield break;
     14: }

    Now look at the following test. Notice that the iterative task is never executed. This is because yielding on a regular task executes that task but never returns to the original method.

      1: private IEnumerator<ITask> DoItAllWrongWay(ManualResetEvent mre)
      2: {
      3:     yield return Arbiter.FromHandler(DoSomething);
      4:     yield return Arbiter.FromIteratorHandler(DoSomethingElse);
      5:     mre.Set();
      6: }
      8: [TestMethod]
      9: public void Doing_the_wrong_thing()
     10: {
     11:     var mre = new ManualResetEvent(false);
     12:     Arbiter.Activate(
     13:         dispatcherQueue, 
     14:         Arbiter.FromIteratorHandler(() => this.DoItAllWrongWay(mre)));
     15:     Assert.IsFalse(
     16:         mre.WaitOne(TimeSpan.FromSeconds(3)), 
     17:         "Should not complete tasks");
     18:     Assert.IsTrue(didSomething);
     19:     Assert.IsFalse(didSomethingElse);
     20: }

    Now look at what happens when we yield return on the iterative handler first instead. Now both helper methods execute but we still don't complete the calling method since yielding on the regular handler never returns.

      1: private IEnumerator<ITask> DoItWrongButItWorksAlmost(ManualResetEvent mre)
      2: {
      3:     yield return Arbiter.FromIteratorHandler(DoSomethingElse);
      4:     yield return Arbiter.FromHandler(DoSomething);
      5:     mre.Set();
      6: }
      8: [TestMethod]
      9: public void Doing_the_wrong_thing_but_it_almost_works()
     10: {
     11:     var mre = new ManualResetEvent(false);
     12:     Arbiter.Activate(
     13:         dispatcherQueue, 
     14:         Arbiter.FromIteratorHandler(() => this.DoItWrongButItWorksAlmost(mre)));
     15:     Assert.IsFalse(
     16:         mre.WaitOne(TimeSpan.FromSeconds(3)), 
     17:         "Should not complete tasks");
     18:     Assert.IsTrue(didSomething);
     19:     Assert.IsTrue(didSomethingElse);
     20: }

    Last we'll look at the correct way of dealing with this scenario.

      1: private IEnumerator<ITask> DoItRight(ManualResetEvent mre)
      2: {
      3:     yield return Arbiter.ExecuteToCompletion(
      4:         dispatcherQueue, new Task(DoSomething));
      5:     yield return new IterativeTask(DoSomethingElse);
      6:     mre.Set();
      7: }
      9: [TestMethod]
     10: public void Doing_the_right_thing()
     11: {
     12:     var mre = new ManualResetEvent(false);
     13:     Arbiter.Activate(
     14:         dispatcherQueue, 
     15:         Arbiter.FromIteratorHandler(() => this.DoItRight(mre)));
     16:     Assert.IsTrue(mre.WaitOne(TimeSpan.FromSeconds(3)), "Should complete tasks");
     17:     Assert.IsTrue(didSomething);
     18:     Assert.IsTrue(didSomethingElse);
     19: }

    Note that for iterative tasks you may actually use an alternative syntax (line 5) when yielding to an iterative task.

  • Being Cellfish

    CCR tips and tricks - part 15


    In part 14 I showed you how to work with an asynchronous API from CCR. Today we'll handle synchronous code form CCR. Since one of the most common ways to work with CCR is to use a dispatcher with one thread per core you do not want to block one thread by waiting on some synchronous, long running operation. Let's create a blocking operation first:

    private void BlockingMethod(int seconds)

    Calling this from a CCR thread would in most cases be devastating for performance of your code. If this method is called multiple times from different CCR tasks you could easily have all your CCR threads blocked in this method and that is probably not what you want. But we can fix this by using a feature of the DispatcherQueue. If you create a DispatcherQueue with no arguments it will not use a default dispatcher (with one thread per core), it will use the CLR thread pool! A good pattern to use if you do not know which dispatcher your code will be used with (this might be true if you have a library of functions for others to use) is demonstrated in this code:

      1: private IEnumerator<ITask> MethodThatNeedToCallBlockingMethodFromCcr(
      2:     Port<EmptyValue> donePort)
      3: {
      4:     if (!Thread.CurrentThread.IsThreadPoolThread)
      5:     {
      6:         using (var clrTaskQueue = new DispatcherQueue())
      7:         {
      8:             yield return
      9:                 Arbiter.ExecuteToCompletion(
     10:                     clrTaskQueue,
     11:                     new IterativeTask<Port<EmptyValue>>(
     12:                         donePort,
     13:                         MethodThatNeedToCallBlockingMethodFromCcr));
     14:             yield break;
     15:         }
     16:     }
     18:     BlockingMethod(42);
     19:     donePort.Post(EmptyValue.SharedInstance);
     20: }

    There are two reasons for the guard clause in the beginning that reschedules the task on a CLR thread;

    • You do not want to reschedule if already on a CLR thread since every time you schedule a task there is a small overhead.
    • You do not want to use the CLR thread pool for all of your CCR code since using the CLR thread pool dispatcher queue has a greater overhead for each task executed than using a standard CCR dispatcher queue with a more limited dispatcher. CCR has very small overhead when running with one thread per core.
  • Being Cellfish

    CCR tips and tricks - part 8


    The scatter gather pattern where you spawn multiple tasks and then wait for them all to complete is a common pattern used with CCR. if you have a small, well known number of tasks to spawn the use of a joined receiver combined with a choice for errors can be a slick way of achieving "wait for all to succeed or the first failure". That is assuming that one failure means the whole scatter and gather operation failed.

      1: public IEnumerator<ITask> Fibonacci(
      2:     int n, 
      3:     PortSet<int, Exception> resultPort)
      4: {
      5:     if (n < 0)
      6:     {
      7:         resultPort.Post(new ArgumentOutOfRangeException("n"));
      8:     }
      9:     else if (n <= 1)
     10:     {
     11:         resultPort.Post(n);
     12:     }
     13:     else
     14:     {
     15:         var n1port = Fibonacci(n - 1);
     16:         var n2port = Fibonacci(n - 2);
     17:         var joinedSuccess = Arbiter.JoinedReceive<int, int>(
     18:             false, 
     19:             n1port, 
     20:             n2port, 
     21:             (n1, n2) => resultPort.Post(n1 + n2));
     22:         yield return Arbiter.Choice(
     23:             joinedSuccess,
     24:             Arbiter.Receive<Exception>(false, n1port, resultPort.Post),
     25:             Arbiter.Receive<Exception>(false, n2port, resultPort.Post));
     26:     }
     27: }

    This implementations computes a specific Fibonacci number by computing the two parts in parallel. Not the most efficient way to do this but it shows how a joined receiver is used to collect the result but if there is any error it is returned immediately without waiting for the two results. This does not mean that "the other" task is aborted. More on that in a future post in this series.

  • Being Cellfish

    CCR tips and tricks - part 19


    When you want to throttle execution based on some resource you can do this easily with CCR and also get the benefit of being able to add and remove resources as needed. For example you may want to throttle some computation to two instances at any given time, but under heavy load you want to double the number of computations and then as load reduces also reduce the number of concurrent computations. To do this you can use what I call the resource port pattern. It uses a joined receiver and a resource port to control how many concurrent handlers there will be.

      1: [TestMethod]
      2: public void BasedOnResources()
      3: {
      4:     var resourcePort = new Port<int>();
      5:     var argumentPort = new Port<int>();
      6:     var resultPort = new Port<int>();
      7:     Arbiter.Activate(
      8:         dispatcherQueue,
      9:         Arbiter.JoinedReceive(
     10:             true,
     11:             resourcePort,
     12:             argumentPort,
     13:             (resource, argument) =>
     14:                 {
     15:                     try
     16:                     {
     17:                         resultPort.Post(resource + argument);
     18:                     }
     19:                     finally
     20:                     {
     21:                         resourcePort.Post(resource);
     22:                     }
     23:                 }));
     24:     resourcePort.Post(42);
     25:     for (int i = 0; i < 1000; i++)
     26:     {
     27:         argumentPort.Post(i);
     28:     }
     29:     resourcePort.Post(4711);
     30:     var mre = new ManualResetEvent(false);
     31:     Arbiter.Activate(dispatcherQueue, resultPort.Receive(_ => mre.Set()));
     32:     Assert.IsTrue(mre.WaitOne(TimeSpan.FromSeconds(3)));
     33:     mre.Reset();
     34:     Arbiter.Activate(dispatcherQueue, resourcePort.Receive(CcrServiceBase.EmptyHandler));
     35:     Arbiter.Activate(dispatcherQueue, resourcePort.Receive(_ => mre.Set()));
     36:     Assert.IsTrue(mre.WaitOne(TimeSpan.FromSeconds(3)));
     37:     Assert.AreNotEqual(1000, resultPort.ItemCount);
     38:     Assert.AreNotEqual(0, resultPort.ItemCount);
     39:     Assert.AreNotEqual(0, argumentPort.ItemCount);
     40: }

    First notice that the joined receive handler always repost the resource when finished (line 21). Then notice how we add a second resource (line 29). Last the resource port is drained to completely stop execution of the joined handler (lines 34 and 35). Execution of the joined receiver could be resumed by posting to the resource port again.

  • Being Cellfish

    CCR tips and tricks - part 10


    Today we'll continue using the same base helpers as in part 9 and today we'll look at a common mistake made in iterative handlers. Take a look at the following code:

      1: private IEnumerator<ITask> ForgettingToYield(ManualResetEvent mre)
      2: {
      3:     var port = new Port<EmptyValue>();
      4:     port.Post(EmptyValue.SharedInstance);
      5:     Arbiter.Receive(false, port, v => mre.Set());
      6:     yield break;
      7: }
      9: [TestMethod]
     10: public void Forgetting_to_yield()
     11: {
     12:     var mre = new ManualResetEvent(false);
     13:     Arbiter.Activate(
     14:         dispatcherQueue, 
     15:         Arbiter.FromIteratorHandler(() => this.ForgettingToYield(mre)));
     16:     Assert.IsFalse(
     17:         mre.WaitOne(TimeSpan.FromSeconds(3)), "Should not complete tasks");
     18: }

    Note the mistake on line 5. Since the task created is not yielded on it will not execute. In my experience these type of errors are easily found while writing unit tests. They look weird sine you think "why is this not executing" even though the code "look" right.

Page 1 of 2 (17 items) 12