Skip to content

Commit f4f4349

Browse files
committed
basic wrappers for sendmmsg/recvmmsg (#823)
1 parent 238b8f4 commit f4f4349

File tree

5 files changed

+644
-76
lines changed

5 files changed

+644
-76
lines changed

crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs

+163-2
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ use std::{
44
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
55
os::unix::io::{AsRawFd, RawFd},
66
pin::Pin,
7+
sync::atomic::{AtomicBool, Ordering},
78
task::{self, Poll},
89
};
910

10-
use log::{error, warn};
11+
use log::{debug, error, warn};
1112
use pin_project::pin_project;
12-
use socket2::{Domain, Protocol, Socket, Type};
13+
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
1314
use tokio::{
1415
io::{AsyncRead, AsyncWrite, ReadBuf},
1516
net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket},
@@ -18,6 +19,7 @@ use tokio_tfo::TfoStream;
1819

1920
use crate::net::{
2021
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
22+
udp::{BatchRecvMessage, BatchSendMessage},
2123
AddrFamily,
2224
ConnectOpts,
2325
};
@@ -241,3 +243,162 @@ pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) ->
241243

242244
Ok(socket)
243245
}
246+
247+
static SUPPORT_BATCH_SEND_RECV_MSG: AtomicBool = AtomicBool::new(true);
248+
249+
fn recvmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchRecvMessage<'_>) -> io::Result<()> {
250+
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
251+
252+
let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
253+
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;
254+
let sock_addr = unsafe { SockAddr::new(addr_storage, addr_len) };
255+
hdr.msg_name = sock_addr.as_ptr() as *mut _;
256+
hdr.msg_namelen = sock_addr.len() as _;
257+
258+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
259+
hdr.msg_iovlen = msg.data.len() as _;
260+
261+
let ret = unsafe { libc::recvmsg(sock.as_raw_fd(), &mut hdr as *mut _, 0) };
262+
if ret < 0 {
263+
return Err(io::Error::last_os_error());
264+
}
265+
266+
msg.addr = sock_addr.as_socket().expect("SockAddr.as_socket");
267+
msg.data_len = ret as usize;
268+
269+
Ok(())
270+
}
271+
272+
pub fn batch_recvmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchRecvMessage<'_>]) -> io::Result<usize> {
273+
if msgs.is_empty() {
274+
return Ok(0);
275+
}
276+
277+
if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
278+
recvmsg_fallback(sock, &mut msgs[0])?;
279+
return Ok(1);
280+
}
281+
282+
let mut vec_msg_name = Vec::with_capacity(msgs.len());
283+
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());
284+
285+
for msg in msgs.iter_mut() {
286+
let mut hdr: libc::mmsghdr = unsafe { mem::zeroed() };
287+
288+
let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
289+
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;
290+
291+
vec_msg_name.push(unsafe { SockAddr::new(addr_storage, addr_len) });
292+
let sock_addr = vec_msg_name.last_mut().unwrap();
293+
hdr.msg_hdr.msg_name = sock_addr.as_ptr() as *mut _;
294+
hdr.msg_hdr.msg_namelen = sock_addr.len() as _;
295+
296+
hdr.msg_hdr.msg_iov = msg.data.as_ptr() as *mut _;
297+
hdr.msg_hdr.msg_iovlen = msg.data.len() as _;
298+
299+
vec_msg_hdr.push(hdr);
300+
}
301+
302+
let ret = unsafe {
303+
libc::recvmmsg(
304+
sock.as_raw_fd(),
305+
vec_msg_hdr.as_mut_ptr(),
306+
vec_msg_hdr.len() as _,
307+
0,
308+
ptr::null(),
309+
)
310+
};
311+
if ret < 0 {
312+
let err = io::Error::last_os_error();
313+
if let Some(libc::ENOSYS) = err.raw_os_error() {
314+
debug!("recvmmsg is not supported, fallback to recvmsg, error: {:?}", err);
315+
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);
316+
317+
recvmsg_fallback(sock, &mut msgs[0])?;
318+
return Ok(1);
319+
}
320+
return Err(err);
321+
}
322+
323+
for idx in 0..ret as usize {
324+
let msg = &mut msgs[idx];
325+
let hdr = &vec_msg_hdr[idx];
326+
let name = &vec_msg_name[idx];
327+
msg.addr = name.as_socket().expect("SockAddr.as_socket");
328+
msg.data_len = hdr.msg_len as usize;
329+
}
330+
331+
Ok(ret as usize)
332+
}
333+
334+
fn sendmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchSendMessage<'_>) -> io::Result<()> {
335+
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
336+
337+
let sock_addr = msg.addr.map(SockAddr::from);
338+
if let Some(ref sa) = sock_addr {
339+
hdr.msg_name = sa.as_ptr() as *mut _;
340+
hdr.msg_namelen = sa.len() as _;
341+
}
342+
343+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
344+
hdr.msg_iovlen = msg.data.len() as _;
345+
346+
let ret = unsafe { libc::sendmsg(sock.as_raw_fd(), &hdr as *const _, 0) };
347+
if ret < 0 {
348+
return Err(io::Error::last_os_error());
349+
}
350+
msg.data_len = ret as usize;
351+
352+
Ok(())
353+
}
354+
355+
pub fn batch_sendmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchSendMessage<'_>]) -> io::Result<usize> {
356+
if msgs.is_empty() {
357+
return Ok(0);
358+
}
359+
360+
if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
361+
sendmsg_fallback(sock, &mut msgs[0])?;
362+
return Ok(1);
363+
}
364+
365+
let mut vec_msg_name = Vec::with_capacity(msgs.len());
366+
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());
367+
368+
for msg in msgs.iter_mut() {
369+
let mut hdr: libc::mmsghdr = unsafe { mem::zeroed() };
370+
371+
if let Some(addr) = msg.addr {
372+
vec_msg_name.push(SockAddr::from(addr));
373+
let sock_addr = vec_msg_name.last_mut().unwrap();
374+
hdr.msg_hdr.msg_name = sock_addr.as_ptr() as *mut _;
375+
hdr.msg_hdr.msg_namelen = sock_addr.len() as _;
376+
}
377+
378+
hdr.msg_hdr.msg_iov = msg.data.as_ptr() as *mut _;
379+
hdr.msg_hdr.msg_iovlen = msg.data.len() as _;
380+
381+
vec_msg_hdr.push(hdr);
382+
}
383+
384+
let ret = unsafe { libc::sendmmsg(sock.as_raw_fd(), vec_msg_hdr.as_mut_ptr(), vec_msg_hdr.len() as _, 0) };
385+
if ret < 0 {
386+
let err = io::Error::last_os_error();
387+
if let Some(libc::ENOSYS) = err.raw_os_error() {
388+
debug!("sendmmsg is not supported, fallback to sendmsg, error: {:?}", err);
389+
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);
390+
391+
sendmsg_fallback(sock, &mut msgs[0])?;
392+
return Ok(1);
393+
}
394+
return Err(err);
395+
}
396+
397+
for idx in 0..ret as usize {
398+
let msg = &mut msgs[idx];
399+
let hdr = &vec_msg_hdr[idx];
400+
msg.data_len = hdr.msg_len as usize;
401+
}
402+
403+
Ok(ret as usize)
404+
}

crates/shadowsocks/src/net/sys/unix/bsd/macos.rs

+173-2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ use std::{
55
os::unix::io::{AsRawFd, RawFd},
66
pin::Pin,
77
ptr,
8+
sync::atomic::{AtomicBool, Ordering},
89
task::{self, Poll},
910
};
1011

11-
use log::{error, warn};
12+
use log::{debug, error, warn};
1213
use pin_project::pin_project;
13-
use socket2::{Domain, Protocol, Socket, Type};
14+
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
1415
use tokio::{
1516
io::{AsyncRead, AsyncWrite, ReadBuf},
1617
net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket},
@@ -19,6 +20,7 @@ use tokio_tfo::TfoStream;
1920

2021
use crate::net::{
2122
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
23+
udp::{BatchRecvMessage, BatchSendMessage},
2224
AddrFamily,
2325
ConnectOpts,
2426
};
@@ -273,3 +275,172 @@ pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) ->
273275

274276
Ok(socket)
275277
}
278+
279+
/// https://github.com/apple/darwin-xnu/blob/main/bsd/sys/socket.h
280+
#[repr(C)]
281+
struct msghdr_x {
282+
msg_name: *mut libc::c_void, //< optional address
283+
msg_namelen: libc::socklen_t, //< size of address
284+
msg_iov: *mut libc::iovec, //< scatter/gather array
285+
msg_iovlen: libc::c_int, //< # elements in msg_iov
286+
msg_control: *mut libc::c_void, //< ancillary data, see below
287+
msg_controllen: libc::socklen_t, //< ancillary data buffer len
288+
msg_flags: libc::c_int, //< flags on received message
289+
msg_datalen: libc::size_t, //< byte length of buffer in msg_iov
290+
}
291+
292+
extern "C" {
293+
fn recvmsg_x(s: libc::c_int, msgp: *const msghdr_x, cnt: libc::c_uint, flags: libc::c_int) -> libc::ssize_t;
294+
fn sendmsg_x(s: libc::c_int, msgp: *const msghdr_x, cnt: libc::c_uint, flags: libc::c_int) -> libc::ssize_t;
295+
}
296+
297+
static SUPPORT_BATCH_SEND_RECV_MSG: AtomicBool = AtomicBool::new(true);
298+
299+
fn recvmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchRecvMessage<'_>) -> io::Result<()> {
300+
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
301+
302+
let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
303+
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;
304+
let sock_addr = unsafe { SockAddr::new(addr_storage, addr_len) };
305+
hdr.msg_name = sock_addr.as_ptr() as *mut _;
306+
hdr.msg_namelen = sock_addr.len() as _;
307+
308+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
309+
hdr.msg_iovlen = msg.data.len() as _;
310+
311+
let ret = unsafe { libc::recvmsg(sock.as_raw_fd(), &mut hdr as *mut _, 0) };
312+
if ret < 0 {
313+
return Err(io::Error::last_os_error());
314+
}
315+
316+
msg.addr = sock_addr.as_socket().expect("SockAddr.as_socket");
317+
msg.data_len = ret as usize;
318+
319+
Ok(())
320+
}
321+
322+
pub fn batch_recvmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchRecvMessage<'_>]) -> io::Result<usize> {
323+
if msgs.is_empty() {
324+
return Ok(0);
325+
}
326+
327+
if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
328+
recvmsg_fallback(sock, &mut msgs[0])?;
329+
return Ok(1);
330+
}
331+
332+
let mut vec_msg_name = Vec::with_capacity(msgs.len());
333+
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());
334+
335+
for msg in msgs.iter_mut() {
336+
let mut hdr: msghdr_x = unsafe { mem::zeroed() };
337+
338+
let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
339+
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;
340+
341+
vec_msg_name.push(unsafe { SockAddr::new(addr_storage, addr_len) });
342+
let sock_addr = vec_msg_name.last_mut().unwrap();
343+
hdr.msg_name = sock_addr.as_ptr() as *mut _;
344+
hdr.msg_namelen = sock_addr.len() as _;
345+
346+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
347+
hdr.msg_iovlen = msg.data.len() as _;
348+
349+
vec_msg_hdr.push(hdr);
350+
}
351+
352+
let ret = unsafe { recvmsg_x(sock.as_raw_fd(), vec_msg_hdr.as_ptr(), vec_msg_hdr.len() as _, 0) };
353+
if ret < 0 {
354+
let err = io::Error::last_os_error();
355+
if let Some(libc::ENOSYS) = err.raw_os_error() {
356+
debug!("recvmsg_x is not supported, fallback to recvmsg, error: {:?}", err);
357+
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);
358+
359+
recvmsg_fallback(sock, &mut msgs[0])?;
360+
return Ok(1);
361+
}
362+
return Err(err);
363+
}
364+
365+
for idx in 0..ret as usize {
366+
let msg = &mut msgs[idx];
367+
let hdr = &vec_msg_hdr[idx];
368+
let name = &vec_msg_name[idx];
369+
msg.addr = name.as_socket().expect("SockAddr.as_socket");
370+
msg.data_len = hdr.msg_datalen as usize;
371+
}
372+
373+
Ok(ret as usize)
374+
}
375+
376+
fn sendmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchSendMessage<'_>) -> io::Result<()> {
377+
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
378+
379+
let sock_addr = msg.addr.map(SockAddr::from);
380+
if let Some(ref sa) = sock_addr {
381+
hdr.msg_name = sa.as_ptr() as *mut _;
382+
hdr.msg_namelen = sa.len() as _;
383+
}
384+
385+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
386+
hdr.msg_iovlen = msg.data.len() as _;
387+
388+
let ret = unsafe { libc::sendmsg(sock.as_raw_fd(), &hdr as *const _, 0) };
389+
if ret < 0 {
390+
return Err(io::Error::last_os_error());
391+
}
392+
msg.data_len = ret as usize;
393+
394+
Ok(())
395+
}
396+
397+
pub fn batch_sendmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchSendMessage<'_>]) -> io::Result<usize> {
398+
if msgs.is_empty() {
399+
return Ok(0);
400+
}
401+
402+
if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
403+
sendmsg_fallback(sock, &mut msgs[0])?;
404+
return Ok(1);
405+
}
406+
407+
let mut vec_msg_name = Vec::with_capacity(msgs.len());
408+
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());
409+
410+
for msg in msgs.iter_mut() {
411+
let mut hdr: msghdr_x = unsafe { mem::zeroed() };
412+
413+
if let Some(addr) = msg.addr {
414+
vec_msg_name.push(SockAddr::from(addr));
415+
let sock_addr = vec_msg_name.last_mut().unwrap();
416+
hdr.msg_name = sock_addr.as_ptr() as *mut _;
417+
hdr.msg_namelen = sock_addr.len() as _;
418+
}
419+
420+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
421+
hdr.msg_iovlen = msg.data.len() as _;
422+
423+
vec_msg_hdr.push(hdr);
424+
}
425+
426+
let ret = unsafe { sendmsg_x(sock.as_raw_fd(), vec_msg_hdr.as_ptr(), vec_msg_hdr.len() as _, 0) };
427+
if ret < 0 {
428+
let err = io::Error::last_os_error();
429+
if let Some(libc::ENOSYS) = err.raw_os_error() {
430+
debug!("sendmsg_x is not supported, fallback to sendmsg, error: {:?}", err);
431+
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);
432+
433+
sendmsg_fallback(sock, &mut msgs[0])?;
434+
return Ok(1);
435+
}
436+
return Err(err);
437+
}
438+
439+
for idx in 0..ret as usize {
440+
let msg = &mut msgs[idx];
441+
let hdr = &vec_msg_hdr[idx];
442+
msg.data_len = hdr.msg_datalen as usize;
443+
}
444+
445+
Ok(ret as usize)
446+
}

0 commit comments

Comments
 (0)