Welcome to MSDN Blogs Sign in | Join | Help

Luca at PDC 2009 next week

I’ll be in Los Angeles next week for PDC 2009. My session is called “Future Directions for C# and Visual Basic” and it comes on Tuesday immediately after the first keynote.

I’m planning on spending the first half of the session talking about the biggest trends influencing C# and VB. The second part will be about more future looking features we are playing with these days.

My deck has just two slides, as always. It’s all about watching me typing code on stage, hoping that I make crazy errors …

There are several other interesting language related sessions at PDC. Here is a list of them (please notice that the times might change). Also feel free to drop by the languages booth and chat. My team and I will be there most of the time.

Day

Time

Title

Presenter

Room

Tues 11/17

11:00 - 12:00

Future Directions for C# and Visual Basic

Luca Bolognese

Hall F

Tues 11/17

11:00 - 12:00

Accelerated Windows Application Development with Microsoft Visual C++ 2010

Boris Jabes

408B

Tues 11/17

12:30 - 13:15

Dynamic Binding in C# 4

Mads Torgersen

Hall F

Tues 11/17

12:30 - 13:15

Using Dynamic Languages to Build Scriptable Applications

Dino Viehland

403AB

Tues 11/17

13:30 - 14:30

C++ Forever: Interactive Applications in the Age of Manycore

Rick Molloy

515B

Tues 11/17

16:30 - 17:30

Manycore and the Microsoft .NET Framework 4: A Match Made in Microsoft Visual Studio 2010

Stephen Toub

502A

Tues 11/17

16:30 - 17:30

Code Contracts and Pex: Power Charge Your Assertions and Unit Tests

Mike Barnett, Nikolai Tillmann

408A

Wed 11/18

11:30 - 12:30

Microsoft Perspectives on the Future of Programming

Panel

Petree Hall C

Wed 11/18

12:00 - 13:00

F# for Architects – Hitting the Sweet Spot

Chris Smith

VS and .NET Pavillion

Wed 11/18

13:00 - 13:45

Code Like the Wind with Microsoft Visual Basic 2010

Lucian Wischik

Petree Hall D

Wed 11/18

13:00 - 13:45

Future of Garbage Collection

Patrick Dussud

Petree Hall C

Wed 11/18

13:00 - 13:45

Microsoft Project Code Name “M”: The Data and Modeling Language

Don Box, Jeff Pinkston

408A

Thu

11/19

08:30 - 09:30

PLINQ: LINQ, but Faster!

Ed Essey, Igor Ostrovsky

515A

Thu

11/19

10:00 - 11:00

A Lap around Microsoft Visual Studio 2010 for the Visual Basic Developer

Lisa Feigenbaum

VS and .NET Pavillion

Thu

11/19

10:00 - 11:00

Axum: A .NET Language for Safe and Scalable Concurrency

Niklas Gustafsson

515A

Thu

11/19

11:30 - 12:30

F# for Parallel and Asynchronous Programming

Luke Hoban

515A

Thu

11/19

12:45 - 13:30

Microsoft Visual C# IDE Tips and Tricks

DJ Park

Petree Hall D

Thu

11/19

12:45 - 13:30

Microsoft Visual Basic IDE Tips and Tricks

Dustin Campbell

Petree Hall C

Posted by lucabol | 4 Comments

Becoming really rich with C#

Or maybe not, please do not hold me responsible if you lose money following this system. Having said that, it is my opinion that there are very few concepts that are important in investing. Three big ones are value, diversification and momentum. This post is about the latter two and how to use C# to create a simple trading system that uses both.

Diversification is ‘not put all your eggs in one basket’ (contrary to ‘put all of them in one basket and watch that basket’). I don’t believe you can ‘watch’ very much in financial markets, so I tend to prefer diversification.

Momentum is a mysterious tendency of financial prices that have risen the most in the recent past, to continue outperforming in the close future. In essence, buying the top stocks/sectors/asset classes tends to outperform buying the bottom ones over horizons from three months to one year.

The idea then is to rank some assets (i.e. ETFs) by how fast they have risen in the past, go long the top ones and short the bottom ones. There are hundreds of variations of this basic strategy, we’ll add the rule that we won’t buy assets that are below their 200 days moving average or sell short assets that are above it.

I’m writing this code with VS 2010 Beta 2 (which hasn’t shipped yet). It should be trivial to modify it to run on B1 (or maybe it does run on it already). I attach the code and data files to this post.

struct Event {
    internal Event(DateTime date, double price) { Date = date; Price = price; }
    internal readonly DateTime Date;
    internal readonly double Price;
}

We’ll use this simple structure to load the closing price for a particular date. My use of internal is kind of bizarre. Actually the whole code might look strange. It is an interesting (maybe un-elegant) mix of object orientation and functional programming.

class Summary {
    internal Summary(string ticker, string name, string assetClass,
                    string assetSubClass, double? weekly, double? fourWeeks,
                    double? threeMonths, double? sixMonths, double? oneYear,
                    double? stdDev, double price, double? mav200) {
        Ticker = ticker;
        Name = name;
        AssetClass = assetClass;
        AssetSubClass = assetSubClass;
        // Abracadabra ...
        LRS = (fourWeeks + threeMonths + sixMonths + oneYear) / 4;
        Weekly = weekly;
        FourWeeks = fourWeeks;
        ThreeMonths = threeMonths;
        SixMonths = sixMonths;
        OneYear = oneYear;
        StdDev = stdDev;
        Mav200 = mav200;
        Price = price;
    }
    internal readonly string Ticker;
    internal readonly string Name;
    internal readonly string AssetClass;
    internal readonly string AssetSubClass;
    internal readonly double? LRS;
    internal readonly double? Weekly;
    internal readonly double? FourWeeks;
    internal readonly double? ThreeMonths;
    internal readonly double? SixMonths;
    internal readonly double? OneYear;
    internal readonly double? StdDev;
    internal readonly double? Mav200;
    internal double Price;

    internal static void Banner() {
        Console.Write("{0,-6}", "Ticker");
        Console.Write("{0,-50}", "Name");
        Console.Write("{0,-12}", "Asset Class");
        //Console.Write("{0,-30}\t", "Asset SubClass";
        Console.Write("{0,4}", "RS");
        Console.Write("{0,4}", "1Wk");
        Console.Write("{0,4}", "4Wk");
        Console.Write("{0,4}", "3Ms");
        Console.Write("{0,4}", "6Ms");
        Console.Write("{0,4}", "1Yr");
        Console.Write("{0,6}", "Vol");
        Console.WriteLine("{0,2}", "Mv");
        //Console.Write("{0,6}", "Pr");
        //Console.WriteLine("{0,6}", "M200");
    }

    internal void Print() {

        Console.Write("{0,-6}", Ticker);
        Console.Write("{0,-50}", new String(Name.Take(48).ToArray()));
        Console.Write("{0,-12}", new String(AssetClass.Take(10).ToArray()));
        //Console.Write("{0,-30}\t", new String(AssetSubClass.Take(28).ToArray()));
        Console.Write("{0,4:N0}", LRS * 100);
        Console.Write("{0,4:N0}", Weekly * 100);
        Console.Write("{0,4:N0}", FourWeeks * 100);
        Console.Write("{0,4:N0}", ThreeMonths * 100);
        Console.Write("{0,4:N0}", SixMonths * 100);
        Console.Write("{0,4:N0}", OneYear * 100);
        Console.Write("{0,6:N0}", StdDev * 100);
        if (Price <= Mav200)
            Console.WriteLine("{0,2}", "X");
        else
            Console.WriteLine();
        //Console.Write("{0,6:N2}", Price);
        //Console.WriteLine("{0,6:N2}", Mav200);
    }
}

The class Summary above is how I want to present my results. A few comments on the code. I use Nullable<T> because some of this values can be null (i.e. not enough history), but I still don’t want to worry about it. It ends up working rather neatly.

I also print the results out to Console, which is crazy. I really should be using WPF/Silverlight as the presentation layer. Also the {0,4:N0} notation might be unfamiliar to some of you, but this is how mad Console guys like myself avoid using real UI frameworks. Sometimes we print things in color too.

The real meat is in the following line:

LRS = (fourWeeks + threeMonths + sixMonths + oneYear) / 4;

That is our highway to richness. It’s a very elaborated quant formula, never before shown, that calculate a magick relative strength (aka momentum) factor as the average of the performance of four weeks, three months, six months and one year.

class TimeSeries {
    internal readonly string Ticker;
    readonly DateTime _start;
    readonly Dictionary<DateTime, double> _adjDictionary;
    readonly string _name;
    readonly string _assetClass;
    readonly string _assetSubClass;

    internal TimeSeries(string ticker, string name, string assetClass, string assetSubClass, 
IEnumerable<Event> events) { Ticker = ticker; _name = name; _assetClass = assetClass; _assetSubClass = assetSubClass; _start = events.Last().Date; _adjDictionary = events.ToDictionary(e => e.Date, e => e.Price); }

I then built myself a little TimeSeries class that represents a series of (date, price). I choose a dictionary to store it because of my assumption that I will be accessing it by date a lot. In retrospect, I was kind of right and kind of wrong. It doesn’t really matter much.

bool GetPrice(DateTime when, out double price, out double shift) {
    // To nullify the effect of hours/min/sec/millisec being different from 0
    when = new DateTime(when.Year, when.Month, when.Day);
    var found = false;
    shift = 1;
    double aPrice = 0;
    while (when >= _start && !found) {
        if (_adjDictionary.TryGetValue(when, out aPrice)) {
            found = true;
        }
        when = when.AddDays(-1);
        shift -= 1;
    }
    price = aPrice;
    return found;
}

A TimeSeries can give you back the price at a particular date. This looks bizarre and complex, but there is a reason for it. I might ask for a date that doesn’t have a price associated with it (i.e. holidays, week-ends). In such cases I want to return the previous price which could be N days in the past.

I also want to return how many days in the past I had to go, so that other calculations (i.e. Return) can modify their end date by the same amount. Also I might not find such a price at all, in which case I don’t want to throw an exception, but instead notify the caller. In retrospect, I should have used double? to signify ‘price not found’.

double? GetReturn(DateTime start, DateTime end) {
    var startPrice = 0.0;
    var endPrice = 0.0;
    var shift = 0.0;
    var foundEnd = GetPrice(end, out endPrice, out shift);
    var foundStart = GetPrice(start.AddDays(shift), out startPrice, out shift);
    if (!foundStart || !foundEnd)
        return null;
    else
        return endPrice / startPrice - 1;
}

We can now go and calculate the return between two dates. Also the TimeSeries object needs to perform a little more calculations.

    internal double? LastWeekReturn() {
        return GetReturn(DateTime.Now.AddDays(-7), DateTime.Now);
    }
    internal double? Last4WeeksReturn() {
        return GetReturn(DateTime.Now.AddDays(-28), DateTime.Now);
    }
    internal double? Last3MonthsReturn() {
        return GetReturn(DateTime.Now.AddMonths(-3), DateTime.Now);
    }
    internal double? Last6MonthsReturn() {
        return GetReturn(DateTime.Now.AddMonths(-6), DateTime.Now);
    }
    internal double? LastYearReturn() {
        return GetReturn(DateTime.Now.AddYears(-1), DateTime.Now);
    }
    internal double? StdDev() {
        var now = DateTime.Now;
        now = new DateTime(now.Year, now.Month, now.Day);
        var limit = now.AddYears(-3);
        var rets = new List<double>();
        while (now >= _start.AddDays(12) && now >= limit) {
            var ret = GetReturn(now.AddDays(-7), now);
            rets.Add(ret.Value);
            now = now.AddDays(-7);
        }
        var mean = rets.Average();
        var variance = rets.Select(r => Math.Pow(r - mean, 2)).Sum();
        var weeklyStdDev = Math.Sqrt(variance / rets.Count);
        return weeklyStdDev * Math.Sqrt(40);
    }
    internal double? MAV200() {
        return _adjDictionary
.ToList()
.OrderByDescending(k => k.Key)
.Take(200)
.Average(k => k.Value); } internal double TodayPrice() { var price = 0.0; var shift = 0.0; GetPrice(DateTime.Now, out price, out shift); return price; } internal Summary GetSummary() { return new Summary(Ticker, _name, _assetClass, _assetSubClass,
LastWeekReturn(), Last4WeeksReturn(), Last3MonthsReturn(),
Last6MonthsReturn(), LastYearReturn(), StdDev(), TodayPrice(),
MAV200()); } }

Nothing particularly interesting in this code. Just a bunch of calculations. The MAV200 is the 200 days moving average of closing prices. It shows a more functional way of doing things. The StdDev function is instead very imperative.

We now can work on downloading the prices. This is how you construct the right URL:

static string CreateUrl(string ticker, DateTime start, DateTime end) {
    return @"http://ichart.finance.yahoo.com/table.csv?s=" + ticker + "&a="
+ (start.Month - 1).ToString() + "&b=" + start.Day.ToString() + "&c="
+ start.Year.ToString() + "&d=" + (end.Month - 1).ToString() + "&e="
+ end.Day.ToString() + "&f=" + end.Year.ToString() + "&g=d&ignore=.csv"; }

 

And let’s set how many concurrent connections we are going to use …

ServicePointManager.DefaultConnectionLimit = 10;

On my machine, setting this number too high causes errors to be returned. I’m not sure on which side of the connection the problem lies.

We can then load all the tickers we want to load from a file. One of the files has Leveraged ETFs, which I want to filter out because they tend to pop up always at the top.

var tickers =
    //File.ReadAllLines("ETFs.csv")
    //File.ReadAllLines("ETFTest.csv")
    File.ReadAllLines("AssetClasses.csv")
    .Skip(1)
    .Select(l => l.Split(new[] { ',' }))
    .Where(v => v[2] != "Leveraged")
    .Select(values => Tuple.Create(values[0], values[1], values[2], values[3]))
    .ToArray();
var len = tickers.Length;

var start = DateTime.Now.AddYears(-2);
var end = DateTime.Now;
var cevent = new CountdownEvent(len);
var summaries = new Summary[len];

And then load all of them, making sure to make an asynchronous call so not to keep the thread busy.

for(var i = 0; i < len; i++)  {
    var t = tickers[i];
    var url = CreateUrl(t.Item1, start, end);
    using (var webClient = new WebClient()) {
        webClient.DownloadStringCompleted +=
new DownloadStringCompletedEventHandler(downloadStringCompleted); webClient.DownloadStringAsync(new Uri(url), Tuple.Create(t, cevent, summaries, i)); } } cevent.Wait();

 

Notice the use of a Countdown event to wait for all the thread to complete before printing out the results. Also notice the new Tuple<T> class used to package things to send around.

We can then print out the top and bottom 15%:

var top15perc =
        summaries
        .Where(s => s.LRS.HasValue)
        .OrderByDescending(s => s.LRS)
        .Take((int)(len * 0.15));
var bottom15perc =
        summaries
        .Where(s => s.LRS.HasValue)
        .OrderBy(s => s.LRS)
        .Take((int)(len * 0.15));

Console.WriteLine();
Summary.Banner();
Console.WriteLine("TOP 15%");
foreach(var s in top15perc)
    s.Print();

Console.WriteLine();
Console.WriteLine("Bottom 15%");
foreach (var s in bottom15perc)
    s.Print();

 

Here is what we do when a request comes back with data:

static void downloadStringCompleted(object sender, DownloadStringCompletedEventArgs e) {
    var bigTuple =
(Tuple<Tuple<string, string, string, string>, CountdownEvent, Summary[], int>)
e.UserState; var tuple = bigTuple.Item1; var cevent = bigTuple.Item2; var summaries = bigTuple.Item3; var i = bigTuple.Item4; var ticker = tuple.Item1; var name = tuple.Item2; var asset = tuple.Item3; var subAsset = tuple.Item4; if (e.Error == null) { var adjustedPrices = e.Result .Split(new[] { '\n' }) .Skip(1) .Select(l => l.Split(new[] { ',' })) .Where(l => l.Length == 7) .Select(v => new Event(DateTime.Parse(v[0]), Double.Parse(v[6]))); var timeSeries = new TimeSeries(ticker, name, asset, subAsset, adjustedPrices); summaries[i] = timeSeries.GetSummary(); cevent.Signal(); Console.Write("{0} ", ticker); } else { Console.WriteLine("[{0} ERROR] ", ticker); //Console.WriteLine(e.Error); summaries[i] = new Summary(ticker, name, "ERROR", "ERROR", 0, 0, 0, 0, 0, 0,0,0); cevent.Signal(); } }

We first unpack the Tuple we sent out originally, we then extract the Date and Price, create a Summary object and store it in the summaries array. It’s important to remember to Signal to the cevent in the error case as well because we want to print out the results even if some downloading failed.

And here is what you get for your effort:

image

Posted by lucabol | 13 Comments
Filed under:

Attachment(s): SystemCodeAndData.zip

LAgent: an agent framework in F# – Part IX – Counting words …

Download framework here.

All posts are here:

Let’s now use our mapReduce to do something more interesting, for example finding the frequency of words in several books. Now the agent that processes the output needs to be a bit more complex.

let gathererF = fun msg (data:List<string * int>, counter, step) ->
                    match msg with
                    | Reduced(key, value)   ->
                        if counter % step = 0 then
                            printfn "Processed %i words. Now processing %s" counter key 
                        data.Add((key, value |> Seq.hd))
                        data, counter + 1, step
                    | MapReduceDone         ->
                        data
                        |> Seq.distinctBy (fun (key, _) -> key.ToLower())
                        |> Seq.filter (fun (key, _) -> not(key = "" || key = "\"" ||
(fst (Double.TryParse(key))))) |> Seq.to_array |> Array.sortBy snd |> Array.rev |> Seq.take 20 |> Seq.iter (fun (key, value) -> printfn "%A\t\t%A" key value) printfn "All done!!" data, counter, step let gatherer = spawnAgent gathererF (new List<string * int>(), 0, 1000)

Every time a new word is reduced, a message is printed out and the result is added to a running list. When everything is done such a list is printed out by first manipulating it to reduce weirdness and limit the number of items. BTW: there are at least two bugs in this code, maybe more (late night quick-and-dirty-see-if-the-algo-works kind of coding).

We want to maximize the number of processors to use, so let’s split the books in chunks so that they can be operated in parallel. The code below roughly does it (I say roughly because it doesn’t chunk the lines in the right order, but for this particular case it doesn’t matter).

let gatherer = spawnAgent gathererF (new List<string * int>(), 0, 1000)

let splitBook howManyBlocks fileName =
    let buffers = Array.init howManyBlocks (fun _ -> new StringBuilder())
    fileName
    |> File.ReadAllLines
    |> Array.iteri (fun i line -> buffers.[i % (howManyBlocks)].Append(line) |> ignore)
    buffers

let blocks1 = "C:\Users\lucabol\Desktop\Agents\Agents\kjv10.txt" |> splitBook 100
let blocks2 = "C:\Users\lucabol\Desktop\Agents\Agents\warandpeace.txt" |> splitBook 100
let input =
    blocks1
    |> Array.append blocks2
    |> Array.mapi (fun i b -> i.ToString(), b.ToString())

And let’s execute!!

mapReduce input map reduce gatherer 20 20 partitionF

On my machine I get the following, which could be the right result.

"a"        16147
"And"        13071
"I"        11349
"unto"        8125
"as"        6400
"her"        5865
"which"        5544
"from"        5378
"at"        5175
"on"        5155
"have"        5135
"me"        5068
"my"        4629
"this"        3782
"out"        3653
"ye"        3399
"when"        3312
"an"        2841
"upon"        2558
"so"        2489
All done!!
Posted by lucabol | 0 Comments
Filed under:

LAgent: an agent framework in F# – Part VIII - Implementing MapReduce (user model)

Download framework here.

All posts are here:

For this post I use a newer version of the framework that I just uploaded on CodeGallery. In the process of using LAgent I grew more and more unhappy with the weakly typed way of sending messages. The code that implements that feature is nasty: full of upcasts and downcasts. I was losing faith in it. Bugs were cropping up in all sorts of scenarios (i.e. using generic union types as messages).

In the end I decided to re-architecture the framework so to make it strongly typed. In essence now each agent can just receive messages of a single type. The limitations that this design choice introduces (i.e. more limited hot swapping) are compensated by the catching of errors at compile time and the streamlining of the code. I left the old framework on the site in case you disagree with me.

In any case, today’s post is about MapReduce. It assumes that you know what it is (link to the original Google paper that served as inspiration is here: Google Research Publication- MapReduce). What would it take to implement an in-memory MapReduce using my agent framework?

Let’s start with the user model.

let mapReduce   (inputs:seq<'in_key * 'in_value>)
                (map:'in_key -> 'in_value -> seq<'out_key * 'out_value>)
                (reduce:'out_key -> seq<'out_value> -> seq<'reducedValues>)
                outputAgent
                M R partitionF =                

mapReduce takes seven parameters:

  1. inputs: a sequence of input key/value pairs.
  2. map: this function operates on each input key/value pair. It  returns a sequence of output key/value pairs. The type of the output sequence can be different from the type of the inputs.
  3. reduce: this function operates on an output key and all the values associated with it. It returns a sequence of reduced values (i.e. the average of all the values for this key)
  4. ouputAgent: this is the agent that gets notified every time a new output key has been reduced and at the end when all the operation ends.
  5. M: how many mapper agents to instantiate
  6. R: how many reducer agents to instantiate
  7. partitionF: the partition function used to choose which of the reducers is associated with a key

Let’s look at how to use this function to find how often each word is used in a set of files. First a simple partition function can be defined as:

let partitionF = fun key M -> abs(key.GetHashCode()) % M 

Given a key and some buckets, it picks one of the buckets. Its type is: ‘a –> int –> int, so it’s fairly reusable.

Let’s also create a basic agent that just prints out the reduced values:

let printer = spawnWorker (fun msg ->
                            match msg with
                            | Reduced(key, value)   -> printfn "%A %A" key value
                            | MapReduceDone         -> printfn "All done!!")

The agent gets notified whenever a new key is reduced or the algorithm ends. It is useful to be notified immediately instead of waiting for everything to be done. If I hadn’t written this code using agents I would have not realized that possibility. I would simply have framed the problem as a function that takes an input and returns an output. Agents force you to think explicitly about the parallelism in your app. That’s a good thing.

The mapping function simply split the content of a file into words and adds a word/1 pair to the list. I know that there are much better ways to do this (i.e. regular expressions for the parsing and summing words counts inside the function), but I wanted to test the basic framework capabilities and doing it this way does it better.

let map = fun (fileName:string) (fileContent:string) ->
            let l = new List<string * int>()
            let wordDelims = [|' ';',';';';'.';':';'?';'!';'(';')';'\n';'\t';'\f';'\r';'\b'|]
            fileContent.Split(wordDelims) |> Seq.iter (fun word -> l.Add((word, 1)))
            l :> seq<string * int>

The reducer function simply sums the various word statistics sent by the mappers:

let reduce = fun key (values:seq<int>) -> [values |> Seq.sum] |> seq<int>

Now we can create some fake input to check that it works:

let testInput = ["File1", "I was going to the airport when I saw someone crossing";
"File2", "I was going home when I saw you coming toward me"]

And execute the mapReduce:

mapReduce testInput map reduce printer 2 2 partitionF

On my machine I get the following. You might get a different order because of the async/parallel processing involved. If I wanted a stable order I would need to change the printer agent to cache results on Reduced and process them on MapReduceDone (see next post).

"I" [4]
"crossing" [1]
"going" [2]
"home" [1]
"me" [1]
"the" [1]
"toward" [1]
"airport" [1]
"coming" [1]
"saw" [2]
"someone" [1]
"to" [1]
"was" [2]
"when" [2]
"you" [1]

In the next post we’ll process some real books …

Posted by lucabol | 4 Comments
Filed under:

LAgent: an agent framework in F# – part VII – An auction application

Download framework here.

All posts are here:

Here is an application that uses the framework we have been creating. It is an auction application and it is described in more detail here.

Let’s go through it.

type AuctionMessage =
  | Offer of int * AsyncAgent // Make a bid
  | Inquire of AsyncAgent     // Check the status
and AuctionReply =
  | StartBidding
  | Status of int * DateTime // Asked sum and expiration
  | BestOffer                // Ours is the best offer
  | BeatenOffer of int       // Yours is beaten by another offer
  | AuctionConcluded of      // Auction concluded
      AsyncAgent * AsyncAgent
  | AuctionFailed            // Failed without any bids
  | AuctionOver              // Bidding is closed
  
let timeToShutdown = 3000
let bidIncrement = 10 

This is the format of the messages that the clients can send and the action agent can reply to. F# is really good at this sort of thing. First, we need an auction agent:

let auctionAgent seller minBid closing =
    let agent = spawnAgent (fun msg (isConcluded, maxBid, maxBidder) ->
                            match msg with
                            | Offer (_, client) when isConcluded ->
                                client <-- AuctionOver
                                (isConcluded, maxBid, maxBidder)
                            | Offer(bid, client) when not(isConcluded) ->
                                if bid >= maxBid + bidIncrement then
                                    if maxBid >= minBid then maxBidder <-- BeatenOffer bid                  
                                    client <-- BestOffer
                                    (isConcluded, bid, client)
                                else
                                    client <-- BeatenOffer maxBid
                                    (isConcluded, maxBid, maxBidder)
                            | Inquire client    ->
                                client <-- Status(maxBid, closing)
                                (isConcluded, maxBid, maxBidder))
                            (false, (minBid - bidIncrement), spawnWorker (fun _ -> ()))                             

Notice that, if the action is concluded, the agent replies to offers by sending an AuctionOver message. If the auction is still open, then, in case the bid is higher than the max, it sets a new max and notify the two parties involved; otherwise it notifies the bidder that the offer wasn’t successful. Also you can ask for the status of the auction.

This is what the code above says. Maybe the code is simpler than words. Anyhow, we need to treat the case where no message is received for some amount of time.

agent <-- SetTimeoutHandler
            (closing - DateTime.Now).Milliseconds
            (fun (isConcluded: bool, maxBid, maxBidder) ->
                if maxBid >= minBid then
                  let reply = AuctionConcluded(seller, maxBidder)
                  maxBidder <-- reply
                  seller <-- reply
                else seller <-- AuctionFailed
                agent <-- SetTimeoutHandler
                    timeToShutdown
                    (fun (_:bool, _:int,_:AsyncAgent) -> StopProcessing)
                ContinueProcessing (true, maxBid, maxBidder))   
agent            

We start by waiting for the amount of time to the closing of the auction. If we get no messages, then two things might happen: we have an offer that is more than the minimum or we don’t. If we do, we tell everyone that it’s finished. Otherwise, we tell the seller that its item wasn’t successful.  In any case, we prepare the agent to shutdown by setting its next timeout to be timeoutToShutdown.

It is interesting that we set the timeout handler inside the timeout handler. This is not a problem because of the nature of message processing (aka it processes one message at the time).

We then need a bunch of of symbols …

module Auction =
  let random = new Random()
  
  let minBid = 100
  let closing = DateTime.Now.AddMilliseconds 10000.
  
  let seller = spawnWorker (fun (msg:AuctionReply) -> ())
  let auction = auctionAgent seller minBid closing

Not a very smart seller we have here … Next up is our definition of a client.

let rec c = spawnAgent (
                fun msg (max, current) ->
                    let processBid (aMax, aCurrent) =
                        if aMax >= top then
                            log "too high for me"
                            (aMax, aCurrent)
                        elif aCurrent < aMax then
                              let aCurrent = aMax + increment
                              Thread.Sleep (1 + random.Next 1000)
                              auction <-- Offer(aCurrent, c)
                              (aMax, aCurrent)
                        else (aMax, aCurrent)                       
                    match msg with
                    | StartBidding      ->
                        auction <-- Inquire c
                        (max, current)
                    | Status(maxBid,_)  ->
                        log <| sprintf "status(%d)" maxBid
                        let s = processBid (maxBid, current)
                        c <-- SetTimeoutHandler timeToShutdown (fun _ -> StopProcessing) 
                        s
                    | BestOffer ->
                        log <| sprintf "bestOffer(%d)" current
                        processBid(max, current)
                    | BeatenOffer maxBid ->
                        log <| sprintf "beatenOffer(%d)" maxBid
                        processBid(maxBid, current)
                    | AuctionConcluded(seller, maxBidder) ->
                        log "auctionConcluded"
                        c <-- Stop
                        (max, current)
                    | AuctionOver ->
                        log "auctionOver"
                        c <-- Stop
                        (max, current))
                 (0,0)
c

Something that I like about agents is the fact that you need to understand just small snippets of code at the time. For example, you can read the processing for BestOffer and figure out if it makes sense.  I have an easy time personalizing them as in : “Ok, the guy just got a notification that there has been a better offer, what is he going to do next?”.

The code should be self explanatory for the most part. In essence, if you can offer more, do it otherwise wait for the auction to end. I’m not even sure the processing is completely right. I confess I’m just trying to do the same as Matthews code from the link above.

We can then start up the whole thing and enjoy the cool output.

open Auction

(client 1 20 200) <-- StartBidding
(client 2 10 300) <-- StartBidding
(client 3 30 150) <-- StartBidding
Console.ReadLine() |> ignore  

Now for the nasty part. Implementing the framework.

Posted by lucabol | 3 Comments
Filed under:

LAgent: an agent framework in F# – Part VI – Hot swapping of code (and something silly)

Download framework here.

All posts are here:

Hot swapping of code

Let’s get back a couple of steps and consider what happens when you get an error. Sure, your agent will continue processing messages, but it might be doing the wrong thing. Your message handling code might be buggy.

Ideally you’d want to patch things on the fly. You’d want to replace the message processing code for an agent without stopping it.

Here is how you do it:

let counter2 = spawnAgent (fun msg state -> printfn "From %i to %i" state (state + msg);
state + msg) 0 counter2 <-- 2 counter2 <-- SetAgentHandler(fun msg state –>
printfn "From %i to %i via multiplication" state (state * msg); msg * state) counter2 <-- 3

Which generates:

From 0 to 2
From 2 to 6 via multiplication

After the agent receives a SetAgentHandler message, it switch from a ‘+’ agent to a ‘*’ agent on the fly!! All the messages that come after that one gets multiplied to the state. Also, the state is preserved between changes in behavior.

It might not be immediately apparent how to load a function at runtime, but it is really simple. Imagine that I get the data on the function to load from somewhere (i.e. a management console UI).

let assemblyNameFromSomewhere, typeNameFromSomewhere, methodNameFromSomewhere = 
"mscorlib.dll", "System.Console", "WriteLine"

I can then use it to dynamically load a message handler (in this case Console.Writeline).

let a = Assembly.Load(assemblyNameFromSomewhere)
let c = a.GetType(typeNameFromSomewhere)
let m = c.GetMethod(methodNameFromSomewhere, [|"".GetType()|])
let newF = fun (msg:string) (state:obj) -> m.Invoke(null, [| (msg:>obj) |])

And then it is as simple as posting a SetAgentHandler.

counter2 <-- SetAgentHandler(newF)
counter2 <-- "blah"

Now our counter2 agent has become an echo agent on the fly, having loaded Console.WriteLine dynamically. Note how the agent moved from being a ‘+’ agent taking integers to being a ‘*’ agent taking integers to being an ‘echo’ agent taking strings. And it didn’t stop processing messages for the whole time.

Obviously, you can do the same thing with workers:

echo <-- SetWorkerHandler(fun msg -> printfn "I'm an echo and I say: %s" msg)
echo <-- "Hello"

And parallelWorkers:

parallelEcho <-- SetWorkerHandler(fun msg -> tprint ("I'm new and " + msg))
messages |> Seq.iter (fun msg -> parallelEcho <-- msg)

A silly interlude

As a way to show some agents talking to each other, here is a simple program that simulates marital interactions (of the worst kind):

let rec husband = spawnWorker (fun (To, msg) -> printfn "Husband says: %s" msg; To <-- msg)
let rec wife = spawnWorker (fun msg -> printfn "Wife says: screw you and your '%s'" msg)
husband <-- (wife, "Hello")
husband <-- (wife, "But darling ...")
husband <-- (wife, "ok")

Which produces:

Husband says: Hello
Husband says: But darling ...
Wife says: screw you and your 'Hello'
Wife says: screw you and your 'But darling ...'
Husband says: ok
Wife says: screw you and your 'ok'

And yes, you cannot expect messages to be in the right sequence … Next up is an auction application.

Posted by lucabol | 1 Comments

LAgent: an agent framework in F# – Part V – Timeout management

Download framework here.

All posts are here:

Timeout management

Timeouts are very important in message based systems. In essence, if you are not getting messages for a certain period of time, that usually means something. It might be that something crashed, that other agents think that you are not online, or any other number of things. Hence the need to set timeouts and react when they are triggered.

You do that by writing the following:

counter1 <--SetTimeoutHandler 1000 
(fun state -> printfn "I'm still waiting for a message in state %A, come on ..."
state; ContinueProcessing(state))

Which generates the following message every second:

I'm still waiting for a message in state 2, come on ...
I'm still waiting for a message in state 2, come on .…

The first parameter to SetTimeoutHandler is how long to wait before triggering the handler. The second parameter is the handler that gets called whenever no message is received for that amount of time. Notice that the handler takes the current state of the agent and returns ContinueProcessing(state).  This tells the agent to continue processing messages and sets the current state to state.

The following code:

counter1 <-- 2

Then generates:

I'm still waiting for a message in state 4, come on ...
I'm still waiting for a message in state 4, come on ...

ContinueProcessing is just one of the three possible values of the (terribly named) AfterError:

type AfterError =
| ContinueProcessing of obj
| StopProcessing
| RestartProcessing

Let’s see what RestartProcessing does.

counter1 <-- SetTimeoutHandler 1000  (fun state -> printfn "Restart from state %A" state
; RestartProcessing)

Which, as expected, generates a nice stream of:

Restart from state 0
Restart from state 0

To bring things back to normal (aka no timeout) you can just pass –1 as in:

counter1 <-- SetTimeoutHandler -1  (fun state -> ContinueProcessing(state))

Also, you can stop the agent when a timeout occurs by returning the aptly named StopProcessing:

counter1 <-- SetTimeoutHandler 1000  (fun state -> printfn "Restart from state %A" state; 
StopProcessing)
Another interesting thing you might want to do is hot swapping of code. More on that in the next part …
Posted by lucabol | 0 Comments
Filed under:

LAgent: an agent framework in F# – Part IV – Custom error management

Download framework here.

All posts are here:

Custom error management

In the last part we saw what happens by default in the framework when an error occurs. But that might not be what you want. You might want to have your sophisticated error detection and recovery distributed algorithm.

To make such a thing possible each agent has a manager. The manager is an agent that gets called whenever an error occurs in the agent it is monitoring.

In code:

let manager = spawnWorker (fun (agent, name:string, ex:Exception, msg:obj,
state, initialState) -> printfn "%s restarting ..." name; agent <-- Restart) counter1 <-- SetManager(manager)

Whenever an error is generated the manager receives a tuple of:

(agent, name, exception, message, currentState, inititialState)

This manager prints out something and then restarts the agent. Let’s trigger an error by posting the wrong message:

counter1 <-- "afdaf"
counter1 <-- 2

The expectation is that the counter will restart from 0 whenever an error is triggered. This is what happens:

Bob restarting ...
From 0 to 2

Which is what we expected. Obviously this is not a very sophisticated error recovery algorithm. You might want to do something more meaningful. Hopefully you have enough information to build whatever you need.

A particularly important class of unexpected event is timeouts. We’ll talk about them next.

Posted by lucabol | 0 Comments
Filed under:

LAgent: an agent framework in F# – Part III – Default error management

Download framework here.

All posts are here:

Default error management

What happens when an error occurs? Well, ideally you want to notify someone and continue processing messages. By default you want to print the error and as much information as you can about it.

Let’s first see what happens if you pass the wrong message type:

counter1 <-- "fst"

Generates:

> The exception below occurred on agent Undefined at state 3 with message "fst". The agent was started with state 0.
System.InvalidCastException: Specified cast is not valid.
   at Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicFunctions.UnboxGeneric[T](Object source)
   at FSI_0003.AgentSystem.f@158.Invoke(Object a, Object b)
   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)
   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)
   at FSI_0003.AgentSystem.loop@20-3.Invoke(Unit unitVar)
   at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@245.Invoke(AsyncParams`1 args)

You get information about the current state of the agent, the message that generated the error, the initial state of the agent and the exception that was generated. But, in a system with several agents, you’d like to know which one agent failed. Then you need to name your agent:

counter1 <-- SetName("Bob")
counter1 <-- "fadfad"

Now you get (important part in blue):

> The exception below occurred on agent Bob at state 3 with message "fadfad". The agent was started with state 0.
System.InvalidCastException: Specified cast is not valid.
   at Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicFunctions.UnboxGeneric[T](Object source)
   at FSI_0003.AgentSystem.f@158.Invoke(Object a, Object b)
   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)
   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)
   at FSI_0003.AgentSystem.loop@20-3.Invoke(Unit unitVar)
   at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@245.Invoke(AsyncParams`1 args)

The important thing is that the agent continues running. It lives to fight another day. Hence:

counter1 <-- 3

Produces:

From 3 to 6

Which shows that the agent is running and that it has kept its current state. Also errors can occur inside the message handler with a similar result:

(spawnAgent (fun msg state -> state / msg) 100) <-- 0

Produces:

> The exception below occurred on agent Undefined at state 100 with message 0. The agent was started with state 100.
System.DivideByZeroException: Attempted to divide by zero.
   at FSI_0013.it@48-3.Invoke(Int32 msg, Int32 state)
   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)
   at FSI_0003.AgentSystem.f@158.Invoke(Object a, Object b)
   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)
   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)
   at FSI_0003.AgentSystem.loop@20-3.Invoke(Unit unitVar)
   at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@245.Invoke(AsyncParams`1 args)

But this might not be what you want. You might want to customize what happens when an error occurs. We’ll talk about that next.

Posted by lucabol | 1 Comments
Filed under:

LAgent : an agent framework in F# – Part II – Agents and control messages

Download framework here.

All posts are here:

Agents

Agents are entities that process messages and keep state between one message and the next. As such they need to be initialized with a lambda that takes a message and a state and returns a new state. In F# pseudo code: msg –> state –> newState. For example the following:

let counter = spawnAgent (fun msg state -> state + msg) 0

This is a counter that starts from 0 and gets incremented by the value of the received message. Let’s make it print something when it receives a message:

let counter1 = spawnAgent
(fun msg state -> printfn "From %i to %i" state (state + msg); state + msg) 0 counter1 <-- 3 counter1 <-- 4

Which produces:

From 0 to 3
From 3 to 7

There is no spawnParallelAgent, because I couldn’t figure out its usage patterns. Maybe I don’t have enough creativity. Obviously msg and state could be of whatever type (in real application they end up being tuples more often than not).

Control messages

You can do things to agents. I’m always adding to them but at this stage they are:

type Command =
| Restart
| Stop
| SetManager of AsyncAgent
| SetName of string

Plus some others. I’ll describe most of them later on, right now I want to talk about Restart and Stop. You use the former like this:

counter1 <-- Restart
counter1 <-- 3

Which produces:

From 0 to 3

This should be somehow surprising to you. You would have thought that you could just post integers to a counter. This is not the case. You can post whatever object. This is useful because it allows to have a common model for passing all sort of messages, it allows for the agent not to be parameterized by the type of the message (and of state) so that you can store them in data structures and allows advanced scenarios (i.e. hot swapping of code).

This is a debatable decision. I tried to get the best of strongly typing and dynamic typing, while keeping simplicity of usage. The implementation of this is kind of a mess though. We’ll get there.

BTW: you use Stop just by posting Stop, which stops the agent (forever).

Posted by lucabol | 3 Comments
Filed under:

LAgent : an agent framework in F# – Part I – Workers and ParallelWorkers

Download framework here.

All posts are here:

    Introduction

    I like to try out different programming paradigms. I started out as an object oriented programmer. In university, I used Prolog. I then learned functional programming. I also experimented with various shared memory parallel paradigms (i.e. async, tasks and such). I now want to learn more about message based parallel programming (Erlang style). I’m convinced that doing so makes me a better programmer. Plus, I enjoy it …

    My usual learning style is to build a framework that replicates a particular programming model and then write code using it. In essence, I build a very constrained environment. For example, when learning functional programming, I didn’t use any OO construct for a while even if my programming language supports them.

    In this case, I built myself a little agent framework based on F# MailboxProcessors. I could have used MailboxProcessors directly, but they are too flexible for my goal. Even to write a simple one of these guys, you need to use async and recursion in a specific pattern, which I always forget. Also, there are multiple ways to to do Post. I wanted things to be as simple as possible. I was willing to sacrifice flexibility for that.

    Notice that there are serious efforts in this space (as Axum). This is not one of them. It’s just a simple thing I enjoy working on between one meeting and the next.

    Workers and ParallelWorkers

    The two major primitives are spawning an agent and posting a message.

    let echo = spawnWorker (fun msg -> printfn "%s" msg)
    echo <-- "Hello guys!"

    There are two kinds of agents in my system. A worker is an agent that doesn’t keep any state between consecutive messages. It is a stateless guy. Notice that the lambda that you pass to create the agent is strongly typed (aka msg is of type string). Also notice that I overloaded the <— operator to mean Post.

    Given that a worker is stateless, you can create a whole bunch of them and, when a message is posted, route it to one of them transparently.

    let parallelEcho = spawnParallelWorker(fun s -> printfn "%s" s) 10
    parallelEcho <-- "Hello guys!”

    For example, in the above code, 10 workers are created and, when a message is posted, it gets routed to one of them (using a super duper innovative dispatching algorithm I’ll describe in the implementation part). This parallelWorker guy is not really needed, you could easily built it out of the other primitives, but it is kind of cute.

    To show the difference between a worker and a parallelWorker, consider this:

    let tprint s = printfn "%s running on thread %i" s Thread.CurrentThread.ManagedThreadId
    let echo1 = spawnWorker (fun s -> tprint s)
    let parallelEcho1 = spawnParallelWorker(fun s -> tprint s) 10
    
    let messages = ["a";"b";"c";"d";"e";"f";"g";"h";"i";"l";"m";"n";"o";"p";"q";"r";"s";"t"]
    messages |> Seq.iter (fun msg -> echo1 <-- msg)
    messages |> Seq.iter (fun msg -> parallelEcho1 <-- msg)

     

    The result of the echo1 iteration is:

    a running on thread 11
    b running on thread 11
    c running on thread 11
    d running on thread 11

    While the result of the parallelEcho1 iteration is:

    a running on thread 13
    c running on thread 14
    b running on thread 12
    o running on thread 14
    m running on thread 13

    Notice how the latter executes on multiple threads (but not in order). Next time I’ll talk about agents, control messages and error management.

    Posted by lucabol | 1 Comments
    Filed under:

    A version of the AsyncCache found its way into the Parallel Programming samples …

    Go here to download them. It is in \ParallelExtensionsExtras\CoordinationDataStructures. It has a slightly different design in that it returns Tasks. I’m trying to get Stephen to blog about it so that you can compare them.

    Posted by lucabol | 3 Comments
    Filed under: , ,

    I talk about C# and VB Co-Evolution on Channel9 (and some F# …)

    The title says it all. If you are interested, go here.

    Posted by lucabol | 3 Comments
    Filed under: , ,

    An Async Html cache – part II – Testing the cache

    Other posts:

    Let’s try out our little cache. First I want to write a synchronous version of it as a baseline.

        Private Shared Sub TestSync(ByVal sites() As String, ByVal sitesToDownload As Integer, ByVal howLong As Integer)
            Dim syncCache As New Dictionary(Of String, String)
            Dim count = sites.Count()
            Dim url1 = "http://moneycentral.msn.com/investor/invsub/results/statemnt.aspx?Symbol="
    
            For i = 0 To sitesToDownload - 1
                Dim html As String = ""
                Dim url = url1 & sites(i Mod count)
                If Not syncCache.TryGetValue(url, html) Then
                    html = LoadWebPage(url)
                    syncCache(url) = html
                End If
                DoWork(html, howLong)
            Next
        End Sub

    This is a loop that loads webpages in the cache if they are not already there. sites is a list of tickers used to compose the urls; sitesToDownload is the total number of sites to download, so that a single url can be loaded multiple times; howLong represents the work to be done on each loaded page.

    In this version the cache is simply a Dictionary and there is no parallelism. The two bold lines is where the cache is managed.

    DoWork is this.

        Public Shared Sub DoWork(ByVal html As String, ByVal howLong As Integer)
            Thread.Sleep(howLong)
        End Sub

    Let’s take a look at the asynchronous version.

        Private Shared Sub TestAsync(ByVal sites() As String, ByVal sitesToDownload As Integer, ByVal howLong As Integer)
            Dim htmlCache As New HtmlCache
            Dim count = sites.Count()
            Dim url = "http://moneycentral.msn.com/investor/invsub/results/statemnt.aspx?Symbol="
            Using ce = New CountdownEvent(sitesToDownload)
                For i = 1 To sitesToDownload
                    htmlCache.GetHtmlAsync(
                        url & sites(i Mod count),
                        Sub(s)
                            DoWork(s, howLong)
                            ce.Signal()
                        End Sub)
                Next
                ce.Wait()
            End Using

    There are several points worth making on this:

    • The lambda used as second parameter for GetHtmlAsync is invoked on a different thread whenever the html has been retrieved (which could be immediately if the cache has downloaded the url before)
    • CountDownEvent allows a thread to wait for a certain number of signals to be sent. The waiting happens on the main thread in the ce.Wait() instruction. The triggering of the event happens in the lambda described in the point above (the ce.Signal() instruction)

    This is the driver for the overall testing.

        Private Shared Sub TestPerf(ByVal s As String, ByVal a As Action, ByVal iterations As Integer)
            Dim clock As New Stopwatch
    
            clock.Start()
            For i = 1 To iterations
                a()
            Next
            clock.Stop()
            Dim ts = clock.Elapsed
            Dim elapsedTime = String.Format(s & ": {0:00}:{1:00}:{2:00}.{3:00}", ts.Hours, ts.Minutes, ts.Seconds, ts.Milliseconds / 10)
            Console.WriteLine(elapsedTime, "RunTime")
        End Sub

    There is not much to say about it. Start the clock, perform a bunch of iterations of the passed lambda, stop the clock, print out performance.

    And finally the main method. Note that all the adjustable parameters are factored out before the calls to TestPerf.

        Public Shared Sub Main()
            Dim tickers = New String() {"mmm", "aos", "shlm", "cas", "abt", "anf", "abm", "akr", "acet", "afl", "agl", "adc", "apd",
    "ayr", "alsk", "ain", "axb", "are", "ale", "ab", "all"} Dim sitesToDownload = 50 Dim workToDoOnEachUrlInMilliSec = 20 Dim perfIterations = 5 TestPerf("Async", Sub() TestAsync(tickers, sitesToDownload, workToDoOnEachUrlInMilliSec), perfIterations) TestPerf("Sync", Sub() TestSync(tickers, sitesToDownload, workToDoOnEachUrlInMilliSec), perfIterations) End Sub

    Feel free to change (tickers, sitesToDownload, workToDoOnEachUrlInMilliSec, perfIterations). Depending on the ratios between these parameters and the number of cores on your machine, you’re going to see different results. Which highlights the fact that parallelizing your algorithms can yield performance gains or not depending on both software and hardware considerations. I get ~3X improvement on my box. I attached the full source file for your amusement.

    Posted by lucabol | 2 Comments
    Filed under: ,

    Attachment(s): AsyncCache.vb
    More Posts Next page »
     
    Page view tracker