A couple weeks ago, I had the opportunity to do a really fun presentation on F# at the .NET Meetup in New York. At the end, I got a great question, which I talked about a little at the presentation, but thought I’d talk about further in a blog post.
The following isn’t exactly how the conversation went - but roughly captures what we talked about, and covers a lot of fun F# topics. There’s some allusions here to ideas from statistics, time-series analysis, CEP and FRP.
Standard deviation is a pretty straightforward computation with F#. There’s a few ways to write it, but with “average” and “averageBy” built-in, they provide a particularly simple option.
/// Square a number let sqr x = x * x /// Compute the standard deviation of a list of numbers let stddev nums = let mean = nums |> List.average let variance = nums |> List.averageBy (fun x -> sqr(x - mean)) sqrt(variance)
The previous function worked over lists. We can change it to work over sequences (IEnumerables) instead. Pleasantly, it’s basically the same code. But the result of working over sequences is that this function will work with any sort of data. It could be a sequence of numbers that result from a LINQ to SQL query, it could be a sequence of numbers read lazily from a very long file, etc.
/// Compute the standard deviation of a sequence of numbers let stddevSeq numSeq = let mean = numSeq |> Seq.average let variance = numSeq |> Seq.averageBy (fun x -> sqr(x - mean)) sqrt(variance)
This makes things more interesting. Instead of just computing the standard deviation, we really want to do an “online” calculation, where we compute enough information so that as each new datapoint comes in, we can easily produce the new standard deviation. This is a different algorithmic approach to the same mathematical formula. Luckily, wikipedia has what we need- an on-line algorithm for standard deviation from Knuth.
/// Compute the running mean and standard deviation /// over a sequence of numbers let meanAndStdDev nums = // A function which updates the on-line computation of // n, mean and M2 given a new datapoint x let newValues (n, mean, M2) x = let n' = n+1. let delta = x - mean let mean' = mean + delta/n' let M2' = M2 + delta*(x - mean') (n', mean', M2') // The initial vaues of n, mean and M2 let startValues = (0., 0., 0.) // The resulting event stream is a scan over the input events nums |> Seq.scan newValues startValues |> Seq.map (fun (n, mean, M2) -> (mean, sqrt(M2/(n))))
This time the code has a few interesting features:
Another fun twist on the problem! Sequences (IEnumerables) have the property that the producer of the data offers up a way to ask for more, and the consumer is in charge of pulling it out. While it’s doing so, it’s busy the whole time, so if the producer doesn’t have the data yet, it will have to block waiting for it to be available. This is not what you want in the long running case, where instead we want to think about the data stream as a sequence of events. With events, the consumer offers up a way to deal with new data whenever it’s available, and the producer is in charge of pushing the new data points out. For the same reasons that there is so much interest in smart phones with push email, we should all want our long-running data streams to be event based instead of sequence based.
let boundedMouseMax = form.MouseMove |> Event.filter (fun mea -> mea.X > 10 && mea.Y > 10 && mea.X < 90 && mea.Y < 90) |> Event.map (fun mea -> max mea.X mea.Y) do boundedMouseMax |> Event.add (fun maxCoord -> printfn "max is: %A" maxCoord)
let boundedMouseMax = form.MouseMove |> Event.filter (fun mea -> mea.X > 10 && mea.Y > 10 && mea.X < 90 && mea.Y < 90) |> Event.map (fun mea -> max mea.X mea.Y)
do boundedMouseMax |> Event.add (fun maxCoord -> printfn "max is: %A" maxCoord)
Events in F# are just standard .NET events. But F# allows you to program with these events in a “first-class” way, meaning you can pass them around as data, and there are functions in the “Event” module for taking events and generating new events from them. For instance, we could take a mouse move event and call “Event.filter” to filter out any time it is outside a bounding box, then “Event.map” to pick the larger of the X or Y coordinate of the mouse. The result is then a new event which will fire only when the mouse is in the bounding box, and will produce the larger of it’s X and Y coordinate as it’s value. We can then hook up a listener to print out the values whenever this new event fires.
But how do we use this to solve the original problem?
/// Compute a running mean and standard deviation over the /// input event stream of floating point numbers let meanAndStdDev nums = // A function which updates the on-line computation of n, mean and M2 let newValues (n, mean, M2) x = let n' = n+1. let delta = x - mean let mean' = mean + delta/n' let M2' = M2 + delta*(x - mean') (n', mean', M2') // The initial vaues of n, mean and M2 let startValues = (0., 0., 0.) // The resulting event stream is a scan over the input events nums |> Event.scan newValues startValues |> Event.map (fun (n, mean, M2) -> (mean, sqrt(M2/(n))))
It’s almost identical to the “on-line” algorithm over sequences! This is one of the important benefits of writing more declarative code using constructs like “scan”. Although the implementations of “scan” over sequences and over events are quite different under the hood – the design pattern of scanning makes sense over both, and is the concept we can program with to tackle the problem at a higher-level. It really makes you feel that we’ve succeeded in expressing the “what” in place of the “how”.
Here’s what it looks like to use this new function we’ve created:
// Create a new event – we would hook up to the stock ticker events if they were available let numEvent = new Event<float>() let numEventStream, fireNewNum = numEvent.Publish, numEvent.Trigger // Derive a new event that computes the running mean and stddev // over the original event stream let stddevEvents = meanAndStdDev numEventStream // Hook up an event handler to print out the new mean and standard deviation do stddevEvents |> Event.add (fun (mean, stddev) -> printfn "Mean = %A, StdDev = %A" mean stddev) do fireNewNum 3. do fireNewNum 7. do fireNewNum 7. do fireNewNum 19.
This is a fun example that really shows off why it’s so valuable to use higher-level design patterns for programming in the small – like “map”, “average”, “averageBy” and “scan”. First class events in F# are a unifying features which allows you to do the same kind of programming you use with sequences and apply it also to events. This is really compelling, because it’s a case where two data sources which are conceptually very similar are often programmed in very different ways – but with higher-level programming models that functional programming and F# provide, you can program against both using the same techniques.
Windows Azure was announced yesterday, and along with it, the first CTP of the SDK and Visual Studio tools. If you haven’t yet tried it, go take a look. On top of serving as a hosting service for web applications, Azure also provides a really simple way to do distributed compute and storage in the cloud.
Azure supports running .NET applications, which means you can build Azure worker roles using F#! The tools released with Azure don’t have F# support out of the box though, so I’ve posted a few simple templates and samples up on Code Gallery.
F# Templates and Samples for Windows Azure
namespace SearchEngine_WorkerRole open System open System.Threading open Microsoft.ServiceHosting.ServiceRuntime open System.Net open System.IO open System.Text.RegularExpressions open Microsoft.Samples.ServiceHosting.StorageClient; open System.Web open System.Runtime.Serialization.Formatters.Binary type WorkerRole() = inherit RoleEntryPoint() // The page to start crawling from let startpage = @"http://blogs.msdn.com/lukeh" // The filter to apply to links while crawling let pageFilter = fun (url:string) -> url.StartsWith("http://blogs.msdn.com/") /// Get the contents of a given url let http(url: string) = let req = WebRequest.Create(url) use resp = req.GetResponse() use stream = resp.GetResponseStream() use reader = new StreamReader(stream) let html = reader.ReadToEnd() html /// Get the links from a page of HTML let linkPat = "href=\s*\"[^\"h]*(http://[^&\"]*)\"" let getLinks text = [ for m in Regex.Matches(text,linkPat) -> m.Groups.Item(1).Value ] /// Handle the message msg using the given queue and blob container let HandleMessage (msg : Message) (queue : MessageQueue, container: BlobContainer) = // There was a new item, get the contents let url = msg.ContentAsString(); let urlBlobName = HttpUtility.UrlEncode(url) // Don't get the page if we've already seen it if not(container.DoesBlobExist(urlBlobName)) then do RoleManager.WriteToLog("Information", String.Format("Handling new url: '{0}'", url)); try // Get the contents of the page let content = http url // Store the page into the blob store let props = new BlobProperties(urlBlobName) let _ = container.CreateBlob(props, new BlobContents(System.Text.UTF8Encoding.Default.GetBytes(content)), true); // Get the links from the page let links = getLinks content // Filter down the links and then create a new work item for each links |> Seq.filter pageFilter |> Seq.distinct |> Seq.filter (fun link -> not(container.DoesBlobExist(HttpUtility.UrlEncode(link)))) |> Seq.iter (fun link -> queue.PutMessage(new Message(link)) |> ignore) queue.DeleteMessage(msg) |> ignore with | _ ->() /// Main loop of worker process let rec Loop (queue : MessageQueue, container: BlobContainer) = // Get the next page to crawl from the queue let msg = queue.GetMessage(240); if msg = null then Thread.Sleep(1000) else HandleMessage msg (queue, container) Loop(queue,container) override wp.Start() = // Initialize the Blob storage let blobStorage = BlobStorage.Create(StorageAccountInfo.GetDefaultBlobStorageAccountFromConfiguration()); let container = blobStorage.GetBlobContainer("searchengine"); let a = container.CreateContainer(null, ContainerAccessControl.Public); // Initialize the Queue storage let queueStorage = QueueStorage.Create(StorageAccountInfo.GetDefaultQueueStorageAccountFromConfiguration()); let queue = queueStorage.GetQueue("searchworker"); let b = queue.CreateQueue() // Put an initial message in the queue, using the start page let c = queue.PutMessage(new Message(startpage)); // Begin the main loop, processing messages in the queue Loop(queue, container) override wp.GetHealthStatus() = RoleStatus.Healthy
The code above defines the implementation of a Worker Role – a process which runs in the background, waiting for work to do, and then processing these work requests. The worker role is set to run 4 instance simultaneously, which means that there will be 4 instances of this worker processing work items as they come in. This gives an implicit parallelism – in fact, the initial release of Azure will run one process per core, so you really are getting effective parallelism this way. Notice also that this requires that the worker processes are inherently stateless. Both aspects make typical functional design approaches that are common in F# natural for developing these worker roles.
This sample uses two of the three data formats supported by Windows Azure. The queue storage holds the work items. The blob storage holds the pages visited during the web crawl. When an instance of the worker role Starts, it connects to the blob store and the queue store, then puts an initial work item in the queue and goes into a loop processing work items out of the queue.