Skip to content

Commit 5b91d9a

Browse files
committed
Tweak select wakeup logic
This resets the wakeup state machine only after draining the pipe which can avoid a state transition in case a wakeup is called before clearing the state and allow select to be entered sooner. This also reads more than single byte from the pipe to try to heal after a `fork` followed by child process(es) writing to the pipe, which can only happen in a single domain program.
1 parent 7460176 commit 5b91d9a

File tree

1 file changed

+35
-19
lines changed

1 file changed

+35
-19
lines changed

lib/picos_io.select/picos_io_select.ml

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,19 @@ type return_on =
7171
}
7272
-> return_on
7373

74-
type phase = Continue | Select | Waking_up | Process
74+
type phase =
75+
| Processing
76+
| Select
77+
| Wakeup_during_processing
78+
| Wakeup_during_select
7579

7680
type state = {
7781
phase : phase Atomic.t;
7882
mutable state : [ `Initial | `Starting | `Alive | `Stopping | `Stopped ];
7983
mutable exn_bt : exn * Printexc.raw_backtrace;
8084
mutable pipe_inn : Unix.file_descr;
8185
mutable pipe_out : Unix.file_descr;
82-
byte : Bytes.t;
86+
bytes : Bytes.t;
8387
(* *)
8488
timeouts : Q.t Atomic.t;
8589
mutable next_id : int;
@@ -117,12 +121,12 @@ let intr_key : [ `Req ] tdt Picos_thread.TLS.t = Picos_thread.TLS.create ()
117121
let key =
118122
Picos_domain.DLS.new_key @@ fun () ->
119123
{
120-
phase = Atomic.make Continue;
124+
phase = Atomic.make Processing;
121125
state = `Initial;
122126
exn_bt = exit_bt;
123127
pipe_inn = Unix.stdin;
124128
pipe_out = Unix.stdin;
125-
byte = Bytes.create 1;
129+
bytes = Bytes.create (if 32 < Sys.int_size then 64 - 8 else 64 - 4);
126130
timeouts = Atomic.make Q.empty;
127131
next_id = 0;
128132
new_rd = ref [];
@@ -142,13 +146,26 @@ let[@poll error] [@inline never] transition s into =
142146
s.state <- into;
143147
from
144148

149+
let prepare_select s = Atomic.compare_and_set s.phase Processing Select
150+
151+
let rec start_processing s =
152+
match Atomic.get s.phase with
153+
| Processing -> ()
154+
| Select ->
155+
if not (Atomic.compare_and_set s.phase Select Processing) then
156+
start_processing s
157+
| Wakeup_during_processing -> Atomic.set s.phase Processing
158+
| Wakeup_during_select ->
159+
(* We may read more than a single byte in case [fork] has been called and
160+
the child process ended up calling [wakeup]. *)
161+
let n = Unix.read s.pipe_inn s.bytes 0 (Bytes.length s.bytes) in
162+
assert (1 <= n);
163+
Atomic.set s.phase Processing
164+
145165
let rec wakeup s from =
146166
match Atomic.get s.phase with
147-
| Process | Waking_up ->
148-
(* The thread will process the fds and timeouts before next select. *)
149-
()
150-
| Continue ->
151-
if Atomic.compare_and_set s.phase Continue Process then
167+
| Processing ->
168+
if Atomic.compare_and_set s.phase Processing Wakeup_during_processing then
152169
(* We managed to signal the wakeup before the thread was ready to call
153170
select and the thread will notice this without us needing to write to
154171
the pipe. *)
@@ -158,12 +175,17 @@ let rec wakeup s from =
158175
need to retry. *)
159176
wakeup s from
160177
| Select ->
161-
if Atomic.compare_and_set s.phase Select Waking_up then
178+
(* A single domain application may end up here after [fork] in the child
179+
process. *)
180+
if Atomic.compare_and_set s.phase Select Wakeup_during_select then
162181
if s.state == from then
163182
(* We are now responsible for writing to the pipe to force the thread
164183
to exit the select. *)
165-
let n = Unix.write s.pipe_out s.byte 0 1 in
184+
let n = Unix.write_substring s.pipe_out " " 0 1 in
166185
assert (n = 1)
186+
| Wakeup_during_processing | Wakeup_during_select ->
187+
(* The thread will process the fds and timeouts before next select. *)
188+
()
167189

168190
type fos = { n : int; unique_fds : Unix.file_descr list; ops : return_on list }
169191

@@ -222,7 +244,7 @@ module Thread_atomic = Picos_io_thread_atomic
222244
let rec select_thread s timeout rd wr ex =
223245
if s.state == `Alive then begin
224246
let rd_fds, wr_fds, ex_fds =
225-
if Atomic.compare_and_set s.phase Continue Select then begin
247+
if prepare_select s then begin
226248
try
227249
Unix.select
228250
(s.pipe_inn :: rd.unique_fds)
@@ -231,13 +253,7 @@ let rec select_thread s timeout rd wr ex =
231253
end
232254
else ([], [], [])
233255
in
234-
begin
235-
match Atomic.exchange s.phase Continue with
236-
| Select | Process | Continue -> ()
237-
| Waking_up ->
238-
let n = Unix.read s.pipe_inn s.byte 0 1 in
239-
assert (n = 1)
240-
end;
256+
start_processing s;
241257
let rd = process_fds rd_fds rd (Thread_atomic.exchange s.new_rd []) in
242258
let wr = process_fds wr_fds wr (Thread_atomic.exchange s.new_wr []) in
243259
let ex = process_fds ex_fds ex (Thread_atomic.exchange s.new_ex []) in

0 commit comments

Comments
 (0)