Mathias Brandewinder on .NET, F#, VSTO and Excel development, and quantitative analysis / machine learning.
by Mathias 23. July 2012 16:12

Nothing fancy this week – just thought I would share some of what I learnt recently playing with the Reactive Extensions and F#.

Here is the context: my current week-ends project, Bumblebee, is a Solver, which, given a Problem to solve, will search for solutions, and fire an event every time an improvement is found. I am currently working on using in in Azure, to hopefully scale out and tackle problems of a larger scale than what I can achieve on a single local machine. One problem I ran into, though, is that if multiple worker roles begin firing events every time a solution is found, the system will likely grind to a halt trying to cope with a gazillion messages (not to mention a potentially unpleasantly high bill), whereas I really don’t care about every single solution – I care about being notified about some improvements, not necessarily every single one. What I want is an ability to “throttle” the flow of events coming from my solver, to receive, say, the best one every 30 seconds.

For illustration purposes, here is a highly simplified version of the Bumblebee solver:

type Generator() =

    let intFound = new Event<int>()
    member this.IntFound = intFound.Publish

    member this.Start() =
        Task.Factory.StartNew(fun () ->
            printfn "Searching for numbers..."
            for i in 0 .. 100 do
                intFound.Trigger(i)
                Thread.Sleep(500)
            ) |> ignore

The Generator class exposes a Start method, which, once called, will “generate” numbers from 0 to 100 – just like the Solver would return solutions of improving quality over time.

Generator declares an event, intFound, which will be triggered when we found a new integer of interest, and which is exposed through IntFound, which consumers can then subscribe to. When we Start the generator, we spin a new Task, which will be running on its own thread, and will simply produce integers from 0 to 100, with a 500ms delay between solutions.

The syntax for declaring an event is refreshingly simple, and we can use it in a way similar to what we would do in C#, by adding a Handler to the event, for instance in a simple Console application like this:

let Main =

    let handler i = printfn "Simple handler: got %i" i

    let generator = new Generator()
    generator.IntFound.Add handler

    generator.Start()

    let wait = Console.ReadLine()
    ignore ()

Create a handler that prints out an integer, hook it up to the event, and run the application – you should see something like this happening:

image

So far, nothing very thrilling.

However, there is more. Our event this.IntFound is an IEvent, which inherits from IObservable, and allows you to do all sort of fun stuff with your events, like transform and compose them into something more usable. Out-of-the-box, the F# Observable module provides a few useful functions. Instead of adding a handler to the event, let’s start by subscribing to the event:

let Main =

    let handler i = printfn "Simple handler: got %i" i

    let generator = new Generator()
    generator.IntFound.Add handler

    let interval = new TimeSpan(0, 0, 5)
    generator.IntFound
    |> Observable.subscribe (fun e -> printfn "Observed %i" e)
    |> ignore

    generator.Start()

    let wait = Console.ReadLine()
    ignore ()

This is doing essentially the same thing as before – running this will produce something along these lines:

image

As you can see, we have now 2 subscribers to the event. However, this is just where the fun begins. We can start transforming our event in a few ways – for instance, we could decide to filter out integers that are odd, and transform the result by mapping integers to floats, multiplied by 3 (why not?):

let Main =

    let handler i = printfn "Simple handler: got %i" i

    let generator = new Generator()
    generator.IntFound.Add handler

    let interval = new TimeSpan(0, 0, 5)
    generator.IntFound
    |> Observable.filter (fun e -> e % 2 = 0)
    |> Observable.map (fun e -> (float)e * 3.0)
    |> Observable.subscribe (fun e -> printfn "Observed %f" e)
    |> ignore

    generator.Start()

    let wait = Console.ReadLine()
    ignore ()

Still not the most thrilling thing ever, but it proves the point – from a sequence of Events that was returning integers, we managed to transform it into a fairly different sequence, all in a few lines of code:

image

The reason I was interested in Observables, though, is because a while back, I attended a talk , given by my good friend Petar, where he presented the Reactive Extensions (Rx) – and I remembered that Rx had a few nice utilities built-in to manage Observables, which would hopefully help me achieve my goal, throttling my sequence of events over time.

At that stage, I wasted a bit of time, trying first to figure out whether or not I needed Rx (the F# module already has a lot built in, so I was wondering if maybe it had all I needed…), then I got tripped up by figuring out what Rx method I needed, and how to make it work seamlessly with F# and the pipe-forward operator.

Needing some “throttling”, I rushed into the Throttle method, which looked plausible enough; unfortunately, throttle wasn’t doing quite what I thought it would – from what I gather, it filters out any event that is followed by another event within a certain time window. I see how this would come handy in lots of scenarios (think typing in a Search Box – you don’t want to trigger a Search while the person it typing, so waiting until no typing occurs is a good idea), but what I really needed was Sample, which returns only the latest event that occurred by regular time window.

Now there is another small problem: Observable.Sample takes in 2 arguments, the Observable to be sampled, and a sampling interval represented as a TimeSpan. The issue here is that because of the C#-style signature, we cannot directly use it with a pipe-forward. It’s simple enough to solve, though: create a small extension method, extending the Observable module with a composable function:

module Observable =
    let sample (interval: TimeSpan) (obs: IObservable<'a>) =
        Observable.Sample(obs, interval)

And we are now set! Armed with our new sample function, we can now do the following:

let Main =

    let handler i = printfn "Simple handler: got %i" i

    let generator = new Generator()
    generator.IntFound.Add handler

    let interval = new TimeSpan(0, 0, 5)
    generator.IntFound
    |> Observable.filter (fun e -> e % 2 = 0)
    |> Observable.map (fun e -> (float)e * 3.0)
    |> Observable.sample interval
    |> Observable.subscribe (fun e -> printfn "Observed %f" e)
    |> ignore

    generator.Start()

    let wait = Console.ReadLine()
    ignore ()

We sample our event stream every 5 seconds, returning only the latest that occurred in that window. Running this produces the following:

image

As you can see, while the original handler is capturing an event every half second, our Observable is showing up every 10 events, that is, every 5 seconds, which is exactly what we expected – and I have now exactly what I need to “throttle” the solutions stream coming from Bumblebee.

That’s it for today – fairly simple stuff, but hopefully this illustrates how easy it is to work with events in F#, and what Observables add to the table, and maybe this will come in useful for someone!

Additional resources I found useful or interesting underway:

Time Flies Like an Arrow in F#

Reactive Programming: First Class Events in F#

FSharp.Reactive

Full code sample (F# console application, using Rx Extensions)

open System
open System.Threading
open System.Threading.Tasks
open System.Reactive.Linq

type Generator() =

    let intFound = new Event<int>()
    [<CLIEvent>]
    member this.IntFound = intFound.Publish

    member this.Start() =
        Task.Factory.StartNew(fun () ->
            printfn "Searching for numbers..."
            for i in 0 .. 100 do
                intFound.Trigger(i)
                Thread.Sleep(500)
            ) |> ignore

module Observable =
    let sample (interval: TimeSpan) (obs: IObservable<'a>) =
        Observable.Sample(obs, interval)

let Main =

    let handler i = printfn "Simple handler: got %i" i

    let generator = new Generator()
    generator.IntFound.Add handler

    let interval = new TimeSpan(0, 0, 5)
    generator.IntFound
    |> Observable.filter (fun e -> e % 2 = 0)
    |> Observable.map (fun e -> (float)e * 3.0)
    |> Observable.sample interval
    |> Observable.subscribe (fun e -> printfn "Observed %f" e)
    |> ignore

    generator.Start()

    let wait = Console.ReadLine()
    ignore ()

Comments

7/24/2012 1:31:57 PM #

trackback

Throttling F# Events using the Reactive Extensions

You've been kicked (a good thing) - Trackback from DotNetKicks.com

DotNetKicks.com | Reply

8/12/2013 4:19:17 AM #

Art Scott

"Time flies  like an arrow ..."
Wondering ... is that some pun fun on arrow notation for reactive programming?

Thanks M.

Art Scott United States | Reply

Add comment




  Country flag

biuquote
  • Comment
  • Preview
Loading



Comments

Comment RSS