Skip to content

Commit 87a4004

Browse files
samuel-williams-shopifyioquatix
authored andcommitted
Handle closed IO while invoking #select. (#138)
1 parent 986b11d commit 87a4004

File tree

4 files changed

+67
-15
lines changed

4 files changed

+67
-15
lines changed

ext/io/event/selector/uring.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
enum {
1919
DEBUG = 0,
2020
DEBUG_COMPLETION = 0,
21-
DEBUG_IO_READ = 1,
21+
DEBUG_CQE = 0,
2222
};
2323

2424
enum {URING_ENTRIES = 64};
@@ -552,7 +552,10 @@ VALUE io_wait_transfer(VALUE _arguments) {
552552

553553
if (DEBUG) fprintf(stderr, "io_wait_transfer:waiting=%p, result=%d\n", (void*)arguments->waiting, arguments->waiting->result);
554554

555-
if (arguments->waiting->result) {
555+
int32_t result = arguments->waiting->result;
556+
if (result < 0) {
557+
rb_syserr_fail(-result, "io_wait_transfer:io_uring_poll_add");
558+
} else if (result > 0) {
556559
// We explicitly filter the resulting events based on the requested events.
557560
// In some cases, poll will report events we didn't ask for.
558561
return RB_INT2NUM(events_from_poll_flags(arguments->waiting->result & arguments->flags));
@@ -1059,7 +1062,7 @@ unsigned select_process_completions(struct IO_Event_Selector_URing *selector) {
10591062
}
10601063

10611064
io_uring_for_each_cqe(ring, head, cqe) {
1062-
if (DEBUG) fprintf(stderr, "select_process_completions: cqe res=%d user_data=%p\n", cqe->res, (void*)cqe->user_data);
1065+
if (DEBUG_CQE) fprintf(stderr, "select_process_completions: cqe res=%d user_data=%p\n", cqe->res, (void*)cqe->user_data);
10631066

10641067
++completed;
10651068

lib/io/event/selector/select.rb

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -421,19 +421,25 @@ def select(duration = nil)
421421
writable = Array.new
422422
priority = Array.new
423423

424-
@waiting.each do |io, waiter|
425-
waiter.each do |fiber, events|
426-
if (events & IO::READABLE) > 0
427-
readable << io
428-
end
429-
430-
if (events & IO::WRITABLE) > 0
431-
writable << io
432-
end
433-
434-
if (events & IO::PRIORITY) > 0
435-
priority << io
424+
@waiting.delete_if do |io, waiter|
425+
if io.closed?
426+
true
427+
else
428+
waiter.each do |fiber, events|
429+
if (events & IO::READABLE) > 0
430+
readable << io
431+
end
432+
433+
if (events & IO::WRITABLE) > 0
434+
writable << io
435+
end
436+
437+
if (events & IO::PRIORITY) > 0
438+
priority << io
439+
end
436440
end
441+
442+
false
437443
end
438444
end
439445

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Improved consistency of handling closed IO when invoking `#select`.
6+
37
## v1.10.0
48

59
- `IO::Event::Profiler` is moved to dedicated gem: [fiber-profiler](https://github.com/socketry/fiber-profiler).

test/io/event/selector.rb

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,45 @@ def transfer
424424
expect(writable2).to be == true
425425
end
426426
end
427+
428+
it "can handle io being closed by another fiber while waiting" do
429+
error = nil
430+
431+
wait_fiber = Fiber.new do
432+
wait_fiber_started = true
433+
events << :wait_readable
434+
begin
435+
result = selector.io_wait(Fiber.current, local, IO::READABLE)
436+
$stderr.puts "result: #{result}"
437+
events << :readable
438+
rescue => error
439+
# This isn't a reliable state transition.
440+
# events << :error
441+
end
442+
end
443+
444+
close_fiber = Fiber.new do
445+
events << :close_io
446+
local.close
447+
end
448+
449+
events << :transfer
450+
wait_fiber.transfer
451+
close_fiber.transfer
452+
453+
expect do
454+
events << :select
455+
selector.select(0)
456+
end.not.to raise_exception
457+
458+
expect(events).to be == [
459+
:transfer, :wait_readable, :close_io, :select
460+
]
461+
462+
# io_uring will raise an EBADF error if the IO is closed while waiting.
463+
# But other selectors are not capable of detecting this.
464+
# expect(error).to be_nil
465+
end
427466
end
428467

429468
with "#io_read" do

0 commit comments

Comments
 (0)