Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement um_op and related code #1

Merged
merged 26 commits into from
Nov 11, 2024
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1d0c25f
Rewrite UM implementation
noteflakes Oct 30, 2024
be2b7c1
Keep references to scheduled fibers
noteflakes Oct 30, 2024
4a785d3
Add list of transient ops for GC purposes.
noteflakes Oct 31, 2024
69a4ae4
Reimplement all single shot ops
noteflakes Oct 31, 2024
98c1a5a
Comment out um_poll looping code
noteflakes Nov 1, 2024
9dd5320
Reimplement sync code
noteflakes Nov 1, 2024
bb19156
Ongoing work on reimplementing multishot ops
noteflakes Nov 1, 2024
2284dce
Handle CQE's one by one
noteflakes Nov 1, 2024
fd0848f
Reimplement recv_each
noteflakes Nov 1, 2024
6fe4260
Reimplement read_each (WIP)
noteflakes Nov 2, 2024
7e3546f
Debug
noteflakes Nov 2, 2024
ae2ac24
Simplify and improve CQE processing, fix read_each
noteflakes Nov 2, 2024
60504b1
Fix failing tests, cleanup
noteflakes Nov 2, 2024
8c2b25e
Refactor checking completion result
noteflakes Nov 2, 2024
6d6af4a
Remove #interrupt method
noteflakes Nov 2, 2024
723e143
Improve stability, fix issue with unwanted fiber resume
noteflakes Nov 2, 2024
86b0fce
Add more design details to README
noteflakes Nov 2, 2024
63f5dcb
Benchmark schedule
noteflakes Nov 2, 2024
8333bb5
Reimplement runqueue scheduling
noteflakes Nov 3, 2024
0465d62
Reuse transient um_op and multishot um_op_result structs
noteflakes Nov 8, 2024
ab4f753
Add basic DNS resolver
noteflakes Nov 9, 2024
a55d22f
Update liburing
noteflakes Nov 9, 2024
5598397
Small refactor to um_sync code
noteflakes Nov 11, 2024
9118539
Raise on error result read_each
noteflakes Nov 11, 2024
5e69510
Fix test warnings
noteflakes Nov 11, 2024
a94dbda
Add UringMachine.kernel_version, skip read_each tests on early kernel
noteflakes Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Handle CQE's one by one
noteflakes committed Nov 1, 2024
commit 2284dce84acba852a23f5a398a3a2a4261ae93f1
113 changes: 75 additions & 38 deletions ext/um/um.c
Original file line number Diff line number Diff line change
@@ -82,11 +82,13 @@ static inline VALUE um_process_cqe(struct um *machine, struct io_uring_cqe *cqe)
struct um_op *op = (struct um_op *)cqe->user_data;
if (unlikely(!op)) return Qnil;

machine->pending_count--;
if (!(cqe->flags & IORING_CQE_F_MORE))
machine->pending_count--;

int more = (cqe->flags & IORING_CQE_F_MORE);
printf(
": process_cqe op %p kind %d flags %d cqe_res %d cqe_flags %d\n",
op, op->kind, op->flags, cqe->res, cqe->flags
": process_cqe op %p kind %d flags %d cqe_res %d cqe_flags %d pending %d more %d\n",
op, op->kind, op->flags, cqe->res, cqe->flags, machine->pending_count, more
);
INSPECT(" fiber", op->fiber);
INSPECT(" value", op->value);
@@ -115,6 +117,7 @@ static inline VALUE um_wait_for_and_process_cqe(struct um *machine) {
};

rb_thread_call_without_gvl(um_wait_for_cqe_without_gvl, (void *)&ctx, RUBY_UBF_IO, 0);
printf("um_wait_for_cqe_without_gvl result: %d\n", ctx.result);
if (unlikely(ctx.result < 0)) {
rb_syserr_fail(-ctx.result, strerror(-ctx.result));
}
@@ -127,8 +130,41 @@ static inline bool cq_ring_needs_flush(struct io_uring *ring) {
return IO_URING_READ_ONCE(*ring->sq.kflags) & IORING_SQ_CQ_OVERFLOW;
}

static inline unsigned um_process_ready_cqes(struct um *machine, VALUE *ret) {
unsigned total_count = 0;
// static inline unsigned um_process_ready_cqes(struct um *machine, VALUE *ret) {
// unsigned total_count = 0;
// iterate:
// bool overflow_checked = false;
// struct io_uring_cqe *cqe;
// unsigned head;
// unsigned count = 0;
// io_uring_for_each_cqe(&machine->ring, head, cqe) {
// ++count;
// *ret = um_process_cqe(machine, cqe);
// }
// io_uring_cq_advance(&machine->ring, count);
// total_count += count;

// if (overflow_checked) goto done;

// if (cq_ring_needs_flush(&machine->ring)) {
// io_uring_enter(machine->ring.ring_fd, 0, 0, IORING_ENTER_GETEVENTS, NULL);
// overflow_checked = true;
// goto iterate;
// }

// done:
// return total_count;
// }

// static inline VALUE um_wait_for_and_process_ready_cqes(struct um *machine) {
// VALUE ret = um_wait_for_and_process_cqe(machine);
// um_process_ready_cqes(machine, &ret);
// RB_GC_GUARD(ret);
// return ret;
// }

// returns true if a cqe was processed
static inline int um_process_next_ready_cqe(struct um *machine, VALUE *ret) {
iterate:
bool overflow_checked = false;
struct io_uring_cqe *cqe;
@@ -137,9 +173,10 @@ static inline unsigned um_process_ready_cqes(struct um *machine, VALUE *ret) {
io_uring_for_each_cqe(&machine->ring, head, cqe) {
++count;
*ret = um_process_cqe(machine, cqe);
break;
}
io_uring_cq_advance(&machine->ring, count);
total_count += count;
if (count > 0) return true;

if (overflow_checked) goto done;

@@ -150,37 +187,36 @@ static inline unsigned um_process_ready_cqes(struct um *machine, VALUE *ret) {
}

done:
return total_count;
}

static inline VALUE um_wait_for_and_process_ready_cqes(struct um *machine) {
VALUE ret = um_wait_for_and_process_cqe(machine);
um_process_ready_cqes(machine, &ret);
RB_GC_GUARD(ret);
return ret;
}

VALUE um_poll(struct um *machine) {
RB_OBJ_WRITE(machine->self, &machine->poll_fiber, rb_fiber_current());
INSPECT("um_poll >>", machine->poll_fiber);
// await:
VALUE ret = um_wait_for_and_process_ready_cqes(machine);
// INSPECT("um_poll ret", ret);
// printf("unsubmitted_count: %d\n", machine->unsubmitted_count);
// if (machine->unsubmitted_count && (ret == Qnil))
// goto await;
RB_OBJ_WRITE(machine->self, &machine->poll_fiber, Qnil);
return false;
}

// VALUE um_poll(struct um *machine) {
// RB_OBJ_WRITE(machine->self, &machine->poll_fiber, rb_fiber_current());
// INSPECT("um_poll >>", machine->poll_fiber);
// // await:
// VALUE ret = um_wait_for_and_process_ready_cqes(machine);
// // INSPECT("um_poll ret", ret);
// // printf("unsubmitted_count: %d\n", machine->unsubmitted_count);
// // if (machine->unsubmitted_count && (ret == Qnil))
// // goto await;
// RB_OBJ_WRITE(machine->self, &machine->poll_fiber, Qnil);

// INSPECT("um_poll <<", ret);
// return ret;
// }

INSPECT("um_poll <<", ret);
VALUE um_fiber_switch(struct um *machine) {
VALUE ret = Qnil;
if (!um_process_next_ready_cqe(machine, &ret))
ret = um_wait_for_and_process_cqe(machine);

return ret;
}

VALUE um_fiber_switch(struct um *machine) {
static VALUE nil = Qnil;
if (machine->poll_fiber != Qnil)
return rb_fiber_transfer(machine->poll_fiber, 1, &nil);
else
return um_poll(machine);
// static VALUE nil = Qnil;
// if (machine->poll_fiber != Qnil)
// return rb_fiber_transfer(machine->poll_fiber, 1, &nil);
// else
// return um_poll(machine);
}

static inline void um_submit_cancel_op(struct um *machine, struct um_op *op) {
@@ -590,6 +626,7 @@ VALUE um_setsockopt(struct um *machine, int fd, int level, int opt, int value) {


VALUE um_accept_each_begin(VALUE arg) {
printf("== um_accept_each_begin\n");
struct op_ensure_ctx *ctx = (struct op_ensure_ctx *)arg;
struct io_uring_sqe *sqe = um_get_sqe(ctx->machine, ctx->op);
io_uring_prep_multishot_accept(sqe, ctx->fd, NULL, NULL, 0);
@@ -601,19 +638,19 @@ VALUE um_accept_each_begin(VALUE arg) {
if (!um_op_completed_p(ctx->op))
return raise_if_exception(ret);
else {
int more = (ctx->op->cqe_flags & IORING_CQE_F_MORE);
if (more) ctx->op->flags &= ~ OP_F_COMPLETED;
um_raise_on_error_result(ctx->op->cqe_res);
rb_yield(INT2NUM(ctx->op->cqe_res));
if (!(ctx->op->cqe_flags & IORING_CQE_F_MORE))
break;
else
ctx->op->flags &= ~ OP_F_COMPLETED;
if (!more) break;
}
}

return Qnil;
}

VALUE um_accept_each_ensure(VALUE arg) {
printf("== um_accept_each_ensure\n");
struct op_ensure_ctx *ctx = (struct op_ensure_ctx *)arg;
if (!um_op_completed_p(ctx->op))
um_cancel_and_wait(ctx->machine, ctx->op);
5 changes: 0 additions & 5 deletions test/test_um.rb
Original file line number Diff line number Diff line change
@@ -469,17 +469,12 @@ def test_accept_each
end

count = 0
p 1
machine.accept_each(@server.fileno) do |fd|
p [2, fd]
count += 1
break if count == 3
end
p 3

assert_equal 3, count
assert_equal 1, machine.pending_count
machine.snooze
assert_equal 0, machine.pending_count
ensure
t&.kill