Skip to content

Commit 8f36d7d

Browse files
authored
Reimplement um_op and related code (#1)
* Rewrite UM implementation * Keep references to scheduled fibers * Add list of transient ops for GC purposes. Care should be taken when working with fibers with regards to keeping refs to objects used in ongoing ops. Apparently, calls to RB_GC_GUARD for guarding refs on a fiber's stack are useless *unless* we keep a ref to the fiber somewhere. Otherwise, the fiber and all refs on its stack might be GC'd prematurely, leading to a segfault. We therefore do both of the following: 1. Keep running fibers in a fibermap. 2. Keep refs to objects for transient ops in a linked list. * Reimplement all single shot ops * Comment out um_poll looping code * Reimplement sync code * Ongoing work on reimplementing multishot ops * Handle CQE's one by one * Reimplement recv_each * Reimplement read_each (WIP) * Debug * Simplify and improve CQE processing, fix read_each * Fix failing tests, cleanup * Refactor checking completion result * Remove #interrupt method * Improve stability, fix issue with unwanted fiber resume * Add more design details to README * Benchmark schedule * Reimplement runqueue scheduling * Reuse transient um_op and multishot um_op_result structs * Add basic DNS resolver * Update liburing * Small refactor to um_sync code * Raise on error result read_each * Fix test warnings * Add UringMachine.kernel_version, skip read_each tests on early kernel
1 parent faebd7e commit 8f36d7d

21 files changed

+1223
-822
lines changed

README.md

+26
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ UringMachine is based on my experience marrying Ruby and io_uring:
3232
- [IOU](https://github.com/digital-fabric/iou) - a low-level asynchronous API
3333
for using io_uring from Ruby.
3434

35+
### Learnings
36+
3537
Some important learnings from those two projects, in no particular order:
3638

3739
- Monkey-patching is not a good solution, long term. You need to deal with
@@ -73,6 +75,30 @@ based on the following principles:
7375
- Do not insist on structured concurrency, just provide the APIs necessary to
7476
create actors and to supervise the execution of fibers.
7577

78+
### Cancellation
79+
80+
When working with io_uring, managing the life cycle of asynchronous operations
81+
is quite tricky, especially with regards to cancellation. This is due to the
82+
fact each operation lives on both sides of the userspace-kernel divide. This
83+
means that when cancelling an operation, we cannot free, or dispose of any
84+
resources associated with the operation, until we know for sure that the kernel
85+
side is also done with the operation.
86+
87+
As stated above, working with fibers allows us to keep operation metadata and
88+
associated data (such as buffers etc) on the stack, which can greatly simplify
89+
the managing of the operation's lifetime, as well as significantly reduce heap
90+
allocations.
91+
92+
When a cancellation does occur, UringMachine issues a cancellation (using
93+
`io_uring_prep_cancel64`), and then waits for the corresponding CQE (with a
94+
`-ECANCELED` result).
95+
96+
### No scheduler, no runqueue
97+
98+
UringMachine manages the scheduling of fibers by relying solely on io_uring
99+
CQE's. There's no list of ready fibers, no runqueue. Manually scheduling fibers
100+
is done by issuing `NOP` operations and then processing CQE's one by one.
101+
76102
## Example
77103

78104
```ruby

TODO.md

+1-40
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,7 @@
1-
- futex wait wake
2-
3-
```ruby
4-
# interface:
5-
f = UM::Mutex.new
6-
f.synchronize { ... }
7-
8-
q = UM:Queue.new
9-
# API is similar to stock Ruby Queue class
10-
q << :foo # same as q.push(:foo)
11-
q.shift
12-
q.pop
13-
q.unshift(:foo)
14-
```
15-
16-
But how do we use this in conjunction with a UringMachine instance?
17-
18-
```ruby
19-
f = UM::Mutex.new
20-
machine.synchronize(futex) { ... } # looks good
21-
22-
# or maybe:
23-
f.synchronize(machine) { ... } # looks a bit weird
24-
25-
# how about queues?
26-
machine.push(q)
27-
machine.pop(q)
28-
machine.shift(q)
29-
machine.unshift(q)
30-
31-
# what about events?
32-
UM::Event
33-
```
34-
35-
36-
- queues
37-
38-
391
- splice / - tee
402
- sendto
413
- recvfrom
42-
- poll
4+
- poll_add / poll_remove / poll_multishot / poll_update
435
- open / openat
446
- fsync
457
- mkdir / mkdirat
@@ -51,5 +13,4 @@ UM::Event
5113
- madvise
5214
- getxattr / setxattr
5315
- shutdown
54-
5516
- send_bundle / recv_bundle (kernel >= 6.10)

examples/bm_snooze.rb

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# frozen_string_literal: true
2+
3+
require 'bundler/inline'
4+
5+
gemfile do
6+
source 'https://rubygems.org'
7+
gem 'uringmachine', path: '..'
8+
gem 'benchmark-ips'
9+
end
10+
11+
require 'benchmark/ips'
12+
require 'uringmachine'
13+
14+
ITERATIONS = 1000
15+
16+
$machine = UringMachine.new
17+
18+
def run_snooze
19+
count = 0
20+
main = Fiber.current
21+
22+
f1 = Fiber.new do
23+
loop do
24+
count += 1
25+
if count == ITERATIONS
26+
$machine.schedule(main, nil)
27+
break
28+
else
29+
$machine.snooze
30+
end
31+
end
32+
end
33+
34+
f2 = Fiber.new do
35+
loop do
36+
count += 1
37+
if count == ITERATIONS
38+
$machine.schedule(main, nil)
39+
break
40+
else
41+
$machine.snooze
42+
end
43+
end
44+
end
45+
46+
$machine.schedule(f1, nil)
47+
$machine.schedule(f2, nil)
48+
$machine.yield
49+
end
50+
51+
def run_raw_transfer
52+
count = 0
53+
main = Fiber.current
54+
f2 = nil
55+
f1 = Fiber.new do
56+
loop do
57+
count += 1
58+
if count == ITERATIONS
59+
main.transfer(nil)
60+
break
61+
else
62+
f2.transfer(nil)
63+
end
64+
end
65+
end
66+
67+
f2 = Fiber.new do
68+
loop do
69+
count += 1
70+
if count == ITERATIONS
71+
main.transfer(nil)
72+
break
73+
else
74+
f1.transfer(nil)
75+
end
76+
end
77+
end
78+
79+
f1.transfer(nil)
80+
end
81+
82+
bm = Benchmark.ips do |x|
83+
x.config(:time => 5, :warmup => 2)
84+
85+
x.report("snooze") { run_snooze }
86+
x.report("raw transfer") { run_raw_transfer }
87+
88+
x.compare!
89+
end

examples/bm_write.rb

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# frozen_string_literal: true
2+
3+
require 'bundler/inline'
4+
5+
gemfile do
6+
source 'https://rubygems.org'
7+
gem 'uringmachine', path: '..'
8+
gem 'benchmark'
9+
end
10+
11+
require 'benchmark'
12+
require 'uringmachine'
13+
14+
ITERATIONS = 100000
15+
BUF = ('*' * 8192).freeze
16+
FN = '/tmp/bm_write'
17+
18+
def run_io_write(num_threads)
19+
FileUtils.rm(FN) rescue nil
20+
fio = File.open(FN, 'w')
21+
22+
threads = num_threads.times.map do |i|
23+
Thread.new do
24+
ITERATIONS.times { fio.write(BUF) }
25+
end
26+
end
27+
threads.each(&:join)
28+
ensure
29+
fio.close
30+
end
31+
32+
def run_um_write(num_fibers)
33+
FileUtils.rm(FN) rescue nil
34+
fio = File.open(FN, 'w')
35+
fd = fio.fileno
36+
37+
machine = UringMachine.new
38+
done = UringMachine::Queue.new
39+
num_fibers.times do
40+
machine.spin do
41+
ITERATIONS.times { machine.write(fd, BUF) }
42+
machine.push(done, true)
43+
end
44+
end
45+
num_fibers.times { machine.pop(done) }
46+
ensure
47+
fio.close
48+
end
49+
50+
Benchmark.bm do |x|
51+
[1, 2, 4, 8].each do |c|
52+
x.report("IO (#{c} threads)") { run_io_write(c) }
53+
x.report("UM (#{c} fibers) ") { run_um_write(c) }
54+
puts
55+
end
56+
end

examples/dns_client.rb

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# frozen_string_literal: true
2+
3+
require_relative '../lib/uringmachine'
4+
require 'resolv'
5+
6+
machine = UM.new
7+
8+
addrs = machine.resolve('status.realiteq.net')
9+
10+
puts '*' * 40
11+
puts addrs.join("\n")
12+
puts

examples/http_server.rb

+10-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ def http_handle_connection(fd)
2323
# parser << _1
2424
# break if done
2525
# end
26+
2627
# puts "Connection closed on fd #{fd}"
2728
rescue => e
28-
puts "Error while handling connection on fd #{fd}: #{e.inspect}"
29+
# puts "Error while handling connection on fd #{fd}: #{e.inspect}"
2930
ensure
3031
@machine.close(fd) rescue nil
3132
end
@@ -36,15 +37,21 @@ def http_send_response(fd, body)
3637
@machine.write(fd, msg)
3738
end
3839

39-
trap('SIGINT') { exit! }
40+
trap('SIGINT') { exit }
4041

4142
server_fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
43+
@machine.setsockopt(server_fd, UM::SOL_SOCKET, UM::SO_REUSEADDR, true)
4244
@machine.bind(server_fd, '127.0.0.1', 1234)
4345
@machine.listen(server_fd, UM::SOMAXCONN)
4446
puts 'Listening on port 1234'
4547

46-
at_exit { @machine.close(server_fd) rescue nil }
48+
at_exit do
49+
puts "Closing server FD"
50+
@machine.close(server_fd) rescue nil
51+
puts "done!"
52+
end
4753

4854
@machine.accept_each(server_fd) do |fd|
4955
@machine.spin(fd) { http_handle_connection _1 }
5056
end
57+
p :post_accept_each

examples/server_client.rb

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# frozen_string_literal: true
2+
3+
require_relative '../lib/uringmachine'
4+
5+
PORT = 1234
6+
7+
@machine = UM.new
8+
@bgid = @machine.setup_buffer_ring(4096, 1024)
9+
10+
@counter = 0
11+
12+
def handle_connection(fd)
13+
buf = +''
14+
loop do
15+
res = @machine.recv(fd, buf, 8192, 0)
16+
break if res == 0
17+
18+
@machine.write(fd, buf)
19+
@counter += 2
20+
end
21+
ensure
22+
@machine.close(fd) rescue nil
23+
end
24+
25+
def run_client
26+
fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
27+
@machine.connect(fd, '127.0.0.1', PORT)
28+
msg = 'foo' * 30
29+
buf = +''
30+
loop do
31+
@machine.send(fd, msg, msg.bytesize, 0)
32+
res = @machine.recv(fd, buf, 8192, 0)
33+
@counter += 2
34+
35+
break if res == 0
36+
raise "Got #{res} bytes instead of #{msg.bytesize}" if res != msg.bytesize
37+
end
38+
end
39+
40+
trap('SIGINT') { exit }
41+
42+
server_fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
43+
@machine.setsockopt(server_fd, UM::SOL_SOCKET, UM::SO_REUSEADDR, true)
44+
@machine.bind(server_fd, '127.0.0.1', PORT)
45+
@machine.listen(server_fd, UM::SOMAXCONN)
46+
puts "Listening on port #{PORT}"
47+
48+
at_exit { @machine.close(server_fd) rescue nil }
49+
50+
20.times do
51+
@machine.spin { run_client }
52+
end
53+
54+
@machine.spin do
55+
@machine.accept_each(server_fd) do |fd|
56+
@machine.spin(fd) { handle_connection _1 }
57+
end
58+
end
59+
60+
t0 = Time.now
61+
@machine.sleep 3
62+
t1 = Time.now
63+
elapsed = t1 - t0
64+
puts "Did #{@counter} ops in #{elapsed} seconds (#{(@counter / elapsed)} ops/s)"

examples/snooze.rb

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# frozen_string_literal: true
2+
3+
require_relative '../lib/uringmachine'
4+
5+
@machine = UM.new
6+
7+
@counter = 0
8+
@fiber_count = 0
9+
10+
def start_fiber
11+
@fiber_count += 1
12+
@machine.spin do
13+
max_count = @counter + rand(1000)
14+
puts "Start #{Fiber.current} #{max_count - @counter}"
15+
loop do
16+
@machine.sleep 0.001
17+
@counter += 1
18+
break if @counter >= max_count
19+
end
20+
puts "Stop #{Fiber.current}"
21+
ensure
22+
@fiber_count -= 1
23+
end
24+
end
25+
26+
t0 = Time.now
27+
MAX_FIBERS = 20
28+
MAX_TIME = 10
29+
loop do
30+
@machine.sleep 0.1
31+
puts "pending: #{@machine.pending_count}"
32+
break if (Time.now - t0) > MAX_TIME
33+
start_fiber while @fiber_count < MAX_FIBERS
34+
end
35+
t1 = Time.now
36+
elapsed = t1 - t0
37+
rate = @counter / elapsed
38+
puts "Did #{@counter} ops in #{elapsed} seconds (#{rate} ops/s)"
39+
40+
puts "Waiting for fibers... (#{@fiber_count})"
41+
while @fiber_count > 0
42+
@machine.sleep 0.1
43+
end
44+
puts "Done"

0 commit comments

Comments
 (0)