Skip to content

Commit 685c173

Browse files
committed
Merge branch 'main' of github.com:digital-fabric/uringmachine
2 parents 0bc4c4b + 8f36d7d commit 685c173

21 files changed

+1225
-824
lines changed

README.md

+26
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ UringMachine is based on my experience marrying Ruby and io_uring:
3232
- [IOU](https://github.com/digital-fabric/iou) - a low-level asynchronous API
3333
for using io_uring from Ruby.
3434

35+
### Learnings
36+
3537
Some important learnings from those two projects, in no particular order:
3638

3739
- Monkey-patching is not a good solution, long term. You need to deal with
@@ -73,6 +75,30 @@ based on the following principles:
7375
- Do not insist on structured concurrency, just provide the APIs necessary to
7476
create actors and to supervise the execution of fibers.
7577

78+
### Cancellation
79+
80+
When working with io_uring, managing the life cycle of asynchronous operations
81+
is quite tricky, especially with regards to cancellation. This is due to the
82+
fact each operation lives on both sides of the userspace-kernel divide. This
83+
means that when cancelling an operation, we cannot free, or dispose of any
84+
resources associated with the operation, until we know for sure that the kernel
85+
side is also done with the operation.
86+
87+
As stated above, working with fibers allows us to keep operation metadata and
88+
associated data (such as buffers etc) on the stack, which can greatly simplify
89+
the managing of the operation's lifetime, as well as significantly reduce heap
90+
allocations.
91+
92+
When a cancellation does occur, UringMachine issues a cancellation (using
93+
`io_uring_prep_cancel64`), and then waits for the corresponding CQE (with a
94+
`-ECANCELED` result).
95+
96+
### No scheduler, no runqueue
97+
98+
UringMachine manages the scheduling of fibers by relying solely on io_uring
99+
CQE's. There's no list of ready fibers, no runqueue. Manually scheduling fibers
100+
is done by issuing `NOP` operations and then processing CQE's one by one.
101+
76102
## Example
77103

78104
```ruby

TODO.md

+1-40
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,7 @@
1-
- futex wait wake
2-
3-
```ruby
4-
# interface:
5-
f = UM::Mutex.new
6-
f.synchronize { ... }
7-
8-
q = UM:Queue.new
9-
# API is similar to stock Ruby Queue class
10-
q << :foo # same as q.push(:foo)
11-
q.shift
12-
q.pop
13-
q.unshift(:foo)
14-
```
15-
16-
But how do we use this in conjunction with a UringMachine instance?
17-
18-
```ruby
19-
f = UM::Mutex.new
20-
machine.synchronize(futex) { ... } # looks good
21-
22-
# or maybe:
23-
f.synchronize(machine) { ... } # looks a bit weird
24-
25-
# how about queues?
26-
machine.push(q)
27-
machine.pop(q)
28-
machine.shift(q)
29-
machine.unshift(q)
30-
31-
# what about events?
32-
UM::Event
33-
```
34-
35-
36-
- queues
37-
38-
391
- splice / - tee
402
- sendto
413
- recvfrom
42-
- poll
4+
- poll_add / poll_remove / poll_multishot / poll_update
435
- open / openat
446
- fsync
457
- mkdir / mkdirat
@@ -51,5 +13,4 @@ UM::Event
5113
- madvise
5214
- getxattr / setxattr
5315
- shutdown
54-
5516
- send_bundle / recv_bundle (kernel >= 6.10)

examples/bm_snooze.rb

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# frozen_string_literal: true
2+
3+
require 'bundler/inline'
4+
5+
gemfile do
6+
source 'https://rubygems.org'
7+
gem 'uringmachine', path: '..'
8+
gem 'benchmark-ips'
9+
end
10+
11+
require 'benchmark/ips'
12+
require 'uringmachine'
13+
14+
ITERATIONS = 1000
15+
16+
$machine = UringMachine.new
17+
18+
def run_snooze
19+
count = 0
20+
main = Fiber.current
21+
22+
f1 = Fiber.new do
23+
loop do
24+
count += 1
25+
if count == ITERATIONS
26+
$machine.schedule(main, nil)
27+
break
28+
else
29+
$machine.snooze
30+
end
31+
end
32+
end
33+
34+
f2 = Fiber.new do
35+
loop do
36+
count += 1
37+
if count == ITERATIONS
38+
$machine.schedule(main, nil)
39+
break
40+
else
41+
$machine.snooze
42+
end
43+
end
44+
end
45+
46+
$machine.schedule(f1, nil)
47+
$machine.schedule(f2, nil)
48+
$machine.yield
49+
end
50+
51+
def run_raw_transfer
52+
count = 0
53+
main = Fiber.current
54+
f2 = nil
55+
f1 = Fiber.new do
56+
loop do
57+
count += 1
58+
if count == ITERATIONS
59+
main.transfer(nil)
60+
break
61+
else
62+
f2.transfer(nil)
63+
end
64+
end
65+
end
66+
67+
f2 = Fiber.new do
68+
loop do
69+
count += 1
70+
if count == ITERATIONS
71+
main.transfer(nil)
72+
break
73+
else
74+
f1.transfer(nil)
75+
end
76+
end
77+
end
78+
79+
f1.transfer(nil)
80+
end
81+
82+
bm = Benchmark.ips do |x|
83+
x.config(:time => 5, :warmup => 2)
84+
85+
x.report("snooze") { run_snooze }
86+
x.report("raw transfer") { run_raw_transfer }
87+
88+
x.compare!
89+
end

examples/bm_write.rb

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# frozen_string_literal: true
2+
3+
require 'bundler/inline'
4+
5+
gemfile do
6+
source 'https://rubygems.org'
7+
gem 'uringmachine', path: '..'
8+
gem 'benchmark'
9+
end
10+
11+
require 'benchmark'
12+
require 'uringmachine'
13+
14+
ITERATIONS = 100000
15+
BUF = ('*' * 8192).freeze
16+
FN = '/tmp/bm_write'
17+
18+
def run_io_write(num_threads)
19+
FileUtils.rm(FN) rescue nil
20+
fio = File.open(FN, 'w')
21+
22+
threads = num_threads.times.map do |i|
23+
Thread.new do
24+
ITERATIONS.times { fio.write(BUF) }
25+
end
26+
end
27+
threads.each(&:join)
28+
ensure
29+
fio.close
30+
end
31+
32+
def run_um_write(num_fibers)
33+
FileUtils.rm(FN) rescue nil
34+
fio = File.open(FN, 'w')
35+
fd = fio.fileno
36+
37+
machine = UringMachine.new
38+
done = UringMachine::Queue.new
39+
num_fibers.times do
40+
machine.spin do
41+
ITERATIONS.times { machine.write(fd, BUF) }
42+
machine.push(done, true)
43+
end
44+
end
45+
num_fibers.times { machine.pop(done) }
46+
ensure
47+
fio.close
48+
end
49+
50+
Benchmark.bm do |x|
51+
[1, 2, 4, 8].each do |c|
52+
x.report("IO (#{c} threads)") { run_io_write(c) }
53+
x.report("UM (#{c} fibers) ") { run_um_write(c) }
54+
puts
55+
end
56+
end

examples/dns_client.rb

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# frozen_string_literal: true
2+
3+
require_relative '../lib/uringmachine'
4+
require 'resolv'
5+
6+
machine = UM.new
7+
8+
addrs = machine.resolve('status.realiteq.net')
9+
10+
puts '*' * 40
11+
puts addrs.join("\n")
12+
puts

examples/http_server.rb

+10-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ def http_handle_connection(fd)
2323
# parser << _1
2424
# break if done
2525
# end
26+
2627
# puts "Connection closed on fd #{fd}"
2728
rescue => e
28-
puts "Error while handling connection on fd #{fd}: #{e.inspect}"
29+
# puts "Error while handling connection on fd #{fd}: #{e.inspect}"
2930
ensure
3031
@machine.close(fd) rescue nil
3132
end
@@ -36,15 +37,21 @@ def http_send_response(fd, body)
3637
@machine.write(fd, msg)
3738
end
3839

39-
trap('SIGINT') { exit! }
40+
trap('SIGINT') { exit }
4041

4142
server_fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
43+
@machine.setsockopt(server_fd, UM::SOL_SOCKET, UM::SO_REUSEADDR, true)
4244
@machine.bind(server_fd, '127.0.0.1', 1234)
4345
@machine.listen(server_fd, UM::SOMAXCONN)
4446
puts 'Listening on port 1234'
4547

46-
at_exit { @machine.close(server_fd) rescue nil }
48+
at_exit do
49+
puts "Closing server FD"
50+
@machine.close(server_fd) rescue nil
51+
puts "done!"
52+
end
4753

4854
@machine.accept_each(server_fd) do |fd|
4955
@machine.spin(fd) { http_handle_connection _1 }
5056
end
57+
p :post_accept_each

examples/server_client.rb

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# frozen_string_literal: true
2+
3+
require_relative '../lib/uringmachine'
4+
5+
PORT = 1234
6+
7+
@machine = UM.new
8+
@bgid = @machine.setup_buffer_ring(4096, 1024)
9+
10+
@counter = 0
11+
12+
def handle_connection(fd)
13+
buf = +''
14+
loop do
15+
res = @machine.recv(fd, buf, 8192, 0)
16+
break if res == 0
17+
18+
@machine.write(fd, buf)
19+
@counter += 2
20+
end
21+
ensure
22+
@machine.close(fd) rescue nil
23+
end
24+
25+
def run_client
26+
fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
27+
@machine.connect(fd, '127.0.0.1', PORT)
28+
msg = 'foo' * 30
29+
buf = +''
30+
loop do
31+
@machine.send(fd, msg, msg.bytesize, 0)
32+
res = @machine.recv(fd, buf, 8192, 0)
33+
@counter += 2
34+
35+
break if res == 0
36+
raise "Got #{res} bytes instead of #{msg.bytesize}" if res != msg.bytesize
37+
end
38+
end
39+
40+
trap('SIGINT') { exit }
41+
42+
server_fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
43+
@machine.setsockopt(server_fd, UM::SOL_SOCKET, UM::SO_REUSEADDR, true)
44+
@machine.bind(server_fd, '127.0.0.1', PORT)
45+
@machine.listen(server_fd, UM::SOMAXCONN)
46+
puts "Listening on port #{PORT}"
47+
48+
at_exit { @machine.close(server_fd) rescue nil }
49+
50+
20.times do
51+
@machine.spin { run_client }
52+
end
53+
54+
@machine.spin do
55+
@machine.accept_each(server_fd) do |fd|
56+
@machine.spin(fd) { handle_connection _1 }
57+
end
58+
end
59+
60+
t0 = Time.now
61+
@machine.sleep 3
62+
t1 = Time.now
63+
elapsed = t1 - t0
64+
puts "Did #{@counter} ops in #{elapsed} seconds (#{(@counter / elapsed)} ops/s)"

examples/snooze.rb

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# frozen_string_literal: true
2+
3+
require_relative '../lib/uringmachine'
4+
5+
@machine = UM.new
6+
7+
@counter = 0
8+
@fiber_count = 0
9+
10+
def start_fiber
11+
@fiber_count += 1
12+
@machine.spin do
13+
max_count = @counter + rand(1000)
14+
puts "Start #{Fiber.current} #{max_count - @counter}"
15+
loop do
16+
@machine.sleep 0.001
17+
@counter += 1
18+
break if @counter >= max_count
19+
end
20+
puts "Stop #{Fiber.current}"
21+
ensure
22+
@fiber_count -= 1
23+
end
24+
end
25+
26+
t0 = Time.now
27+
MAX_FIBERS = 20
28+
MAX_TIME = 10
29+
loop do
30+
@machine.sleep 0.1
31+
puts "pending: #{@machine.pending_count}"
32+
break if (Time.now - t0) > MAX_TIME
33+
start_fiber while @fiber_count < MAX_FIBERS
34+
end
35+
t1 = Time.now
36+
elapsed = t1 - t0
37+
rate = @counter / elapsed
38+
puts "Did #{@counter} ops in #{elapsed} seconds (#{rate} ops/s)"
39+
40+
puts "Waiting for fibers... (#{@fiber_count})"
41+
while @fiber_count > 0
42+
@machine.sleep 0.1
43+
end
44+
puts "Done"

0 commit comments

Comments
 (0)