Skip to content

Commit f966dea

Browse files
committed
Implement UM::Mutex, #synchronize
1 parent 81bc6bc commit f966dea

File tree

7 files changed

+145
-20
lines changed

7 files changed

+145
-20
lines changed

TODO.md

+40-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,45 @@
1-
- send_bundle / recv_bundle (kernel >= 6.10)
2-
- splice
1+
- futex wait wake
2+
3+
```ruby
4+
# interface:
5+
f = UM::Mutex.new
6+
f.synchronize { ... }
7+
8+
q = UM:Queue.new
9+
# API is similar to stock Ruby Queue class
10+
q << :foo # same as q.push(:foo)
11+
q.shift
12+
q.pop
13+
q.unshift(:foo)
14+
```
15+
16+
But how do we use this in conjunction with a UringMachine instance?
17+
18+
```ruby
19+
f = UM::Mutex.new
20+
machine.synchronize(futex) { ... } # looks good
21+
22+
# or maybe:
23+
f.synchronize(machine) { ... } # looks a bit weird
24+
25+
# how about queues?
26+
machine.push(q)
27+
machine.pop(q)
28+
machine.shift(q)
29+
machine.unshift(q)
30+
31+
# what about events?
32+
UM::Event
33+
```
34+
35+
36+
- queues
37+
38+
39+
- splice / - tee
340
- sendto
441
- recvfrom
542
- poll
6-
- tee
743
- open / openat
844
- fsync
945
- mkdir / mkdirat
@@ -15,6 +51,5 @@
1551
- madvise
1652
- getxattr / setxattr
1753
- shutdown
18-
- futex wait wake
1954

20-
- queues
55+
- send_bundle / recv_bundle (kernel >= 6.10)

ext/um/extconf.rb

+5-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ def get_config
2828
single_issuer_flag: combined_version >= 600,
2929
prep_bind: combined_version >= 611,
3030
prep_listen: combined_version >= 611,
31-
prep_cmd_sock: combined_version >= 608
31+
prep_cmd_sock: combined_version >= 607,
32+
prep_futex: combined_version >= 607,
33+
prep_waitid: combined_version >= 607
3234
}
3335
end
3436

@@ -58,6 +60,8 @@ def get_config
5860
$defs << '-DHAVE_IO_URING_PREP_BIND' if config[:prep_bind]
5961
$defs << '-DHAVE_IO_URING_PREP_LISTEN' if config[:prep_listen]
6062
$defs << '-DHAVE_IO_URING_PREP_CMD_SOCK' if config[:prep_cmd_sock]
63+
$defs << '-DHAVE_IO_URING_PREP_FUTEX' if config[:prep_futex]
64+
$defs << '-DHAVE_IO_URING_PREP_WAITID' if config[:prep_waitid]
6165
$CFLAGS << ' -Wno-pointer-arith'
6266

6367
CONFIG['optflags'] << ' -fno-strict-aliasing'

ext/um/um.c

+12-12
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ inline void um_teardown(struct um *machine) {
4848
um_free_buffer_linked_list(machine);
4949
}
5050

51-
static inline struct io_uring_sqe *um_get_sqe(struct um *machine, struct um_op *op) {
51+
struct io_uring_sqe *um_get_sqe(struct um *machine, struct um_op *op) {
5252
struct io_uring_sqe *sqe;
5353
sqe = io_uring_get_sqe(&machine->ring);
5454
if (likely(sqe)) goto done;
@@ -209,7 +209,7 @@ static inline void um_submit_cancel_op(struct um *machine, struct um_op *op) {
209209
io_uring_prep_cancel64(sqe, (long long)op, 0);
210210
}
211211

212-
static inline VALUE um_await_op(struct um *machine, struct um_op *op, __s32 *result, __u32 *flags) {
212+
VALUE um_await_op(struct um *machine, struct um_op *op, __s32 *result, __u32 *flags) {
213213
RB_OBJ_WRITE(machine->self, &op->fiber, rb_fiber_current());
214214

215215
VALUE value = um_fiber_switch(machine);
@@ -537,7 +537,7 @@ VALUE um_accept_each(struct um *machine, int fd) {
537537
VALUE um_socket(struct um *machine, int domain, int type, int protocol, uint flags) {
538538
struct um_op *op = um_op_idle_checkout(machine, OP_SOCKET);
539539
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
540-
int result = 0;
540+
__s32 result = 0;
541541

542542
io_uring_prep_socket(sqe, domain, type, protocol, flags);
543543
um_await_op(machine, op, &result, NULL);
@@ -549,7 +549,7 @@ VALUE um_socket(struct um *machine, int domain, int type, int protocol, uint fla
549549
VALUE um_connect(struct um *machine, int fd, const struct sockaddr *addr, socklen_t addrlen) {
550550
struct um_op *op = um_op_idle_checkout(machine, OP_CONNECT);
551551
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
552-
int result = 0;
552+
__s32 result = 0;
553553

554554
io_uring_prep_connect(sqe, fd, addr, addrlen);
555555
um_await_op(machine, op, &result, NULL);
@@ -561,7 +561,7 @@ VALUE um_connect(struct um *machine, int fd, const struct sockaddr *addr, sockle
561561
VALUE um_send(struct um *machine, int fd, VALUE buffer, int len, int flags) {
562562
struct um_op *op = um_op_idle_checkout(machine, OP_SEND);
563563
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
564-
int result = 0;
564+
__s32 result = 0;
565565

566566
io_uring_prep_send(sqe, fd, RSTRING_PTR(buffer), len, flags);
567567
um_await_op(machine, op, &result, NULL);
@@ -573,7 +573,7 @@ VALUE um_send(struct um *machine, int fd, VALUE buffer, int len, int flags) {
573573
VALUE um_recv(struct um *machine, int fd, VALUE buffer, int maxlen, int flags) {
574574
struct um_op *op = um_op_idle_checkout(machine, OP_RECV);
575575
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
576-
int result = 0;
576+
__s32 result = 0;
577577

578578
void *ptr = um_prepare_read_buffer(buffer, maxlen, 0);
579579
io_uring_prep_recv(sqe, fd, ptr, maxlen, flags);
@@ -620,7 +620,7 @@ VALUE um_recv_each(struct um *machine, int fd, int bgid, int flags) {
620620
VALUE um_bind(struct um *machine, int fd, struct sockaddr *addr, socklen_t addrlen) {
621621
struct um_op *op = um_op_idle_checkout(machine, OP_BIND);
622622
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
623-
int result = 0;
623+
__s32 result = 0;
624624

625625
io_uring_prep_bind(sqe, fd, addr, addrlen);
626626
um_await_op(machine, op, &result, NULL);
@@ -632,7 +632,7 @@ VALUE um_bind(struct um *machine, int fd, struct sockaddr *addr, socklen_t addrl
632632
VALUE um_listen(struct um *machine, int fd, int backlog) {
633633
struct um_op *op = um_op_idle_checkout(machine, OP_LISTEN);
634634
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
635-
int result = 0;
635+
__s32 result = 0;
636636

637637
io_uring_prep_listen(sqe, fd, backlog);
638638
um_await_op(machine, op, &result, NULL);
@@ -645,9 +645,9 @@ VALUE um_getsockopt(struct um *machine, int fd, int level, int opt) {
645645
int value;
646646

647647
#ifdef HAVE_IO_URING_PREP_CMD_SOCK
648-
struct um_op *op = um_op_idle_checkout(machine, OP_LISTEN);
648+
struct um_op *op = um_op_idle_checkout(machine, OP_GETSOCKOPT);
649649
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
650-
int result = 0;
650+
__s32 result = 0;
651651

652652
io_uring_prep_cmd_sock(sqe, SOCKET_URING_OP_GETSOCKOPT, fd, level, opt, &value, sizeof(value));
653653
um_await_op(machine, op, &result, NULL);
@@ -664,9 +664,9 @@ VALUE um_getsockopt(struct um *machine, int fd, int level, int opt) {
664664

665665
VALUE um_setsockopt(struct um *machine, int fd, int level, int opt, int value) {
666666
#ifdef HAVE_IO_URING_PREP_CMD_SOCK
667-
struct um_op *op = um_op_idle_checkout(machine, OP_LISTEN);
667+
struct um_op *op = um_op_idle_checkout(machine, OP_SETSOCKOPT);
668668
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
669-
int result = 0;
669+
__s32 result = 0;
670670

671671
io_uring_prep_cmd_sock(sqe, SOCKET_URING_OP_SETSOCKOPT, fd, level, opt, &value, sizeof(value));
672672
um_await_op(machine, op, &result, NULL);

ext/um/um.h

+18-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ enum op_kind {
4545
OP_SOCKET,
4646
OP_CONNECT,
4747
OP_BIND,
48-
OP_LISTEN
48+
OP_LISTEN,
49+
OP_GETSOCKOPT,
50+
OP_SETSOCKOPT,
51+
OP_SYNCHRONIZE
4952
};
5053

5154
struct um_result_entry {
@@ -128,7 +131,13 @@ struct um {
128131
unsigned int buffer_ring_count;
129132
};
130133

134+
struct um_futex {
135+
uint32_t value;
136+
VALUE self;
137+
};
138+
131139
extern VALUE cUM;
140+
extern VALUE cMutex;
132141

133142
void um_setup(VALUE self, struct um *machine);
134143
void um_teardown(struct um *machine);
@@ -161,6 +170,11 @@ void um_update_read_buffer(struct um *machine, VALUE buffer, int buffer_offset,
161170
int um_setup_buffer_ring(struct um *machine, unsigned size, unsigned count);
162171
VALUE um_get_string_from_buffer_ring(struct um *machine, int bgid, __s32 result, __u32 flags);
163172

173+
struct um_futex *Mutex_data(VALUE self);
174+
175+
struct io_uring_sqe *um_get_sqe(struct um *machine, struct um_op *op);
176+
VALUE um_await_op(struct um *machine, struct um_op *op, __s32 *result, __u32 *flags);
177+
164178
VALUE um_await(struct um *machine);
165179
void um_schedule(struct um *machine, VALUE fiber, VALUE value);
166180
void um_interrupt(struct um *machine, VALUE fiber, VALUE value);
@@ -183,6 +197,9 @@ VALUE um_bind(struct um *machine, int fd, struct sockaddr *addr, socklen_t addrl
183197
VALUE um_listen(struct um *machine, int fd, int backlog);
184198
VALUE um_getsockopt(struct um *machine, int fd, int level, int opt);
185199
VALUE um_setsockopt(struct um *machine, int fd, int level, int opt, int value);
200+
201+
VALUE um_mutex_synchronize(struct um *machine, uint32_t *mutex);
202+
186203
VALUE um_debug(struct um *machine);
187204

188205
void um_define_net_constants(VALUE mod);

ext/um/um_class.c

+14
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ VALUE cUM;
55

66
static void UM_mark(void *ptr) {
77
struct um *machine = ptr;
8+
rb_gc_mark_movable(machine->self);
89
um_mark_op_linked_list(&machine->list_pending);
910
um_mark_op_linked_list(&machine->list_scheduled);
1011
}
1112

1213
static void UM_compact(void *ptr) {
1314
struct um *machine = ptr;
15+
machine->self = rb_gc_location(machine->self);
1416
um_compact_op_linked_list(&machine->list_pending);
1517
um_compact_op_linked_list(&machine->list_scheduled);
1618
}
@@ -222,6 +224,14 @@ VALUE UM_setsockopt(VALUE self, VALUE fd, VALUE level, VALUE opt, VALUE value) {
222224
return um_setsockopt(machine, NUM2INT(fd), NUM2INT(level), NUM2INT(opt), numeric_value(value));
223225
}
224226

227+
#ifdef HAVE_IO_URING_PREP_FUTEX
228+
VALUE UM_synchronize(VALUE self, VALUE futex) {
229+
struct um *machine = get_machine(self);
230+
struct um_futex *futex_data = Mutex_data(futex);
231+
return um_mutex_synchronize(machine, &futex_data->value);
232+
}
233+
#endif
234+
225235
VALUE UM_debug(VALUE self) {
226236
struct um *machine = get_machine(self);
227237
return um_debug(machine);
@@ -261,6 +271,10 @@ void Init_UM(void) {
261271
rb_define_method(cUM, "getsockopt", UM_getsockopt, 3);
262272
rb_define_method(cUM, "setsockopt", UM_setsockopt, 4);
263273

274+
#ifdef HAVE_IO_URING_PREP_FUTEX
275+
rb_define_method(cUM, "synchronize", UM_synchronize, 1);
276+
#endif
277+
264278
rb_define_method(cUM, "debug", UM_debug, 0);
265279

266280
um_define_net_constants(cUM);

ext/um/um_ext.c

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
void Init_UM();
2+
void Init_Mutex();
23

34
void Init_um_ext(void) {
45
Init_UM();
6+
Init_Mutex();
57
}

test/test_um.rb

+54-1
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ def test_constants
689689
end
690690
end
691691

692-
class GetSetSockOpt < UMBaseTest
692+
class GetSetSockOptTest < UMBaseTest
693693
def test_getsockopt_setsockopt
694694
fd = machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
695695
reuseaddr = machine.getsockopt(fd, UM::SOL_SOCKET, UM::SO_REUSEADDR)
@@ -702,3 +702,56 @@ def test_getsockopt_setsockopt
702702
assert_equal 1, reuseaddr
703703
end
704704
end
705+
706+
class SynchronizeTest < UMBaseTest
707+
def test_synchronize_single
708+
skip if !machine.respond_to?(:synchronize)
709+
710+
m = UM::Mutex.new
711+
712+
buf = []
713+
machine.synchronize(m) do
714+
buf << 1
715+
end
716+
machine.synchronize(m) do
717+
buf << 2
718+
end
719+
720+
assert_equal [1, 2], buf
721+
assert_equal 0, machine.pending_count
722+
end
723+
724+
def test_synchronize_pair
725+
skip if !machine.respond_to?(:synchronize)
726+
m = UM::Mutex.new
727+
728+
buf = []
729+
730+
f1 = Fiber.new do
731+
machine.synchronize(m) do
732+
buf << 11
733+
machine.sleep(0.01)
734+
buf << 12
735+
end
736+
buf << 13
737+
machine.yield
738+
end
739+
740+
f2 = Fiber.new do
741+
machine.synchronize(m) do
742+
buf << 21
743+
machine.sleep(0.01)
744+
buf << 22
745+
end
746+
buf << 23
747+
machine.yield
748+
end
749+
750+
machine.schedule(f1, nil)
751+
machine.schedule(f2, nil)
752+
753+
machine.sleep(0.03)
754+
assert_equal [11, 12, 13, 21, 22, 23], buf
755+
assert_equal 0, machine.pending_count
756+
end
757+
end

0 commit comments

Comments
 (0)