Skip to content

Commit ddb7fbe

Browse files
committed
Auto merge of rust-lang#96441 - ChrisDenton:sync-pipes, r=m-ou-se
Windows: Make stdin pipes synchronous Stdin pipes do not need to be used asynchronously within the standard library. This is a first step in making pipes mostly synchronous. r? `@m-ou-se`
2 parents baaa3b6 + 1e7c156 commit ddb7fbe

File tree

5 files changed

+119
-31
lines changed

5 files changed

+119
-31
lines changed

library/std/src/os/windows/io/handle.rs

+12
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,18 @@ impl OwnedHandle {
198198
})?;
199199
unsafe { Ok(Self::from_raw_handle(ret)) }
200200
}
201+
202+
/// Allow child processes to inherit the handle.
203+
pub(crate) fn set_inheritable(&self) -> io::Result<()> {
204+
cvt(unsafe {
205+
c::SetHandleInformation(
206+
self.as_raw_handle(),
207+
c::HANDLE_FLAG_INHERIT,
208+
c::HANDLE_FLAG_INHERIT,
209+
)
210+
})?;
211+
Ok(())
212+
}
201213
}
202214

203215
impl TryFrom<HandleOrInvalid> for OwnedHandle {

library/std/src/sys/windows/c.rs

+6
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,12 @@ extern "system" {
10221022
bWaitAll: BOOL,
10231023
dwMilliseconds: DWORD,
10241024
) -> DWORD;
1025+
pub fn CreatePipe(
1026+
hReadPipe: *mut HANDLE,
1027+
hWritePipe: *mut HANDLE,
1028+
lpPipeAttributes: *const SECURITY_ATTRIBUTES,
1029+
nSize: DWORD,
1030+
) -> BOOL;
10251031
pub fn CreateNamedPipeW(
10261032
lpName: LPCWSTR,
10271033
dwOpenMode: DWORD,

library/std/src/sys/windows/handle.rs

+4
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,10 @@ impl Handle {
221221
Ok(Self(self.0.duplicate(access, inherit, options)?))
222222
}
223223

224+
pub(crate) fn set_inheritable(&self) -> io::Result<()> {
225+
self.0.set_inheritable()
226+
}
227+
224228
/// Performs a synchronous read.
225229
///
226230
/// If the handle is opened for asynchronous I/O then this abort the process.

library/std/src/sys/windows/pipe.rs

+65-25
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,56 @@ use crate::sys_common::IntoInner;
1818
// Anonymous pipes
1919
////////////////////////////////////////////////////////////////////////////////
2020

21-
pub struct AnonPipe {
22-
inner: Handle,
21+
// A 64kb pipe capacity is the same as a typical Linux default.
22+
const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;
23+
24+
pub enum AnonPipe {
25+
Sync(Handle),
26+
Async(Handle),
2327
}
2428

2529
impl IntoInner<Handle> for AnonPipe {
2630
fn into_inner(self) -> Handle {
27-
self.inner
31+
match self {
32+
Self::Sync(handle) => handle,
33+
Self::Async(handle) => handle,
34+
}
2835
}
2936
}
3037

3138
pub struct Pipes {
3239
pub ours: AnonPipe,
3340
pub theirs: AnonPipe,
3441
}
42+
impl Pipes {
43+
/// Create a new pair of pipes where both pipes are synchronous.
44+
///
45+
/// These must not be used asynchronously.
46+
pub fn new_synchronous(
47+
ours_readable: bool,
48+
their_handle_inheritable: bool,
49+
) -> io::Result<Self> {
50+
unsafe {
51+
// If `CreatePipe` succeeds, these will be our pipes.
52+
let mut read = ptr::null_mut();
53+
let mut write = ptr::null_mut();
54+
55+
if c::CreatePipe(&mut read, &mut write, ptr::null(), PIPE_BUFFER_CAPACITY) == 0 {
56+
Err(io::Error::last_os_error())
57+
} else {
58+
let (ours, theirs) = if ours_readable { (read, write) } else { (write, read) };
59+
let ours = Handle::from_raw_handle(ours);
60+
let theirs = Handle::from_raw_handle(theirs);
61+
62+
if their_handle_inheritable {
63+
theirs.set_inheritable()?;
64+
}
65+
66+
Ok(Pipes { ours: AnonPipe::Sync(ours), theirs: AnonPipe::Sync(theirs) })
67+
}
68+
}
69+
}
70+
}
3571

3672
/// Although this looks similar to `anon_pipe` in the Unix module it's actually
3773
/// subtly different. Here we'll return two pipes in the `Pipes` return value,
@@ -53,9 +89,6 @@ pub struct Pipes {
5389
/// with `OVERLAPPED` instances, but also works out ok if it's only ever used
5490
/// once at a time (which we do indeed guarantee).
5591
pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
56-
// A 64kb pipe capacity is the same as a typical Linux default.
57-
const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;
58-
5992
// Note that we specifically do *not* use `CreatePipe` here because
6093
// unfortunately the anonymous pipes returned do not support overlapped
6194
// operations. Instead, we create a "hopefully unique" name and create a
@@ -156,12 +189,9 @@ pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Res
156189
};
157190
opts.security_attributes(&mut sa);
158191
let theirs = File::open(Path::new(&name), &opts)?;
159-
let theirs = AnonPipe { inner: theirs.into_inner() };
192+
let theirs = AnonPipe::Sync(theirs.into_inner());
160193

161-
Ok(Pipes {
162-
ours: AnonPipe { inner: ours },
163-
theirs: AnonPipe { inner: theirs.into_inner() },
164-
})
194+
Ok(Pipes { ours: AnonPipe::Async(ours), theirs })
165195
}
166196
}
167197

@@ -171,12 +201,12 @@ pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Res
171201
/// This is achieved by creating a new set of pipes and spawning a thread that
172202
/// relays messages between the source and the synchronous pipe.
173203
pub fn spawn_pipe_relay(
174-
source: &AnonPipe,
204+
source: &Handle,
175205
ours_readable: bool,
176206
their_handle_inheritable: bool,
177207
) -> io::Result<AnonPipe> {
178208
// We need this handle to live for the lifetime of the thread spawned below.
179-
let source = source.duplicate()?;
209+
let source = AnonPipe::Async(source.duplicate(0, true, c::DUPLICATE_SAME_ACCESS)?);
180210

181211
// create a new pair of anon pipes.
182212
let Pipes { theirs, ours } = anon_pipe(ours_readable, their_handle_inheritable)?;
@@ -227,19 +257,24 @@ type AlertableIoFn = unsafe extern "system" fn(
227257

228258
impl AnonPipe {
229259
pub fn handle(&self) -> &Handle {
230-
&self.inner
260+
match self {
261+
Self::Async(ref handle) => handle,
262+
Self::Sync(ref handle) => handle,
263+
}
231264
}
232265
pub fn into_handle(self) -> Handle {
233-
self.inner
234-
}
235-
fn duplicate(&self) -> io::Result<Self> {
236-
self.inner.duplicate(0, false, c::DUPLICATE_SAME_ACCESS).map(|inner| AnonPipe { inner })
266+
self.into_inner()
237267
}
238268

239269
pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
240270
let result = unsafe {
241271
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
242-
self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
272+
match self {
273+
Self::Sync(ref handle) => handle.read(buf),
274+
Self::Async(_) => {
275+
self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
276+
}
277+
}
243278
};
244279

245280
match result {
@@ -253,28 +288,33 @@ impl AnonPipe {
253288
}
254289

255290
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
256-
self.inner.read_vectored(bufs)
291+
io::default_read_vectored(|buf| self.read(buf), bufs)
257292
}
258293

259294
#[inline]
260295
pub fn is_read_vectored(&self) -> bool {
261-
self.inner.is_read_vectored()
296+
false
262297
}
263298

264299
pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
265300
unsafe {
266301
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
267-
self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
302+
match self {
303+
Self::Sync(ref handle) => handle.write(buf),
304+
Self::Async(_) => {
305+
self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
306+
}
307+
}
268308
}
269309
}
270310

271311
pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
272-
self.inner.write_vectored(bufs)
312+
io::default_write_vectored(|buf| self.write(buf), bufs)
273313
}
274314

275315
#[inline]
276316
pub fn is_write_vectored(&self) -> bool {
277-
self.inner.is_write_vectored()
317+
false
278318
}
279319

280320
/// Synchronizes asynchronous reads or writes using our anonymous pipe.
@@ -346,7 +386,7 @@ impl AnonPipe {
346386

347387
// Asynchronous read of the pipe.
348388
// If successful, `callback` will be called once it completes.
349-
let result = io(self.inner.as_handle(), buf, len, &mut overlapped, callback);
389+
let result = io(self.handle().as_handle(), buf, len, &mut overlapped, callback);
350390
if result == c::FALSE {
351391
// We can return here because the call failed.
352392
// After this we must not return until the I/O completes.

library/std/src/sys/windows/process.rs

+32-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::sys::cvt;
2424
use crate::sys::fs::{File, OpenOptions};
2525
use crate::sys::handle::Handle;
2626
use crate::sys::path;
27-
use crate::sys::pipe::{self, AnonPipe};
27+
use crate::sys::pipe::{self, AnonPipe, Pipes};
2828
use crate::sys::stdio;
2929
use crate::sys_common::mutex::StaticMutex;
3030
use crate::sys_common::process::{CommandEnv, CommandEnvs};
@@ -173,7 +173,7 @@ pub enum Stdio {
173173
Inherit,
174174
Null,
175175
MakePipe,
176-
Pipe(AnonPipe),
176+
AsyncPipe(Handle),
177177
Handle(Handle),
178178
}
179179

@@ -527,13 +527,33 @@ impl Stdio {
527527
},
528528

529529
Stdio::MakePipe => {
530-
let ours_readable = stdio_id != c::STD_INPUT_HANDLE;
531-
let pipes = pipe::anon_pipe(ours_readable, true)?;
530+
// Handles that are passed to a child process must be synchronous
531+
// because they will be read synchronously (see #95759).
532+
// Therefore we prefer to make both ends of a pipe synchronous
533+
// just in case our end of the pipe is passed to another process.
534+
//
535+
// However, we may need to read from both the child's stdout and
536+
// stderr simultaneously when waiting for output. This requires
537+
// async reads so as to avoid blocking either pipe.
538+
//
539+
// The solution used here is to make handles synchronous
540+
// except for our side of the stdout and sterr pipes.
541+
// If our side of those pipes do end up being given to another
542+
// process then we use a "pipe relay" to synchronize access
543+
// (see `Stdio::AsyncPipe` below).
544+
let pipes = if stdio_id == c::STD_INPUT_HANDLE {
545+
// For stdin both sides of the pipe are synchronous.
546+
Pipes::new_synchronous(false, true)?
547+
} else {
548+
// For stdout/stderr our side of the pipe is async and their side is synchronous.
549+
pipe::anon_pipe(true, true)?
550+
};
532551
*pipe = Some(pipes.ours);
533552
Ok(pipes.theirs.into_handle())
534553
}
535554

536-
Stdio::Pipe(ref source) => {
555+
Stdio::AsyncPipe(ref source) => {
556+
// We need to synchronize asynchronous pipes by using a pipe relay.
537557
let ours_readable = stdio_id != c::STD_INPUT_HANDLE;
538558
pipe::spawn_pipe_relay(source, ours_readable, true).map(AnonPipe::into_handle)
539559
}
@@ -562,7 +582,13 @@ impl Stdio {
562582

563583
impl From<AnonPipe> for Stdio {
564584
fn from(pipe: AnonPipe) -> Stdio {
565-
Stdio::Pipe(pipe)
585+
// Note that it's very important we don't give async handles to child processes.
586+
// Therefore if the pipe is asynchronous we must have a way to turn it synchronous.
587+
// See #95759.
588+
match pipe {
589+
AnonPipe::Sync(handle) => Stdio::Handle(handle),
590+
AnonPipe::Async(handle) => Stdio::AsyncPipe(handle),
591+
}
566592
}
567593
}
568594

0 commit comments

Comments
 (0)