Mathias Brandewinder on .NET, F#, VSTO and Excel development, and quantitative analysis / machine learning.
by Mathias 23. July 2012 16:12

Nothing fancy this week – just thought I would share some of what I learnt recently playing with the Reactive Extensions and F#.

Here is the context: my current week-ends project, Bumblebee, is a Solver, which, given a Problem to solve, will search for solutions, and fire an event every time an improvement is found. I am currently working on using in in Azure, to hopefully scale out and tackle problems of a larger scale than what I can achieve on a single local machine. One problem I ran into, though, is that if multiple worker roles begin firing events every time a solution is found, the system will likely grind to a halt trying to cope with a gazillion messages (not to mention a potentially unpleasantly high bill), whereas I really don’t care about every single solution – I care about being notified about some improvements, not necessarily every single one. What I want is an ability to “throttle” the flow of events coming from my solver, to receive, say, the best one every 30 seconds.

For illustration purposes, here is a highly simplified version of the Bumblebee solver:

type Generator() =

    let intFound = new Event<int>()
    member this.IntFound = intFound.Publish

    member this.Start() =
        Task.Factory.StartNew(fun () ->
            printfn "Searching for numbers..."
            for i in 0 .. 100 do
                intFound.Trigger(i)
                Thread.Sleep(500)
            ) |> ignore

The Generator class exposes a Start method, which, once called, will “generate” numbers from 0 to 100 – just like the Solver would return solutions of improving quality over time.

Generator declares an event, intFound, which will be triggered when we found a new integer of interest, and which is exposed through IntFound, which consumers can then subscribe to. When we Start the generator, we spin a new Task, which will be running on its own thread, and will simply produce integers from 0 to 100, with a 500ms delay between solutions.

The syntax for declaring an event is refreshingly simple, and we can use it in a way similar to what we would do in C#, by adding a Handler to the event, for instance in a simple Console application like this:

let Main =

    let handler i = printfn "Simple handler: got %i" i

    let generator = new Generator()
    generator.IntFound.Add handler

    generator.Start()

    let wait = Console.ReadLine()
    ignore ()

Create a handler that prints out an integer, hook it up to the event, and run the application – you should see something like this happening:

image

So far, nothing very thrilling.

However, there is more. Our event this.IntFound is an IEvent, which inherits from IObservable, and allows you to do all sort of fun stuff with your events, like transform and compose them into something more usable. Out-of-the-box, the F# Observable module provides a few useful functions. Instead of adding a handler to the event, let’s start by subscribing to the event:

let Main =

    let handler i = printfn "Simple handler: got %i" i

    let generator = new Generator()
    generator.IntFound.Add handler

    let interval = new TimeSpan(0, 0, 5)
    generator.IntFound
    |> Observable.subscribe (fun e -> printfn "Observed %i" e)
    |> ignore

    generator.Start()

    let wait = Console.ReadLine()
    ignore ()

This is doing essentially the same thing as before – running this will produce something along these lines:

image

As you can see, we have now 2 subscribers to the event. However, this is just where the fun begins. We can start transforming our event in a few ways – for instance, we could decide to filter out integers that are odd, and transform the result by mapping integers to floats, multiplied by 3 (why not?):

let Main =

    let handler i = printfn "Simple handler: got %i" i

    let generator = new Generator()
    generator.IntFound.Add handler

    let interval = new TimeSpan(0, 0, 5)
    generator.IntFound
    |> Observable.filter (fun e -> e % 2 = 0)
    |> Observable.map (fun e -> (float)e * 3.0)
    |> Observable.subscribe (fun e -> printfn "Observed %f" e)
    |> ignore

    generator.Start()

    let wait = Console.ReadLine()
    ignore ()

Still not the most thrilling thing ever, but it proves the point – from a sequence of Events that was returning integers, we managed to transform it into a fairly different sequence, all in a few lines of code:

image

The reason I was interested in Observables, though, is because a while back, I attended a talk , given by my good friend Petar, where he presented the Reactive Extensions (Rx) – and I remembered that Rx had a few nice utilities built-in to manage Observables, which would hopefully help me achieve my goal, throttling my sequence of events over time.

At that stage, I wasted a bit of time, trying first to figure out whether or not I needed Rx (the F# module already has a lot built in, so I was wondering if maybe it had all I needed…), then I got tripped up by figuring out what Rx method I needed, and how to make it work seamlessly with F# and the pipe-forward operator.

Needing some “throttling”, I rushed into the Throttle method, which looked plausible enough; unfortunately, throttle wasn’t doing quite what I thought it would – from what I gather, it filters out any event that is followed by another event within a certain time window. I see how this would come handy in lots of scenarios (think typing in a Search Box – you don’t want to trigger a Search while the person it typing, so waiting until no typing occurs is a good idea), but what I really needed was Sample, which returns only the latest event that occurred by regular time window.

Now there is another small problem: Observable.Sample takes in 2 arguments, the Observable to be sampled, and a sampling interval represented as a TimeSpan. The issue here is that because of the C#-style signature, we cannot directly use it with a pipe-forward. It’s simple enough to solve, though: create a small extension method, extending the Observable module with a composable function:

module Observable =
    let sample (interval: TimeSpan) (obs: IObservable<'a>) =
        Observable.Sample(obs, interval)

And we are now set! Armed with our new sample function, we can now do the following:

let Main =

    let handler i = printfn "Simple handler: got %i" i

    let generator = new Generator()
    generator.IntFound.Add handler

    let interval = new TimeSpan(0, 0, 5)
    generator.IntFound
    |> Observable.filter (fun e -> e % 2 = 0)
    |> Observable.map (fun e -> (float)e * 3.0)
    |> Observable.sample interval
    |> Observable.subscribe (fun e -> printfn "Observed %f" e)
    |> ignore

    generator.Start()

    let wait = Console.ReadLine()
    ignore ()

We sample our event stream every 5 seconds, returning only the latest that occurred in that window. Running this produces the following:

image

As you can see, while the original handler is capturing an event every half second, our Observable is showing up every 10 events, that is, every 5 seconds, which is exactly what we expected – and I have now exactly what I need to “throttle” the solutions stream coming from Bumblebee.

That’s it for today – fairly simple stuff, but hopefully this illustrates how easy it is to work with events in F#, and what Observables add to the table, and maybe this will come in useful for someone!

Additional resources I found useful or interesting underway:

Time Flies Like an Arrow in F#

Reactive Programming: First Class Events in F#

FSharp.Reactive

Full code sample (F# console application, using Rx Extensions)

open System
open System.Threading
open System.Threading.Tasks
open System.Reactive.Linq

type Generator() =

    let intFound = new Event<int>()
    [<CLIEvent>]
    member this.IntFound = intFound.Publish

    member this.Start() =
        Task.Factory.StartNew(fun () ->
            printfn "Searching for numbers..."
            for i in 0 .. 100 do
                intFound.Trigger(i)
                Thread.Sleep(500)
            ) |> ignore

module Observable =
    let sample (interval: TimeSpan) (obs: IObservable<'a>) =
        Observable.Sample(obs, interval)

let Main =

    let handler i = printfn "Simple handler: got %i" i

    let generator = new Generator()
    generator.IntFound.Add handler

    let interval = new TimeSpan(0, 0, 5)
    generator.IntFound
    |> Observable.filter (fun e -> e % 2 = 0)
    |> Observable.map (fun e -> (float)e * 3.0)
    |> Observable.sample interval
    |> Observable.subscribe (fun e -> printfn "Observed %f" e)
    |> ignore

    generator.Start()

    let wait = Console.ReadLine()
    ignore ()
by Mathias 27. June 2012 15:06

This months’ issue of MSDN Magazine has an interesting piece on evolutionary algorithms. The article applies a genetic algorithm to identify the minimum value of a “pathological” continuous function, the Schwefel function.

SchefelFunction

The Schwefel function

For X and Y values between –500 and 500, the “correct” answer is X and Y = 420.9687.

This function is designed to give fits to optimization algorithms. The issue here is that  the function has numerous peaks and valleys. As a result, if the search strategy is to use some form of gradient approach, that is, from a starting point, follow the steepest descent until a minimum is reached, there is a big risk to end up in a place which is a local minimum: the algorithm gets stuck in a valley with no path downwards, but there are other, better solutions around, which can be reached only by “climbing out of the hole” and temporarily following a path which heads in the wrong direction.

Out of curiosity, I checked how the Excel Solver would fare on this problem:

ExcelSchwefel

The result was an abject failure – not even close to the true solution.

I thought it would be interesting to see how Bumblebee, my Artificial Bee Colony framework, would perform. There are some general similarities between the underlying ideas behind the articles’ algorithm and Bumblebee, the main difference being that Bumblebee simply mutates individual solutions, and doesn’t create “crossover solutions”.

Let’s open Visual Studio, create an F# Console project, grab the NuGet package for Bumblebee – and start coding.

As usual, we need 4 elements to leverage Bumblebee – a Type of Solution, and 3 functions: a Generator, which returns a brand-new, random solution, a Mutator, which transforms a known solution into a new, similar solution, and an Evaluator, which evaluates a solution and returns a float, increasing with the quality of the solution.

In this case, the Solution type is fairly straightforward. We are looking for 2 floats x and y, so we’ll go for a Tuple. Similarly, the Evaluation is a given, it is the negative of the Schwefel function. The reason we go for the negative is that Bumblebee will try to maximize the Evaluation, so if we are looking for a minimum, we need to reverse the sign – because the Minimum of a function is the Maximum of its negative.

let schwefel x = 
   -x * Math.Sin(Math.Sqrt(Math.Abs(x)))

let evaluate (x, y) = 
   - schwefel (x) - schwefel (y)

The Generation function is also trivial – we’ll simply pick random pairs of floats in the [ –500.0; 500.0 ] interval:

let min = -500.0
let max = 500.0

let generate (rng: Random) = (
   rng.NextDouble() * (max - min) + min,
   rng.NextDouble() * (max - min) + min)

The Mutation function takes a tiny bit more of effort. The idea I followed is essentially the same as the one used in the article: given a solution defined by a pair of floats, randomly decide if any of the elements will be mutated, and if yes, add a random permutation to that element, scaled to the precision we want to achieve:

let precision = 0.00001
let rate = 0.5

let mutate (rng: Random) solution =
   let (x, y) = solution
   let x =
      if rng.NextDouble() < rate 
      then x + precision * ((max - min) * rng.NextDouble() + min)
      else x
   let y =
      if rng.NextDouble() < rate 
      then y + precision * ((max - min) * rng.NextDouble() + min)
      else y
   (x, y)

More...

by Mathias 6. June 2012 15:05

I have just pushed version 0.3 of Bumblebee on CodePlex. Bumblebee is an Artificial Bee Colony algorithm implementation in F#. In a nutshell, it’s a randomized search method that mimics the behavior of bees looking for food, and can be suitable to find approximate solutions to large scale problems where deterministic approaches are impractical. Bumblebee provides a C#-friendly Solver, which will search for solutions to a problem, sending messages to the caller whenever an improved solution is found, until the search is stopped. It uses the Task Parallel Library to parallelize the searches as much as possible.

The source code includes two sample projects that illustrate the algorithm in action on the Travelling Salesman Problem, one in F#, one in C#, with a “fancy” WPF user interface.

What’s new in Version 0.3? Mostly code cleanup – when I revisited my original code a few months after the fact, I had a hard time following it, so I reorganized things internally in a way which I hope is clearer. The main change from an API perspective is the constructor for C# problems: I removed the Tuples from the Func delegates, which were mostly noise and didn’t help much.

Other than that, the changes are mostly cosmetic: the main Search loop has been transformed into a recursive immutable loop, code has been shuffled around and renamed for readability, the test suite has been updated and uses the FsUnit NuGet package.

Speaking of NuGet, I published Bumblebee as a NuGet package here. It’s my first NuGet package, so hopefully I haven’t messed up anything there – if you see anything amiss, please let me know!

That’s it for the moment. I am working on some ideas right now, the main one being to use Azure to allow Bumblebee to scale out and attack larger problems – we’ll see how that turns out. I kept Bumblebee as Alpha stage, as I may still change the API in the future.

On a related note, I will be travelling to Boston mid-June, where I will have the pleasure to present Bumblebee at the New England F# user group. I am extremely excited about this opportunity (thanks to Talbott Crowell for the invitation!) - I plan on discussing the algorithm and its implementation, writing F# code live to solve a problem and hopefully show why F# is so fun to work with, and talk in general about my experience learning F# after years of C# development. I am really looking forward to it, and hope to see some of you there!

by Mathias 1. April 2012 09:47

Last week’s StackOverflow newsletter contained a fun problem I had never seen before: Bipartite Matching. Here is the problem:

There are N starting points (purple) and N target points (green) in 2D. I want an algorithm that connects starting points to target points by a line segment (brown) without any of these segments intersecting (red) and while minimizing the cumulative length of all segments.

Image from the original post on StackOverflow

I figured it would be fun to try out Bumblebee, my artificial bee colony library, on the problem. As the accepted answer points out, the constraint that no segment should intersect is redundant, and we only need to worry about minimizing the cumulative length, because reducing the length implies removing intersections.

As usual with Bumblebee, I’ll go first with the dumbest thing that could work. The solution involves matching points from two lists, so we’ll define a record type for Point and represent a Solution as two (ordered) lists of points, packed in a Tuple:

type Point = { X: float; Y: float }
let points = 100
let firstList = [ for i in 0 .. points -> { X = (float)i ; Y = float(i) } ]
let secondList =  [ for i in 0 .. points -> { X = (float)i ; Y = float(i) } ]

let root = firstList, secondList
We’ll start with a silly problem, where the 2 lists are identical: the trivial solution here is to match each point with itself, resulting in a zero-length, which will be convenient to see how well the algorithm is doing and how far it is from the optimum.

How can we Evaluate the quality of a solution? We need to pair up the points of each of the lists, compute the distance of each pair, and sum them up – fairly straightforward:

let distance pair =
   ((fst pair).X - (snd pair).X) ** 2.0 + ((fst pair).Y - (snd pair).Y) ** 2.0

let evaluate = fun (solution: Point list * Point list) -> 
   List.zip (fst solution) (snd solution)
   |> List.sumBy (fun p -> – distance p)

More...

by Mathias 30. January 2012 01:32

As happy as I am with how Bumblebee came out so far, there was one sore spot bugging me. I tried my best to write it in a functional style, but failed in one place. The inner part of the Search algorithm, which processes bees coming back in the queue with a new solution, was written as a while loop, using two mutable references to maintain the best solution found so far, and the list of solutions stored by the inactive bees.

And then it hit me yesterday, as I was reading some material on F# Azure worker roles.

Novice:

   Master, I fail to find the path to the immutable way.

Master:

   Immutable way?

   Nothing changes, just call the

   Immutable way.

This is an execrable Haiku, and a reminder to myself. When I get stuck with my old mutable ways in F#, usually the answer is recursion.

In this case, all it took was rewriting the inner loop as a recursive function. The original loop looked along these lines:

while cancel.IsCancellationRequested = false do
   match returnQueue.IsEmpty with
   // the returning bees queue is not empty
   | false -> 
      let success, bee = returnQueue.TryDequeue()
      match success with
      // a returning bee has been found in the queue
      | true -> 
         inactives := waggle !inactives bee
         let candidate = Hive.solutionOf bee
         if candidate.Quality > best.Value.Quality then
            best.Value <- candidate
            foundSolution.Trigger(new SolutionMessage<'a>(candidate))
         else ignore ()
      // more code, irrelevant to the point

The recursive version simply includes in its arguments list all the previously mutable variables, and calls itself, passing along the result of the updates, along these lines:

let rec loop (queue: ConcurrentQueue<Bee<'a>>) best inactives (cancel: CancellationTokenSource) =
   if cancel.IsCancellationRequested then ignore ()
   else
      let success, bee = queue.TryDequeue()
      if success then
      // a returning bee has been found in the queue
         let updatedInactives = waggle inactives bee
         let candidate = Hive.solutionOf bee
         let updatedBest = bestOf candidate best
         if candidate.Quality > best.Quality 
         then foundSolution.Trigger(new SolutionMessage<'a>(candidate))
         
         dispatchBee updatedInactives pickRandom waggle bee

         loop queue updatedBest updatedInactives cancel

      else loop queue best inactives cancel

The structure is essentially the same, but all the mutable variables are now gone; instead, the recursive function passes forward new “updated” values.

I think what tripped me up is the fact that I never used recursion to run an “infinite loop” before – I always saw them done using while true statements. Conversely, I always used recursion to produce an actual result, computing intermediary results until a termination condition was met. This one is a bit different, because the recursion simply passes along some state information, but doesn’t return anything (its return is unit) or terminate until a cancellation happens.

Comments

Comment RSS