Skip to content

Commit 3370103

Browse files
committed
Implement #bind, #listen
1 parent a7afd38 commit 3370103

File tree

7 files changed

+153
-22
lines changed

7 files changed

+153
-22
lines changed

TODO.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1-
- bind
2-
- listen
31
- splice
2+
- sendto
3+
- recvfrom
4+
5+
- queues

ext/um/extconf.rb

+6-15
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,11 @@ def get_config
2121
raise "UringMachine requires kernel version 6.4 or newer!" if combined_version < 604
2222

2323
config[:kernel_version] = combined_version
24-
config[:pidfd_open] = combined_version > 503
25-
config[:multishot_accept] = combined_version >= 519
26-
config[:multishot_recv] = combined_version >= 600
27-
config[:multishot_recvmsg] = combined_version >= 600
28-
config[:multishot_timeout] = combined_version >= 604
2924
config[:submit_all_flag] = combined_version >= 518
3025
config[:coop_taskrun_flag] = combined_version >= 519
3126
config[:single_issuer_flag] = combined_version >= 600
27+
config[:prep_bind] = combined_version >= 611
28+
config[:prep_listen] = combined_version >= 611
3229

3330
config
3431
end
@@ -54,16 +51,10 @@ def get_config
5451
raise "Couldn't find liburing.a"
5552
end
5653

57-
def define_bool(name, value)
58-
$defs << "-D#{name}=#{value ? 1 : 0 }"
59-
end
60-
61-
$defs << '-DHAVE_IO_URING_PREP_MULTISHOT_ACCEPT' if config[:multishot_accept]
62-
$defs << '-DHAVE_IO_URING_PREP_RECV_MULTISHOT' if config[:multishot_recv]
63-
$defs << '-DHAVE_IO_URING_PREP_RECVMSG_MULTISHOT' if config[:multishot_recvmsg]
64-
$defs << '-DHAVE_IO_URING_TIMEOUT_MULTISHOT' if config[:multishot_timeout]
65-
$defs << '-DHAVE_IORING_SETUP_SUBMIT_ALL' if config[:submit_all_flag]
66-
$defs << '-DHAVE_IORING_SETUP_COOP_TASKRUN' if config[:coop_taskrun_flag]
54+
$defs << '-DHAVE_IORING_SETUP_SUBMIT_ALL' if config[:submit_all_flag]
55+
$defs << '-DHAVE_IORING_SETUP_COOP_TASKRUN' if config[:coop_taskrun_flag]
56+
$defs << '-DHAVE_IO_URING_PREP_BIND' if config[:prep_bind]
57+
$defs << '-DHAVE_IO_URING_PREP_LISTEN' if config[:prep_listen]
6758
$CFLAGS << ' -Wno-pointer-arith'
6859

6960
CONFIG['optflags'] << ' -fno-strict-aliasing'

ext/um/um.c

+30
Original file line numberDiff line numberDiff line change
@@ -617,3 +617,33 @@ VALUE um_recv(struct um *machine, int fd, VALUE buffer, int maxlen, int flags) {
617617
um_update_read_buffer(machine, buffer, 0, result, flags);
618618
return INT2FIX(result);
619619
}
620+
621+
VALUE um_bind(struct um *machine, int fd, struct sockaddr *addr, socklen_t addrlen) {
622+
struct um_op *op = um_op_checkout(machine);
623+
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
624+
int result = 0;
625+
626+
io_uring_prep_bind(sqe, fd, addr, addrlen);
627+
op->state = OP_submitted;
628+
629+
um_await_op(machine, op, &result, NULL);
630+
631+
discard_op_if_completed(machine, op);
632+
um_raise_on_system_error(result);
633+
return INT2FIX(result);
634+
}
635+
636+
VALUE um_listen(struct um *machine, int fd, int backlog) {
637+
struct um_op *op = um_op_checkout(machine);
638+
struct io_uring_sqe *sqe = um_get_sqe(machine, op);
639+
int result = 0;
640+
641+
io_uring_prep_listen(sqe, fd, backlog);
642+
op->state = OP_submitted;
643+
644+
um_await_op(machine, op, &result, NULL);
645+
646+
discard_op_if_completed(machine, op);
647+
um_raise_on_system_error(result);
648+
return INT2FIX(result);
649+
}

ext/um/um.h

+2
Original file line numberDiff line numberDiff line change
@@ -129,5 +129,7 @@ VALUE um_socket(struct um *machine, int domain, int type, int protocol, uint fla
129129
VALUE um_connect(struct um *machine, int fd, const struct sockaddr *addr, socklen_t addrlen);
130130
VALUE um_send(struct um *machine, int fd, VALUE buffer, int len, int flags);
131131
VALUE um_recv(struct um *machine, int fd, VALUE buffer, int maxlen, int flags);
132+
VALUE um_bind(struct um *machine, int fd, struct sockaddr *addr, socklen_t addrlen);
133+
VALUE um_listen(struct um *machine, int fd, int backlog);
132134

133135
#endif // UM_H

ext/um/um_class.c

+32
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,36 @@ VALUE UM_recv(VALUE self, VALUE fd, VALUE buffer, VALUE maxlen, VALUE flags) {
164164
return um_recv(machine, NUM2INT(fd), buffer, NUM2INT(maxlen), NUM2INT(flags));
165165
}
166166

167+
VALUE UM_bind(VALUE self, VALUE fd, VALUE host, VALUE port) {
168+
struct sockaddr_in addr;
169+
memset(&addr, 0, sizeof(addr));
170+
addr.sin_family = AF_INET;
171+
addr.sin_addr.s_addr = inet_addr(StringValueCStr(host));
172+
addr.sin_port = htons(NUM2INT(port));
173+
174+
#ifdef HAVE_IO_URING_PREP_BIND
175+
struct um *machine = get_machine(self);
176+
return um_bind(machine, NUM2INT(fd), (struct sockaddr *)&addr, sizeof(addr));
177+
#else
178+
int res = bind(NUM2INT(fd), (struct sockaddr *)&addr, sizeof(addr));
179+
if (res)
180+
rb_syserr_fail(errno, strerror(errno));
181+
return INT2NUM(0);
182+
#endif
183+
}
184+
185+
VALUE UM_listen(VALUE self, VALUE fd, VALUE backlog) {
186+
#ifdef HAVE_IO_URING_PREP_LISTEN
187+
struct um *machine = get_machine(self);
188+
return um_listen(machine, NUM2INT(fd), NUM2INT(backlog));
189+
#else
190+
int res = listen(NUM2INT(fd), NUM2INT(backlog));
191+
if (res)
192+
rb_syserr_fail(errno, strerror(errno));
193+
return INT2NUM(0);
194+
#endif
195+
}
196+
167197
void Init_UM(void) {
168198
rb_ext_ractor_safe(true);
169199

@@ -192,4 +222,6 @@ void Init_UM(void) {
192222
rb_define_method(cUM, "connect", UM_connect, 3);
193223
rb_define_method(cUM, "send", UM_send, 4);
194224
rb_define_method(cUM, "recv", UM_recv, 4);
225+
rb_define_method(cUM, "bind", UM_bind, 3);
226+
rb_define_method(cUM, "listen", UM_listen, 2);
195227
}

test/helper.rb

+8
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,12 @@ def setup
6060
def teardown
6161
# @machine&.cleanup
6262
end
63+
64+
def assign_port
65+
@@port_assign_mutex ||= Mutex.new
66+
@@port_assign_mutex.synchronize do
67+
@@port ||= 1024 + rand(60000)
68+
@@port += 1
69+
end
70+
end
6371
end

test/test_um.rb

+71-5
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ def test_close
381381
class AcceptTest < UMBaseTest
382382
def setup
383383
super
384-
@port = 9000 + rand(1000)
384+
@port = assign_port
385385
@server = TCPServer.open('127.0.0.1', @port)
386386
end
387387

@@ -409,7 +409,7 @@ def test_accept
409409
class AcceptEachTest < UMBaseTest
410410
def setup
411411
super
412-
@port = 9000 + rand(1000)
412+
@port = assign_port
413413
@server = TCPServer.open('127.0.0.1', @port)
414414
end
415415

@@ -455,7 +455,7 @@ def test_socket
455455
class ConnectTest < UMBaseTest
456456
def setup
457457
super
458-
@port = 9000 + rand(1000)
458+
@port = assign_port
459459
@server = TCPServer.open('127.0.0.1', @port)
460460
end
461461

@@ -496,7 +496,7 @@ def test_connect_with_bad_addr
496496
class SendTest < UMBaseTest
497497
def setup
498498
super
499-
@port = 9000 + rand(1000)
499+
@port = assign_port
500500
@server = TCPServer.open('127.0.0.1', @port)
501501
end
502502

@@ -532,7 +532,7 @@ def test_send
532532
class RecvTest < UMBaseTest
533533
def setup
534534
super
535-
@port = 9000 + rand(1000)
535+
@port = assign_port
536536
@server = TCPServer.open('127.0.0.1', @port)
537537
end
538538

@@ -560,3 +560,69 @@ def test_recv
560560
t&.kill
561561
end
562562
end
563+
564+
class BindTest < UMBaseTest
565+
def setup
566+
super
567+
@port = assign_port
568+
end
569+
570+
def test_bind
571+
assert_equal 0, machine.pending_count
572+
fd = machine.socket(Socket::AF_INET, Socket::SOCK_DGRAM, 0, 0);
573+
res = machine.bind(fd, '127.0.0.1', @port)
574+
assert_equal 0, res
575+
assert_equal 0, machine.pending_count
576+
577+
peer = UDPSocket.new
578+
peer.connect('127.0.0.1', @port)
579+
peer.send 'foo', 0
580+
581+
buf = +''
582+
res = machine.recv(fd, buf, 8192, 0)
583+
assert_equal 3, res
584+
assert_equal 'foo', buf
585+
end
586+
587+
def test_bind_invalid_args
588+
assert_equal 0, machine.pending_count
589+
590+
fd = machine.socket(Socket::AF_INET, Socket::SOCK_DGRAM, 0, 0);
591+
assert_raises(Errno::EACCES) { machine.bind(fd, 'foo.bar.baz', 3) }
592+
assert_raises(Errno::EBADF) { machine.bind(-3, '127.0.01', 1234) }
593+
594+
assert_equal 0, machine.pending_count
595+
end
596+
end
597+
598+
class ListenTest < UMBaseTest
599+
def setup
600+
super
601+
@port = assign_port
602+
end
603+
604+
def test_listen
605+
fd = machine.socket(Socket::AF_INET, Socket::SOCK_STREAM, 0, 0);
606+
machine.bind(fd, '127.0.0.1', @port)
607+
res = machine.listen(fd, 5)
608+
assert_equal 0, res
609+
assert_equal 0, machine.pending_count
610+
611+
conn = nil
612+
t = Thread.new do
613+
sleep 0.01
614+
conn = TCPSocket.new('127.0.0.1', @port)
615+
end
616+
617+
conn_fd = machine.accept(fd)
618+
t.join
619+
assert_kind_of TCPSocket, conn
620+
621+
machine.send(conn_fd, 'foo', 3, 0)
622+
623+
buf = conn.readpartial(42)
624+
assert_equal 'foo', buf
625+
ensure
626+
t&.kill
627+
end
628+
end

0 commit comments

Comments
 (0)