Skip to content

Commit f7cc085

Browse files
committed
clearing write readiness if TFO connect returns EINPROGRESS
- ref #555 - imperfect until tokio-rs/tokio#3888 was merged
1 parent 3743ffe commit f7cc085

File tree

7 files changed

+246
-160
lines changed

7 files changed

+246
-160
lines changed

Cargo.lock

+6-8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,6 @@ byteorder = "1.3"
141141
env_logger = "0.8"
142142
byte_string = "1.0"
143143
tokio = { version = "1", features = ["net", "time", "macros", "io-util"]}
144+
145+
[patch.crates-io]
146+
tokio = { git = "https://github.com/zonyitoo/tokio.git" }

crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs

+49-35
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ enum TcpStreamState {
2929
}
3030

3131
/// A `TcpStream` that supports TFO (TCP Fast Open)
32-
#[pin_project]
32+
#[pin_project(project = TcpStreamProj)]
3333
pub struct TcpStream {
3434
#[pin]
3535
inner: TokioTcpStream,
@@ -107,59 +107,73 @@ impl AsyncRead for TcpStream {
107107
impl AsyncWrite for TcpStream {
108108
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
109109
loop {
110-
let this = self.as_mut().project();
110+
let TcpStreamProj { inner, state } = self.project();
111+
112+
match *state {
113+
TcpStreamState::Connected => return inner.poll_write(cx, buf),
111114

112-
match this.state {
113115
TcpStreamState::FastOpenConnect(addr) => {
114116
// TCP_FASTOPEN was supported since FreeBSD 12.0
115117
//
116118
// Example program:
117119
// <https://people.freebsd.org/~pkelsey/tfo-tools/tfo-client.c>
118120

119-
// Wait until socket is writable
120-
ready!(this.inner.poll_write_ready(cx))?;
121-
122-
unsafe {
123-
let saddr = SockAddr::from(*addr);
124-
125-
let ret = libc::sendto(
126-
this.inner.as_raw_fd(),
127-
buf.as_ptr() as *const libc::c_void,
128-
buf.len(),
129-
0, // Yes, BSD doesn't need MSG_FASTOPEN
130-
saddr.as_ptr(),
131-
saddr.len(),
132-
);
133-
134-
if ret >= 0 {
135-
// Connect successfully.
136-
*(this.state) = TcpStreamState::Connected;
137-
return Ok(ret as usize).into();
138-
} else {
139-
// Error occurs
140-
let err = io::Error::last_os_error();
141-
142-
// EAGAIN, EWOULDBLOCK
143-
if err.kind() != ErrorKind::WouldBlock {
121+
let saddr = SockAddr::from(addr);
122+
123+
let stream = inner.get_mut();
124+
125+
// Ensure socket is writable
126+
ready!(stream.poll_write_ready(cx))?;
127+
128+
let mut connecting = false;
129+
let send_result = stream.try_write_io(|| {
130+
unsafe {
131+
let ret = libc::sendto(
132+
stream.as_raw_fd(),
133+
buf.as_ptr() as *const libc::c_void,
134+
buf.len(),
135+
0, // Yes, BSD doesn't need MSG_FASTOPEN
136+
saddr.as_ptr(),
137+
saddr.len(),
138+
);
139+
140+
if ret >= 0 {
141+
Ok(ret as usize)
142+
} else {
143+
// Error occurs
144+
let err = io::Error::last_os_error();
145+
144146
// EINPROGRESS
145147
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
146148
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
147149
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
148150
//
149151
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
150-
*(this.state) = TcpStreamState::Connected;
152+
connecting = true;
153+
154+
// Let `poll_write_io` clears the write readiness.
155+
Err(ErrorKind::WouldBlock.into())
151156
} else {
152-
// Other errors
153-
return Err(err).into();
157+
// Other errors, including EAGAIN, EWOULDBLOCK
158+
Err(err)
154159
}
155-
} else {
156-
// Pending on poll_write_ready
157160
}
158161
}
162+
});
163+
164+
match send_result {
165+
Ok(n) => {
166+
// Connected successfully with fast open
167+
*state = TcpStreamState::Connected;
168+
return Ok(n).into();
169+
}
170+
Err(ref err) if err.kind() == ErrorKind::WouldBlock && connecting => {
171+
// Connecting with normal TCP handshakes, write the first packet after connected
172+
*state = TcpStreamState::Connected;
173+
}
174+
Err(err) => return Err(err).into(),
159175
}
160176
}
161-
162-
TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
163177
}
164178
}
165179
}

crates/shadowsocks/src/net/sys/unix/bsd/macos.rs

+45-21
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ enum TcpStreamState {
3030
}
3131

3232
/// A `TcpStream` that supports TFO (TCP Fast Open)
33-
#[pin_project]
33+
#[pin_project(project = TcpStreamProj)]
3434
pub struct TcpStream {
3535
#[pin]
3636
inner: TokioTcpStream,
@@ -119,40 +119,64 @@ impl AsyncRead for TcpStream {
119119
impl AsyncWrite for TcpStream {
120120
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
121121
loop {
122-
let this = self.as_mut().project();
122+
let TcpStreamProj { inner, state } = self.as_mut().project();
123+
124+
match *state {
125+
TcpStreamState::Connected => return inner.poll_write(cx, buf),
123126

124-
match this.state {
125127
TcpStreamState::FastOpenWrite => {
126128
// `CONNECT_RESUME_ON_READ_WRITE` is set when calling `connectx`,
127129
// so the first call of `send` will perform the actual SYN with TFO cookie.
128130
//
129131
// (NOT SURE) If remote server doesn't support TFO or this is the first connection,
130132
// it may return EINPROGRESS just like other platforms (Linux, FreeBSD).
131133

132-
match ready!(this.inner.poll_write(cx, buf)) {
134+
let stream = inner.get_mut();
135+
136+
// Ensure socket is writable
137+
ready!(stream.poll_write_ready(cx))?;
138+
139+
let mut connecting = false;
140+
let send_result = stream.try_write_io(|| {
141+
unsafe {
142+
let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0);
143+
if ret >= 0 {
144+
Ok(ret as usize)
145+
} else {
146+
let err = io::Error::last_os_error();
147+
// EAGAIN and EWOULDBLOCK should have been handled by tokio
148+
//
149+
// EINPROGRESS
150+
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
151+
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
152+
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
153+
//
154+
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
155+
connecting = true;
156+
157+
// Let `poll_write_io` clears the write readiness.
158+
Err(ErrorKind::WouldBlock.into())
159+
} else {
160+
// Other errors, including EAGAIN
161+
Err(err)
162+
}
163+
}
164+
}
165+
});
166+
167+
match send_result {
133168
Ok(n) => {
134-
*(this.state) = TcpStreamState::Connected;
169+
// Connected successfully with fast open
170+
*state = TcpStreamState::Connected;
135171
return Ok(n).into();
136172
}
137-
Err(err) => {
138-
// EAGAIN and EWOULDBLOCK should have been handled by tokio
139-
//
140-
// EINPROGRESS
141-
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
142-
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
143-
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
144-
//
145-
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
146-
*(this.state) = TcpStreamState::Connected;
147-
} else {
148-
// Other errors
149-
return Err(err).into();
150-
}
173+
Err(ref err) if err.kind() == ErrorKind::WouldBlock && connecting => {
174+
// Connecting with normal TCP handshakes, write the first packet after connected
175+
*state = TcpStreamState::Connected;
151176
}
177+
Err(err) => return Err(err).into(),
152178
}
153179
}
154-
155-
TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
156180
}
157181
}
158182
}

0 commit comments

Comments
 (0)