Skip to content

Commit 46c9905

Browse files
committed
feat: add support for splitting TcpStream
1 parent 8ff1bd6 commit 46c9905

File tree

3 files changed

+158
-1
lines changed

3 files changed

+158
-1
lines changed

madsim/src/sim/net/unix/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
#![doc(hidden)]
66

77
mod datagram;
8+
mod split;
89
mod stream;
910

1011
pub use self::datagram::UnixDatagram;
12+
pub use self::split::*;
1113
pub use self::stream::{UnixListener, UnixStream};

madsim/src/sim/net/unix/split.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
use crate::net::UnixStream;
2+
use bytes::BufMut;
3+
use std::{
4+
io,
5+
pin::Pin,
6+
sync::Arc,
7+
task::{Context, Poll},
8+
};
9+
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
10+
11+
/// Borrowed read half of a [`UnixStream`].
12+
#[derive(Debug)]
13+
pub struct ReadHalf<'a>(&'a UnixStream);
14+
15+
/// Borrowed write half of a [`UnixStream`].
16+
#[derive(Debug)]
17+
pub struct WriteHalf<'a>(&'a UnixStream);
18+
19+
pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
20+
(ReadHalf(&*stream), WriteHalf(&*stream))
21+
}
22+
23+
impl ReadHalf<'_> {
24+
/// Tries to read data from the stream into the provided buffer, advancing
25+
/// the buffer's internal cursor, returning how many bytes were read.
26+
///
27+
/// Receives any pending data from the socket but does not wait for new data
28+
/// to arrive. On success, returns the number of bytes read. Because
29+
/// `try_read_buf()` is non-blocking, the buffer does not have to be stored
30+
/// by the async task and can exist entirely on the stack.
31+
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
32+
self.0.try_read_buf(buf)
33+
}
34+
}
35+
36+
impl AsyncRead for ReadHalf<'_> {
37+
fn poll_read(
38+
self: Pin<&mut Self>,
39+
cx: &mut Context<'_>,
40+
buf: &mut ReadBuf<'_>,
41+
) -> Poll<io::Result<()>> {
42+
todo!()
43+
}
44+
}
45+
46+
impl AsyncWrite for WriteHalf<'_> {
47+
fn poll_write(
48+
self: Pin<&mut Self>,
49+
cx: &mut Context<'_>,
50+
buf: &[u8],
51+
) -> Poll<Result<usize, io::Error>> {
52+
todo!()
53+
}
54+
55+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
56+
todo!()
57+
}
58+
59+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
60+
todo!()
61+
}
62+
}
63+
64+
impl AsRef<UnixStream> for ReadHalf<'_> {
65+
fn as_ref(&self) -> &UnixStream {
66+
self.0
67+
}
68+
}
69+
70+
impl AsRef<UnixStream> for WriteHalf<'_> {
71+
fn as_ref(&self) -> &UnixStream {
72+
self.0
73+
}
74+
}
75+
76+
/// Owned read half of a [`UnixStream`].
77+
#[derive(Debug)]
78+
pub struct OwnedReadHalf(Arc<UnixStream>);
79+
80+
/// Owned write half of a [`UnixStream`].
81+
#[derive(Debug)]
82+
pub struct OwnedWriteHalf(Arc<UnixStream>);
83+
84+
pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) {
85+
let arc = Arc::new(stream);
86+
let read = OwnedReadHalf(Arc::clone(&arc));
87+
let write = OwnedWriteHalf(arc);
88+
(read, write)
89+
}
90+
91+
impl OwnedReadHalf {
92+
/// Tries to read data from the stream into the provided buffer, advancing
93+
/// the buffer's internal cursor, returning how many bytes were read.
94+
///
95+
/// Receives any pending data from the socket but does not wait for new data
96+
/// to arrive. On success, returns the number of bytes read. Because
97+
/// `try_read_buf()` is non-blocking, the buffer does not have to be stored
98+
/// by the async task and can exist entirely on the stack.
99+
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
100+
self.0.try_read_buf(buf)
101+
}
102+
}
103+
104+
impl AsyncRead for OwnedReadHalf {
105+
fn poll_read(
106+
self: Pin<&mut Self>,
107+
cx: &mut Context<'_>,
108+
buf: &mut ReadBuf<'_>,
109+
) -> Poll<io::Result<()>> {
110+
todo!()
111+
}
112+
}
113+
114+
impl AsyncWrite for OwnedWriteHalf {
115+
fn poll_write(
116+
self: Pin<&mut Self>,
117+
cx: &mut Context<'_>,
118+
buf: &[u8],
119+
) -> Poll<Result<usize, io::Error>> {
120+
todo!()
121+
}
122+
123+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
124+
todo!()
125+
}
126+
127+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
128+
todo!()
129+
}
130+
}
131+
132+
impl AsRef<UnixStream> for OwnedReadHalf {
133+
fn as_ref(&self) -> &UnixStream {
134+
&self.0
135+
}
136+
}
137+
138+
impl AsRef<UnixStream> for OwnedWriteHalf {
139+
fn as_ref(&self) -> &UnixStream {
140+
&self.0
141+
}
142+
}

madsim/src/sim/net/unix/stream.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::net::unix::split;
12
use bytes::BufMut;
23
use std::{
34
io::{self, Result},
@@ -45,9 +46,21 @@ impl UnixStream {
4546
todo!();
4647
}
4748

48-
pub fn try_read_buf<B: BufMut>(&mut self, buf: &mut B) -> io::Result<usize> {
49+
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
4950
unimplemented!();
5051
}
52+
53+
/// Splits a `UnixStream` into a read half and a write half, which can be used
54+
/// to read and write the stream concurrently.
55+
pub fn split(&mut self) -> (split::ReadHalf<'_>, split::WriteHalf<'_>) {
56+
split::split(self)
57+
}
58+
59+
/// Splits a `UnixStream` into a read half and a write half, which can be
60+
/// used to read and write the stream concurrently.
61+
pub fn into_split(self) -> (split::OwnedReadHalf, split::OwnedWriteHalf) {
62+
split::split_owned(self)
63+
}
5164
}
5265

5366
impl AsRawFd for UnixStream {

0 commit comments

Comments
 (0)