Skip to content

Binary serialization over pipes #741

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions src/common/bin_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//! Binary serialization, and an implementation over Unix pipes.
use sealed::DeSerializeBytes;
use std::{
io::{self, Read, Write},
marker::PhantomData,
os::{fd::AsRawFd, unix::net::UnixStream},
};

mod sealed {
pub trait DeSerializeBytes {
fn zero_init() -> Self;
fn as_mut_ref(&mut self) -> &mut [u8];
}

impl<const N: usize> DeSerializeBytes for [u8; N] {
fn zero_init() -> [u8; N] {
[0; N]
}
fn as_mut_ref(&mut self) -> &mut [u8] {
self.as_mut_slice()
}
}
}

/// Serialization/deserialization trait using a byte array as storage.
pub trait DeSerialize {
/// Usually `[u8; std::mem::size_of::<Self>()]`.
type Bytes: sealed::DeSerializeBytes;
fn serialize(&self) -> Self::Bytes;
fn deserialize(bytes: Self::Bytes) -> Self;
}

/// A binary pipe that can send and recieve typed messages.
///
/// By default, if only one generic is included,
/// the types of the [BinPipe::write()] and [BinPipe::read()] messages
/// are the same.
pub struct BinPipe<R: DeSerialize, W: DeSerialize = R> {
sock: UnixStream,
_read_marker: PhantomData<R>,
_write_marker: PhantomData<W>,
}

impl<R: DeSerialize, W: DeSerialize> BinPipe<R, W> {
/// A pipe abstracting over a [UnixStream] with easier
/// binary serialization, to help with the buffer sizes and ser/de steps.
/// Uses [UnixStream::pair()].
pub fn pair() -> io::Result<(BinPipe<R, W>, BinPipe<W, R>)> {
let (first, second) = UnixStream::pair()?;
Ok((
BinPipe {
sock: first,
_read_marker: PhantomData::<R>,
_write_marker: PhantomData::<W>,
},
// R and W are inverted here since the type of what's written in one
// pipe is read in the other, and vice versa.
BinPipe {
sock: second,
_read_marker: PhantomData::<W>,
_write_marker: PhantomData::<R>,
},
))
}

/// Read a `R` from the pipe.
pub fn read(&mut self) -> io::Result<R> {
let mut bytes = R::Bytes::zero_init();
self.sock.read_exact(bytes.as_mut_ref())?;
Ok(R::deserialize(bytes))
}

/// Write a `W` to the pipe.
pub fn write(&mut self, bytes: &W) -> io::Result<()> {
self.sock.write_all(bytes.serialize().as_mut_ref())?;
Ok(())
}

/// Calls [std::net::TcpStream::set_nonblocking] on the underlying socket.
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
self.sock.set_nonblocking(nonblocking)
}
}

impl<R: DeSerialize, W: DeSerialize> AsRawFd for BinPipe<R, W> {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
self.sock.as_raw_fd()
}
}

impl DeSerialize for i32 {
type Bytes = [u8; std::mem::size_of::<Self>()];

fn serialize(&self) -> Self::Bytes {
self.to_ne_bytes()
}
fn deserialize(bytes: Self::Bytes) -> Self {
Self::from_ne_bytes(bytes)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
pub fn single_type() {
let (mut tx, mut rx) = BinPipe::pair().unwrap();
tx.write(&42i32).unwrap();
assert_eq!(rx.read().unwrap(), 42);
rx.write(&23i32).unwrap();
assert_eq!(tx.read().unwrap(), 23);
}

#[test]
pub fn different_types() {
impl DeSerialize for u8 {
type Bytes = [u8; std::mem::size_of::<Self>()];
fn serialize(&self) -> [u8; 1] {
self.to_ne_bytes()
}
fn deserialize(bytes: [u8; 1]) -> Self {
Self::from_ne_bytes(bytes)
}
}

let (mut tx, mut rx) = BinPipe::pair().unwrap();
tx.write(&42i32).unwrap();
assert_eq!(rx.read().unwrap(), 42);
rx.write(&23u8).unwrap();
assert_eq!(tx.read().unwrap(), 23);
}
}
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub use command::CommandAndArguments;
pub use context::Context;
pub use error::Error;

pub mod bin_serde;
pub mod command;
pub mod context;
pub mod error;
Expand Down
30 changes: 13 additions & 17 deletions src/exec/no_pty.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use std::{
ffi::c_int,
io::{self, Read, Write},
os::unix::{net::UnixStream, process::CommandExt},
process::Command,
};
use std::{ffi::c_int, io, os::unix::process::CommandExt, process::Command};

use super::{
event::PollEvent,
event::{EventRegistry, Process, StopReason},
io_util::was_interrupted,
terminate_process, ExitReason, HandleSigchld, ProcessOutput,
};
use crate::system::signal::{
consts::*, register_handlers, SignalHandler, SignalHandlerBehavior, SignalNumber, SignalSet,
SignalStream,
use crate::{
common::bin_serde::BinPipe,
system::signal::{
consts::*, register_handlers, SignalHandler, SignalHandlerBehavior, SignalNumber,
SignalSet, SignalStream,
},
};
use crate::{
exec::{handle_sigchld, opt_fmt, signal_fmt},
Expand Down Expand Up @@ -46,7 +44,7 @@ pub(super) fn exec_no_pty(sudo_pid: ProcessId, mut command: Command) -> io::Resu
// FIXME (ogsudo): Some extra config happens here if selinux is available.

// Use a pipe to get the IO error if `exec` fails.
let (mut errpipe_tx, errpipe_rx) = UnixStream::pair()?;
let (mut errpipe_tx, errpipe_rx) = BinPipe::pair()?;

// Don't close the error pipe as we need it to retrieve the error code if the command execution
// fails.
Expand All @@ -72,7 +70,7 @@ pub(super) fn exec_no_pty(sudo_pid: ProcessId, mut command: Command) -> io::Resu
// If `exec` returns, it means that executing the command failed. Send the error to the
// monitor using the pipe.
if let Some(error_code) = err.raw_os_error() {
errpipe_tx.write_all(&error_code.to_ne_bytes()).ok();
errpipe_tx.write(&error_code).ok();
}

return Ok(ProcessOutput::ChildExit);
Expand Down Expand Up @@ -108,7 +106,7 @@ struct ExecClosure {
command_pid: Option<ProcessId>,
sudo_pid: ProcessId,
parent_pgrp: ProcessId,
errpipe_rx: UnixStream,
errpipe_rx: BinPipe<i32>,
signal_stream: &'static SignalStream,
signal_handlers: [SignalHandler; ExecClosure::SIGNALS.len()],
}
Expand All @@ -122,7 +120,7 @@ impl ExecClosure {
fn new(
command_pid: ProcessId,
sudo_pid: ProcessId,
errpipe_rx: UnixStream,
errpipe_rx: BinPipe<i32>,
registry: &mut EventRegistry<Self>,
) -> io::Result<Self> {
registry.register_event(&errpipe_rx, PollEvent::Readable, |_| ExecEvent::ErrPipe);
Expand Down Expand Up @@ -287,13 +285,11 @@ impl Process for ExecClosure {
match event {
ExecEvent::Signal => self.on_signal(registry),
ExecEvent::ErrPipe => {
let mut buf = 0i32.to_ne_bytes();
match self.errpipe_rx.read_exact(&mut buf) {
match self.errpipe_rx.read() {
Err(err) if was_interrupted(&err) => { /* Retry later */ }
Err(err) => registry.set_break(err),
Ok(_) => {
Ok(error_code) => {
// Received error code from the command, forward it to the parent.
let error_code = i32::from_ne_bytes(buf);
registry.set_break(io::Error::from_raw_os_error(error_code));
}
}
Expand Down
Loading