Being Cellfish

Stuff I wished I've found in some blog (and sometimes did)

July, 2011

Change of Address
This blog has moved to blog.cellfish.se.
Posts
  • Being Cellfish

    CCR tips and tricks - part 21

    • 0 Comments

    When working with resource ports as in part 19 and 20 there is another thing you may need to do if you rely on causalities. Before reposting the resource (or enqueue a new timer) you should do this:

     1: Dispatcher.ClearCausalities();
    

    That call removes all causalities from the current thread. This is important since otherwise you may leak causalities through your resource. The reason is that when the resource is reposted any causalities will be stored with the resource in the port. When the joined receiver handler executes it's going to merge the causalities from the resource with the causalities from the argument port. If all items on the argument part all use the same causality there is no problem since the merge will see that both ports have the same causality and merge them into one. However if each argument have its own causality (common case when using CCR in web services for example) it's going to be a problem; The first time the joined receiver executes one causality from the argument port is merged with nothing from the resource port. When the resource is reposted the causality travels with it. The second time this resource is join received a new causality is joined with the old causality and now we have two causalities being posted with the resource port and so on. So without clearing the causalities you've created a causality leak that eventually will generate an unhandled exception and your code stops working.

  • Being Cellfish

    CCR tips and tricks - part 20

    • 0 Comments

    In robotics you sometimes work with a sensor that sends data very often but you really just want to handle one message every X seconds. You also just want to handle the last message received since you last handled a message. This can be achieved in a similar way to the resource port in part 19 together with a simple loop to drain a port of events.

      1: [TestMethod]
      2: public void BasedOnTimeWithMessagesDroped()
      3: {
      4:     var timePort = new Port<DateTime>();
      5:     var eventPort = new Port<int>();
      6:     var resultPort = new Port<int>();
      7:  
      8:     Arbiter.Activate(
      9:         dispatcherQueue,
     10:         Arbiter.JoinedReceive(
     11:         true,
     12:         timePort,
     13:         eventPort,
     14:         (timer, eventValue) =>
     15:             {
     16:                 try
     17:                 {
     18:                     int valueToUse = eventValue;
     19:                     int v;
     20:                     while (eventPort.Test(out v))
     21:                     {
     22:                         valueToUse = v;
     23:                     }
     24:                     resultPort.Post(valueToUse);
     25:                 }
     26:                 finally
     27:                 {
     28:                     dispatcherQueue.EnqueueTimer(
     29:                         TimeSpan.FromMilliseconds(100), 
     30:                         timePort);
     31:                 }
     32:             }));
     33:     dispatcherQueue.EnqueueTimer(TimeSpan.FromMilliseconds(100), timePort);
     34:     for (int i = 0; i < 100; i++)
     35:     {
     36:         Thread.Sleep(TimeSpan.FromMilliseconds(10));
     37:         eventPort.Post(i);
     38:     }
     39:     var mre = new ManualResetEvent(false);
     40:     int result = 0;
     41:     Arbiter.Activate(
     42:         dispatcherQueue,
     43:         resultPort.Receive(
     44:             r =>
     45:                 {
     46:                     result = r;
     47:                     mre.Set();
     48:                 }));
     49:     Assert.IsTrue(mre.WaitOne(TimeSpan.FromSeconds(3)));
     50:     Assert.AreNotEqual(0, result);
     51:     Assert.AreNotEqual(99, result);
     52: }
    

    Like with the resource port it is important always enqueue a new timer (line 28+). The code to drain a port and preserve only the last value is line 19 to 23.

  • Being Cellfish

    CCR tips and tricks - part 19

    • 0 Comments

    When you want to throttle execution based on some resource you can do this easily with CCR and also get the benefit of being able to add and remove resources as needed. For example you may want to throttle some computation to two instances at any given time, but under heavy load you want to double the number of computations and then as load reduces also reduce the number of concurrent computations. To do this you can use what I call the resource port pattern. It uses a joined receiver and a resource port to control how many concurrent handlers there will be.

      1: [TestMethod]
      2: public void BasedOnResources()
      3: {
      4:     var resourcePort = new Port<int>();
      5:     var argumentPort = new Port<int>();
      6:     var resultPort = new Port<int>();
      7:     Arbiter.Activate(
      8:         dispatcherQueue,
      9:         Arbiter.JoinedReceive(
     10:             true,
     11:             resourcePort,
     12:             argumentPort,
     13:             (resource, argument) =>
     14:                 {
     15:                     try
     16:                     {
     17:                         resultPort.Post(resource + argument);
     18:                     }
     19:                     finally
     20:                     {
     21:                         resourcePort.Post(resource);
     22:                     }
     23:                 }));
     24:     resourcePort.Post(42);
     25:     for (int i = 0; i < 1000; i++)
     26:     {
     27:         argumentPort.Post(i);
     28:     }
     29:     resourcePort.Post(4711);
     30:     var mre = new ManualResetEvent(false);
     31:     Arbiter.Activate(dispatcherQueue, resultPort.Receive(_ => mre.Set()));
     32:     Assert.IsTrue(mre.WaitOne(TimeSpan.FromSeconds(3)));
     33:     mre.Reset();
     34:     Arbiter.Activate(dispatcherQueue, resourcePort.Receive(CcrServiceBase.EmptyHandler));
     35:     Arbiter.Activate(dispatcherQueue, resourcePort.Receive(_ => mre.Set()));
     36:     Assert.IsTrue(mre.WaitOne(TimeSpan.FromSeconds(3)));
     37:     Assert.AreNotEqual(1000, resultPort.ItemCount);
     38:     Assert.AreNotEqual(0, resultPort.ItemCount);
     39:     Assert.AreNotEqual(0, argumentPort.ItemCount);
     40: }
    

    First notice that the joined receive handler always repost the resource when finished (line 21). Then notice how we add a second resource (line 29). Last the resource port is drained to completely stop execution of the joined handler (lines 34 and 35). Execution of the joined receiver could be resumed by posting to the resource port again.

  • Being Cellfish

    CCR tips and tricks - part 18

    • 0 Comments

    The CCR interleave is a very powerful construct to create read/write type of locking but on steroids. It allows you to create a concurrent group and an exclusive group of handlers. Concurrent handlers will execute concurrently as long as no exclusive handler is pending. Exclusive handlers will execute exclusively. It is however important to remember that the exclusive handler is not re-entrant and if an exclusive handler posts to a port with a handler in the same exclusive group, the second handler will not execute until the first one completes so the first handler cannot wait for a result from the second one.

    Another related problem is when you want a persistent handler but you only want it to execute one handler at a time. Remember that if there is a handler waiting on a port, as soon as a message is posted a handler task is added to the task queue so if you have a persistent handler and post ten times to that port fast; ten tasks will be scheduled and hence they will run concurrently. Instead of using an interleave you can fake an exclusive persistent handler with something similar to tail recursion.

      1: public void WithTailRecursion()
      2: {
      3:     var responsePort = new Port<int>();
      4:     Arbiter.Activate(
      5:         dispatcherQueue, 
      6:         Arbiter.Receive(false, excusivePort, RecursiveHandler));
      7:     excusivePort.Post(1);
      8:     excusivePort.Post(2);
      9:     excusivePort.Post(3);
     10:     excusivePort.Post(4);
     11: }
     12:  
     13: private void RecursiveHandler(int n)
     14: {
     15:     try
     16:     {
     17:         DoSomeWork(n);
     18:     }
     19:     finally
     20:     {
     21:         Arbiter.Activate(
     22:             dispatcherQueue, 
     23:             Arbiter.Receive(false, excusivePort, RecursiveHandler));
     24:     }
     25: }
    

    The important thing is to make sure the receiver is always added again, even for unexpected exceptions. Since messages in ports are stored in a FIFO queue and only one handler is executing at any time messages will be handled in order as they arrive on the port.

  • Being Cellfish

    CCR tips and tricks - part 17

    • 0 Comments

    Sometimes when you have a number of things that have to be executed in sequence a common pattern is to have some kind of status variable to track errors and abort the sequence when there is an error. There is however another pattern that works well not only with sequential work but also works for scatter-gather operations; the abort port (say that fast and it sounds like fake Swedish). Here is an example of a method to calculate a Fibonacci number that can be aborted:

      1: public void FibonacciAbortable(
      2:     int n, 
      3:     Port<int> resultPort, 
      4:     Port<Exception> abortPort)
      5: {
      6:     if (n < 0)
      7:     {
      8:         abortPort.Post(new ArgumentOutOfRangeException("n"));
      9:     }
     10:     else if (n <= 1)
     11:     {
     12:         resultPort.Post(n);
     13:     }
     14:     else
     15:     {
     16:         int n1 = -1;
     17:         Port<int> n1port = new Port<int>();
     18:         Port<int> n2port = new Port<int>();
     19:         Arbiter.Activate(
     20:             dispatcherQueue, 
     21:             Arbiter.FromHandler(
     22:                 () => this.FibonacciAbortable(n - 1, n1port, abortPort)));
     23:         Arbiter.Activate(
     24:             dispatcherQueue,
     25:             Arbiter.Choice(
     26:                 Arbiter.Receive(
     27:                     false,
     28:                     n1port,
     29:                     s =>
     30:                         {
     31:                             n1 = s;
     32:                             Arbiter.Activate(
     33:                                 dispatcherQueue,
     34:                                 Arbiter.FromHandler(
     35:                                     () => this.FibonacciAbortable(
     36:                                         n - 2, n2port, abortPort)));
     37:                         }),
     38:                 Arbiter.Receive(false, abortPort, abortPort.Post)));
     39:         Arbiter.Activate(dispatcherQueue, Arbiter.Choice(
     40:             Arbiter.Receive(false, n2port, s => resultPort.Post(s + n1)),
     41:             Arbiter.Receive(false, abortPort, abortPort.Post)));
     42:     }
     43: }
    

    There are four important tricks to consider when you use an abort port:

    • If you have an error port, you want to post the error to the abort port too since it's a good pattern to let the message on the abort port be the reason for the abort.
    • All handlers for the abort port must repost the message to the same port so that any number of tasks can be aborted with just one abort item. This means that once all tasks have been aborted there should be once item in the abort port queue.
    • If you wrap code you do not know will repost to the abort port you should not use your own abort port for child tasks; let child tasks have their own abort port.
    • The use of an abort port will not guarantee that tasks are canceled, only things you ignore because of an abort port will be ignored. it also means that scheduled tasks will post responses so the use of a choice is important to prevent other handlers form executing.
  • Being Cellfish

    CCR tips and tricks - part 16

    • 0 Comments

    The pattern used in part 15 can be implemented as a helper method that looks like this:

      1: public static void ExecuteUsingClrThreadPool(Action handler)
      2: {
      3:     if (handler == null)
      4:     {
      5:         throw new ArgumentNullException("handler");
      6:     }
      7:  
      8:     if (Thread.CurrentThread.IsThreadPoolThread)
      9:     {
     10:         handler();
     11:     }
     12:     else
     13:     {
     14:         using (var dispatcherQueueUsingClrThreadPool = new DispatcherQueue())
     15:         {
     16:             Arbiter.ExecuteToCompletion(
     17:                 dispatcherQueueUsingClrThreadPool, 
     18:                 Arbiter.FromHandler(() => handler()));
     19:         }
     20:     }
     21: }
    

    Using that helper the method from part 15 can be implemented like this:

     22: private IEnumerator<ITask> MethodThatNeedToCallBlockingMethodFromCcr(
     23:     DispatcherQueue taskQueue,
     24:     Port<EmptyValue> donePort)
     25: {
     26:     yield return
     27:         Arbiter.ExecuteToCompletion(
     28:             taskQueue, 
     29:             Arbiter.FromHandler(
     30:                 () => ExecuteUsingClrThreadPool(
     31:                     () => this.BlockingMethod(42))));
     32:     donePort.Post(EmptyValue.SharedInstance);
     33: }
    

     

  • Being Cellfish

    CCR tips and tricks - part 15

    • 3 Comments

    In part 14 I showed you how to work with an asynchronous API from CCR. Today we'll handle synchronous code form CCR. Since one of the most common ways to work with CCR is to use a dispatcher with one thread per core you do not want to block one thread by waiting on some synchronous, long running operation. Let's create a blocking operation first:

    private void BlockingMethod(int seconds)
    {
        Thread.Sleep(TimeSpan.FromSeconds(seconds));
    }

    Calling this from a CCR thread would in most cases be devastating for performance of your code. If this method is called multiple times from different CCR tasks you could easily have all your CCR threads blocked in this method and that is probably not what you want. But we can fix this by using a feature of the DispatcherQueue. If you create a DispatcherQueue with no arguments it will not use a default dispatcher (with one thread per core), it will use the CLR thread pool! A good pattern to use if you do not know which dispatcher your code will be used with (this might be true if you have a library of functions for others to use) is demonstrated in this code:

      1: private IEnumerator<ITask> MethodThatNeedToCallBlockingMethodFromCcr(
      2:     Port<EmptyValue> donePort)
      3: {
      4:     if (!Thread.CurrentThread.IsThreadPoolThread)
      5:     {
      6:         using (var clrTaskQueue = new DispatcherQueue())
      7:         {
      8:             yield return
      9:                 Arbiter.ExecuteToCompletion(
     10:                     clrTaskQueue,
     11:                     new IterativeTask<Port<EmptyValue>>(
     12:                         donePort,
     13:                         MethodThatNeedToCallBlockingMethodFromCcr));
     14:             yield break;
     15:         }
     16:     }
     17:  
     18:     BlockingMethod(42);
     19:     donePort.Post(EmptyValue.SharedInstance);
     20: }
    

    There are two reasons for the guard clause in the beginning that reschedules the task on a CLR thread;

    • You do not want to reschedule if already on a CLR thread since every time you schedule a task there is a small overhead.
    • You do not want to use the CLR thread pool for all of your CCR code since using the CLR thread pool dispatcher queue has a greater overhead for each task executed than using a standard CCR dispatcher queue with a more limited dispatcher. CCR has very small overhead when running with one thread per core.
  • Being Cellfish

    CCR tips and tricks - part 14

    • 0 Comments

    Sometimes you need to call an asynchronous API that is not created for CCR. While there are several types of asynchronous interfaces the pattern to do it from CCR is basically the same. Here is an example:

      1: private IEnumerator<ITask> ReadStreamWithCcr(
      2:     Stream stream, 
      3:     PortSet<string, Exception> resultPort)
      4: {
      5:     var asyncPort = new Port<IAsyncResult>();
      6:     byte[] buffer = new byte[42];
      7:     stream.BeginRead(buffer, 0, buffer.Length, asyncPort.Post, null);
      8:  
      9:     yield return asyncPort.Receive(
     10:         asyncResult =>
     11:             {
     12:                 int count = stream.EndRead(asyncResult);
     13:                 resultPort.Post(Encoding.ASCII.GetString(buffer, 0, count));
     14:             });
     15: }
    

    Note how the Port<IAsyncResult>.Post method is used as a callback since it has the same signature as the AsyncCallback. There is currently no helper in CCR that uses this pattern but you can write your own if you (like me) don't like to create ports everywhere. You need two helpers since some EndXxx methods return a value and others don't.

     16: public static PortSet<EmptyValue, Exception> DoAsyncOperation(
     17:     Action<AsyncCallback> beginAction, 
     18:     Action<IAsyncResult> endAction)
     19: {
     20:     return DoAsyncOperation<EmptyValue>(
     21:         beginAction,
     22:         result =>
     23:         {
     24:             endAction(result);
     25:             return EmptyValue.SharedInstance;
     26:         });
     27: }
     28:  
     29: public static PortSet<T, Exception> DoAsyncOperation<T>(
     30:     Action<AsyncCallback> beginAction, 
     31:     Func<IAsyncResult, T> endAction)
     32: {
     33:     var resultPort = new PortSet<T, Exception>();
     34:     try
     35:     {
     36:         beginAction(result =>
     37:         {
     38:             try
     39:             {
     40:                 var r = endAction(result);
     41:                 resultPort.Post(r);
     42:             }
     43:             catch (Exception e)
     44:             {
     45:                 resultPort.Post(e);
     46:             }
     47:         });
     48:     }
     49:     catch (Exception e)
     50:     {
     51:         resultPort.Post(e);
     52:     }
     53:  
     54:     return resultPort;
     55: }
    

    With that helper the original method looks like this:

     58: public IEnumerator<ITask> ReadStreamWithCcr2(
     56:     Stream stream, 
     57:     PortSet<string, Exception> resultPort)
     58: {
     59:     byte[] buffer = new byte[42];
     60:     var readPort = DoAsyncOperation<int>(
     61:             cb => stream.BeginRead(buffer, 0, buffer.Length, cb, null),
     62:             stream.EndRead);
     63:     yield return readPort.Choice(
     64:         c => resultPort.Post(Encoding.ASCII.GetString(buffer, 0, c)), 
     65:         resultPort.Post);
     66: }
    

    Looks very similar doesn't it. But there is a big difference! BeginXxx and EndXxx methods may throw exceptions and the first implementation I showed you does not handle that. So probably you want to add some try-catch-blocks (unless you purely rely on causalities to handle errors) which makes the first implementation more cumbersome to work with. And you'll probably forget to add those try-blocks once in a while...

  • Being Cellfish

    CCR tips and tricks - part 13

    • 0 Comments

    Another very common misunderstanding is that the Receive with no arguments is the same as using an empty handler. That is not true; when you yield return with the argumentless receive your method will wait for something to arrive on the port but it will not remove the item from the port. Here are two tests to illustrate the difference:

      1: private IEnumerator<ITask> SimpleDoubleEmptyReceiver(
      2:     Port<EmptyValue> port, 
      3:     ManualResetEvent mre)
      4: {
      5:     yield return port.Receive(CcrServiceBase.EmptyHandler);
      6:     yield return port.Receive(CcrServiceBase.EmptyHandler);
      7:     mre.Set();
      8: }
      9:  
     10: [TestMethod]
     11: public void UsingSimpleEmptyReceive()
     12: {
     13:     var mre = new ManualResetEvent(false);
     14:     var port = new Port<EmptyValue>();
     15:     Arbiter.Activate(
     16:         dispatcherQueue, 
     17:         Arbiter.FromIteratorHandler(
     18:             () => this.SimpleDoubleEmptyReceiver(port, mre)));
     19:     port.Post(EmptyValue.SharedInstance);
     20:     Assert.IsFalse(
     21:         mre.WaitOne(TimeSpan.FromSeconds(5)), 
     22:         "Processing completed in time");
     23: }
     24:  
     25: private IEnumerator<ITask> SimpleDoubleReceiver(
     26:     Port<EmptyValue> port, 
     27:     ManualResetEvent mre)
     28: {
     29:     yield return port.Receive();
     30:     yield return port.Receive();
     31:     mre.Set();
     32: }
     33:  
     34: [TestMethod]
     35: public void UsingSimpleReceive()
     36: {
     37:     var mre = new ManualResetEvent(false);
     38:     var port = new Port<EmptyValue>();
     39:     Arbiter.Activate(
     40:         dispatcherQueue, 
     41:         Arbiter.FromIteratorHandler(
     42:             () => this.SimpleDoubleReceiver(port, mre)));
     43:     port.Post(EmptyValue.SharedInstance);
     44:     Assert.IsTrue(
     45:         mre.WaitOne(TimeSpan.FromSeconds(5)), 
     46:         "Processing failed to complete in time");
     47: }
    

    This difference can be deceiving if you reuse a port and the purpose is to use it in a spawn and test type scenario.

  • Being Cellfish

    Kinect services for RDS 2008 R3

    • 0 Comments

    I hope you noticed that we realeased an update for RDS yesterday...

Page 1 of 2 (17 items) 12