Skip to content

Optionally increase the size of the pipe buffer #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct Acquired {
impl Client {
pub fn new(limit: usize) -> io::Result<Client> {
let client = unsafe { Client::mk()? };
client.configure_capacity(limit)?;
// I don't think the character written here matters, but I could be
// wrong!
for _ in 0..limit {
Expand Down Expand Up @@ -63,6 +64,70 @@ impl Client {
Ok(Client::from_fds(pipes[0], pipes[1]))
}

fn configure_capacity(&self, required_capacity: usize) -> io::Result<()> {
// On Linux we may need to increase the capacity of the pipe for the
// jobserver to work correctly. Linux seems to exhibit behavior where it
// implements a ring-buffer internally but apparently the ring-ness of
// the ring-buffer is connected to *pages* of the ring buffer rather
// than actual bytes of the ring buffer. This means that if the pipe has
// only one page of capacity we can hit a possible deadlock situation
// where a bunch of threads are writing to the pipe but they're all
// blocked, despite the current used capacity of the pipe being less
// than a page.
//
// This was first discovered in rust-lang/cargo#9739 where a system with
// a large amount of concurrency would hang in `cargo build` when the
// jobserver pipe only had one page of capacity. This was reduced to a
// reproduction program [1] which indeed showed that the system would
// deadlock if the capacity of the pipe was just one page.
//
// To fix this issue, on Linux only, we may increase the capacity of the
// pipe. The main thing here is that if the capacity of the pipe is a
// single page we try to increase it to two pages, otherwise we fail
// because a deadlock might happen. While we're at it this goes ahead
// and factors in the `required_capacity` requested by the client to
// this calculation as well. If for some reason you want 10_000 units of
// concurrency in the pipe that means we'll need more than 2 pages
// (typically 8192 bytes), so we round that up to 3 pages as well.
//
// Someone with more understanding of linux pipes and how they buffer
// internally should probably review this at some point. The exact cause
// of the deadlock seems a little uncertain and it's not clear why the
// example program [1] deadlocks and why simply adding another page
// fixes things. Is this a kernel bug? Do we need to always guarantee at
// least one free page? I'm not sure! Hopefully for now this is enough
// to fix the problem until machines start having more than 4k cores,
// which seems like it might be awhile.
//
// [1]: https://github.com/rust-lang/cargo/issues/9739#issuecomment-889183009
#[cfg(target_os = "linux")]
unsafe {
let page_size = libc::sysconf(libc::_SC_PAGESIZE);
let actual_capacity = cvt(libc::fcntl(self.write.as_raw_fd(), libc::F_GETPIPE_SZ))?;

if let Some(c) = calculate_capacity(
required_capacity,
actual_capacity as usize,
page_size as usize,
) {
cvt(libc::fcntl(self.write.as_raw_fd(), libc::F_SETPIPE_SZ, c)).map_err(|e| {
io::Error::new(
e.kind(),
format!(
"failed to increase jobserver pipe capacity from {} to {}; \
jobserver otherwise might deadlock",
actual_capacity, c,
),
)

// ...
})?;
}
}

Ok(())
}

pub unsafe fn open(s: &str) -> Option<Client> {
let mut parts = s.splitn(2, ',');
let read = parts.next().unwrap();
Expand Down Expand Up @@ -337,3 +402,44 @@ extern "C" fn sigusr1_handler(
) {
// nothing to do
}

#[allow(dead_code)]
fn calculate_capacity(
required_capacity: usize,
actual_capacity: usize,
page_size: usize,
) -> Option<usize> {
if actual_capacity < required_capacity {
let mut rounded_capacity = round_up_to(required_capacity, page_size);
if rounded_capacity < page_size * 2 {
rounded_capacity += page_size;
}
return Some(rounded_capacity);
}

if actual_capacity <= page_size {
return Some(page_size * 2);
}

return None;

fn round_up_to(a: usize, b: usize) -> usize {
assert!(b.is_power_of_two());
(a + (b - 1)) & (!(b - 1))
}
}

#[cfg(test)]
mod tests {
use super::calculate_capacity;

#[test]
fn test_calculate_capacity() {
assert_eq!(calculate_capacity(1, 65536, 4096), None);
assert_eq!(calculate_capacity(500, 65536, 4096), None);
assert_eq!(calculate_capacity(5000, 4096, 4096), Some(8192));
assert_eq!(calculate_capacity(1, 4096, 4096), Some(8192));
assert_eq!(calculate_capacity(4096, 4096, 4096), Some(8192));
assert_eq!(calculate_capacity(8192, 4096, 4096), Some(8192));
}
}
22 changes: 22 additions & 0 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,25 @@ fn zero_client() {
assert!(rx.try_recv().is_err());
}
}

#[test]
fn highly_concurrent() {
const N: usize = 10000;

let client = t!(Client::new(80));

let threads = (0..80)
.map(|_| {
let client = client.clone();
std::thread::spawn(move || {
for _ in 0..N {
drop(client.acquire().unwrap());
}
})
})
.collect::<Vec<_>>();

for t in threads {
t.join().unwrap();
}
}