Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement um_op and related code #1

Merged
merged 26 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1d0c25f
Rewrite UM implementation
noteflakes Oct 30, 2024
be2b7c1
Keep references to scheduled fibers
noteflakes Oct 30, 2024
4a785d3
Add list of transient ops for GC purposes.
noteflakes Oct 31, 2024
69a4ae4
Reimplement all single shot ops
noteflakes Oct 31, 2024
98c1a5a
Comment out um_poll looping code
noteflakes Nov 1, 2024
9dd5320
Reimplement sync code
noteflakes Nov 1, 2024
bb19156
Ongoing work on reimplementing multishot ops
noteflakes Nov 1, 2024
2284dce
Handle CQE's one by one
noteflakes Nov 1, 2024
fd0848f
Reimplement recv_each
noteflakes Nov 1, 2024
6fe4260
Reimplement read_each (WIP)
noteflakes Nov 2, 2024
7e3546f
Debug
noteflakes Nov 2, 2024
ae2ac24
Simplify and improve CQE processing, fix read_each
noteflakes Nov 2, 2024
60504b1
Fix failing tests, cleanup
noteflakes Nov 2, 2024
8c2b25e
Refactor checking completion result
noteflakes Nov 2, 2024
6d6af4a
Remove #interrupt method
noteflakes Nov 2, 2024
723e143
Improve stability, fix issue with unwanted fiber resume
noteflakes Nov 2, 2024
86b0fce
Add more design details to README
noteflakes Nov 2, 2024
63f5dcb
Benchmark schedule
noteflakes Nov 2, 2024
8333bb5
Reimplement runqueue scheduling
noteflakes Nov 3, 2024
0465d62
Reuse transient um_op and multishot um_op_result structs
noteflakes Nov 8, 2024
ab4f753
Add basic DNS resolver
noteflakes Nov 9, 2024
a55d22f
Update liburing
noteflakes Nov 9, 2024
5598397
Small refactor to um_sync code
noteflakes Nov 11, 2024
9118539
Raise on error result read_each
noteflakes Nov 11, 2024
5e69510
Fix test warnings
noteflakes Nov 11, 2024
a94dbda
Add UringMachine.kernel_version, skip read_each tests on early kernel
noteflakes Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ UringMachine is based on my experience marrying Ruby and io_uring:
- [IOU](https://github.com/digital-fabric/iou) - a low-level asynchronous API
for using io_uring from Ruby.

### Learnings

Some important learnings from those two projects, in no particular order:

- Monkey-patching is not a good solution, long term. You need to deal with
Expand Down Expand Up @@ -73,6 +75,30 @@ based on the following principles:
- Do not insist on structured concurrency, just provide the APIs necessary to
create actors and to supervise the execution of fibers.

### Cancellation

When working with io_uring, managing the life cycle of asynchronous operations
is quite tricky, especially with regards to cancellation. This is due to the
fact each operation lives on both sides of the userspace-kernel divide. This
means that when cancelling an operation, we cannot free, or dispose of any
resources associated with the operation, until we know for sure that the kernel
side is also done with the operation.

As stated above, working with fibers allows us to keep operation metadata and
associated data (such as buffers etc) on the stack, which can greatly simplify
the managing of the operation's lifetime, as well as significantly reduce heap
allocations.

When a cancellation does occur, UringMachine issues a cancellation (using
`io_uring_prep_cancel64`), and then waits for the corresponding CQE (with a
`-ECANCELED` result).

### No scheduler, no runqueue

UringMachine manages the scheduling of fibers by relying solely on io_uring
CQE's. There's no list of ready fibers, no runqueue. Manually scheduling fibers
is done by issuing `NOP` operations and then processing CQE's one by one.

## Example

```ruby
Expand Down
41 changes: 1 addition & 40 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,45 +1,7 @@
- futex wait wake

```ruby
# interface:
f = UM::Mutex.new
f.synchronize { ... }

q = UM:Queue.new
# API is similar to stock Ruby Queue class
q << :foo # same as q.push(:foo)
q.shift
q.pop
q.unshift(:foo)
```

But how do we use this in conjunction with a UringMachine instance?

```ruby
f = UM::Mutex.new
machine.synchronize(futex) { ... } # looks good

# or maybe:
f.synchronize(machine) { ... } # looks a bit weird

# how about queues?
machine.push(q)
machine.pop(q)
machine.shift(q)
machine.unshift(q)

# what about events?
UM::Event
```


- queues


- splice / - tee
- sendto
- recvfrom
- poll
- poll_add / poll_remove / poll_multishot / poll_update
- open / openat
- fsync
- mkdir / mkdirat
Expand All @@ -51,5 +13,4 @@ UM::Event
- madvise
- getxattr / setxattr
- shutdown

- send_bundle / recv_bundle (kernel >= 6.10)
89 changes: 89 additions & 0 deletions examples/bm_snooze.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# frozen_string_literal: true

require 'bundler/inline'

gemfile do
source 'https://rubygems.org'
gem 'uringmachine', path: '..'
gem 'benchmark-ips'
end

require 'benchmark/ips'
require 'uringmachine'

ITERATIONS = 1000

$machine = UringMachine.new

def run_snooze
count = 0
main = Fiber.current

f1 = Fiber.new do
loop do
count += 1
if count == ITERATIONS
$machine.schedule(main, nil)
break
else
$machine.snooze
end
end
end

f2 = Fiber.new do
loop do
count += 1
if count == ITERATIONS
$machine.schedule(main, nil)
break
else
$machine.snooze
end
end
end

$machine.schedule(f1, nil)
$machine.schedule(f2, nil)
$machine.yield
end

def run_raw_transfer
count = 0
main = Fiber.current
f2 = nil
f1 = Fiber.new do
loop do
count += 1
if count == ITERATIONS
main.transfer(nil)
break
else
f2.transfer(nil)
end
end
end

f2 = Fiber.new do
loop do
count += 1
if count == ITERATIONS
main.transfer(nil)
break
else
f1.transfer(nil)
end
end
end

f1.transfer(nil)
end

bm = Benchmark.ips do |x|
x.config(:time => 5, :warmup => 2)

x.report("snooze") { run_snooze }
x.report("raw transfer") { run_raw_transfer }

x.compare!
end
56 changes: 56 additions & 0 deletions examples/bm_write.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true

require 'bundler/inline'

gemfile do
source 'https://rubygems.org'
gem 'uringmachine', path: '..'
gem 'benchmark'
end

require 'benchmark'
require 'uringmachine'

ITERATIONS = 100000
BUF = ('*' * 8192).freeze
FN = '/tmp/bm_write'

def run_io_write(num_threads)
FileUtils.rm(FN) rescue nil
fio = File.open(FN, 'w')

threads = num_threads.times.map do |i|
Thread.new do
ITERATIONS.times { fio.write(BUF) }
end
end
threads.each(&:join)
ensure
fio.close
end

def run_um_write(num_fibers)
FileUtils.rm(FN) rescue nil
fio = File.open(FN, 'w')
fd = fio.fileno

machine = UringMachine.new
done = UringMachine::Queue.new
num_fibers.times do
machine.spin do
ITERATIONS.times { machine.write(fd, BUF) }
machine.push(done, true)
end
end
num_fibers.times { machine.pop(done) }
ensure
fio.close
end

Benchmark.bm do |x|
[1, 2, 4, 8].each do |c|
x.report("IO (#{c} threads)") { run_io_write(c) }
x.report("UM (#{c} fibers) ") { run_um_write(c) }
puts
end
end
12 changes: 12 additions & 0 deletions examples/dns_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

require_relative '../lib/uringmachine'
require 'resolv'

machine = UM.new

addrs = machine.resolve('status.realiteq.net')

puts '*' * 40
puts addrs.join("\n")
puts
13 changes: 10 additions & 3 deletions examples/http_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ def http_handle_connection(fd)
# parser << _1
# break if done
# end

# puts "Connection closed on fd #{fd}"
rescue => e
puts "Error while handling connection on fd #{fd}: #{e.inspect}"
# puts "Error while handling connection on fd #{fd}: #{e.inspect}"
ensure
@machine.close(fd) rescue nil
end
Expand All @@ -36,15 +37,21 @@ def http_send_response(fd, body)
@machine.write(fd, msg)
end

trap('SIGINT') { exit! }
trap('SIGINT') { exit }

server_fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
@machine.setsockopt(server_fd, UM::SOL_SOCKET, UM::SO_REUSEADDR, true)
@machine.bind(server_fd, '127.0.0.1', 1234)
@machine.listen(server_fd, UM::SOMAXCONN)
puts 'Listening on port 1234'

at_exit { @machine.close(server_fd) rescue nil }
at_exit do
puts "Closing server FD"
@machine.close(server_fd) rescue nil
puts "done!"
end

@machine.accept_each(server_fd) do |fd|
@machine.spin(fd) { http_handle_connection _1 }
end
p :post_accept_each
64 changes: 64 additions & 0 deletions examples/server_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# frozen_string_literal: true

require_relative '../lib/uringmachine'

PORT = 1234

@machine = UM.new
@bgid = @machine.setup_buffer_ring(4096, 1024)

@counter = 0

def handle_connection(fd)
buf = +''
loop do
res = @machine.recv(fd, buf, 8192, 0)
break if res == 0

@machine.write(fd, buf)
@counter += 2
end
ensure
@machine.close(fd) rescue nil
end

def run_client
fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
@machine.connect(fd, '127.0.0.1', PORT)
msg = 'foo' * 30
buf = +''
loop do
@machine.send(fd, msg, msg.bytesize, 0)
res = @machine.recv(fd, buf, 8192, 0)
@counter += 2

break if res == 0
raise "Got #{res} bytes instead of #{msg.bytesize}" if res != msg.bytesize
end
end

trap('SIGINT') { exit }

server_fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
@machine.setsockopt(server_fd, UM::SOL_SOCKET, UM::SO_REUSEADDR, true)
@machine.bind(server_fd, '127.0.0.1', PORT)
@machine.listen(server_fd, UM::SOMAXCONN)
puts "Listening on port #{PORT}"

at_exit { @machine.close(server_fd) rescue nil }

20.times do
@machine.spin { run_client }
end

@machine.spin do
@machine.accept_each(server_fd) do |fd|
@machine.spin(fd) { handle_connection _1 }
end
end

t0 = Time.now
@machine.sleep 3
t1 = Time.now
elapsed = t1 - t0
puts "Did #{@counter} ops in #{elapsed} seconds (#{(@counter / elapsed)} ops/s)"
44 changes: 44 additions & 0 deletions examples/snooze.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# frozen_string_literal: true

require_relative '../lib/uringmachine'

@machine = UM.new

@counter = 0
@fiber_count = 0

def start_fiber
@fiber_count += 1
@machine.spin do
max_count = @counter + rand(1000)
puts "Start #{Fiber.current} #{max_count - @counter}"
loop do
@machine.sleep 0.001
@counter += 1
break if @counter >= max_count
end
puts "Stop #{Fiber.current}"
ensure
@fiber_count -= 1
end
end

t0 = Time.now
MAX_FIBERS = 20
MAX_TIME = 10
loop do
@machine.sleep 0.1
puts "pending: #{@machine.pending_count}"
break if (Time.now - t0) > MAX_TIME
start_fiber while @fiber_count < MAX_FIBERS
end
t1 = Time.now
elapsed = t1 - t0
rate = @counter / elapsed
puts "Did #{@counter} ops in #{elapsed} seconds (#{rate} ops/s)"

puts "Waiting for fibers... (#{@fiber_count})"
while @fiber_count > 0
@machine.sleep 0.1
end
puts "Done"
Loading