Mathias Brandewinder on .NET, F#, VSTO and Excel development, and quantitative analysis / machine learning.
by Mathias 31. October 2012 13:43

This post is to be filed in the “useless but fun” category. A friend of mine was doing some Hadoopy stuff a few days ago, experimenting with rather large sparse matrices and their products. Long story short, we ended up wondering how sparse the product of 2 sparse matrices should be.

A sparse matrix is a matrix which is primarily filled with zeros. The product of two matrices involves computing the dot product of each row of the first one, with each column of the second one. There is clearly a relationship between how dense the two matrices are, and how dense the result should be. As an obvious illustration, if we consider 2 matrices populated only with zeroes – as sparse as it gets – their product is obviously 0. Conversely, two matrices populated only with ones – as dense as it gets – will result in a “completely dense” matrix. But… what happens in between?

Should I expect the product of 2 sparse matrices to be more, or less dense than the original matrices? And does it depend on how large the matrices are? What would be your guess?

More...

by Mathias 8. July 2012 11:34

I recently spent some time thinking about code performance optimization problems, which led me to dust my old class notes on Queuing Theory. In general, Queuing studies the behavior of networks of Queues, where jobs arrive at various time intervals, and wait in line until they can be processed / served by a Node / Server, potentially propagating new Jobs going to other connected queues upon completion.

MarsupilamiThe simplest case in Queues is known as the Single-Server Queue – a queue where all jobs wait in a single line, and are processed by a single server. Single queues are interesting in their own right, and can be found everywhere. They are also important, because they are the building blocks of Networks of Queues, and understanding what happens at a single Queue level helps understand how they work when connected together.

The question we’ll be looking at today is the following: given the characteristics of a Queue, what can we say about its performance over time? For instance, can we estimate how often the Queue will be busy? How many jobs will be backed up waiting for service on average? How large can the queue get? How long will it take, on average, to process an incoming job?

<Side note: the picture above is not completely random. It depicts the Marsupilami, a fictional creature who is an expert in managing long tails, known in French as “Queues”>

We’ll use a simple Monte-Carlo simulation model to approach these questions, and see whether the results we observe match the results predicted by theory.

What we are interested in is observing the state of the Queue over time. Two events drive the behavior of the queue:

  • a new Job arrives in the queue to be processed, and either gets processed immediately by the server, or is placed at the end of the line,
  • the server completes a Job, picks the next one in line if available and works on it until it’s done.

From this description, we can identify a few elements that are important in modeling the queue:

  • whether the Server is Idle or Busy,
  • whether Jobs are waiting in the Queue to be processed,
  • how long it takes the Server to process a Job,
  • how new Jobs arrive to the Queue over time

Modeling the Queue

Let’s get coding! We create a small F# script (Note: the complete code sample is available at the bottom of the post, as well as on FsSnip), and begin by defining a Discriminated Union type Status, which represents the state of the Server at time T:

type Status = Idle | Busy of DateTime * int

Our server can be in two states: Idle, or Busy, in which case we also need to know when it will be done with its current Job, and how many Jobs are waiting to be processed.

In order to model what is happening to the Queue, we’ll create a State record type:

type State = 
    { Start: DateTime;
      Status: Status; 
      NextIn: DateTime }

Start and Status should be self-explanatory – they represent the time when the Queue started being in that new State, and the Server Status. The reason for NextIn may not be as obvious - it represents the arrival time of the next Job. First, there is an underlying assumption here that there is ALWAYS a next job: we are modeling the Queue as a perpetual process, so that we can simulate it for as long as we want. Then, the reason for this approach is that it simplifies the determination of the transition of the Queue between states:

let next arrival processing state =
   match state.Status with
   | Idle ->
      { Start = state.NextIn;
        NextIn = state.NextIn + arrival();
        Status = Busy(state.NextIn + processing(), 0) }
   | Busy(until, waiting) ->
      match (state.NextIn <= until) with
      | true -> 
            { Start = state.NextIn; 
              NextIn = state.NextIn + arrival();
              Status = Busy(until, waiting + 1) } 
      | false -> 
            match (waiting > 0) with
            | true -> 
               { Start = until; 
                 Status = Busy(until + processing(), waiting - 1); 
                 NextIn = state.NextIn }
            | false -> 
               { Start = until; 
                 Status = Idle; 
                 NextIn = state.NextIn }

This somewhat gnarly function is worth commenting a bit. Its purpose is to determine the next state the Queue will enter in, given its current state, and 2 functions, arrival and processing, which have the same signature:

val f : (unit -> TimeSpan)

Both these functions take no arguments, and return a TimeSpan, which represents respectively how much time will elapse between the latest arrival in the Queue and the next one (the inter-arrival time), and how much time the Server will take completing its next Job. This information is sufficient to derive the next state of the system:

  • If the Server is Idle, it will switch to Busy when the next Job arrives, and the arrival of next Job is schedule based on job inter-arrival time,
  • If the Server is Busy, two things can happen: either the next Job arrives before the Server completes its work, or not. If a new Job arrives first, it increases the number of Jobs waiting to be processed, and we schedule the next Arrival. If the Server finishes first, if no Job is waiting to be processed, it becomes Idle, and otherwise it begins processing the Job in front of the line.

We are now ready to run a Simulation.

let simulate startTime arr proc =
   let nextIn = startTime + arr()
   let state = 
      { Start = startTime; 
        Status = Idle;
        NextIn = nextIn }
   Seq.unfold (fun st -> 
      Some(st, next arr proc st)) state

We initialize the Queue to begin at the specified start time, with a cold-start (Idle), and unfold an infinite sequence of States, which can go on if we please.

Running the Simulation

Let’s start with a Sanity check, and validate the behavior of a simple case, where Jobs arrive to the Queue every 10 seconds, and the Queue takes 5 seconds to process each Job.

First, let’s write a simple function to pretty-display the state of the Queue over time:

let pretty state =
   let count =
      match state.Status with
      | Idle -> 0
      | Busy(_, waiting) -> 1 + waiting
   let nextOut =
      match state.Status with
      | Idle -> "Idle"
      | Busy(until, _) -> until.ToLongTimeString()
   let start = state.Start.ToLongTimeString()
   let nextIn = state.NextIn.ToLongTimeString()
   printfn "Start: %s, Count: %i, Next in: %s, Next out: %s" start count nextIn nextOut

Now we can define our model:

let constantTime (interval: TimeSpan) = 
   let ticks = interval.Ticks
   fun () -> interval

let arrivalTime = new TimeSpan(0,0,10);
let processTime = new TimeSpan(0,0,5)

let simpleArr = constantTime arrivalTime
let simpleProc = constantTime processTime

let startTime = new DateTime(2010, 1, 1)
let constantCase = simulate startTime simpleArr simpleProc

Let’s simulate 10 transitions in fsi:

> Seq.take 10 constantCase |> Seq.iter pretty;;
Start: 12:00:00 AM, Count: 0, Next in: 12:00:10 AM, Next out: Idle
Start: 12:00:10 AM, Count: 1, Next in: 12:00:20 AM, Next out: 12:00:15 AM
Start: 12:00:15 AM, Count: 0, Next in: 12:00:20 AM, Next out: Idle
Start: 12:00:20 AM, Count: 1, Next in: 12:00:30 AM, Next out: 12:00:25 AM
Start: 12:00:25 AM, Count: 0, Next in: 12:00:30 AM, Next out: Idle
Start: 12:00:30 AM, Count: 1, Next in: 12:00:40 AM, Next out: 12:00:35 AM
Start: 12:00:35 AM, Count: 0, Next in: 12:00:40 AM, Next out: Idle
Start: 12:00:40 AM, Count: 1, Next in: 12:00:50 AM, Next out: 12:00:45 AM
Start: 12:00:45 AM, Count: 0, Next in: 12:00:50 AM, Next out: Idle
Start: 12:00:50 AM, Count: 1, Next in: 12:01:00 AM, Next out: 12:00:55 AM
val it : unit = ()

Looks like we are doing something right – the simulation displays an arrival every 10 seconds, followed by 5 seconds of activity until the job is processed, and 5 seconds of Idleness until the next arrival.

Let’s do something a bit more complicated – arrivals with random, uniformly distributed inter-arrival times:

let uniformTime (seconds: int) = 
   let rng = new Random()
   fun () ->
      let t = rng.Next(seconds + 1) 
      new TimeSpan(0, 0, t)

let uniformArr = uniformTime 10

let uniformCase = simulate startTime uniformArr simpleProc

Here, arrival times will take any value (in seconds) between 0 and 10, included – with an average of 5 seconds between arrivals. A quick run in fsi produces the following sample:

> Seq.take 10 uniformCase |> Seq.iter pretty;;
Start: 12:00:00 AM, Count: 0, Next in: 12:00:02 AM, Next out: Idle
Start: 12:00:02 AM, Count: 1, Next in: 12:00:03 AM, Next out: 12:00:07 AM
Start: 12:00:03 AM, Count: 2, Next in: 12:00:11 AM, Next out: 12:00:07 AM
Start: 12:00:07 AM, Count: 1, Next in: 12:00:11 AM, Next out: 12:00:12 AM
Start: 12:00:11 AM, Count: 2, Next in: 12:00:11 AM, Next out: 12:00:12 AM
Start: 12:00:11 AM, Count: 3, Next in: 12:00:16 AM, Next out: 12:00:12 AM
Start: 12:00:12 AM, Count: 2, Next in: 12:00:16 AM, Next out: 12:00:17 AM
Start: 12:00:16 AM, Count: 3, Next in: 12:00:24 AM, Next out: 12:00:17 AM
Start: 12:00:17 AM, Count: 2, Next in: 12:00:24 AM, Next out: 12:00:22 AM
Start: 12:00:22 AM, Count: 1, Next in: 12:00:24 AM, Next out: 12:00:27 AM
val it : unit = ()

Not surprisingly, given the faster arrivals, we see the Queue getting slightly backed up, with Jobs waiting to be processed.

How would the Queue look like after, say, 1,000,000 transitions? Easy enough to check:

>  Seq.nth 1000000 uniformCase |> pretty;;
Start: 10:36:23 PM, Count: 230, Next in: 10:36:25 PM, Next out: 10:36:28 PM
val it : unit = ()

Interesting – looks like the Queue is getting backed up quite a bit as time goes by. This is a classic result with Queues: the utilization rate, defined as arrival rate / departure rate, is saturated. When the utilization rate is strictly less than 100%, the Queue is stable, and otherwise it will build up over time, accumulating a backlog of Jobs.

Let’s create a third type of model, with Exponential rates:

let exponentialTime (seconds: float) =
   let lambda = 1.0 / seconds
   let rng = new Random()
   fun () ->
      let t = - Math.Log(rng.NextDouble()) / lambda
      let ticks = t * (float)TimeSpan.TicksPerSecond
      new TimeSpan((int64)ticks)

let expArr = exponentialTime 10.0
let expProc = exponentialTime 7.0
let exponentialCase = simulate startTime expArr expProc

The arrivals and processing times are exponentially distributed, with an average time expressed in seconds. In our system, we expect new Jobs to arrive on average every 10 seconds, varying between 0 and + infinity, and Jobs take 7 seconds on average to process. The queue is not saturated, and should therefore not build up, which we can verify:

> Seq.nth 1000000 exponentialCase |> pretty;;
Start: 8:55:36 PM, Count: 4, Next in: 8:55:40 PM, Next out: 8:55:36 PM
val it : unit = ()

A queue where both arrivals and processing times follow that distribution is a classic in Queuing Theory, known as a M/M/1 queue. It is of particular interest because some of its characteristics can be derived analytically – we’ll revisit that later.

Measuring performance

We already saw a simple useful measurement for Queues, the utilization rate, which defines whether our Queue will explode or stabilize over time. This is important but crude – what we would really be interested in is measuring how much of a bottleneck the Queue creates. Two measures come to mind in this frame: how long is the Queue on average, and how much time does a Job take to go through the entire system (queue + processing)?

Let’s begin with the Queue length. On average, how many Jobs should we expect to see in the Queue (including Jobs being currently processed)?

The question is less trivial than it looks. We could naively simulate a Sequence of States, and average out the number of Jobs in each State, but this would be incorrect. To understand the issue, let’s consider a Queue with constant arrivals and processing times, where Jobs arrive every 10 seconds and take 1 second to process. The result will be alternating 0s and 1s – which would give a naïve average of 0.5 Jobs in queue. However, the system will be Busy for 1 seconds, and Idle for 9 seconds, with an average number of Jobs of 0.1 over time.

To correctly compute the average, we need to compute a weighted average, counting the number of jobs present in a state, weighted by the time the System spent in that particular state.

image

Let’s consider for illustration the example above, where we observe a Queue for 10 seconds, with 3 Jobs A, B, C arriving and departing. The average number of Jobs in the System is 3 seconds with 0, 5 seconds with 1 and 2 seconds with 2, which would give us (3x0 + 5x1 + 2x2)/10, i.e. 9/10 or 0.9 Jobs on average. We could achieve the same result by accumulating the computation over time, starting at each transition point: 2s x 0 + 2s x 1 + 1s x 2 + 2s x 1 + 1s x 2 + 1s x 1 + 1s x 0 = 9 “Jobs-seconds”, which over 10 seconds gives us the same result as before.

Let’s implement this. We will compute the average using an Accumulator, using Sequence Scan: for each State of the System, we will measure how much time was spent, in Ticks, as well as how many Jobs were in the System during that period, and accumulate the total number of ticks since the Simulation started, as well as the total number of “Jobs-Ticks”, so that the average until that point will simply be:

Average Queue length = sum of Job-Ticks / sum of Ticks.

let averageCountIn (transitions: State seq) =
   // time spent in current state, in ticks
   let ticks current next =
      next.Start.Ticks - current.Start.Ticks
   // jobs in system in state
   let count state =
      match state.Status with
      | Idle -> (int64)0
      | Busy(until, c) -> (int64)c + (int64)1
   // update state = total time and total jobsxtime
   // between current and next queue state
   let update state pair =
      let current, next = pair
      let c = count current
      let t = ticks current next
      (fst state) + t, (snd state) + (c * t)     
   // accumulate updates from initial state
   let initial = (int64)0, (int64)0
   transitions
   |> Seq.pairwise
   |> Seq.scan (fun state pair -> update state pair) initial
   |> Seq.map (fun state -> (float)(snd state) / (float)(fst state))

Let’s try this on our M/M/1 queue, the exponential case described above:

> averageCountIn exponentialCase |> Seq.nth 1000000 ;;
val it : float = 2.288179686

According to theory, for an M/M/1 Queue, that number should be rho / (1-rho), i.e. (7/10) / (1-(7/10)), which gives 2.333. Close enough, I say.

Let’s look at the Response Time now, that is, the average time it takes for a Job to leave the system once it entered the queue.

image

We’ll use an idea similar to the one we used for the average number of Jobs in the System. On our illustration, we can see that A stays in for 3 seconds, B for 4s and C for 2 seconds. In that case, the average is time A + time B + time C / 3 jobs, i.e. (3 + 4 + 2)/3 = 3 seconds. But we can also decompose the time spent by A, B and C in the system by summing up not by Job, but by period between transitions. In this case, we would get Time spent by A, B, C = 2s x 0 jobs + 2s x 1 job + 1s x 2 jobs + 2s x 1 job + 1s x 2 jobs + 1s x 1 job + 1s x 0 jobs = 9 “job-seconds”, which would give us the correct total time we need.

We can use that idea to implement the average time spent in the system in the same fashion we did the average jobs in the system, by accumulating the measure as the sequence of transition unfolds:

let averageTimeIn  (transitions: State seq) =
   // time spent in current state, in ticks
   let ticks current next =
      next.Start.Ticks - current.Start.Ticks
   // jobs in system in state
   let count state =
      match state.Status with
      | Idle -> (int64)0
      | Busy(until, c) -> (int64)c + (int64)1
   // count arrivals
   let arrival current next =
      if count next > count current then (int64)1 else (int64)0
   // update state = total time and total arrivals
   // between current and next queue state
   let update state pair =
      let current, next = pair
      let c = count current
      let t = ticks current next
      let a = arrival current next
      (fst state) + a, (snd state) + (c * t)     
   // accumulate updates from initial state
   let initial = (int64)0, (int64)0
   transitions
   |> Seq.pairwise
   |> Seq.scan (fun state pair -> update state pair) initial
   |> Seq.map (fun state -> 
      let time = (float)(snd state) / (float)(fst state)
      new TimeSpan((int64)time))

Trying this out on our M/M/1 queue, we theoretically expect an average of 23.333 seconds, and get 22.7 seconds:

> averageTimeIn exponentialCase |> Seq.nth 1000000 ;;
val it : TimeSpan = 00:00:22.7223798 {Days = 0;
                                      Hours = 0;
                                      Milliseconds = 722;
                                      Minutes = 0;
                                      Seconds = 22;
                                      Ticks = 227223798L;
                                      TotalDays = 0.0002629905069;
                                      TotalHours = 0.006311772167;
                                      TotalMilliseconds = 22722.3798;
                                      TotalMinutes = 0.37870633;
                                      TotalSeconds = 22.7223798;}

Given the somewhat sordid conversions between Int64, floats and TimeSpan, this seems plausible enough.

A practical example

Now that we got some tools at our disposition, let’s look at a semi-realistic example. Imagine a subway station, with 2 turnstiles (apparently also known as “Baffle Gates”), one letting people in, one letting people out. On average, it takes 4 seconds to get a person through the Turnstile (some people are more baffled than others) – we’ll model the processing time as an Exponential.

Now imagine that, on average, passengers arrive to the station every 5 seconds. We’ll model that process as an exponential too, even tough it’s fairly unrealistic to assume that the rate of arrival remains constant throughout the day.

// turnstiles admit 1 person / 4 seconds
let turnstileProc = exponentialTime 4.0
// passengers arrive randomly every 5s
let passengerArr = exponentialTime 5.0

Assuming the Law of conservation applies to subway station passengers too, we would expect the average rate of exit from the station to also be one every 5 seconds. However, unlike passengers coming in the station, passengers exiting arrived there by subway, and are therefore likely to arrive in batches. We’ll make the totally realistic assumption here that trains are never late, and arrive like clockwork at constant intervals, bringing in the same number of passengers. If trains arrive every 30 seconds, to maintain our average rate of 1 passenger every 5 seconds, each train will carry 6 passengers:

let batchedTime seconds batches = 
   let counter = ref 0
   fun () ->
      counter := counter.Value + 1
      if counter.Value < batches
      then new TimeSpan(0, 0, 0)
      else 
         counter := 0
         new TimeSpan(0, 0, seconds)
// trains arrive every 30s with 5 passengers
let trainArr = batchedTime 30 6

How would our 2 Turnstiles behave? Let’s check:

// passengers arriving in station
let queueIn = simulate startTime passengerArr turnstileProc
// passengers leaving station
let queueOut = simulate startTime trainArr turnstileProc

let prettyWait (t:TimeSpan) = t.TotalSeconds

printfn "Turnstile to get in the Station"
averageCountIn queueIn |> Seq.nth 1000000 |> printfn "In line: %f"
averageTimeIn queueIn |> Seq.nth 1000000 |> prettyWait |> printfn "Wait in secs: %f"

printfn "Turnstile to get out of the Station"
averageCountIn queueOut |> Seq.nth 1000000 |> printfn "In line: %f"
averageTimeIn queueOut |> Seq.nth 1000000 |> prettyWait |> printfn "Wait in secs: %f";;
Turnstile to get in the Station
In line: 1.917345
Wait in secs: 9.623852
Turnstile to get out of the Station
In line: 3.702664
Wait in secs: 18.390027

The results fits my personal experience: the Queue at the exit gets backed up quite a bit, and passengers have to wait an average 18.4 seconds to exit the Station, while it takes them only 9.6 seconds to get in.

It also may seem paradoxical. People are entering and exiting the Station at the same rate, and turnstiles process passengers at the same speed, so how can we have such different behaviors at the two turnstiles?

The first point here is that Queuing processes can be counter-intuitive, and require thinking carefully about what is being measured, as we saw earlier with the performance metrics computations.

The only thing which differs between the 2 turnstiles is the way arrivals are distributed over time – and that makes a lot of difference. Arrivals are fairly evenly spaced, and there is a good chance that a Passenger who arrives to the Station finds no one in the Queue, and in that case, he will wait only 4 seconds on average. By contrast, when passengers exit, they arrive in bunches, and only the first one will find no-one in the Queue – all the others will have to wait for that first person to pass through before getting their chance, and therefore have by default a much larger “guaranteed wait time”.

That’s it for today! There is much more to Queuing than single-queues (if you are into probability and Markov chains, networks of Queues are another fascinating area), but we will leave that for another time. I hope you’ll have found this excursion in Queuing interesting and maybe even useful. I also thought this was an interesting topic illustrating F# Sequences – and I am always looking forward to feedback!

Complete code (F# script) is also available on FsSnip.net

open System

// Queue / Server is either Idle, 
// or Busy until a certain time, 
// with items queued for processing
type Status = Idle | Busy of DateTime * int

type State = 
    { Start: DateTime;
      Status: Status; 
      NextIn: DateTime }

let next arrival processing state =
   match state.Status with
   | Idle ->
      { Start = state.NextIn;
        NextIn = state.NextIn + arrival();
        Status = Busy(state.NextIn + processing(), 0) }
   | Busy(until, waiting) ->
      match (state.NextIn <= until) with
      | true -> 
            { Start = state.NextIn; 
              NextIn = state.NextIn + arrival();
              Status = Busy(until, waiting + 1) } 
      | false -> 
            match (waiting > 0) with
            | true -> 
               { Start = until; 
                 Status = Busy(until + processing(), waiting - 1); 
                 NextIn = state.NextIn }
            | false -> 
               { Start = until; 
                 Status = Idle; 
                 NextIn = state.NextIn }

let simulate startTime arr proc =
   let nextIn = startTime + arr()
   let state = 
      { Start = startTime; 
        Status = Idle;
        NextIn = nextIn }
   Seq.unfold (fun st -> 
      Some(st, next arr proc st)) state

let pretty state =
   let count =
      match state.Status with
      | Idle -> 0
      | Busy(_, waiting) -> 1 + waiting
   let nextOut =
      match state.Status with
      | Idle -> "Idle"
      | Busy(until, _) -> until.ToLongTimeString()
   let start = state.Start.ToLongTimeString()
   let nextIn = state.NextIn.ToLongTimeString()
   printfn "Start: %s, Count: %i, Next in: %s, Next out: %s" start count nextIn nextOut

let constantTime (interval: TimeSpan) = 
   let ticks = interval.Ticks
   fun () -> interval

let arrivalTime = new TimeSpan(0,0,10);
let processTime = new TimeSpan(0,0,5)

let simpleArr = constantTime arrivalTime
let simpleProc = constantTime processTime

let startTime = new DateTime(2010, 1, 1)
let constantCase = simulate startTime simpleArr simpleProc

printfn "Constant arrivals, Constant processing"
Seq.take 10 constantCase |> Seq.iter pretty;;

let uniformTime (seconds: int) = 
   let rng = new Random()
   fun () ->
      let t = rng.Next(seconds + 1) 
      new TimeSpan(0, 0, t)

let uniformArr = uniformTime 10
let uniformCase = simulate startTime uniformArr simpleProc

printfn "Uniform arrivals, Constant processing"
Seq.take 10 uniformCase |> Seq.iter pretty;;

let exponentialTime (seconds: float) =
   let lambda = 1.0 / seconds
   let rng = new Random()
   fun () ->
      let t = - Math.Log(rng.NextDouble()) / lambda
      let ticks = t * (float)TimeSpan.TicksPerSecond
      new TimeSpan((int64)ticks)

let expArr = exponentialTime 10.0
let expProc = exponentialTime 7.0
let exponentialCase = simulate startTime expArr expProc

printfn "Exponential arrivals, Exponential processing"
Seq.take 10 exponentialCase |> Seq.iter pretty;;

let averageCountIn (transitions: State seq) =
   // time spent in current state, in ticks
   let ticks current next =
      next.Start.Ticks - current.Start.Ticks
   // jobs in system in state
   let count state =
      match state.Status with
      | Idle -> (int64)0
      | Busy(until, c) -> (int64)c + (int64)1
   // update state = total time and total jobsxtime
   // between current and next queue state
   let update state pair =
      let current, next = pair
      let c = count current
      let t = ticks current next
      (fst state) + t, (snd state) + (c * t)     
   // accumulate updates from initial state
   let initial = (int64)0, (int64)0
   transitions
   |> Seq.pairwise
   |> Seq.scan (fun state pair -> update state pair) initial
   |> Seq.map (fun state -> (float)(snd state) / (float)(fst state))

let averageTimeIn  (transitions: State seq) =
   // time spent in current state, in ticks
   let ticks current next =
      next.Start.Ticks - current.Start.Ticks
   // jobs in system in state
   let count state =
      match state.Status with
      | Idle -> (int64)0
      | Busy(until, c) -> (int64)c + (int64)1
   // count arrivals
   let arrival current next =
      if count next > count current then (int64)1 else (int64)0
   // update state = total time and total arrivals
   // between current and next queue state
   let update state pair =
      let current, next = pair
      let c = count current
      let t = ticks current next
      let a = arrival current next
      (fst state) + a, (snd state) + (c * t)     
   // accumulate updates from initial state
   let initial = (int64)0, (int64)0
   transitions
   |> Seq.pairwise
   |> Seq.scan (fun state pair -> update state pair) initial
   |> Seq.map (fun state -> 
      let time = (float)(snd state) / (float)(fst state)
      new TimeSpan((int64)time))

// turnstiles admit 1 person / 4 seconds
let turnstileProc = exponentialTime 4.0
// passengers arrive randomly every 5s
let passengerArr = exponentialTime 5.0

let batchedTime seconds batches = 
   let counter = ref 0
   fun () ->
      counter := counter.Value + 1
      if counter.Value < batches
      then new TimeSpan(0, 0, 0)
      else 
         counter := 0
         new TimeSpan(0, 0, seconds)
// trains arrive every 30s with 5 passengers
let trainArr = batchedTime 30 6

// passengers arriving in station
let queueIn = simulate startTime passengerArr turnstileProc
// passengers leaving station
let queueOut = simulate startTime trainArr turnstileProc

let prettyWait (t:TimeSpan) = t.TotalSeconds

printfn "Turnstile to get in the Station"
averageCountIn queueIn |> Seq.nth 1000000 |> printfn "In line: %f"
averageTimeIn queueIn |> Seq.nth 1000000 |> prettyWait |> printfn "Wait in secs: %f"

printfn "Turnstile to get out of the Station"
averageCountIn queueOut |> Seq.nth 1000000 |> printfn "In line: %f"
averageTimeIn queueOut |> Seq.nth 1000000 |> prettyWait |> printfn "Wait in secs: %f"
by Mathias 27. May 2012 10:20

File:AAMarkov.jpgMarkov chains are a classic in probability model. They represent systems that evolve between states over time, following a random but stable process which is memoryless. The memoryless-ness is the defining characteristic of Markov processes,  and is known as the Markov property. Roughly speaking, the idea is that if you know the state of the process at time T, you know all there is to know about it – knowing where it was at time T-1 would not give you additional information on where it may be at time T+1.

While Markov models come in multiple flavors, Markov chains with finite discrete states in discrete time are particularly interesting. They describe a system which is changes between discrete states at fixed time intervals, following a transition pattern described by a transition matrix.

Let’s illustrate with a simplistic example. Imagine that you are running an Airline, AcmeAir, operating one plane. The plane goes from city to city, refueling and doing some maintenance (or whatever planes need) every time.

Each time the plane lands somewhere, it can be in three states: early, on-time, or delayed. It’s not unreasonable to think that if our plane landed late somewhere, it may be difficult to catch up with the required operations, and as a result, the likelihood of the plane landing late at its next stop is higher. We could represent this in the following transition matrix (numbers totally made up):

Current \ Next Early On-time Delayed
Early 10% 85% 5%
On-Time 10% 75% 15%
Delayed 5% 60% 35%

Each row of the matrix represents the current state, and each column the next state. The first row tells us that if the plane landed Early, there is a 10% chance we’ll land early in our next stop, an 80% chance we’ll be on-time, and a 5% chance we’ll arrive late. Note that each row sums up to 100%: given the current state, we have to end up in one of the next states.

How could we simulate this system? Given the state at time T, we simply need to “roll” a random number generator for a percentage between 0% and 100%, and depending on the result, pick our next state – and repeat.

Using F#, we could model the transition matrix as an array (one element per state) of arrays (the probabilities to land in each state), which is pretty easy to define using Array comprehensions:

let P = 
   [| 
      [| 0.10; 0.85; 0.05 |];
      [| 0.10; 0.75; 0.15 |];
      [| 0.05; 0.60; 0.35 |]
   |]

(Note: the entire code sample is also posted on fsSnip.net/ch)

To simulate the behavior of the system, we need a function that given a state and a transition matrix, produces the next state according to the transition probabilities:

// given a roll between 0 and 1
// and a distribution D of 
// probabilities to end up in each state
// returns the index of the state
let state (D: float[]) roll =
   let rec index cumul current =
      let cumul = cumul + D.[current]
      match (roll <= cumul) with
      | true -> current
      | false -> index cumul (current + 1)
   index 0.0 0

// given the transition matrix P
// the index of the current state
// and a random generator,
// simulates what the next state is
let nextState (P: float[][]) current (rng: Random) =
   let dist = P.[current]
   let roll = rng.NextDouble()
   state dist roll

// given a transition matrix P
// the index i of the initial state
// and a random generator
// produces a sequence of states visited
let simulate (P: float[][]) i (rng: Random) =
   Seq.unfold (fun s -> Some(s, nextState P s rng)) i

The state function is a simple helper; given an array D which is assumed to contain probabilities to transition to each of the states, and a “roll” between 0.0 and 1.0, returns the corresponding state. nextState uses that function, by first retrieving the transition probabilities for the current state i, “rolling” the dice, and using state to compute the simulated next state. simulate uses nextState to create an infinite sequence of states, starting from an initial state i.

We need to open System to use the System.Random class – and we can now use this in the F# interactive window:

> let flights = simulate P 1 (new Random());;

val flights : seq<int>

> Seq.take 50 flights |> Seq.toList;;

val it : int list =
  [1; 0; 1; 1; 1; 1; 1; 1; 1; 1; 1; 1; 1; 0; 1; 1; 2; 2; 2; 1; 1; 1; 1; 1; 1;
   1; 1; 2; 1; 1; 1; 1; 1; 1; 1; 1; 2; 1; 1; 1; 1; 1; 2; 1; 1; 1; 1; 1; 1; 1]
>

Our small sample shows us what we expect: mostly on-time (Fly AcmeAir!), with some episodical delayed or early flights.

How many delays would we observe on a 1000-flights simulation? Let’s try:

> Seq.take 1000 flights |> Seq.filter (fun i -> i = 2) |> Seq.length;; 
val it : int = 174
> 

We observe about 17% of delayed flights. This is relevant information, but a single simulation is just that – an isolated case. Fortunately, Markov chains have an interesting property: if it is possible to go from any state to any state, then the system will have a stationary distribution, which corresponds to its long term equilibrium. Essentially, regardless of the starting point, over long enough periods, each state will be observed with a stable frequency.

One way to understand better what is going on is to expand our frame. Instead of considering the exact state of the system, we can look at it in terms of probability: at any point in time, the system has a certain probability to be in each of its states.

For instance, imagine that given current information, we know that our plane will land at its next stop either early or on time, with a 50% chance of each. In that case, we can determine the probability that its next stop will be delayed by combining the transition probabilities:

p(delayed in T+1) = p(delayed in T) x P(delayed in T+1 | delayed in T) +  p(on-time in T) x P(delayed in T+1 | on-time in T) + p(early in T) x P(delayed in T+1 | early in T)

p(delayed in T+1) = 0.0 x 0.35 + 0.5 x 0.15 + 0.5 x 0.05 = 0.1

This can be expressed much more concisely using Vector notation. We can represent the state as a vector S, where each component of the vector is the probability to be in each state, in our case

S(T) = [ 0.50; 0.50; 0.0 ]

In that case, the state at time T+1 will be:

S(T+1) = S(T) x P

Let’s make that work with some F#. The product of a vector by a matrix is the dot-product of the vector with each column vector of the matrix:

// Vector dot product
let dot (V1: float[]) (V2: float[]) =
   Array.zip V1 V2
   |> Array.map(fun (v1, v2) -> v1 * v2)
   |> Array.sum

// Extracts the jth column vector of matrix M 
let column (M: float[][]) (j: int) =
   M |> Array.map (fun v -> v.[j])

// Given a row-vector S describing the probability
// of each state and a transition matrix P, compute
// the next state distribution
let nextDist S P =
   P 
   |> Array.mapi (fun j v -> column P j)
   |> Array.map(fun v -> dot v S)

We can now handle our previous example, creating a state s with a 50/50 chance of being in state 0 or 1:

> let s = [| 0.5; 0.5; 0.0 |];;

val s : float [] = [|0.5; 0.5; 0.0|]

> let s' = nextDist s P;;

val s' : float [] = [|0.1; 0.8; 0.1|]

> 

We can also easily check what the state of the system should be after, say, 100 flights:

> let s100 = Seq.unfold (fun s -> Some(s, nextDist s P)) s |> Seq.nth 100;;

val s100 : float [] = [|0.09119496855; 0.7327044025; 0.1761006289|]

After 100 flights, starting from either early or on-time, we have about 17% of chance of being delayed. Note that this is consistent with what we observed in our initial simulation. Given that our Markov chain has a stationary distribution, this is to be expected: unless our simulation was pathologically unlikely, we should observe the same frequency of delayed flights in the long run, no matter what the initial starting state is.

Can we compute that stationary distribution? The typical way to achieve this is to bust out some algebra and solve V = P x V, where V is the stationary distribution vector and P the transition matrix.

Here we’ll go for a numeric approximation approach. Rather than solving the system of equations, we will start from a uniform distribution over the states, and apply the transition matrix until the distance between two consecutive states is under a threshold Epsilon:

// Euclidean distance between 2 vectors
let dist (V1: float[]) V2 =
   Array.zip V1 V2
   |> Array.map(fun (v1, v2) -> (v1 - v2) * (v1 - v2))
   |> Array.sum
   
// Evaluate stationary distribution
// by searching for a fixed point
// under tolerance epsilon
let stationary (P: float[][]) epsilon =
   let states = P.[0] |> Array.length
   [| for s in 1 .. states -> 1.0 / (float)states |] // initial
   |> Seq.unfold (fun s -> Some((s, (nextDist s P)), (nextDist s P)))
   |> Seq.map (fun (s, s') -> (s', dist s s'))
   |> Seq.find (fun (s, d) -> d < epsilon)

Running this on our example results in the following stationary distribution estimation:

> stationary P 0.0000001;;
val it : float [] * float =
  ([|0.09118958333; 0.7326858333; 0.1761245833|], 1.1590625e-08)

In short, in the long run, we should expect our plane to be early 9.1% of the time, on-time 73.2%, and delayed 17.6%.

Note: the fixed point approach above should work if a unique stationary distribution exists. If this is not the case, the function may never converge, or may converge to a fixed point that depends on the initial conditions. Use with caution!

Armed with this model, we could now ask interesting questions. Suppose for instance that we could improve the operations of AcmeAir, and reduce the chance that our next arrival is delayed given our current state. What should we focus on – should we reduce the probability to remain delayed after a delay (strategy 1), or should we prevent the risk of being delayed after an on-time landing (strategy 2)?

One way to look at this is to consider the impact of each strategy on the long-term distribution. Let’s compare the impact of a 1-point reduction of delays in each case, which we’ll assume gets transferred to on-time. We can then create the matrices for each strategy, and compare their respective stationary distributions:

> let strat1 = [|[|0.1; 0.85; 0.05|]; [|0.1; 0.75; 0.15|]; [|0.05; 0.61; 0.34|]|]
let strat2 = [|[|0.1; 0.85; 0.05|]; [|0.1; 0.76; 0.14|]; [|0.05; 0.60; 0.35|]|];;

val strat1 : float [] [] =
  [|[|0.1; 0.85; 0.05|]; [|0.1; 0.75; 0.15|]; [|0.05; 0.61; 0.34|]|]
val strat2 : float [] [] =
  [|[|0.1; 0.85; 0.05|]; [|0.1; 0.76; 0.14|]; [|0.05; 0.6; 0.35|]|]

> stationary strat1 0.0001;;
val it : float [] * float =
  ([|0.091; 0.7331333333; 0.1758666667|], 8.834666667e-05)
> stationary strat2 0.0001;;
val it : float [] * float = ([|0.091485; 0.740942; 0.167573|], 1.2698318e-05)
> 

The numbers tell the following story: strategy 2 (improve reduction of delays after on-time arrivals) is better: it results in 16.6% delays, instead of 17.6% for strategy 1. Intuitively, this makes sense, because most of our flights are on-time, so an improvement in this area will have a much larger impact in the overall results that a comparable improvement on delayed flights.

There is (much) more to Markov chains than this, and there are many ways the code presented could be improved upon – but I’ll leave it at that for today, hopefully you will have picked up something of interest along the path of this small exploration!

I also posted the complete code sample on fsSnip.net/ch.

by Mathias 22. May 2011 14:26

In our previous installments, we laid the groundwork of our Bee Colony Algorithm implementation. Today, it’s time to put the bees to work, searching for an acceptable solution to the Traveling Salesman problem.

We will approach the search as a Sequence: starting from an initial hive and solution, we will unfold it, updating the state of the hive and the current best solution at each step. Let’s start with the hive initialization. Starting from an initial route, we need to create a pre-defined number of each Bee type, and provide them with an initial destination:

let Initialize nScouts nActives nInactives cities (rng : Random) =
   [    
      for i in 1 .. nScouts do 
         let solution = Evaluate(Shuffle rng cities)
         yield Scout(solution)
      for i in 1 .. nActives do
         let solution = Evaluate(Shuffle rng cities)
         yield Active(solution, 0)
      for i in 1 .. nActives do
         let solution = Evaluate(Shuffle rng cities)
         yield Inactive(solution)
   ]

There is probably a more elegant way to do this, but this is good enough: we use a simple List comprehension to generate a list on the fly, yielding the appropriate number of each type of bees, and assigning them a shuffled version of the starting route.

More...

by Mathias 8. May 2011 19:43

This is our third episode in attempting to convert a C# bee colony algorithm into an F# equivalent. In our previous posts, we created functions to randomly shuffle lists of cities, and to measure the length of the corresponding path. Today, it’s time to get the bees to work, bringing us new solutions to the hive.

The algorithm distinguishes 3 types of bees: Scout, Active and Inactive. Each bee type has a different role in the algorithms: Scouts keep searching for new solutions, Active bees explore around known solutions for improvements until their potential is exhausted, and Inactive bees wait for new information, and replace Active bees when they turn Inactive.

Let’s start there, and define a Bee discriminated union:

type Bee = Scout | Active | Inactive

In the original C# implementation I am starting from, the algorithm works by iteration: each bee of the hive is processed and its state in the Hive updated (see “The Solve Method” in the article), with 3 steps: the bee

  • finds a new solution and evaluates its quality,
  • shares that information with the inactive bees of the Hive by performing a Waggle Dance,
  • becomes re-allocated as Active or Inactive for the next iteration.

Rather than follow strictly the existing implementation, where all three steps are happening in one single method for a bee, I decided to re-organize it a bit, and separate each of these operations, in part to make the code easier to follow, and in part with an eye to making it run in parallel later.

In that frame, let’s begin with the first step, where bee searches for a new solution. Every bee has a current solution in memory, and after searching, they will come up with a new target solution if it is an improvement. Let’s first define for convenience what a Solution will be:

type Solution = { Route: List<City>; Cost: float }
let Evaluate (route: List<City>) = { Route = route; Cost = CircuitCost route }

A Solution wraps in a Record the Route – the ordered list of Cities travelled – and its Cost, measured by its length. We also define a convenience function Evaluate, which takes in a Route and returns the corresponding solution, with the original Route and its Cost, computed using the function we wrote in our last post.

More...

Comments

Comment RSS