Luke Hoban's Blog

October, 2008

  • LukeH's WebLog

    F# on Windows Azure

    • 5 Comments

    Windows Azure was announced yesterday, and along with it, the first CTP of the SDK and Visual Studio tools.  If you haven’t yet tried it, go take a look.  On top of serving as a hosting service for web applications, Azure also provides a really simple way to do distributed compute and storage in the cloud.

    Azure supports running .NET applications, which means you can build Azure worker roles using F#! The tools released with Azure don’t have F# support out of the box though, so I’ve posted a few simple templates and samples up on Code Gallery.

    Download

    F# Templates and Samples for Windows Azure

    Templates

    image

    Cloud WebCrawl Sample

    namespace SearchEngine_WorkerRole
    
    open System
    open System.Threading
    open Microsoft.ServiceHosting.ServiceRuntime
    open System.Net
    open System.IO
    open System.Text.RegularExpressions
    open Microsoft.Samples.ServiceHosting.StorageClient;
    open System.Web
    open System.Runtime.Serialization.Formatters.Binary
    
    type WorkerRole() =
        inherit RoleEntryPoint()
    
        // The page to start crawling from
        let startpage = @"http://blogs.msdn.com/lukeh"
        // The filter to apply to links while crawling
        let pageFilter = fun (url:string) -> url.StartsWith("http://blogs.msdn.com/")
    
        /// Get the contents of a given url
        let http(url: string) = 
            let req    = WebRequest.Create(url) 
            use resp   = req.GetResponse()
            use stream = resp.GetResponseStream() 
            use reader = new StreamReader(stream) 
            let html   = reader.ReadToEnd()
            html
    
        /// Get the links from a page of HTML
        let linkPat = "href=\s*\"[^\"h]*(http://[^&\"]*)\""
        let getLinks text =  [ for m in Regex.Matches(text,linkPat)  -> m.Groups.Item(1).Value ]
        
        /// Handle the message msg using the given queue and blob container
        let HandleMessage (msg : Message) (queue : MessageQueue, container: BlobContainer) =
            // There was a new item, get the contents
            let url = msg.ContentAsString();
            let urlBlobName = HttpUtility.UrlEncode(url)
            // Don't get the page if we've already seen it
            if not(container.DoesBlobExist(urlBlobName)) 
            then
                do RoleManager.WriteToLog("Information", String.Format("Handling new url: '{0}'", url));
                try
                    // Get the contents of the page
                    let content = http url
                    // Store the page into the blob store
                    let props = new BlobProperties(urlBlobName)
                    let _ = container.CreateBlob(props, new BlobContents(System.Text.UTF8Encoding.Default.GetBytes(content)), true);
                    
                    // Get the links from the page
                    let links = getLinks content
                    
                    // Filter down the links and then create a new work item for each
                    links
                    |> Seq.filter pageFilter
                    |> Seq.distinct
                    |> Seq.filter (fun link -> not(container.DoesBlobExist(HttpUtility.UrlEncode(link))))
                    |> Seq.iter (fun link -> queue.PutMessage(new Message(link)) |> ignore)
                    queue.DeleteMessage(msg) |> ignore
                with
                | _ ->()
        
        /// Main loop of worker process
        let rec Loop (queue : MessageQueue, container: BlobContainer) = 
            // Get the next page to crawl from the queue
            let msg = queue.GetMessage(240);
            if msg = null
            then Thread.Sleep(1000)
            else HandleMessage msg (queue, container)
            Loop(queue,container)
        
        override wp.Start() =
            // Initialize the Blob storage
            let blobStorage = BlobStorage.Create(StorageAccountInfo.GetDefaultBlobStorageAccountFromConfiguration());
            let container = blobStorage.GetBlobContainer("searchengine");
            let a = container.CreateContainer(null, ContainerAccessControl.Public);
    
            // Initialize the Queue storage
            let queueStorage = QueueStorage.Create(StorageAccountInfo.GetDefaultQueueStorageAccountFromConfiguration());
            let queue = queueStorage.GetQueue("searchworker");
            let b = queue.CreateQueue()
            
            // Put an initial message in the queue, using the start page
            let c = queue.PutMessage(new Message(startpage));
            
            // Begin the main loop, processing messages in the queue
            Loop(queue, container)
            
        override wp.GetHealthStatus() = RoleStatus.Healthy 

    Worker Roles

    The code above defines the implementation of a Worker Role – a process which runs in the background, waiting for work to do, and then processing these work requests.  The worker role is set to run 4 instance simultaneously, which means that there will be 4 instances of this worker processing work items as they come in.  This gives an implicit parallelism – in fact, the initial release of Azure will run one process per core, so you really are getting effective parallelism this way.  Notice also that this requires that the worker processes are inherently stateless.  Both aspects make typical functional design approaches that are common in F# natural for developing these worker roles.

    Queues and Blobs

    This sample uses two of the three data formats supported by Windows Azure.  The queue storage holds the work items. The blob storage holds the pages visited during the web crawl.  When an instance of the worker role Starts, it connects to the blob store and the queue store, then puts an initial work item in the queue and goes into a loop processing work items out of the queue.

    Conclusion

    Ideas for any other interesting F# applications on Windows Azure?  Download the templates and samples.
  • LukeH's WebLog

    Standard Deviation and Event-based Programming

    • 3 Comments

    A couple weeks ago, I had the opportunity to do a really fun presentation on F# at the .NET Meetup in New York.  At the end, I got a great question, which I talked about a little at the presentation, but thought I’d talk about further in a blog post.

    The following isn’t exactly how the conversation went - but roughly captures what we talked about, and covers a lot of fun F# topics.  There’s some allusions here to ideas from statistics, time-series analysis, CEP and FRP.

    “How would you compute standard deviation over a stream of data in F#?”

    Standard deviation is a pretty straightforward computation with F#.  There’s a few ways to write it, but with “average” and “averageBy” built-in, they provide a particularly simple option.

    /// Square a number
    let sqr x = x * x
    
    /// Compute the standard deviation of a list of numbers
    let stddev nums = 
        let mean = nums |> List.average
        let variance = nums |> List.averageBy (fun x -> sqr(x - mean))
        sqrt(variance)

    “Okay – but what if I want the inputs to be long, potentially infinite sources of data?”

    The previous function worked over lists.  We can change it to work over sequences (IEnumerables) instead.  Pleasantly, it’s basically the same code.  But the result of working over sequences is that this function will work with any sort of data.  It could be a sequence of numbers that result from a LINQ to SQL query, it could be a sequence of numbers read lazily from a very long file, etc. 

    /// Compute the standard deviation of a sequence of numbers
    let stddevSeq numSeq = 
        let mean = 
            numSeq |> Seq.average
        let variance = 
            numSeq |> Seq.averageBy (fun x -> sqr(x - mean))
        sqrt(variance)

    “That’s computing what I want, but can I get all the partial results as my stream of data is processed? ”

    This makes things more interesting.  Instead of just computing the standard deviation, we really want to do an “online” calculation, where we compute enough information so that as each new datapoint comes in, we can easily produce the new standard deviation.  This is a different algorithmic approach to the same mathematical formula.  Luckily, wikipedia has what we need- an on-line algorithm for standard deviation from Knuth.

    /// Compute the running mean and standard deviation
    /// over a sequence of numbers
    let meanAndStdDev nums = 
        // A function which updates the on-line computation of 
        // n, mean and M2 given a new datapoint x
        let newValues (n, mean, M2) x = 
            let n' = n+1.
            let delta = x - mean
            let mean' = mean + delta/n'
            let M2' = M2 + delta*(x - mean')
            (n', mean', M2')
        // The initial vaues of n, mean and M2
        let startValues = (0., 0., 0.)
        // The resulting event stream is a scan over the input events 
        nums 
        |> Seq.scan newValues startValues
        |> Seq.map (fun (n, mean, M2) -> (mean, sqrt(M2/(n))))

    This time the code has a few interesting features:

    1. A local function “newValues” updates n, mean and M2 based on old values and a new datapoint x.  Notice that (a) this is strikingly similar to the pseudo-code in the wikipedia article but (b) this is a functional implementation, so instead of changing the values, we just compute new ones.
    2. The function “Seq.scan” takes care of the heavy lifting.  This function, and the related “fold” and “reduce” can be used to capture many common design patterns seen in “programming in the small”.  Scan walks across an input collection applying a function to it’s current accumulator argument, and returning a collection of the results that it builds up as it goes.  If you’ve haven’t written code using these before, they can feel uncomfortable at first, but after you’ve used each of these once or twice, you’ll notice them as common patterns that you are currently encoding as for loops, if statements and local state in your programs.  “scan” and it’s related functions give us the ability to name that design pattern and then re-use it – resulting in more declarative code.

    “Perfect!  But my actual stream of data (stock ticker prices) is long running, and will provide new data points periodically over minutes, days and weeks.”

    Another fun twist on the problem!  Sequences (IEnumerables) have the property that the producer of the data offers up a way to ask for more, and the consumer is in charge of pulling it out.  While it’s doing so, it’s busy the whole time, so if the producer  doesn’t have the data yet, it will have to block waiting for it to be available.  This is not what you want in the long running case, where instead we want to think about the data stream as a sequence of events.  With events, the consumer offers up a way to deal with new data whenever it’s available, and the producer is in charge of pushing the new data points out.  For the same reasons that there is so much interest in smart phones with push email, we should all want our long-running data streams to be event based instead of sequence based.

    let boundedMouseMax = 
        form.MouseMove 
        |> Event.filter (fun mea -> mea.X > 10 && mea.Y > 10 && mea.X < 90 && mea.Y < 90)
        |> Event.map (fun mea -> max mea.X mea.Y)
     
    do boundedMouseMax |> Event.add (fun maxCoord -> printfn "max is: %A" maxCoord)

    Events in F# are just standard .NET events.  But F# allows you to program with these events in a “first-class” way, meaning you can pass them around as data, and there are functions in the “Event” module for taking events and generating new events from them.  For instance, we could take a mouse move event and call “Event.filter” to filter out any time it is outside a bounding box, then “Event.map” to pick the larger of the X or Y coordinate of the mouse.  The result is then a new event which will fire only when the mouse is in the bounding box, and will produce the larger of it’s X and Y coordinate as it’s value. We can then hook up a listener to print out the values whenever this new event fires.

    But how do we use this to solve the original problem?

    /// Compute a running mean and standard deviation over the 
    /// input event stream of floating point numbers
    let meanAndStdDev nums = 
        // A function which updates the on-line computation of n, mean and M2
        let newValues (n, mean, M2) x = 
            let n' = n+1.
            let delta = x - mean
            let mean' = mean + delta/n'
            let M2' = M2 + delta*(x - mean')
            (n', mean', M2')
        // The initial vaues of n, mean and M2
        let startValues = (0., 0., 0.)
        // The resulting event stream is a scan over the input events 
        nums 
        |> Event.scan newValues startValues
        |> Event.map (fun (n, mean, M2) -> (mean, sqrt(M2/(n))))

    It’s almost identical to the “on-line” algorithm over sequences! This is one of the important benefits of writing more declarative code using constructs like “scan”.  Although the implementations of “scan” over sequences and over events are quite different under the hood – the design pattern of scanning makes sense over both, and is the concept we can program with to tackle the problem at a higher-level.  It really makes you feel that we’ve succeeded in expressing the “what” in place of the “how”.

    Here’s what it looks like to use this new function we’ve created:

    // Create a new event – we would hook up to the stock ticker events if they were available
    let numEvent = new Event<float>()
    let numEventStream, fireNewNum = numEvent.Publish, numEvent.Trigger
    
    // Derive a new event that computes the running mean and stddev 
    // over the original event stream
    let stddevEvents = meanAndStdDev numEventStream
    
    // Hook up an event handler to print out the new mean and standard deviation 
    do stddevEvents |> Event.add (fun (mean, stddev) -> printfn "Mean = %A, StdDev = %A" mean stddev)
    
    do fireNewNum 3.
    do fireNewNum 7.
    do fireNewNum 7.
    do fireNewNum 19.

    Conclusion

    This is a fun example that really shows off why it’s so valuable to use higher-level design patterns for programming in the small – like “map”, “average”, “averageBy” and “scan”.  First class events in F# are a unifying features which allows you to do the same kind of programming you use with sequences and apply it also to events.  This is really compelling, because it’s a case where two data sources which are conceptually very similar are often programmed in very different ways – but with higher-level programming models that functional programming and F# provide, you can program against both using the same techniques.

Page 1 of 1 (2 items)