Skip to content

Commit 65f36b1

Browse files
committed
Add a poisonable Barrier to picos_std.sync
1 parent 608deec commit 65f36b1

File tree

5 files changed

+185
-0
lines changed

5 files changed

+185
-0
lines changed

lib/picos_std.sync/barrier.ml

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
open Picos_std_awaitable
2+
3+
type t = int Awaitable.t
4+
5+
let sense_bit = 1
6+
let parties_shift = 1
7+
let parties_bits = (Sys.int_size - 2) / 2
8+
let max_parties = (1 lsl parties_bits) - 1
9+
let parties_mask = max_parties lsl parties_shift
10+
let awaiting_shift = parties_shift + parties_bits
11+
let awaiting_one = 1 lsl awaiting_shift
12+
let poisoned_bit = Int.min_int
13+
14+
let create ?padded parties =
15+
if parties <= 0 || max_parties < parties then
16+
invalid_arg "invalid number of parties";
17+
Awaitable.make ?padded
18+
((parties lsl parties_shift) lor (parties lsl awaiting_shift))
19+
20+
exception Poisoned
21+
22+
let rec poison t =
23+
let before = Awaitable.get t in
24+
if 0 < before then
25+
let after =
26+
let parties_shifted = before land parties_mask in
27+
let after_sense = lnot before land sense_bit in
28+
parties_shifted lor after_sense
29+
lor (max_parties lsl awaiting_shift)
30+
lor poisoned_bit
31+
in
32+
if Awaitable.compare_and_set t before after then Awaitable.broadcast t
33+
else poison t
34+
35+
let await t =
36+
let[@inline never] poison_and_raise t =
37+
poison t;
38+
raise Poisoned
39+
in
40+
let prior = Awaitable.fetch_and_add t (-awaiting_one) in
41+
if awaiting_one < prior then begin
42+
let before = prior - awaiting_one in
43+
let after_sense = prior land sense_bit lxor sense_bit in
44+
if before < awaiting_one then
45+
let after =
46+
let parties_shifted = before land parties_mask in
47+
parties_shifted lor after_sense
48+
lor (parties_shifted lsl (awaiting_shift - parties_shift))
49+
in
50+
if Awaitable.compare_and_set t before after then Awaitable.broadcast t
51+
else
52+
(* The barrier is being misused? Poison the barrier. *)
53+
poison_and_raise t
54+
else
55+
let state = ref before in
56+
match
57+
while !state land sense_bit <> after_sense do
58+
Awaitable.await t !state;
59+
state := Awaitable.get t
60+
done
61+
with
62+
| () -> if 0 <= !state then () else raise Poisoned
63+
| exception exn ->
64+
let bt = Printexc.get_raw_backtrace () in
65+
poison t;
66+
Printexc.raise_with_backtrace exn bt
67+
end
68+
else begin
69+
(* The barrier was poisoned. Undo the decrement. *)
70+
Awaitable.fetch_and_add t awaiting_one |> ignore;
71+
poison_and_raise t
72+
end
73+
74+
let parties t = (Awaitable.get t land parties_mask) lsr parties_shift

lib/picos_std.sync/picos_std_sync.ml

+1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ module Rwlock = Rwlock
66
module Sem = Sem
77
module Lazy = Lazy
88
module Latch = Latch
9+
module Barrier = Barrier
910
module Ivar = Ivar
1011
module Stream = Stream

lib/picos_std.sync/picos_std_sync.mli

+36
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,42 @@ module Sem : sig
481481
{{!poison} poisoned}. *)
482482
end
483483

484+
module Barrier : sig
485+
(** A poisonable barrier. *)
486+
487+
type t
488+
(** Represents a poisonable barrier. *)
489+
490+
val max_parties : int
491+
(** Maximum number of participants that a barrier can be configured with. *)
492+
493+
val create : ?padded:bool -> int -> t
494+
(** [create parties] creates a new barrier for given number of parties.
495+
496+
@raise Invalid_argument
497+
in case the given number of [parties] is less than [1] or greater than
498+
[max_parties]. *)
499+
500+
val parties : t -> int
501+
(** [paries barrier] returns the number of parties the barrier was
502+
{{!create} created} with. *)
503+
504+
exception Poisoned
505+
(** Exception raised by {!await} in case the barrier has become poisoned. *)
506+
507+
val await : t -> unit
508+
(** [await barrier] awaits until the configured number of {!parties} has
509+
called [await] on the barrier and returns, or raises {!Poisoned} in case
510+
the barrier has become poisoned.
511+
512+
ℹ️ If the await is canceled, then [await] will {!poison} the barrier before
513+
reraising the cancelation exception. *)
514+
515+
val poison : t -> unit
516+
(** [poison barrier] marks the barrier as poisoned. Concurrent and subsequent
517+
calls of {!await} will raise the {!Poisoned} exception. *)
518+
end
519+
484520
module Lazy : sig
485521
(** A lazy suspension.
486522

test/dune

+2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
(run %{test} -- "^Lazy$" 0)
5252
(run %{test} -- "^Lazy$" 1)
5353
(run %{test} -- "^Event$" 0)
54+
(run %{test} -- "^Barrier$" 0)
55+
(run %{test} -- "^Barrier$" 1)
5456
(run %{test} -- "^Non-cancelable ops$" 0)
5557
;;
5658
)))

test/test_sync.ml

+72
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,73 @@ end
607607

608608
module Rwlock_is_a_submodule_of_Lock : module type of Lock = Rwlock
609609

610+
let test_barrier_basics () =
611+
for n = 1 to 4 do
612+
let barrier = Barrier.create n in
613+
Test_scheduler.run ~max_domains:(n + 1) @@ fun () ->
614+
Flock.join_after @@ fun () ->
615+
let n_outside = Atomic.make n in
616+
for _ = 1 to n do
617+
Flock.fork @@ fun () ->
618+
for _ = 1 to 5 do
619+
Atomic.decr n_outside;
620+
Barrier.await barrier;
621+
assert (Atomic.get n_outside = 0);
622+
Barrier.await barrier;
623+
Atomic.incr n_outside
624+
done
625+
done
626+
done
627+
628+
let test_barrier_poisoning () =
629+
for n = 2 to 5 do
630+
match
631+
Test_scheduler.run ~max_domains:n @@ fun () ->
632+
let barrier = Barrier.create n in
633+
Flock.join_after @@ fun () ->
634+
for _ = 2 to n do
635+
Flock.fork @@ fun () ->
636+
Barrier.await barrier;
637+
assert false
638+
done;
639+
Barrier.poison barrier
640+
with
641+
| () -> assert false
642+
| exception Barrier.Poisoned -> ()
643+
| exception Control.Errors exns ->
644+
assert (
645+
List.for_all
646+
(function Barrier.Poisoned, _ -> true | _ -> false)
647+
exns);
648+
assert (List.length exns <= n - 1)
649+
done;
650+
for n = 3 to 6 do
651+
match
652+
Test_scheduler.run ~max_domains:n @@ fun () ->
653+
let barrier = Barrier.create n in
654+
Flock.join_after @@ fun () ->
655+
let to_be_terminated =
656+
Flock.fork_as_promise @@ fun () ->
657+
Barrier.await barrier;
658+
assert false
659+
in
660+
for _ = 3 to n do
661+
Flock.fork @@ fun () ->
662+
Barrier.await barrier;
663+
assert false
664+
done;
665+
Promise.terminate to_be_terminated
666+
with
667+
| () -> assert false
668+
| exception Barrier.Poisoned -> ()
669+
| exception Control.Errors exns ->
670+
assert (
671+
List.for_all
672+
(function Barrier.Poisoned, _ -> true | _ -> false)
673+
exns);
674+
assert (List.length exns <= n - 2)
675+
done
676+
610677
let () =
611678
try
612679
[
@@ -664,6 +731,11 @@ let () =
664731
Alcotest.test_case "cancelation" `Quick test_lazy_cancelation;
665732
] );
666733
("Event", [ Alcotest.test_case "basics" `Quick test_event_basics ]);
734+
( "Barrier",
735+
[
736+
Alcotest.test_case "basics" `Quick test_barrier_basics;
737+
Alcotest.test_case "poisoning" `Quick test_barrier_poisoning;
738+
] );
667739
( "Non-cancelable ops",
668740
[ Alcotest.test_case "are not canceled" `Quick test_non_cancelable_ops ]
669741
);

0 commit comments

Comments
 (0)