Skip to content

Commit 039d7c2

Browse files
committed
clearing write readiness if TFO connect returns EINPROGRESS
- ref #555 - imperfect until tokio-rs/tokio#3888 was merged
1 parent 3743ffe commit 039d7c2

File tree

6 files changed

+212
-161
lines changed

6 files changed

+212
-161
lines changed

Cargo.lock

+6-8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,6 @@ byteorder = "1.3"
141141
env_logger = "0.8"
142142
byte_string = "1.0"
143143
tokio = { version = "1", features = ["net", "time", "macros", "io-util"]}
144+
145+
[patch.crates-io]
146+
tokio = { git = "https://github.com/zonyitoo/tokio.git" }

crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs

+42-35
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ use crate::net::{
2626
enum TcpStreamState {
2727
Connected,
2828
FastOpenConnect(SocketAddr),
29+
FastOpenConnecting,
2930
}
3031

3132
/// A `TcpStream` that supports TFO (TCP Fast Open)
32-
#[pin_project]
33+
#[pin_project(project = TcpStreamProj)]
3334
pub struct TcpStream {
3435
#[pin]
3536
inner: TokioTcpStream,
@@ -107,59 +108,65 @@ impl AsyncRead for TcpStream {
107108
impl AsyncWrite for TcpStream {
108109
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
109110
loop {
110-
let this = self.as_mut().project();
111+
let TcpStreamProj { inner, state } = self.as_mut().project();
112+
113+
match *state {
114+
TcpStreamState::Connected => return inner.poll_write(cx, buf),
111115

112-
match this.state {
113116
TcpStreamState::FastOpenConnect(addr) => {
114117
// TCP_FASTOPEN was supported since FreeBSD 12.0
115118
//
116119
// Example program:
117120
// <https://people.freebsd.org/~pkelsey/tfo-tools/tfo-client.c>
118121

119-
// Wait until socket is writable
120-
ready!(this.inner.poll_write_ready(cx))?;
121-
122-
unsafe {
123-
let saddr = SockAddr::from(*addr);
124-
125-
let ret = libc::sendto(
126-
this.inner.as_raw_fd(),
127-
buf.as_ptr() as *const libc::c_void,
128-
buf.len(),
129-
0, // Yes, BSD doesn't need MSG_FASTOPEN
130-
saddr.as_ptr(),
131-
saddr.len(),
132-
);
133-
134-
if ret >= 0 {
135-
// Connect successfully.
136-
*(this.state) = TcpStreamState::Connected;
137-
return Ok(ret as usize).into();
138-
} else {
139-
// Error occurs
140-
let err = io::Error::last_os_error();
141-
142-
// EAGAIN, EWOULDBLOCK
143-
if err.kind() != ErrorKind::WouldBlock {
122+
let saddr = SockAddr::from(addr);
123+
124+
let stream = inner.get_mut();
125+
let n = ready!(stream.poll_write_io(cx, || {
126+
unsafe {
127+
let ret = libc::sendto(
128+
stream.as_raw_fd(),
129+
buf.as_ptr() as *const libc::c_void,
130+
buf.len(),
131+
0, // Yes, BSD doesn't need MSG_FASTOPEN
132+
saddr.as_ptr(),
133+
saddr.len(),
134+
);
135+
136+
if ret >= 0 {
137+
Ok(ret as usize)
138+
} else {
139+
// Error occurs
140+
let err = io::Error::last_os_error();
141+
144142
// EINPROGRESS
145143
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
146144
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
147145
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
148146
//
149147
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
150-
*(this.state) = TcpStreamState::Connected;
148+
*state = TcpStreamState::FastOpenConnecting;
149+
150+
// Let `poll_write_io` clears the write readiness.
151+
Err(ErrorKind::WouldBlock.into())
151152
} else {
152-
// Other errors
153-
return Err(err).into();
153+
// Other errors, including EAGAIN, EWOULDBLOCK
154+
Err(err)
154155
}
155-
} else {
156-
// Pending on poll_write_ready
157156
}
158157
}
159-
}
158+
}))?;
159+
160+
// Connect successfully with fast open
161+
*state = TcpStreamState::Connected;
162+
return Ok(n).into();
160163
}
161164

162-
TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
165+
TcpStreamState::FastOpenConnecting => {
166+
ready!(inner.poll_write_ready(cx))?;
167+
168+
*state = TcpStreamState::Connected;
169+
}
163170
}
164171
}
165172
}

crates/shadowsocks/src/net/sys/unix/bsd/macos.rs

+39-22
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ use crate::net::{
2727
enum TcpStreamState {
2828
Connected,
2929
FastOpenWrite,
30+
FastOpenConnecting,
3031
}
3132

3233
/// A `TcpStream` that supports TFO (TCP Fast Open)
33-
#[pin_project]
34+
#[pin_project(project = TcpStreamProj)]
3435
pub struct TcpStream {
3536
#[pin]
3637
inner: TokioTcpStream,
@@ -119,40 +120,56 @@ impl AsyncRead for TcpStream {
119120
impl AsyncWrite for TcpStream {
120121
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
121122
loop {
122-
let this = self.as_mut().project();
123+
let TcpStreamProj { inner, state } = self.as_mut().project();
124+
125+
match *state {
126+
TcpStreamState::Connected => return inner.poll_write(cx, buf),
123127

124-
match this.state {
125128
TcpStreamState::FastOpenWrite => {
126129
// `CONNECT_RESUME_ON_READ_WRITE` is set when calling `connectx`,
127130
// so the first call of `send` will perform the actual SYN with TFO cookie.
128131
//
129132
// (NOT SURE) If remote server doesn't support TFO or this is the first connection,
130133
// it may return EINPROGRESS just like other platforms (Linux, FreeBSD).
131134

132-
match ready!(this.inner.poll_write(cx, buf)) {
133-
Ok(n) => {
134-
*(this.state) = TcpStreamState::Connected;
135-
return Ok(n).into();
136-
}
137-
Err(err) => {
138-
// EAGAIN and EWOULDBLOCK should have been handled by tokio
139-
//
140-
// EINPROGRESS
141-
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
142-
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
143-
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
144-
//
145-
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
146-
*(this.state) = TcpStreamState::Connected;
135+
let stream = inner.get_mut();
136+
let n = ready!(stream.poll_write_io(cx, || {
137+
unsafe {
138+
let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0);
139+
if ret >= 0 {
140+
Ok(ret as usize)
147141
} else {
148-
// Other errors
149-
return Err(err).into();
142+
let err = io::Error::last_os_error();
143+
// EAGAIN and EWOULDBLOCK should have been handled by tokio
144+
//
145+
// EINPROGRESS
146+
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
147+
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
148+
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
149+
//
150+
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
151+
*state = TcpStreamState::FastOpenConnecting;
152+
153+
// Let `poll_write_io` clears the write readiness.
154+
Err(ErrorKind::WouldBlock.into())
155+
} else {
156+
// Other errors, including EAGAIN
157+
Err(err)
158+
}
150159
}
151160
}
152-
}
161+
}))?;
162+
163+
// Connected successfully with fast open
164+
*state = TcpStreamState::Connected;
165+
return Ok(n).into();
153166
}
154167

155-
TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
168+
TcpStreamState::FastOpenConnecting => {
169+
ready!(inner.poll_write_ready(cx))?;
170+
171+
*state = TcpStreamState::Connected;
172+
}
156173
}
157174
}
158175
}

0 commit comments

Comments
 (0)