Skip to content

Commit 9d5ef7f

Browse files
authored
Merge pull request #21 from bartoszmodelski/mpmc-queue
MPMC relaxed queue
2 parents cae8514 + e32f1e1 commit 9d5ef7f

19 files changed

+788
-6
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ clean:
1010
dune clean
1111

1212
bench:
13-
@dune exec -- ./bench/main.exe --json
13+
@dune exec -- ./bench/main.exe

README.md

+6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ A collection of Concurrent Lockfree Data Structures for OCaml 5. It contains:
77

88
* [SPSC Queue](src/spsc_queue.mli) Simple single-producer single-consumer fixed-size queue. Thread-safe as long as at most one thread acts as producer and at most one as consumer at any single point in time.
99

10+
* [MPMC Relaxed Queue](src/mpmc_relaxed_queue.mli) Multi-producer, multi-consumer, fixed-size relaxed queue. Optimised for high number of threads. Not strictly FIFO. Note, it exposes two interfaces: a lockfree and a non-lockfree (albeit more practical) one. See the mli for details.
11+
1012
* [MPSC Queue](src/mpsc_queue.mli) A multi-producer, single-consumer, thread-safe queue without support for cancellation. This makes a good data structure for a scheduler's run queue. It is used in [Eio](https://github.com/ocaml-multicore/eio). It is a single consumer version of the queue described in [Implementing lock-free queues](https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf).
1113

1214

@@ -25,6 +27,10 @@ let () = Ws_deque.push q 100
2527
let () = assert (Ws_deque.pop q = 100)
2628
```
2729

30+
## Benchmarks
31+
32+
There is a number of benchmarks in `bench/` directory. You can run them with `make bench`. See [bench/README.md](bench/README.md) for more details.
33+
2834
## Contributing
2935

3036
Contributions of more lockfree data structures appreciated! Please create

bench/README.md

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
Benchmarks for lockfree
2+
3+
# General usage
4+
5+
Execute `make bench` from root of the repository to run the standard set of benchmarks. The output is in JSON, as it is intended to be consumed by ocaml-benchmark CI (in progress).
6+
7+
# Specific structures
8+
9+
Some benchmarks expose commandline interface targeting particular structures:
10+
11+
* [mpmc_queue.exe](mpmc_queue_cmd.ml)

bench/dune

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
(executable
2-
(name main)
1+
(executables
2+
(names main mpmc_queue_cmd)
33
(libraries lockfree unix yojson))

bench/main.ml

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
1-
let benchmark_list = [ Bench_spsc_queue.bench ]
1+
let benchmark_list =
2+
[
3+
Bench_spsc_queue.bench;
4+
Mpmc_queue.bench ~takers:4 ~pushers:4;
5+
Mpmc_queue.bench ~takers:1 ~pushers:8;
6+
Mpmc_queue.bench ~takers:8 ~pushers:1;
7+
Mpmc_queue.bench ~use_cas:true ~takers:4 ~pushers:4;
8+
Mpmc_queue.bench ~use_cas:true ~takers:1 ~pushers:8;
9+
Mpmc_queue.bench ~use_cas:true ~takers:8 ~pushers:1;
10+
]
211

312
let () =
413
let results =
14+
(* todo: should assert no stranded domains between tests. *)
515
List.map (fun f -> f ()) benchmark_list
616
|> List.map Benchmark_result.to_json
717
|> String.concat ", "
818
in
919
let output =
10-
Printf.sprintf {| {"name": "lockfree", "results": %s}|} results
20+
Printf.sprintf {| {"name": "lockfree", "results": [%s]}|} results
1121
(* Cannot use Yojson rewriters as of today none works on OCaml 5.1.0.
1222
This at least verifies that the manually crafted JSON is well-formed.
1323

bench/mpmc_queue.ml

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
open Lockfree.Mpmc_relaxed_queue
2+
3+
let num_of_elements = ref 500_000
4+
let num_of_pushers = ref 4
5+
let num_of_takers = ref 4
6+
let num_of_iterations = ref 10
7+
let use_cas_intf = ref false
8+
let pop = ref Not_lockfree.pop
9+
let push = ref Not_lockfree.push
10+
11+
let taker queue num_of_elements () =
12+
let i = ref 0 in
13+
while !i < num_of_elements do
14+
if Option.is_some (!pop queue) then i := !i + 1
15+
done
16+
17+
let pusher queue num_of_elements () =
18+
let i = ref 0 in
19+
while !i < num_of_elements do
20+
if !push queue !i then i := !i + 1
21+
done
22+
23+
let create_output ~time_median ~throughput_median ~throughput_stddev =
24+
let time =
25+
({
26+
name = "time";
27+
value = `Numeric time_median;
28+
units = "s";
29+
description = "median time";
30+
}
31+
: Benchmark_result.Metric.t)
32+
in
33+
let throughput =
34+
({
35+
name = "throughput";
36+
value = `Numeric throughput_median;
37+
units = "item/s";
38+
description = "median throughput";
39+
}
40+
: Benchmark_result.Metric.t)
41+
in
42+
let throughput_stddev =
43+
({
44+
name = "throughput-stddev";
45+
value = `Numeric throughput_stddev;
46+
units = "item/s";
47+
description = "stddev throughput";
48+
}
49+
: Benchmark_result.Metric.t)
50+
in
51+
let metrics = [ time; throughput; throughput_stddev ] in
52+
let name =
53+
Printf.sprintf "mpmc-queue-pushers:%d,takers:%d,use-cas:%b" !num_of_pushers
54+
!num_of_takers !use_cas_intf
55+
in
56+
({ name; metrics } : Benchmark_result.t)
57+
58+
let run_bench () =
59+
if !use_cas_intf then (
60+
push := Lockfree.Mpmc_relaxed_queue.Not_lockfree.CAS_interface.push;
61+
pop := Lockfree.Mpmc_relaxed_queue.Not_lockfree.CAS_interface.pop);
62+
let queue = create ~size_exponent:10 () in
63+
let orchestrator =
64+
Orchestrator.init
65+
~total_domains:(!num_of_takers + !num_of_pushers)
66+
~rounds:!num_of_iterations
67+
in
68+
(* define function to start domains *)
69+
let start_n_domains n f =
70+
assert (!num_of_elements mod n == 0);
71+
let items_per_pusher = !num_of_elements / n in
72+
List.init n (fun _ ->
73+
Domain.spawn (fun () ->
74+
Orchestrator.worker orchestrator (f queue items_per_pusher)))
75+
in
76+
(* start domains *)
77+
let domains =
78+
let takers = start_n_domains !num_of_takers taker in
79+
let pushers = start_n_domains !num_of_pushers pusher in
80+
Sys.opaque_identity (pushers @ takers)
81+
in
82+
(* run test *)
83+
let times = Orchestrator.run orchestrator in
84+
List.iter Domain.join domains;
85+
let time_median = Stats.median times in
86+
let throughputs =
87+
List.map (fun time -> Int.to_float !num_of_elements /. time) times
88+
in
89+
let throughput_median = Stats.median throughputs in
90+
let throughput_stddev = Stats.stddev throughputs in
91+
create_output ~time_median ~throughput_median ~throughput_stddev
92+
93+
let bench ?takers ?pushers ?use_cas ?iterations ?elements () =
94+
num_of_takers := Option.value takers ~default:!num_of_takers;
95+
num_of_pushers := Option.value pushers ~default:!num_of_pushers;
96+
use_cas_intf := Option.value use_cas ~default:!use_cas_intf;
97+
num_of_iterations := Option.value iterations ~default:!num_of_iterations;
98+
num_of_elements := Option.value elements ~default:!num_of_elements;
99+
run_bench ()

bench/mpmc_queue.mli

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
val bench :
2+
?takers:int ->
3+
?pushers:int ->
4+
?use_cas:bool ->
5+
?iterations:int ->
6+
?elements:int ->
7+
unit ->
8+
Benchmark_result.t

bench/mpmc_queue_cmd.ml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
let elements = ref 100_000
2+
let pushers = ref 4
3+
let takers = ref 4
4+
let iterations = ref 10
5+
let use_cas = ref false
6+
7+
let speclist =
8+
[
9+
("-items", Arg.Set_int elements, "number of items to insert and remove");
10+
("-pushers", Arg.Set_int pushers, "number of domains pushing items");
11+
("-takers", Arg.Set_int takers, "number of domains taking times");
12+
("-iterations", Arg.Set_int iterations, "run the benchmark this many times");
13+
("-use-cas", Arg.Set use_cas, "use CAS instead of FAD");
14+
]
15+
16+
let _f () =
17+
Arg.parse speclist
18+
(fun _ -> ())
19+
"mpmc_queue.exe [-items INT] [-pushers INT] [-takers INT] [-iterations \
20+
INT] [-use-cas]";
21+
let result =
22+
Mpmc_queue.bench ~takers:!takers ~pushers:!pushers ~use_cas:!use_cas
23+
~iterations:!iterations ~elements:!elements ()
24+
in
25+
Benchmark_result.to_json result |> Yojson.Basic.prettify |> print_string

bench/orchestrator.ml

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
type t = {
2+
ready : int Atomic.t;
3+
total_domains : int;
4+
round : int Atomic.t;
5+
rounds : int;
6+
}
7+
8+
let init ~total_domains ~rounds =
9+
{ ready = Atomic.make 0; total_domains; round = Atomic.make 0; rounds }
10+
11+
let wait_until_all_ready ?(round = 0) { ready; total_domains; _ } =
12+
while Atomic.get ready < total_domains * (round + 1) do
13+
()
14+
done
15+
16+
let worker ({ ready; round; rounds; _ } as t) f =
17+
Atomic.incr ready;
18+
wait_until_all_ready t;
19+
(* all domains are up at this point *)
20+
for i = 1 to rounds do
21+
(* wait for signal to start work *)
22+
while Atomic.get round < i do
23+
()
24+
done;
25+
f ();
26+
(* signal that we're done *)
27+
Atomic.incr ready
28+
done
29+
30+
let run ?(drop_first = true) ({ round; rounds; _ } as t) =
31+
wait_until_all_ready t;
32+
(* all domains are up, can start benchmarks *)
33+
let results = ref [] in
34+
for i = 1 to rounds do
35+
let start_time = Unix.gettimeofday () in
36+
Atomic.incr round;
37+
wait_until_all_ready ~round:i t;
38+
let end_time = Unix.gettimeofday () in
39+
40+
let diff = end_time -. start_time in
41+
if drop_first && i == 1 then () else results := diff :: !results
42+
done;
43+
!results

bench/orchestrator.mli

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
(** Helper library that ensures all workers have started before any
2+
starts making progress on the benchmark. *)
3+
4+
type t
5+
6+
val init : total_domains:int -> rounds:int -> t
7+
val worker : t -> (unit -> unit) -> unit
8+
val run : ?drop_first:bool -> t -> float List.t

bench/run_all.sh

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#!/bin/bash
2+
set -x
3+
4+
date=$(date)
5+
6+
echo "$date" > benchmarks_output.txt
7+
8+
with_cas=("" "-use-cas")
9+
rw_balance=("-takers 4 -pushers 4" "-takers 1 pushers 8" "-takers 8 pushers 1")
10+
path="./../_build/default/bench/mpmc_queue.exe "
11+
12+
for i in "${with_cas[@]}"
13+
do
14+
for j in "${rw_balance[@]}"
15+
do
16+
cmd="$path $j $i"
17+
output=$($cmd)
18+
19+
echo "$cmd" >> benchmarks_output.txt
20+
echo "$output" >> benchmarks_output.txt
21+
done
22+
done

bench/stats.ml

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
let median data =
2+
let data = List.sort Float.compare data in
3+
let len = List.length data in
4+
if len mod 2 == 1 then List.nth data (List.length data / 2)
5+
else
6+
let a = List.nth data ((len / 2) - 1) in
7+
let b = List.nth data (len / 2) in
8+
(a +. b) /. 2.
9+
10+
let mean data =
11+
let sum = List.fold_left (fun curr_sum b -> curr_sum +. b) 0. data in
12+
let n = Int.to_float (List.length data) in
13+
sum /. n
14+
15+
let stddev data =
16+
let mean = mean data in
17+
let sum =
18+
List.fold_left
19+
(fun curr_sum datapoint ->
20+
let squared_diff = Float.pow (datapoint -. mean) 2. in
21+
curr_sum +. squared_diff)
22+
0. data
23+
in
24+
let n = Int.to_float (List.length data) in
25+
Float.sqrt (sum /. n)
26+
27+
let cmp ?(epsilon = 0.01) a b = Float.abs (a -. b) < epsilon
28+
29+
let sanity_checks () =
30+
assert (cmp (mean [ 1.; 5. ]) 3.);
31+
assert (cmp (mean [ 1.; 3.; 7. ]) 3.6666);
32+
assert (cmp (stddev [ 1.; 5. ]) 2.);
33+
assert (cmp (stddev [ 1.; 3.; 7. ]) 2.4944)
34+
;;
35+
36+
sanity_checks ()

bench/stats.mli

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
val mean : float List.t -> float
2+
val stddev : float List.t -> float
3+
val median : float List.t -> float

src/lockfree.ml

+1
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@ Copyright (c) 2017, Nicolas ASSOUAD <[email protected]>
2929
module Ws_deque = Ws_deque
3030
module Spsc_queue = Spsc_queue
3131
module Mpsc_queue = Mpsc_queue
32+
module Mpmc_relaxed_queue = Mpmc_relaxed_queue

src/lockfree.mli

+1
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@ Copyright (c) 2017, Nicolas ASSOUAD <[email protected]>
3333
module Ws_deque = Ws_deque
3434
module Spsc_queue = Spsc_queue
3535
module Mpsc_queue = Mpsc_queue
36+
module Mpmc_relaxed_queue = Mpmc_relaxed_queue

0 commit comments

Comments
 (0)