Skip to content

Commit a62a216

Browse files
committed
Binary serialization over pipes
We use `UnixStream::pair()` a few times and reimplement binary ser/de each time. Add an abstraction that does the serde (`BinSerDe`) and buffer ceremony (`BinPipe`) for us. Fixes #471.
1 parent 6d40763 commit a62a216

File tree

5 files changed

+202
-94
lines changed

5 files changed

+202
-94
lines changed

src/common/bin_serde.rs

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
//! Binary serialization, and an implementation over Unix pipes.
2+
use std::{
3+
io::{self, Read, Write},
4+
marker::PhantomData,
5+
os::unix::net::UnixStream,
6+
};
7+
8+
/// Serialization/deserialization trait using a byte array as storage.
9+
pub trait BinSerDe<const N: usize> {
10+
fn serialize(&self) -> [u8; N];
11+
fn deserialize(val: [u8; N]) -> Self;
12+
}
13+
14+
/// A pipe abstracting over a [UnixStream] with easier
15+
/// binary serialization, to help with the buffer sizes and ser/de steps.
16+
/// Uses [UnixStream::pair()].
17+
pub fn pair<const N1: usize, T1: BinSerDe<N1>, const N2: usize, T2: BinSerDe<N2>>(
18+
) -> io::Result<(BinPipe<N1, T1, N2, T2>, BinPipe<N2, T2, N1, T1>)> {
19+
let (a, b) = UnixStream::pair()?;
20+
Ok((
21+
BinPipe {
22+
sock: a,
23+
_t1: PhantomData::<T1>,
24+
_t2: PhantomData::<T2>,
25+
},
26+
BinPipe {
27+
sock: b,
28+
_t1: PhantomData::<T2>,
29+
_t2: PhantomData::<T1>,
30+
},
31+
))
32+
}
33+
34+
/// A binary pipe that can send and recieve typed messages.
35+
///
36+
/// By default, the types of the [BinPipe::write()] and [BinPipe::read()]
37+
/// messages are the same, but you can specify extra generic arguments to make
38+
/// them differ. In this case, T1 is the type in [BinPipe::write()]
39+
/// and T2 is returned from [BinPipe::read()].
40+
pub struct BinPipe<const N1: usize, T1: BinSerDe<N1>, const N2: usize = N1, T2: BinSerDe<N2> = T1> {
41+
pub sock: UnixStream,
42+
_t1: PhantomData<T1>,
43+
_t2: PhantomData<T2>,
44+
}
45+
46+
impl<const N1: usize, const N2: usize, T1: BinSerDe<N1>, T2: BinSerDe<N2>> BinPipe<N1, T1, N2, T2> {
47+
/// Write `T` to the pipe.
48+
pub fn write(&mut self, val: &T1) -> io::Result<()> {
49+
self.sock.write_all(&val.serialize())?;
50+
Ok(())
51+
}
52+
/// Read `T` from the pipe.
53+
pub fn read(&mut self) -> io::Result<T2> {
54+
let mut buf = [0u8; N2];
55+
self.sock.read_exact(buf.as_mut_slice())?;
56+
Ok(T2::deserialize(buf))
57+
}
58+
}
59+
60+
impl BinSerDe<4> for i32 {
61+
fn serialize(&self) -> [u8; 4] {
62+
self.to_ne_bytes()
63+
}
64+
fn deserialize(val: [u8; 4]) -> Self {
65+
i32::from_ne_bytes(val)
66+
}
67+
}
68+
69+
#[cfg(test)]
70+
mod tests {
71+
use super::*;
72+
73+
#[test]
74+
pub fn single_type_send_recv() {
75+
let (mut tx, mut rx) = pair().unwrap();
76+
tx.write(&42i32).unwrap();
77+
assert_eq!(rx.read().unwrap(), 42);
78+
rx.write(&23i32).unwrap();
79+
assert_eq!(tx.read().unwrap(), 23);
80+
}
81+
82+
#[test]
83+
pub fn different_send_recv_types() {
84+
impl BinSerDe<1> for u8 {
85+
fn serialize(&self) -> [u8; 1] {
86+
self.to_ne_bytes()
87+
}
88+
fn deserialize(val: [u8; 1]) -> Self {
89+
u8::from_ne_bytes(val)
90+
}
91+
}
92+
93+
let (mut tx, mut rx) = pair().unwrap();
94+
tx.write(&42i32).unwrap();
95+
assert_eq!(rx.read().unwrap(), 42);
96+
rx.write(&23u8).unwrap();
97+
assert_eq!(tx.read().unwrap(), 23);
98+
}
99+
}

src/common/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub use command::CommandAndArguments;
55
pub use context::Context;
66
pub use error::Error;
77

8+
pub mod bin_serde;
89
pub mod command;
910
pub mod context;
1011
pub mod error;

src/exec/no_pty.rs

+17-19
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
use std::{
2-
ffi::c_int,
3-
io::{self, Read, Write},
4-
os::unix::{net::UnixStream, process::CommandExt},
5-
process::Command,
6-
};
1+
use std::{ffi::c_int, io, os::unix::process::CommandExt, process::Command};
72

83
use super::{
94
event::PollEvent,
105
event::{EventRegistry, Process, StopReason},
116
io_util::was_interrupted,
127
terminate_process, ExitReason, HandleSigchld, ProcessOutput,
138
};
14-
use crate::system::signal::{
15-
consts::*, register_handlers, SignalHandler, SignalHandlerBehavior, SignalNumber, SignalSet,
16-
SignalStream,
9+
use crate::{
10+
common::bin_serde::{pair, BinPipe},
11+
system::signal::{
12+
consts::*, register_handlers, SignalHandler, SignalHandlerBehavior, SignalNumber,
13+
SignalSet, SignalStream,
14+
},
1715
};
1816
use crate::{
1917
exec::{handle_sigchld, opt_fmt, signal_fmt},
@@ -46,11 +44,11 @@ pub(super) fn exec_no_pty(sudo_pid: ProcessId, mut command: Command) -> io::Resu
4644
// FIXME (ogsudo): Some extra config happens here if selinux is available.
4745

4846
// Use a pipe to get the IO error if `exec` fails.
49-
let (mut errpipe_tx, errpipe_rx) = UnixStream::pair()?;
47+
let (mut errpipe_tx, errpipe_rx) = pair()?;
5048

5149
// Don't close the error pipe as we need it to retrieve the error code if the command execution
5250
// fails.
53-
file_closer.except(&errpipe_tx);
51+
file_closer.except(&errpipe_tx.sock);
5452

5553
let ForkResult::Parent(command_pid) = fork().map_err(|err| {
5654
dev_warn!("unable to fork command process: {err}");
@@ -72,7 +70,7 @@ pub(super) fn exec_no_pty(sudo_pid: ProcessId, mut command: Command) -> io::Resu
7270
// If `exec` returns, it means that executing the command failed. Send the error to the
7371
// monitor using the pipe.
7472
if let Some(error_code) = err.raw_os_error() {
75-
errpipe_tx.write_all(&error_code.to_ne_bytes()).ok();
73+
errpipe_tx.write(&error_code).ok();
7674
}
7775

7876
return Ok(ProcessOutput::ChildExit);
@@ -108,7 +106,7 @@ struct ExecClosure {
108106
command_pid: Option<ProcessId>,
109107
sudo_pid: ProcessId,
110108
parent_pgrp: ProcessId,
111-
errpipe_rx: UnixStream,
109+
errpipe_rx: BinPipe<4, i32>,
112110
signal_stream: &'static SignalStream,
113111
signal_handlers: [SignalHandler; ExecClosure::SIGNALS.len()],
114112
}
@@ -122,10 +120,12 @@ impl ExecClosure {
122120
fn new(
123121
command_pid: ProcessId,
124122
sudo_pid: ProcessId,
125-
errpipe_rx: UnixStream,
123+
errpipe_rx: BinPipe<4, i32>,
126124
registry: &mut EventRegistry<Self>,
127125
) -> io::Result<Self> {
128-
registry.register_event(&errpipe_rx, PollEvent::Readable, |_| ExecEvent::ErrPipe);
126+
registry.register_event(&errpipe_rx.sock, PollEvent::Readable, |_| {
127+
ExecEvent::ErrPipe
128+
});
129129

130130
let signal_stream = SignalStream::init()?;
131131

@@ -287,13 +287,11 @@ impl Process for ExecClosure {
287287
match event {
288288
ExecEvent::Signal => self.on_signal(registry),
289289
ExecEvent::ErrPipe => {
290-
let mut buf = 0i32.to_ne_bytes();
291-
match self.errpipe_rx.read_exact(&mut buf) {
290+
match self.errpipe_rx.read() {
292291
Err(err) if was_interrupted(&err) => { /* Retry later */ }
293292
Err(err) => registry.set_break(err),
294-
Ok(_) => {
293+
Ok(error_code) => {
295294
// Received error code from the command, forward it to the parent.
296-
let error_code = i32::from_ne_bytes(buf);
297295
registry.set_break(io::Error::from_raw_os_error(error_code));
298296
}
299297
}

0 commit comments

Comments
 (0)