F# agents with timeouts

F# agents with timeouts

Rate This
  • Comments 1

In my previous post, I showed a code snippet with a very simple F# agent console application.  You run the app from the console, and every time you enter a line of text, it generates a new message and posts it to the message queue of an F# agent.  An agent is an instance of the MailboxProcessor class.  The MailboxProcessor is a class in the F# core library in the Control namespace.  As the name MailboxProcessor implies, an agent is something that you can send messages to, and it runs some code in response to messages that you send.  The code that it runs is conveniently expressed as a lambda expression that you pass in.

Typically the lambda expressions that you pass in will all follow a common pattern, which is a loop that repeatedly takes a message from the queue, one at a time.  You call the Receive method on each iteration of the loop.

Here is the sample code that I posted the other day.  In this code, we create one agent and its message-processing loop as I've just described.  Additionally, we run a loop that reads one line at a time from the console and sends each line to the agent.  If you run this program, then every time you enter some text, the text is received as a message.  In addition to the basic message processing, this example shows how to send information back to the caller by using the reply channel, an object of type AsyncReplyChannel.  The only processing that happens in this simple example is that if the message is "Stop" then send a special reply, otherwise send a reply that we've received a message, along with the ID and text of the message.

module MailboxProcessorTest2 =

 

    open System

 

    type Message = string * AsyncReplyChannel<string>

 

    let formatString = "Message number {0} was received. Message contents: {1}"

 

    let agent = MailboxProcessor<Message>.Start(fun inbox ->

        let rec loop n =

            async {

                    let! (message, replyChannel) = inbox.Receive();

                    if (message = "Stop") then

                        replyChannel.Reply("Stopping.")

                    else  

                        replyChannel.Reply(String.Format(formatString, n, message))

                    do! loop (n + 1)

            }

        loop 0)

 

    printfn "Mailbox Processor Test"

    printfn "Enter some text and hit ENTER to submit a message."

    printfn "Type 'Stop' to terminate."

 

    let rec loop() =

        printf "> "

        let input = Console.ReadLine()

        let reply = agent.PostAndReply(fun replyChannel -> input, replyChannel)

        if (reply <> "Stopping.") then

            printfn "Reply: %s" reply

            loop()

        else

            ()

    loop()

 

 

    printfn "Press enter to continue."

    Console.ReadLine() |> ignore

 

The other interesting thing is the async { ... } in the message processing loop.  Asynchronous programming is one of the strengths of F#.  MailboxProcessor is designed to work asynchronously. So, for example, the return type of the lambda expression is Async<unit>, an asynchronous work item.  This allows us to start to listen to messages but not block on waiting for the next message.  The use of let! instead of let, and do! instead of do indicates an asynchronous operation.  The basic idea is that execution goes up until a let! or do! and then, if no return or completion is ready, execution can go on elsewhere. Then, when a return value is available, or the execution is complete, then the program continues from that point.  Behind the scenes, a callback is used and the remaining code after the let! or do! is packaged up into the callback.  Obviously all of that is rather complicated, so what people like about the design is that you can write asynchronous code much as you would write synchronous code, with a few extra bangs (!) here and there, and F# asynchronous workflows take care of making it asynchronous.

The asynchronous feature of F# is one of its most attractive features.  In fact, it was so nicely done that it is being implemented in C# and Visual Basic as well, and you can try that out in a CTP release that came out in the fall of 2010 at the PDC conference.  It can seem a bit like a black box though.  One thing I hope to do in this series of blog posts is to explore the asynchronous programming model and hopefully elucidate this black box.  In particular, we want to have a mental model of how to think about how it operates.  It would be one thing if all it did was fire up a background thread every time a new logical "program flow" came into play.  In this example, we have two loops going, one that reads input, and one that processes messages.  You would think that there are two threads going on in parallel since there are these two loops happening "at the same time".  Let's test this theory by printing out the Thread ID.

    let printThreadId note =

        // Append the Thread ID

        printfn "%d : %s" System.Threading.Thread.CurrentThread.ManagedThreadId note

 

 Then add some code that calls this printThreadId method, with either an argument of "MailboxProcessor" or "Console".  If you run this, you'll find that indeed it is true, the MailboxProcessor loop is running on separate threads from the UI thread that is reading from the console.  I found when running this that there were in fact two background threads reading messages from the queue.  Here is the code after adding these changes:

module MailboxProcessorTest2 =

 

    open System

 

    type Message = string * AsyncReplyChannel<string>

 

    let formatString = "Message number {0} was received. Message contents: {1}"

 

    let printThreadId note =

        // Append the Thread ID

        printfn "%d : %s" System.Threading.Thread.CurrentThread.ManagedThreadId note

 

 

    let agent = MailboxProcessor<Message>.Start(fun inbox ->

        let rec loop n =

            async {

                    let! (message, replyChannel) = inbox.Receive();

                    printThreadId "mailboxProcessor"

                    if (message = "Stop") then

                        replyChannel.Reply("Stopping.")

                    else

                        replyChannel.Reply(String.Format(formatString, n, message))

                    do! loop (n + 1)

            }

        loop 0)

 

    printfn "Mailbox Processor Test"

    printfn "Enter some text and hit ENTER to submit a message."

    printfn "Type 'Stop' to terminate."

 

    let rec loop() =

        printf "> "

        let input = Console.ReadLine()

        printThreadId("Console loop")

        let reply = agent.PostAndReply(fun replyChannel -> input, replyChannel)

        if (reply <> "Stopping.") then

            printfn "Reply: %s" reply

            loop()

        else

            ()

    loop()

 

    printfn "Press enter to continue."

    Console.ReadLine() |> ignore

 

And here is some of my output:

Mailbox Processor Test
Enter some text and hit ENTER to submit a message.
Type 'Stop' to terminate.
> test
1 : Console loop
4 : mailboxProcessor
Reply: Message number 0 was received. Message contents: test
> another loop
1 : Console loop
3 : mailboxProcessor
Reply: Message number 1 was received. Message contents: another loop
> another loop
1 : Console loop
4 : mailboxProcessor
Reply: Message number 2 was received. Message contents: another loop
> testing
1 : Console loop
3 : mailboxProcessor
Reply: Message number 3 was received. Message contents: testing
> testing
1 : Console loop
4 : mailboxProcessor
Reply: Message number 4 was received. Message contents: testing

Let's suppose we want to now add some timeout condition, so that after a while of no new input from the console, the app posts a note saying that a timeout occurred and skips to the next message ID.  Receive takes a timeout as an optional argument, which is an integer in milleseconds.  I set a timeout of 10 seconds.  If the timeout is exceeded, TimeoutException is thrown.   I want to catch this exception, but at first, I tried to put the try/with outside the call to PostAndReply.  But, the exception was never caught!    I can only catch the exception on the same thread where it is generated, so the exception handler has to be in the mailbox processor message loop.  I also had to be careful to start the next loop iteration in the case of a timeout.  Ouch, I got a bit caught by the fact that F# asynchronous workflows are so smooth, I don't even know I'm running on a separate thread.   Here is an implementation of the timeout that works.

module MailboxProcessorTest3 =

 

    open System

 

    type Message = string * AsyncReplyChannel<string>

 

    let formatString = "Message number {0} was received. Message contents: {1}"

 

    let agent = MailboxProcessor<Message>.Start(fun inbox ->

        let rec loop n =

            async {           

                try

                    let! (message, replyChannel) = inbox.Receive(10000);

 

                    if (message = "Stop") then

                        replyChannel.Reply("Stop")

                    else

                        replyChannel.Reply(String.Format(formatString, n, message))

                    do! loop (n + 1)

 

                with

                | :? TimeoutException ->

                    printfn "The mailbox processor timed out."

                    do! loop (n + 1)

            }

        loop (0))

 

    printfn "Mailbox Processor Test"

    printfn "Enter some text and hit ENTER to submit a message."

    printfn "Type 'Stop' to terminate."

 

    let rec loop() =

        printf "> "

        let input = Console.ReadLine()

        let reply = agent.PostAndReply(fun replyChannel -> input, replyChannel)

        if (reply <> "Stop") then

            printfn "Reply: %s" reply

            loop()

        else

            ()

    loop()

 

    printfn "Press enter to continue."

    Console.ReadLine() |> ignore

 

 

Now, when you run this, if you wait long enough, the timeout message will be printed, but the processing of new messages continues, and the message id number will skip one.  Here is an example session:

Mailbox Processor Test
Enter some text and hit ENTER to submit a message.
Type 'Stop' to terminate.
> hello
Reply: Message number 0 was received. Message contents: hello
> hello?
Reply: Message number 1 was received. Message contents: hello?
> The mailbox processor timed out.
testing
Reply: Message number 3 was received. Message contents: testing
> Stop
Press enter to continue.

However there are definitely times when you want the exception to be propagated to the thread that posted the message.  It is possible to do this, with the Error event.  The Error event takes a lambda expression that processes an exception.  Here is an example where the agent throws a timeout exception and it is handled by the Error event.

module MailboxProcessorWithError =

 

    open System

 

    type Message = string

 

    let agent = MailboxProcessor<Message>.Start(fun inbox ->

        let rec loop n =

            async {

                    let! message = inbox.Receive(10000);

                    printfn "Message number %d. Message contents: %s" n message

                    do! loop (n+1)

            }

        loop 0)

 

    agent.Error.Add(fun exn ->

        match exn with

        | :? System.TimeoutException as exn -> printfn "The agent timed out."

                                               printfn "Press Enter to close the program."

                                               Console.ReadLine() |> ignore

                                               exit(1)

        | _ -> printfn "Unknown exception.")

 

    printfn "Mailbox Processor Test"

    printfn "Type some text and press Enter to submit a message."

 

 

    while true do

        Console.ReadLine() |> agent.Post

 

 

Where I'm headed with this, is I want to create a job agent that will allow me to start some background jobs from a command prompt.  The timeout isn't really necessary, but it did help clarify the behavior of exceptions that take place in the message loop.  If I want to start jobs in the background, I'm going to want to start up multiple jobs at a time, so it won't do to submit one message and then have to wait until PostAndReply returns!  I will need to use PostAndAsyncReply, so in the next version, both the message processing loop and the input loop will be asynchronous.

Gordon

  • This line cause memory leak:

    let rec loop n = async { do! loop (n+1) }

    Tail recursion inside computation expressions should be done with "return!":

    let rec loop n = async { return! loop (n+1) }

Page 1 of 1 (1 items)
Leave a Comment
  • Please add 2 and 8 and type the answer here:
  • Post