Skip to content

Commit 4cfe00a

Browse files
committed
Add an actor model implementation example
1 parent 0abaad8 commit 4cfe00a

File tree

7 files changed

+285
-14
lines changed

7 files changed

+285
-14
lines changed

dune-project

+4-8
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,15 @@
164164
(>= 0.21.2))
165165
(qcheck-stm
166166
(>= 0.3))
167+
(backoff
168+
(>= 0.1.0))
169+
(multicore-magic
170+
(>= 2.3.0))
167171
;;
168172
(alcotest
169173
(and
170174
(>= 1.7.0)
171175
:with-test))
172-
(backoff
173-
(and
174-
(>= 0.1.0)
175-
:with-test))
176176
(cohttp
177177
(and
178178
(>= 6.0.0)
@@ -209,10 +209,6 @@
209209
(and
210210
(>= 0.1.7)
211211
:with-test))
212-
(multicore-magic
213-
(and
214-
(>= 2.3.0)
215-
:with-test))
216212
(multicore-magic-dscheck
217213
(and
218214
(>= 2.3.0)

lib/picos_meta.hoot/dune

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
(library
2+
(name picos_meta_hoot)
3+
(public_name picos_meta.hoot)
4+
(libraries backoff multicore-magic picos))

lib/picos_meta.hoot/hoot.ml

+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
open Picos
2+
3+
let[@inline never] impossible () = failwith "impossible"
4+
let[@inline never] not_in_a_hoot () = invalid_arg "Not in a hoot"
5+
6+
module Message = struct
7+
type t = ..
8+
end
9+
10+
type _ tdt =
11+
| Nil : [> `Nil ] tdt
12+
| Message : {
13+
message : Message.t;
14+
next : [ `Nil | `Message ] tdt;
15+
}
16+
-> [> `Message ] tdt
17+
| Wait : Trigger.t -> [> `Wait ] tdt
18+
| Pid : {
19+
computation : unit Computation.t;
20+
terminated : unit Computation.t;
21+
incoming : incoming Atomic.t;
22+
mutable received : [ `Nil | `Message ] tdt;
23+
}
24+
-> [> `Pid ] tdt
25+
26+
and incoming = In : [< `Nil | `Message | `Wait ] tdt -> incoming [@@unboxed]
27+
28+
module Pid = struct
29+
type t = [ `Pid ] tdt
30+
31+
let key : [ `Nil | `Pid ] tdt Fiber.FLS.t = Fiber.FLS.create ()
32+
end
33+
34+
let self_of fiber =
35+
match Fiber.FLS.get_exn fiber Pid.key with
36+
| Pid _ as t -> t
37+
| Nil | (exception Fiber.FLS.Not_set) -> not_in_a_hoot ()
38+
39+
let self () : Pid.t = self_of @@ Fiber.current ()
40+
41+
exception Terminate
42+
43+
let run main =
44+
let fiber = Fiber.current () in
45+
let before = Fiber.FLS.get fiber Pid.key ~default:Nil in
46+
let computation = Computation.create ~mode:`LIFO () in
47+
let inner = Computation.Packed computation in
48+
let (Pid p as t) : Pid.t =
49+
let terminated = Computation.create ~mode:`LIFO ()
50+
and incoming = Atomic.make (In Nil) |> Multicore_magic.copy_as_padded in
51+
Pid { computation; terminated; incoming; received = Nil }
52+
in
53+
Fiber.FLS.set fiber Pid.key t;
54+
let (Packed parent as outer) = Fiber.get_computation fiber in
55+
let canceler = Computation.attach_canceler ~from:parent ~into:computation in
56+
Fiber.set_computation fiber inner;
57+
begin
58+
match main () with
59+
| () | (exception Terminate) -> Computation.finish p.terminated
60+
| exception exn ->
61+
let bt = Printexc.get_raw_backtrace () in
62+
Computation.cancel p.terminated exn bt
63+
end;
64+
Computation.finish p.computation;
65+
Fiber.set_computation fiber outer;
66+
Computation.detach parent canceler;
67+
Fiber.FLS.set fiber Pid.key before;
68+
(* An otherwise unhandled exception except [Terminate] will be raised. *)
69+
Computation.check p.terminated
70+
71+
let wait (Pid p : Pid.t) = Computation.wait p.terminated
72+
73+
let spawn main =
74+
let (Pid p as t) : Pid.t =
75+
let computation = Computation.create ~mode:`LIFO ()
76+
and terminated = Computation.create ~mode:`LIFO ()
77+
and incoming = Atomic.make (In Nil) in
78+
Pid { computation; terminated; incoming; received = Nil }
79+
in
80+
let fiber = Fiber.create ~forbid:false p.computation in
81+
Fiber.FLS.set fiber Pid.key t;
82+
begin
83+
Fiber.spawn fiber @@ fun fiber ->
84+
let (Pid p) : Pid.t = self_of fiber in
85+
(* An unhandled exception except [Terminate] will be treated as a fatal
86+
error. *)
87+
begin
88+
match main () with
89+
| () | (exception Terminate) -> Computation.finish p.terminated
90+
end;
91+
Computation.finish p.computation
92+
end;
93+
t
94+
95+
let rec rev_to (Message _ as ms : [ `Message ] tdt) :
96+
[ `Nil | `Message ] tdt -> _ = function
97+
| Nil -> ms
98+
| Message r -> rev_to (Message { message = r.message; next = ms }) r.next
99+
100+
let rev (Message r : [ `Message ] tdt) =
101+
rev_to (Message { message = r.message; next = Nil }) r.next
102+
103+
let rec receive (Pid p as t : Pid.t) =
104+
match Atomic.get p.incoming with
105+
| In Nil as before ->
106+
let trigger = Trigger.create () in
107+
let after = In (Wait trigger) in
108+
if Atomic.compare_and_set p.incoming before after then begin
109+
match Trigger.await trigger with
110+
| None -> ()
111+
| Some (exn, bt) ->
112+
(* At this point the trigger has been signaled and cannot leak
113+
arbitrary amoun of space. There is no need to remove it. *)
114+
Printexc.raise_with_backtrace exn bt
115+
end;
116+
receive t
117+
| _ -> begin
118+
match Atomic.exchange p.incoming (In Nil) with
119+
| In (Wait _ | Nil) -> impossible ()
120+
| In (Message _ as ms) ->
121+
let (Message r : [ `Message ] tdt) = rev ms in
122+
p.received <- r.next;
123+
r.message
124+
end
125+
126+
let receive () =
127+
let (Pid p as t) = self () in
128+
match p.received with
129+
| Message r ->
130+
p.received <- r.next;
131+
r.message
132+
| Nil -> receive t
133+
134+
let rec send (Pid p as t : Pid.t) message backoff =
135+
match Atomic.get p.incoming with
136+
| In ((Nil | Message _) as before) ->
137+
let after = Message { message; next = before } in
138+
if not (Atomic.compare_and_set p.incoming (In before) (In after)) then
139+
send t message (Backoff.once backoff)
140+
| In (Wait trigger as before) ->
141+
let after = Message { message; next = Nil } in
142+
if Atomic.compare_and_set p.incoming (In before) (In after) then
143+
Trigger.signal trigger
144+
else send t message (Backoff.once backoff)
145+
146+
let[@inline] send t message = send t message Backoff.default
147+
148+
type Message.t += Terminated of Pid.t
149+
150+
let monitor ~at ~the:(Pid the_p as the : Pid.t) =
151+
let[@alert "-handler"] trigger =
152+
Trigger.from_action at the @@ fun _ at the -> send at (Terminated the)
153+
in
154+
if not (Computation.try_attach the_p.terminated trigger) then
155+
send at (Terminated the)
156+
157+
let empty_bt = Printexc.get_callstack 0
158+
159+
let link (Pid p1 as t1 : Pid.t) (Pid p2 as t2 : Pid.t) =
160+
let[@alert "-handler"] trigger =
161+
Trigger.from_action t1 t2 @@ fun _ (Pid p1 : Pid.t) (Pid p2 : Pid.t) ->
162+
Computation.cancel p1.computation Terminate empty_bt;
163+
Computation.cancel p2.computation Terminate empty_bt
164+
in
165+
if
166+
(not (Computation.try_attach p1.terminated trigger))
167+
|| not (Computation.try_attach p2.terminated trigger)
168+
then begin
169+
Computation.cancel p1.computation Terminate empty_bt;
170+
Computation.cancel p2.computation Terminate empty_bt
171+
end
+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
module Hoot = Hoot
+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
(** The beginnings of an actor model implementation loosely inspired by
2+
{{:https://github.com/riot-ml/riot} Riot}.
3+
4+
⚠️ This is meant as an example only and would require further development to
5+
become a production ready actor model implementation. *)
6+
7+
(** {1 Modules} *)
8+
9+
module Hoot : sig
10+
(** {{:https://www.merriam-webster.com/thesaurus/riot} Hoot} is an actor model
11+
core loosely inspired by {{:https://github.com/riot-ml/riot} Riot}. *)
12+
13+
exception Terminate
14+
(** Exception used by {!link} to terminate actor processes.
15+
16+
An unhandled [Terminate] exception is not treated as an error. *)
17+
18+
val run : (unit -> unit) -> unit
19+
(** [run main] establishes a new actor process on the current fiber and runs
20+
[main] as the process.
21+
22+
⚠️ An unhandled exception, aside from {!Terminate}, raised from [main ()]
23+
will be raised from [run main]. Unlike with {!spawn} an unhandled
24+
exception is not treated as a fatal error.
25+
26+
This can be called from any fiber, even from another actor process, which
27+
would then effectively get suspended while running [main], but typically
28+
this is used to start a "scope" for running actors. *)
29+
30+
module Pid : sig
31+
(** Actor process or process identifier. *)
32+
33+
type t
34+
end
35+
36+
val spawn : (unit -> unit) -> Pid.t
37+
(** [spawn main] creates a new actor process to run [main].
38+
39+
⚠️ An unhandled exception, aside from {!Terminate}, raised from [main ()]
40+
will be treated as a fatal error and will either exit the entire program
41+
or stop the scheduler without completing other fibers.
42+
43+
This can be called from any fiber, even from fibers that are not actor
44+
processes, but typically this would be used within a "scope" for running
45+
actors. *)
46+
47+
val self : unit -> Pid.t
48+
(** [self ()], when called within an actor process, returns the {{!Pid.t}
49+
process identifier} of the actor process.
50+
51+
@raise Invalid_argument when called outside of an actor process. *)
52+
53+
val wait : Pid.t -> unit
54+
(** [wait pid] blocks until the specified actor process has terminated. *)
55+
56+
module Message : sig
57+
(** Extensible message type. *)
58+
59+
type t = ..
60+
end
61+
62+
val receive : unit -> Message.t
63+
(** [receive ()] waits until at least one message has been added to the
64+
mailbox of the current process and then removes and returns the least
65+
recently added message from the mailbox. *)
66+
67+
val send : Pid.t -> Message.t -> unit
68+
(** [send pid message] adds the given [message] to the mailbox of the process
69+
[pid].
70+
71+
ℹ️ Sending a message to a process that has already terminated is not
72+
considered an error. *)
73+
74+
type Message.t +=
75+
| Terminated of Pid.t
76+
(** Sent by the {!monitor} mechanism to notify of process
77+
termination. *)
78+
79+
val monitor : at:Pid.t -> the:Pid.t -> unit
80+
(** [monitor ~at:observer ~the:subject] makes it so that when the [subject]
81+
process terminates the message {{!Terminated} [Terminated subject]} is
82+
{{!send} sent} to the [observer] process. *)
83+
84+
val link : Pid.t -> Pid.t -> unit
85+
(** [link pid1 pid2] makes it so that when either one of the given processes
86+
terminates the other process will also be terminated with the [Terminate]
87+
exception.
88+
89+
In case either one of the given processes is already terminated when
90+
[link] is called, the other process will then be terminated. *)
91+
end
92+
93+
(** {1 Examples} *)

lib/picos_meta/index.mld

+10-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1-
{0 Integration tests for Picos packages}
1+
{0 Integration tests and additional examples for Picos packages}
22

3-
This package contains integration (and other kinds of) tests for the core
4-
{!Picos} interface and other Picos libraries. A separate package is used to
5-
allow the dependencies of and between different Picos packages to be simplified.
3+
This package contains additional examples and integration (and other kinds of)
4+
tests for the core {!Picos} interface and other Picos libraries. A separate
5+
package is used to allow the dependencies of and between different Picos
6+
packages to be simplified.
7+
8+
{1 Libraries}
9+
10+
{!modules:
11+
Picos_meta_hoot}

picos_meta.opam

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ depends: [
2020
"lwt" {>= "5.7.0"}
2121
"qcheck-core" {>= "0.21.2"}
2222
"qcheck-stm" {>= "0.3"}
23+
"backoff" {>= "0.1.0"}
24+
"multicore-magic" {>= "2.3.0"}
2325
"alcotest" {>= "1.7.0" & with-test}
24-
"backoff" {>= "0.1.0" & with-test}
2526
"cohttp" {>= "6.0.0" & with-test}
2627
"cohttp-lwt-unix" {>= "6.0.0" & os != "win32" & with-test}
2728
"conduit-lwt-unix" {>= "6.2.2" & os != "win32" & with-test}
@@ -31,7 +32,6 @@ depends: [
3132
"js_of_ocaml" {>= "5.4.0" & with-test}
3233
"mdx" {>= "2.4.0" & with-test}
3334
"multicore-bench" {>= "0.1.7" & with-test}
34-
"multicore-magic" {>= "2.3.0" & with-test}
3535
"multicore-magic-dscheck" {>= "2.3.0" & with-test}
3636
"ocaml-version" {>= "3.6.4" & with-test}
3737
"cohttp-lwt" {>= "6.0.0" & with-test}

0 commit comments

Comments
 (0)