Being Cellfish

Stuff I wished I've found in some blog (and sometimes did)

July, 2011

Change of Address
This blog has moved to blog.cellfish.se.
Posts
  • Being Cellfish

    CCR tips and tricks - part 12

    • 0 Comments

    I think it's a common engineering practice to not reuse a variable for multiple things in the same method. This may be done for indexes or keeping track of errors but I think it's a bad practice. For some reason people tend to not only reuse port variables but also reuse the port instance itself. This is dangerous since the port may contain things you don't know about. Especially if timeouts are involved since what ever timed out will execute to completion anyway and post a result. Let's assume that you have a function that reads a stream and posts each line to a port like this:

      1: private void StreamReaderWithCcr(StreamReader reader, Port<string> linePort)
      2: {
      3:     for (string line = reader.ReadLine(); 
      4:         line != null; 
      5:         line = reader.ReadLine())
      6:     {
      7:         Thread.Sleep(50); //Slowing it down...
      8:         linePort.Post(line);
      9:     }
     10: }

    This method is then used in the following method:

     11: private IEnumerator<ITask> ProcessTwoFilesUnlessFirstStartsWithCowabunga(
     12:     Stream file1, 
     13:     Stream file2, 
     14:     ManualResetEvent mre)
     15: {
     16:     bool processFirstFile = false;
     17:     Port<string> linePort = new Port<string>();
     18:     Arbiter.Activate(
     19:         dispatcherQueue, 
     20:         Arbiter.FromHandler(
     21:             () => StreamReaderWithCcr(new StreamReader(file1), linePort)));
     22:     yield return linePort.Receive(
     23:         line => processFirstFile = line != "Cowabunga");
     24:     if (processFirstFile)
     25:     {
     26:         yield return linePort.Receive(line => Assert.AreEqual("12", line));
     27:     }
     28:     
     29:     Arbiter.Activate(
     30:         dispatcherQueue, 
     31:         Arbiter.FromHandler(
     32:             () => StreamReaderWithCcr(new StreamReader(file2), linePort)));
     33:     yield return linePort.Receive(line => Assert.AreEqual("21", line));
     34:     yield return linePort.Receive(line => Assert.AreEqual("22", line));
     35:     mre.Set();
     36: }

    Given those two methods this test will pass:

     37: [TestMethod]
     38: public void Processing_two_files_completely()
     39: {
     40:     var file1 = new MemoryStream(Encoding.ASCII.GetBytes("11\n12"));
     41:     var file2 = new MemoryStream(Encoding.ASCII.GetBytes("21\n22"));
     42:  
     43:     using (var testCausality = new CausalityForTests(dispatcherQueue))
     44:     {
     45:         var mre = new ManualResetEvent(false);
     46:         Arbiter.Activate(
     47:             dispatcherQueue, 
     48:             Arbiter.FromIteratorHandler(
     49:                 () => this.ProcessTwoFilesUnlessFirstStartsWithCowabunga(
     50:                     file1, 
     51:                     file2, 
     52:                     mre)));
     53:         Assert.IsTrue(
     54:             mre.WaitOne(TimeSpan.FromSeconds(5)), 
     55:             "Processing failed to complete in time");
     56:     }
     57: }

    But this test will fail on line 33 since the second line from the first file is still in the linePort:

     58: [TestMethod]
     59: public void Processing_two_files_partially()
     60: {
     61:     var file1 = new MemoryStream(Encoding.ASCII.GetBytes("Cowabunga\n12"));
     62:     var file2 = new MemoryStream(Encoding.ASCII.GetBytes("21\n22"));
     63:  
     64:     using (var testCausality = new CausalityForTests(dispatcherQueue))
     65:     {
     66:         var mre = new ManualResetEvent(false);
     67:         Arbiter.Activate(
     68:             dispatcherQueue, 
     69:             Arbiter.FromIteratorHandler(
     70:                 () => this.ProcessTwoFilesUnlessFirstStartsWithCowabunga(
     71:                     file1, 
     72:                     file2, 
     73:                     mre)));
     74:         Assert.IsTrue(
     75:             mre.WaitOne(TimeSpan.FromSeconds(5)), 
     76:             "Processing failed to complete in time");
     77:     }
     78: }

    The easy fix is to change line 28 to this and then the second test will pass:

     28: linePort = new Port<string>();

    Another way of detecting when you accidentally reuse a port is to use a method like this:

     79: private void AssertNoMoreMessages<T>(Port<T> port)
     80: {
     81:     Arbiter.Activate(
     82:         dispatcherQueue, 
     83:         Arbiter.Receive(
     84:             true, 
     85:             port, 
     86:             v => Assert.Fail("Unexpected value posted: {0}", v.ToString())));
     87: }

    And use that method on line 28.

     28: AssertNoMoreMessages(linePort);
  • Being Cellfish

    CCR tips and tricks - part 11

    • 0 Comments

    A very common pattern in CCR is the use of a result port. That means that you have a port you give (or get from) a method you want to call and that method will post a result on the result port. However sometimes a bug leads to a situation where no result is ever posted back. This can be very hard to detect of ju just use a standard result/exception portset. If you're in the DSS environment you know that each operation typically has a timeout that is handled by DSS but in CCR you need to deal with this yourself. I've previously mentioned that a SmartPort could be used. But this is what it would look like with a timeout:

      1: private void DoSomething(SuccessFailurePort resultPort)
      2: {
      3:     // never posts a response.
      4: }
      5:  
      6: private void CallingSomething(TimeSpan timeout)
      7: {
      8:     var mre = new ManualResetEvent(false);
      9:     Exception error = null;
     10:     var resultPort = new SuccessFailurePort();
     11:     var timeoutPort = new Port<DateTime>();
     12:     Arbiter.Activate(
     13:         dispatcherQueue,
     14:         Arbiter.Choice(
     15:             Arbiter.Receive<SuccessResult>(false, resultPort, s => mre.Set()),
     16:             Arbiter.Receive<Exception>(
     17:                 false,
     18:                 resultPort,
     19:                 e =>
     20:                     {
     21:                         error = e;
     22:                         mre.Set();
     23:                     }),
     24:             Arbiter.Receive(
     25:                 false,
     26:                 timeoutPort,
     27:                 e =>
     28:                     {
     29:                         error = new Exception("Timeout when waiting for DoSomething");
     30:                         mre.Set();
     31:                     })));
     32:     Arbiter.Activate(
     33:         dispatcherQueue, 
     34:         Arbiter.FromHandler(() => this.DoSomething(resultPort)));
     35:     dispatcherQueue.EnqueueTimer(timeout, timeoutPort);
     36:     mre.WaitOne();
     37:     if (error != null)
     38:     {
     39:         throw error;
     40:     }
     41: }
     42:  
     43: [TestMethod]
     44: public void UsingTimeoutToDetectWhenSomethingIsNotPostingBack()
     45: {
     46:     try
     47:     {
     48:         this.CallingSomething(TimeSpan.FromSeconds(1));
     49:         Assert.Fail("Expected exception to be thrown");
     50:     }
     51:     catch (Exception e)
     52:     {
     53:         Assert.IsTrue(e.Message.Contains("Timeout"));
     54:     }
     55: }
    

    The lines to pay most attention to are lines 14 through 31.

  • Being Cellfish

    CCR tips and tricks - part 10

    • 0 Comments

    Today we'll continue using the same base helpers as in part 9 and today we'll look at a common mistake made in iterative handlers. Take a look at the following code:

      1: private IEnumerator<ITask> ForgettingToYield(ManualResetEvent mre)
      2: {
      3:     var port = new Port<EmptyValue>();
      4:     port.Post(EmptyValue.SharedInstance);
      5:     Arbiter.Receive(false, port, v => mre.Set());
      6:     yield break;
      7: }
      8:         
      9: [TestMethod]
     10: public void Forgetting_to_yield()
     11: {
     12:     var mre = new ManualResetEvent(false);
     13:     Arbiter.Activate(
     14:         dispatcherQueue, 
     15:         Arbiter.FromIteratorHandler(() => this.ForgettingToYield(mre)));
     16:     Assert.IsFalse(
     17:         mre.WaitOne(TimeSpan.FromSeconds(3)), "Should not complete tasks");
     18: }
    

    Note the mistake on line 5. Since the task created is not yielded on it will not execute. In my experience these type of errors are easily found while writing unit tests. They look weird sine you think "why is this not executing" even though the code "look" right.

  • Being Cellfish

    CCR tips and tricks - part 9

    • 0 Comments

    The pattern in CCR to use iterators to implements tasks is very powerful since it means that the code of the task looks very synchronous even though it performs asynchronous tasks. But sometimes you may yield on the wrong thing and the resulting behavior is unexpected. But even before you use an iterative handler (as they're called in CCR) you should ask yourself if you really need it. The rule of thumb is fairly easy. If you need a "yield break" in the end of your method to make it compile (since there are no "yield return" in it) you should not use an iterative handler. The exception is if you implement some interface or base implementation that others will override and where you anticipate those implementations to need an iterative handler. The reason to not use an iterative handler unless needed is that there is a slight performance penalty from using iterative handlers so if you don't need them, don't pay the price.

    In the following code examples I will be reusing the following simple methods/variables (and yes I have an iterative handler that should not be it, but it's used to show a difference between iterative handlers and regular handlers.

      1: private DispatcherQueue dispatcherQueue = new DispatcherQueue();
      2: private bool didSomething = false;
      3: private bool didSomethingElse = false;
      4:  
      5: private void DoSomething()
      6: {
      7:     didSomething = true;
      8: }
      9:  
     10: private IEnumerator<ITask> DoSomethingElse()
     11: {
     12:     didSomethingElse = true;
     13:     yield break;
     14: }
    

    Now look at the following test. Notice that the iterative task is never executed. This is because yielding on a regular task executes that task but never returns to the original method.

      1: private IEnumerator<ITask> DoItAllWrongWay(ManualResetEvent mre)
      2: {
      3:     yield return Arbiter.FromHandler(DoSomething);
      4:     yield return Arbiter.FromIteratorHandler(DoSomethingElse);
      5:     mre.Set();
      6: }
      7:  
      8: [TestMethod]
      9: public void Doing_the_wrong_thing()
     10: {
     11:     var mre = new ManualResetEvent(false);
     12:     Arbiter.Activate(
     13:         dispatcherQueue, 
     14:         Arbiter.FromIteratorHandler(() => this.DoItAllWrongWay(mre)));
     15:     Assert.IsFalse(
     16:         mre.WaitOne(TimeSpan.FromSeconds(3)), 
     17:         "Should not complete tasks");
     18:     Assert.IsTrue(didSomething);
     19:     Assert.IsFalse(didSomethingElse);
     20: }
    

    Now look at what happens when we yield return on the iterative handler first instead. Now both helper methods execute but we still don't complete the calling method since yielding on the regular handler never returns.

      1: private IEnumerator<ITask> DoItWrongButItWorksAlmost(ManualResetEvent mre)
      2: {
      3:     yield return Arbiter.FromIteratorHandler(DoSomethingElse);
      4:     yield return Arbiter.FromHandler(DoSomething);
      5:     mre.Set();
      6: }
      7:  
      8: [TestMethod]
      9: public void Doing_the_wrong_thing_but_it_almost_works()
     10: {
     11:     var mre = new ManualResetEvent(false);
     12:     Arbiter.Activate(
     13:         dispatcherQueue, 
     14:         Arbiter.FromIteratorHandler(() => this.DoItWrongButItWorksAlmost(mre)));
     15:     Assert.IsFalse(
     16:         mre.WaitOne(TimeSpan.FromSeconds(3)), 
     17:         "Should not complete tasks");
     18:     Assert.IsTrue(didSomething);
     19:     Assert.IsTrue(didSomethingElse);
     20: }
    

    Last we'll look at the correct way of dealing with this scenario.

      1: private IEnumerator<ITask> DoItRight(ManualResetEvent mre)
      2: {
      3:     yield return Arbiter.ExecuteToCompletion(
      4:         dispatcherQueue, new Task(DoSomething));
      5:     yield return new IterativeTask(DoSomethingElse);
      6:     mre.Set();
      7: }
      8:  
      9: [TestMethod]
     10: public void Doing_the_right_thing()
     11: {
     12:     var mre = new ManualResetEvent(false);
     13:     Arbiter.Activate(
     14:         dispatcherQueue, 
     15:         Arbiter.FromIteratorHandler(() => this.DoItRight(mre)));
     16:     Assert.IsTrue(mre.WaitOne(TimeSpan.FromSeconds(3)), "Should complete tasks");
     17:     Assert.IsTrue(didSomething);
     18:     Assert.IsTrue(didSomethingElse);
     19: }
    

    Note that for iterative tasks you may actually use an alternative syntax (line 5) when yielding to an iterative task.

  • Being Cellfish

    CCR tips and tricks - part 8

    • 0 Comments

    The scatter gather pattern where you spawn multiple tasks and then wait for them all to complete is a common pattern used with CCR. if you have a small, well known number of tasks to spawn the use of a joined receiver combined with a choice for errors can be a slick way of achieving "wait for all to succeed or the first failure". That is assuming that one failure means the whole scatter and gather operation failed.

      1: public IEnumerator<ITask> Fibonacci(
      2:     int n, 
      3:     PortSet<int, Exception> resultPort)
      4: {
      5:     if (n < 0)
      6:     {
      7:         resultPort.Post(new ArgumentOutOfRangeException("n"));
      8:     }
      9:     else if (n <= 1)
     10:     {
     11:         resultPort.Post(n);
     12:     }
     13:     else
     14:     {
     15:         var n1port = Fibonacci(n - 1);
     16:         var n2port = Fibonacci(n - 2);
     17:         var joinedSuccess = Arbiter.JoinedReceive<int, int>(
     18:             false, 
     19:             n1port, 
     20:             n2port, 
     21:             (n1, n2) => resultPort.Post(n1 + n2));
     22:         yield return Arbiter.Choice(
     23:             joinedSuccess,
     24:             Arbiter.Receive<Exception>(false, n1port, resultPort.Post),
     25:             Arbiter.Receive<Exception>(false, n2port, resultPort.Post));
     26:     }
     27: }
    

    This implementations computes a specific Fibonacci number by computing the two parts in parallel. Not the most efficient way to do this but it shows how a joined receiver is used to collect the result but if there is any error it is returned immediately without waiting for the two results. This does not mean that "the other" task is aborted. More on that in a future post in this series.

  • Being Cellfish

    CCR tips and tricks - part 7

    • 0 Comments

    There are many ways you can receive messages on a port. In all examples below I'll be using two implementations of Fibonacci that uses CCR. The methods I call have the following signatures:

     1: public Port<int> Fibonacci(int n)
     2: public IEnumerator<ITask> Fibonacci(int n, Port<int> resultPort)
    

    The assumption is that the first method creates a port, spawns a task using the second method and returns the result port hence calculating the Fibonacci number asynchronously. The second method just calculates the number synchronously and posts the result on the given port.

    First out is a very common way that I call "call and receive". Basically you create a little method train using the fact that the method we call spawns a task and returns a result port.

     1: public IEnumerator<ITask> CallAndReceive(
     2:     int n, 
     3:     Port<KeyValuePair<int, int>> resultPort)
     4: {
     5:     yield return Fibonacci(n)
     6:         .Receive(s => resultPort.Post(new KeyValuePair<int, int>(n, s)));
     7: }
    

    The second way I call "spawn and receive". The interesting thing about this one is that if the spawned operation completes before the receive is activated the message lives in the port until the receive is activated. This means that theoretically you have slightly worse performance than if you have the receive already activated when the result is posted.

     1: public IEnumerator<ITask> SpawnAndReceive(
     2:     int n, 
     3:     Port<KeyValuePair<int, int>> resultPort)
     4: {
     5:     var port = new Port<int>();
     6:     Arbiter.Activate(
     7:         dispatcherQueue,
     8:         Arbiter.FromIteratorHandler(() => Fibonacci(n, port)));
     9:     yield return port.Receive(
     10:         s => resultPort.Post(new KeyValuePair<int, int>(n, s)));
     11: }
    

    In the third example called "receive and spawn" I guarantee that the receiver is activated before the task is executed. Hence the theoretical performance penalty given above is eliminated since in this example there will always be a receiver activated when the result is posted. Also note that I here use the Arbiter.Receive method rather than the Port.Receive extension method used above.

     1: public IEnumerator<ITask> ReceiveAndSpawn(
     2:     int n, 
     3:     Port<KeyValuePair<int, int>> resultPort)
     4: {
     5:     var port = new Port<int>();
     6:     Arbiter.Activate(
     7:         dispatcherQueue, 
     8:         Arbiter.Receive(
     9:         false, 
     10:         port, s => resultPort.Post(new KeyValuePair<int, int>(n, s))));
     11:     yield return new IterativeTask(() => Fibonacci(n, port));
     12: }
    

    The forth way I call "spawn and test" uses two features of CCR that are not that well known. especially if you're new to CCR. This example (and the one above) uses "yield return new IterativeTask" which actually executes that task to completion before returning. This is equivalent to using Arbiter.ExecuteToCompletion. The second feature used is the fact that casting a Port (or PortSet) to a given type will remove a message from the port and return its value or the default value if no message of given type is available. This is truly an interesting feature of CCR since it means that the cast operator now has a side effect on the object you're casting from.

     1: public IEnumerator<ITask> SpawnAndTest(
     2:     int n, 
     3:     Port<KeyValuePair<int, int>> resultPort)
     4: {
     5:     var port = new Port<int>();
     6:     yield return new IterativeTask(() => Fibonacci(n, port));
     7:     int result = port;
     8:     resultPort.Post(new KeyValuePair<int, int>(n, result));
     9: }

    Line 7 could also be written like this:

     7:     int result = (int)port.Test();

    or this:

     7:     int result;
    7.5:     port.Test(out result);

    Of these four variants the receive and spawn is the most dangerous one to use since it's easy to introduce a race condition if you're not careful. Take a look at the following code where some local variables are introduced to increase (?) readability. Maybe you'd do something like this to see values in the debugger.

     1: public IEnumerator<ITask> ReceiveAndSpawnWithRaceCondition(
     2:     int n, 
     3:     Port<KeyValuePair<int, int>> resultPort)
     4: {
     5:     int result = 0;
     6:     var port = new Port<int>();
     7:     Arbiter.Activate(
     8:         dispatcherQueue, 
     9:         Arbiter.Receive(false, port, s => result = s));
     10:     yield return new IterativeTask(() => Fibonacci(n, port));
     11:     resultPort.Post(new KeyValuePair<int, int>(n, result));
     12: }
    

    This code probably works fine when you step through it in the debugger, but there is a race condition. The problem is that when the Fibonacci method posts the result, the result handler (line 9) is scheduled for execution. You do not know when it's executed. If you're unlucky the method above gets back after completing the iterative task and executes line 11 before the handler on line 9 and hence the wrong result is posted back to the caller.

  • Being Cellfish

    CCR tips and tricks - part 6

    • 2 Comments

    Today I have three recommendations for your CCR methods. For methods that are public interfaces to components I would recommend methods that returns ports like this:

     1: public PortSet<ResultType, Exception> DoSomething()

    This makes it easy for the consumer (yourself or other developers) to just call the method and then use the port returned. As a consumer I do not need to know how to create the port needed and I think that is convenient. It also means the implementation may complete synchronously or asynchronously. As a caller I don't really care.

    For internal helpers I however tend to use the following two variants:

     1: private IEnumerator<ITask> DoSomething(PortSet<ResultType, Exception> resultPort)
     2: private void DoSomething(PortSet<ResultType, Exception> resultPort)

    The first one should only be used if the method itself needs to yield return. If it ends up having a yield break in the end just to pass compilation you should go for the second variant. However if the method is part of an interface definition I always use the first (iterator) variant.

    So wait you may think. isn't an interface public and hence should have the variant returning a port? And the answer is yes, if the interface is public. But if the interface is internal and it may make sense to treat it as an internal helper.

Page 2 of 2 (17 items) 12