@@ -3,48 +3,54 @@ open Eio.Stdenv
33open Eio
44module Sync = Eio__Sync
55
6- let sender_fibers = 10
6+ let sender_fibers = 2
77
88let message = 1234
99
10- let sender ~id ~n_msgs stream =
11- for i = 1 to n_msgs do
12- traceln " Sent message #%d from %d" i id;
13- Sync. put stream message
14- done
10+ (* Send [n_msgs] items to streams in a round-robin way. *)
11+ let sender ~n_msgs streams =
12+ let msgs = Seq. take n_msgs (Seq. ints 0 ) in
13+ let streams = Seq. cycle (List. to_seq streams) in
14+ let zipped = Seq. zip msgs streams in
15+ ignore (Seq. iter (fun (_i , stream ) ->
16+ Sync. put stream message) zipped)
1517
16- (* Start one sender fiber for each stream, and let it send n_msgs messages. *)
18+ (* Start one sender fiber for each stream, and let it send n_msgs messages.
19+ Each fiber sends to all streams in a round-robin way. *)
1720let run_senders ~dom_mgr ?(n_msgs = 100 ) streams =
18- let count = ref 0 in
1921 Switch. run @@ fun sw ->
20- ignore @@ List. map (fun stream ->
22+ ignore @@ List. iter (fun _stream ->
2123 Fiber. fork ~sw (fun () ->
22- let id = ! count in
23- count := ! count + 1 ;
24- Domain_manager. run dom_mgr (fun () ->
25- sender ~id ~n_msgs stream))) streams
24+ Domain_manager. run dom_mgr (fun () ->
25+ sender ~n_msgs streams))) streams
2626
27+ (* Receive messages from all streams. *)
2728let receiver ~n_msgs streams =
28- for i = 1 to n_msgs do
29+ for _i = 1 to n_msgs do
2930 assert (Int. equal message (Sync. select_of_many streams));
30- traceln " Received message #%d" i
3131 done
3232
33+ (* Create [n] streams. *)
3334let make_streams n =
3435 let unfolder i = if i == 0 then None else Some (Sync. create () , i-1 ) in
3536 let seq = Seq. unfold unfolder n in
3637 List. of_seq seq
3738
38- (* Currently fails with exception from ocaml-uring/lib/uring/uring.ml:326
39- https://github.com/ocaml-multicore/ocaml-uring/blob/07482dae72c8e977e4e4e2b2c8bd137e770ee1dd/lib/uring/uring.ml#L327
40- *)
4139let run env =
4240 let dom_mgr = domain_mgr env in
41+ let clock = clock env in
4342 let streams = make_streams sender_fibers in
44- let n_msgs = 50 in
43+ let n_msgs = 10000 in
4544 Switch. run @@ fun sw ->
46- Fiber. fork ~sw (fun () -> run_senders ~dom_mgr ~n_msgs streams);
47- receiver ~n_msgs: (sender_fibers * n_msgs) streams;
48- []
45+ Fiber. fork ~sw (fun () -> run_senders ~dom_mgr ~n_msgs streams);
46+ let before = Time. now clock in
47+ receiver ~n_msgs: (sender_fibers * n_msgs) streams;
48+ let after = Time. now clock in
49+ let elapsed = after -. before in
50+ let time_per_iter = elapsed /. (Float. of_int @@ sender_fibers * n_msgs) in
51+ [Metric. create
52+ (Printf. sprintf " sync:true senders:%d msgs_per_sender:%d" sender_fibers n_msgs)
53+ (`Float (1e9 *. time_per_iter)) " ns"
54+ " Time per transmitted int" ]
4955
5056
0 commit comments