Skip to content

Commit 6583640

Browse files
committed
first write on TFO sockets may return EINPROGRESS
- Remote server doesn't support TFO - First connect without TFO cookie fallbacks to normal TCP connect() ref #555
1 parent 05a3d6c commit 6583640

File tree

3 files changed

+190
-82
lines changed

3 files changed

+190
-82
lines changed

crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -105,46 +105,62 @@ impl AsyncRead for TcpStream {
105105
}
106106

107107
impl AsyncWrite for TcpStream {
108-
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
109-
let this = self.project();
110-
111-
if let TcpStreamState::FastOpenConnect(addr) = this.state {
112-
loop {
113-
// TCP_FASTOPEN was supported since FreeBSD 12.0
114-
//
115-
// Example program:
116-
// <https://people.freebsd.org/~pkelsey/tfo-tools/tfo-client.c>
117-
118-
// Wait until socket is writable
119-
ready!(this.inner.poll_write_ready(cx))?;
120-
121-
unsafe {
122-
let saddr = SockAddr::from(*addr);
123-
124-
let ret = libc::sendto(
125-
this.inner.as_raw_fd(),
126-
buf.as_ptr() as *const libc::c_void,
127-
buf.len(),
128-
0, // Yes, BSD doesn't need MSG_FASTOPEN
129-
saddr.as_ptr(),
130-
saddr.len(),
131-
);
132-
133-
if ret >= 0 {
134-
// Connect successfully.
135-
*(this.state) = TcpStreamState::Connected;
136-
return Ok(ret as usize).into();
137-
} else {
138-
// Error occurs
139-
let err = io::Error::last_os_error();
140-
if err.kind() != ErrorKind::WouldBlock {
141-
return Err(err).into();
108+
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
109+
loop {
110+
let this = self.as_mut().project();
111+
112+
match this.state {
113+
TcpStreamState::FastOpenConnect(addr) => {
114+
// TCP_FASTOPEN was supported since FreeBSD 12.0
115+
//
116+
// Example program:
117+
// <https://people.freebsd.org/~pkelsey/tfo-tools/tfo-client.c>
118+
119+
// Wait until socket is writable
120+
ready!(this.inner.poll_write_ready(cx))?;
121+
122+
unsafe {
123+
let saddr = SockAddr::from(*addr);
124+
125+
let ret = libc::sendto(
126+
this.inner.as_raw_fd(),
127+
buf.as_ptr() as *const libc::c_void,
128+
buf.len(),
129+
0, // Yes, BSD doesn't need MSG_FASTOPEN
130+
saddr.as_ptr(),
131+
saddr.len(),
132+
);
133+
134+
if ret >= 0 {
135+
// Connect successfully.
136+
*(this.state) = TcpStreamState::Connected;
137+
return Ok(ret as usize).into();
138+
} else {
139+
// Error occurs
140+
let err = io::Error::last_os_error();
141+
142+
// EAGAIN, EWOULDBLOCK
143+
if err.kind() != ErrorKind::WouldBlock {
144+
// EINPROGRESS
145+
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
146+
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
147+
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
148+
//
149+
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
150+
*(this.state) = TcpStreamState::Connected;
151+
} else {
152+
// Other errors
153+
return Err(err).into();
154+
}
155+
} else {
156+
// Pending on poll_write_ready
157+
}
142158
}
143159
}
144160
}
161+
162+
TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
145163
}
146-
} else {
147-
this.inner.poll_write(cx, buf)
148164
}
149165
}
150166

crates/shadowsocks/src/net/sys/unix/bsd/macos.rs

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99
task::{self, Poll},
1010
};
1111

12+
use futures::ready;
1213
use log::error;
1314
use pin_project::pin_project;
1415
use socket2::SockAddr;
@@ -23,9 +24,18 @@ use crate::net::{
2324
ConnectOpts,
2425
};
2526

27+
enum TcpStreamState {
28+
Connected,
29+
FastOpenWrite,
30+
}
31+
2632
/// A `TcpStream` that supports TFO (TCP Fast Open)
2733
#[pin_project]
28-
pub struct TcpStream(#[pin] TokioTcpStream);
34+
pub struct TcpStream {
35+
#[pin]
36+
inner: TokioTcpStream,
37+
state: TcpStreamState,
38+
}
2939

3040
impl TcpStream {
3141
pub async fn connect(addr: SocketAddr, opts: &ConnectOpts) -> io::Result<TcpStream> {
@@ -45,7 +55,10 @@ impl TcpStream {
4555
// If TFO is not enabled, it just works like a normal TcpStream
4656
let stream = socket.connect(addr).await?;
4757
set_common_sockopt_after_connect(&stream, opts)?;
48-
return Ok(TcpStream(stream));
58+
return Ok(TcpStream {
59+
inner: stream,
60+
state: TcpStreamState::Connected,
61+
});
4962
}
5063

5164
// TFO in macos uses connectx
@@ -76,41 +89,80 @@ impl TcpStream {
7689
let stream = TokioTcpStream::from_std(unsafe { StdTcpStream::from_raw_fd(socket.into_raw_fd()) })?;
7790
set_common_sockopt_after_connect(&stream, opts)?;
7891

79-
Ok(TcpStream(stream))
92+
Ok(TcpStream {
93+
inner: stream,
94+
state: TcpStreamState::FastOpenWrite,
95+
})
8096
}
8197
}
8298

8399
impl Deref for TcpStream {
84100
type Target = TokioTcpStream;
85101

86102
fn deref(&self) -> &Self::Target {
87-
&self.0
103+
&self.inner
88104
}
89105
}
90106

91107
impl DerefMut for TcpStream {
92108
fn deref_mut(&mut self) -> &mut Self::Target {
93-
&mut self.0
109+
&mut self.inner
94110
}
95111
}
96112

97113
impl AsyncRead for TcpStream {
98114
fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
99-
self.project().0.poll_read(cx, buf)
115+
self.project().inner.poll_read(cx, buf)
100116
}
101117
}
102118

103119
impl AsyncWrite for TcpStream {
104-
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
105-
self.project().0.poll_write(cx, buf)
120+
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
121+
loop {
122+
let this = self.as_mut().project();
123+
124+
match this.state {
125+
TcpStreamState::FastOpenWrite => {
126+
// `CONNECT_RESUME_ON_READ_WRITE` is set when calling `connectx`,
127+
// so the first call of `send` will perform the actual SYN with TFO cookie.
128+
//
129+
// (NOT SURE) If remote server doesn't support TFO or this is the first connection,
130+
// it may return EINPROGRESS just like other platforms (Linux, FreeBSD).
131+
132+
match ready!(this.inner.poll_write(cx, buf)) {
133+
Ok(n) => {
134+
*(this.state) = TcpStreamState::Connected;
135+
return Ok(n).into();
136+
}
137+
Err(err) => {
138+
// EAGAIN and EWOULDBLOCK should have been handled by tokio
139+
//
140+
// EINPROGRESS
141+
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
142+
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
143+
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
144+
//
145+
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
146+
*(this.state) = TcpStreamState::Connected;
147+
} else {
148+
// Other errors
149+
return Err(err).into();
150+
}
151+
}
152+
}
153+
}
154+
155+
TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
156+
}
157+
}
106158
}
107159

108160
fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
109-
self.project().0.poll_flush(cx)
161+
self.project().inner.poll_flush(cx)
110162
}
111163

112164
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
113-
self.project().0.poll_shutdown(cx)
165+
self.project().inner.poll_shutdown(cx)
114166
}
115167
}
116168

crates/shadowsocks/src/net/sys/unix/linux/mod.rs

Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::net::{
2828
enum TcpStreamState {
2929
Connected,
3030
FastOpenConnect(SocketAddr),
31+
FastOpenWrite,
3132
}
3233

3334
/// A `TcpStream` that supports TFO (TCP Fast Open)
@@ -147,7 +148,7 @@ impl TcpStream {
147148
Ok(TcpStream {
148149
inner: stream,
149150
state: if connected {
150-
TcpStreamState::Connected
151+
TcpStreamState::FastOpenWrite
151152
} else {
152153
TcpStreamState::FastOpenConnect(addr)
153154
},
@@ -176,45 +177,84 @@ impl AsyncRead for TcpStream {
176177
}
177178

178179
impl AsyncWrite for TcpStream {
179-
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
180-
let this = self.project();
181-
182-
if let TcpStreamState::FastOpenConnect(addr) = this.state {
183-
loop {
184-
// Fallback mode. Must be kernal < 4.11
185-
//
186-
// Uses sendto as BSD-like systems
187-
188-
// Wait until socket is writable
189-
ready!(this.inner.poll_write_ready(cx))?;
190-
191-
unsafe {
192-
let saddr = SockAddr::from(*addr);
193-
194-
let ret = libc::sendto(
195-
this.inner.as_raw_fd(),
196-
buf.as_ptr() as *const libc::c_void,
197-
buf.len(),
198-
libc::MSG_FASTOPEN,
199-
saddr.as_ptr(),
200-
saddr.len(),
201-
);
202-
203-
if ret >= 0 {
204-
// Connect successfully.
205-
*(this.state) = TcpStreamState::Connected;
206-
return Ok(ret as usize).into();
207-
} else {
208-
// Error occurs
209-
let err = io::Error::last_os_error();
210-
if err.kind() != ErrorKind::WouldBlock {
211-
return Err(err).into();
180+
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
181+
loop {
182+
let this = self.as_mut().project();
183+
184+
match this.state {
185+
TcpStreamState::FastOpenConnect(addr) => {
186+
// Fallback mode. Must be kernal < 4.11
187+
//
188+
// Uses sendto as BSD-like systems
189+
190+
// Wait until socket is writable
191+
ready!(this.inner.poll_write_ready(cx))?;
192+
193+
unsafe {
194+
let saddr = SockAddr::from(*addr);
195+
196+
let ret = libc::sendto(
197+
this.inner.as_raw_fd(),
198+
buf.as_ptr() as *const libc::c_void,
199+
buf.len(),
200+
libc::MSG_FASTOPEN,
201+
saddr.as_ptr(),
202+
saddr.len(),
203+
);
204+
205+
if ret >= 0 {
206+
// Connect successfully.
207+
*(this.state) = TcpStreamState::Connected;
208+
return Ok(ret as usize).into();
209+
} else {
210+
// Error occurs
211+
let err = io::Error::last_os_error();
212+
213+
// EAGAIN, EWOULDBLOCK
214+
if err.kind() != ErrorKind::WouldBlock {
215+
// EINPROGRESS
216+
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
217+
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
218+
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
219+
//
220+
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
221+
*(this.state) = TcpStreamState::Connected;
222+
} else {
223+
// Other errors
224+
return Err(err).into();
225+
}
226+
} else {
227+
// Pending on poll_write_ready
228+
}
212229
}
213230
}
214231
}
232+
233+
TcpStreamState::FastOpenWrite => {
234+
// First `write` after `TCP_FASTOPEN_CONNECT`
235+
// Kernel >= 4.11
236+
237+
match ready!(this.inner.poll_write(cx, buf)) {
238+
Ok(n) => {
239+
*(this.state) = TcpStreamState::Connected;
240+
return Ok(n).into();
241+
}
242+
Err(err) => {
243+
// EAGAIN and EWOULDBLOCK should have been handled by tokio
244+
//
245+
// EINPROGRESS
246+
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
247+
// loop again to call `poll_write` for sending the first packet
248+
*(this.state) = TcpStreamState::Connected;
249+
} else {
250+
return Err(err).into();
251+
}
252+
}
253+
}
254+
}
255+
256+
TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
215257
}
216-
} else {
217-
this.inner.poll_write(cx, buf)
218258
}
219259
}
220260

0 commit comments

Comments
 (0)