Skip to content

Commit 3edfca5

Browse files
committed
Add an actor model implementation example
1 parent a5ea38c commit 3edfca5

File tree

3 files changed

+200
-0
lines changed

3 files changed

+200
-0
lines changed

test/lib/hoot/dune

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
(library
2+
(name hoot)
3+
(libraries backoff multicore-magic picos))

test/lib/hoot/hoot.ml

+155
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
open Picos
2+
3+
let[@inline never] impossible () = failwith "impossible"
4+
let[@inline never] not_in_a_hoot () = raise (Sys_error "Not in a hoot")
5+
6+
module Message = struct
7+
type t = ..
8+
end
9+
10+
type _ tdt =
11+
| Nil : [> `Nil ] tdt
12+
| Message : {
13+
message : Message.t;
14+
next : [ `Nil | `Message ] tdt;
15+
}
16+
-> [> `Message ] tdt
17+
| Wait : Trigger.t -> [> `Wait ] tdt
18+
| Pid : {
19+
computation : unit Computation.t;
20+
terminated : unit Computation.t;
21+
incoming : incoming Atomic.t;
22+
mutable received : [ `Nil | `Message ] tdt;
23+
}
24+
-> [> `Pid ] tdt
25+
26+
and incoming = In : [< `Nil | `Message | `Wait ] tdt -> incoming [@@unboxed]
27+
28+
module Pid = struct
29+
type t = [ `Pid ] tdt
30+
31+
let key : [ `Nil | `Pid ] tdt Fiber.FLS.t = Fiber.FLS.create ()
32+
end
33+
34+
let self_of fiber =
35+
match Fiber.FLS.get_exn fiber Pid.key with
36+
| Pid _ as t -> t
37+
| Nil | (exception Fiber.FLS.Not_set) -> not_in_a_hoot ()
38+
39+
let self () : Pid.t = self_of @@ Fiber.current ()
40+
41+
let run main =
42+
let fiber = Fiber.current () in
43+
let before = Fiber.FLS.get fiber Pid.key ~default:Nil in
44+
let computation = Computation.create ~mode:`LIFO () in
45+
let inner = Computation.Packed computation in
46+
let (Pid p as t) : Pid.t =
47+
let terminated = Computation.create ~mode:`LIFO ()
48+
and incoming = Atomic.make (In Nil) |> Multicore_magic.copy_as_padded in
49+
Pid { computation; terminated; incoming; received = Nil }
50+
in
51+
Fiber.FLS.set fiber Pid.key t;
52+
let (Packed parent as outer) = Fiber.get_computation fiber in
53+
let canceler = Computation.attach_canceler ~from:parent ~into:computation in
54+
Fiber.set_computation fiber inner;
55+
Computation.capture p.terminated main ();
56+
Computation.finish p.computation;
57+
Fiber.set_computation fiber outer;
58+
Computation.detach parent canceler;
59+
Fiber.FLS.set fiber Pid.key before
60+
61+
let wait (Pid p : Pid.t) = Computation.await p.terminated
62+
63+
let spawn main =
64+
let (Pid p as t) : Pid.t =
65+
let computation = Computation.create ~mode:`LIFO ()
66+
and terminated = Computation.create ~mode:`LIFO ()
67+
and incoming = Atomic.make (In Nil) in
68+
Pid { computation; terminated; incoming; received = Nil }
69+
in
70+
let fiber = Fiber.create ~forbid:false p.computation in
71+
Fiber.FLS.set fiber Pid.key t;
72+
begin
73+
Fiber.spawn fiber @@ fun fiber ->
74+
let (Pid p) : Pid.t = self_of fiber in
75+
Computation.capture p.terminated main ();
76+
Computation.finish p.computation
77+
end;
78+
t
79+
80+
let rec rev_to (Message _ as ms : [ `Message ] tdt) :
81+
[ `Nil | `Message ] tdt -> _ = function
82+
| Nil -> ms
83+
| Message r -> rev_to (Message { message = r.message; next = ms }) r.next
84+
85+
let rev (Message r : [ `Message ] tdt) =
86+
rev_to (Message { message = r.message; next = Nil }) r.next
87+
88+
let rec receive (Pid p as t : Pid.t) =
89+
match Atomic.get p.incoming with
90+
| In Nil as before ->
91+
let trigger = Trigger.create () in
92+
let after = In (Wait trigger) in
93+
if Atomic.compare_and_set p.incoming before after then begin
94+
match Trigger.await trigger with
95+
| None -> ()
96+
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
97+
end;
98+
receive t
99+
| _ -> begin
100+
match Atomic.exchange p.incoming (In Nil) with
101+
| In (Wait _ | Nil) -> impossible ()
102+
| In (Message _ as ms) ->
103+
let (Message r : [ `Message ] tdt) = rev ms in
104+
p.received <- r.next;
105+
r.message
106+
end
107+
108+
let receive () =
109+
let (Pid p as t) = self () in
110+
match p.received with
111+
| Message r ->
112+
p.received <- r.next;
113+
r.message
114+
| Nil -> receive t
115+
116+
let rec send (Pid p as t : Pid.t) message backoff =
117+
match Atomic.get p.incoming with
118+
| In ((Nil | Message _) as before) ->
119+
let after = Message { message; next = before } in
120+
if not (Atomic.compare_and_set p.incoming (In before) (In after)) then
121+
send t message (Backoff.once backoff)
122+
| In (Wait trigger as before) ->
123+
let after = Message { message; next = Nil } in
124+
if Atomic.compare_and_set p.incoming (In before) (In after) then
125+
Trigger.signal trigger
126+
else send t message (Backoff.once backoff)
127+
128+
let send t message = send t message Backoff.default
129+
130+
type Message.t += Terminated of Pid.t
131+
132+
let monitor ~at ~the:(Pid the_p as the : Pid.t) =
133+
let[@alert "-handler"] trigger =
134+
Trigger.from_action at the @@ fun _ at the -> send at (Terminated the)
135+
in
136+
if not (Computation.try_attach the_p.terminated trigger) then
137+
send at (Terminated the)
138+
139+
exception Terminate
140+
141+
let empty_bt = Printexc.get_callstack 0
142+
143+
let link (Pid p1 as t1 : Pid.t) (Pid p2 as t2 : Pid.t) =
144+
let[@alert "-handler"] trigger =
145+
Trigger.from_action t1 t2 @@ fun _ (Pid p1 : Pid.t) (Pid p2 : Pid.t) ->
146+
Computation.cancel p1.computation Terminate empty_bt;
147+
Computation.cancel p2.computation Terminate empty_bt
148+
in
149+
if
150+
(not (Computation.try_attach p1.terminated trigger))
151+
|| not (Computation.try_attach p2.terminated trigger)
152+
then begin
153+
Computation.cancel p1.computation Terminate empty_bt;
154+
Computation.cancel p2.computation Terminate empty_bt
155+
end

test/lib/hoot/hoot.mli

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
val run : (unit -> unit) -> unit
2+
(** *)
3+
4+
module Pid : sig
5+
(** *)
6+
7+
type t
8+
(** *)
9+
end
10+
11+
val spawn : (unit -> unit) -> Pid.t
12+
(** *)
13+
14+
val self : unit -> Pid.t
15+
(** *)
16+
17+
val wait : Pid.t -> unit
18+
(** *)
19+
20+
module Message : sig
21+
(** *)
22+
23+
type t = ..
24+
(** *)
25+
end
26+
27+
val receive : unit -> Message.t
28+
(** *)
29+
30+
val send : Pid.t -> Message.t -> unit
31+
(** *)
32+
33+
type Message.t += Terminated of Pid.t (** *)
34+
35+
val monitor : at:Pid.t -> the:Pid.t -> unit
36+
(** *)
37+
38+
exception Terminate
39+
(** *)
40+
41+
val link : Pid.t -> Pid.t -> unit
42+
(** *)

0 commit comments

Comments
 (0)