Skip to content

Commit 7213a27

Browse files
authored
Merge pull request rust-lang#3939 from tiif/blockeventfd
Implement blocking eventfd
2 parents 61d496e + 96d9a95 commit 7213a27

11 files changed

+491
-109
lines changed

src/tools/miri/src/concurrency/thread.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ pub enum BlockReason {
152152
InitOnce(InitOnceId),
153153
/// Blocked on epoll.
154154
Epoll,
155+
/// Blocked on eventfd.
156+
Eventfd,
155157
}
156158

157159
/// The state of a thread.

src/tools/miri/src/shims/unix/linux/eventfd.rs

Lines changed: 153 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::io;
44
use std::io::ErrorKind;
55

66
use crate::concurrency::VClock;
7-
use crate::shims::unix::fd::FileDescriptionRef;
7+
use crate::shims::unix::fd::{FileDescriptionRef, WeakFileDescriptionRef};
88
use crate::shims::unix::linux::epoll::{EpollReadyEvents, EvalContextExt as _};
99
use crate::shims::unix::*;
1010
use crate::*;
@@ -26,6 +26,10 @@ struct Event {
2626
counter: Cell<u64>,
2727
is_nonblock: bool,
2828
clock: RefCell<VClock>,
29+
/// A list of thread ids blocked on eventfd::read.
30+
blocked_read_tid: RefCell<Vec<ThreadId>>,
31+
/// A list of thread ids blocked on eventfd::write.
32+
blocked_write_tid: RefCell<Vec<ThreadId>>,
2933
}
3034

3135
impl FileDescription for Event {
@@ -72,31 +76,8 @@ impl FileDescription for Event {
7276
// eventfd read at the size of u64.
7377
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
7478

75-
// Block when counter == 0.
76-
let counter = self.counter.get();
77-
if counter == 0 {
78-
if self.is_nonblock {
79-
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
80-
}
81-
82-
throw_unsup_format!("eventfd: blocking is unsupported");
83-
} else {
84-
// Synchronize with all prior `write` calls to this FD.
85-
ecx.acquire_clock(&self.clock.borrow());
86-
87-
// Give old counter value to userspace, and set counter value to 0.
88-
ecx.write_int(counter, &buf_place)?;
89-
self.counter.set(0);
90-
91-
// When any of the event happened, we check and update the status of all supported event
92-
// types for current file description.
93-
ecx.check_and_update_readiness(self_ref)?;
94-
95-
// Tell userspace how many bytes we wrote.
96-
ecx.write_int(buf_place.layout.size.bytes(), dest)?;
97-
}
98-
99-
interp_ok(())
79+
let weak_eventfd = self_ref.downgrade();
80+
eventfd_read(buf_place, dest, weak_eventfd, ecx)
10081
}
10182

10283
/// A write call adds the 8-byte integer value supplied in
@@ -127,7 +108,7 @@ impl FileDescription for Event {
127108
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
128109
}
129110

130-
// Read the user supplied value from the pointer.
111+
// Read the user-supplied value from the pointer.
131112
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
132113
let num = ecx.read_scalar(&buf_place)?.to_u64()?;
133114

@@ -137,27 +118,8 @@ impl FileDescription for Event {
137118
}
138119
// If the addition does not let the counter to exceed the maximum value, update the counter.
139120
// Else, block.
140-
match self.counter.get().checked_add(num) {
141-
Some(new_count @ 0..=MAX_COUNTER) => {
142-
// Future `read` calls will synchronize with this write, so update the FD clock.
143-
ecx.release_clock(|clock| {
144-
self.clock.borrow_mut().join(clock);
145-
});
146-
self.counter.set(new_count);
147-
}
148-
None | Some(u64::MAX) =>
149-
if self.is_nonblock {
150-
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
151-
} else {
152-
throw_unsup_format!("eventfd: blocking is unsupported");
153-
},
154-
};
155-
// When any of the event happened, we check and update the status of all supported event
156-
// types for current file description.
157-
ecx.check_and_update_readiness(self_ref)?;
158-
159-
// Return how many bytes we read.
160-
ecx.write_int(buf_place.layout.size.bytes(), dest)
121+
let weak_eventfd = self_ref.downgrade();
122+
eventfd_write(num, buf_place, dest, weak_eventfd, ecx)
161123
}
162124
}
163125

@@ -217,8 +179,151 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
217179
counter: Cell::new(val.into()),
218180
is_nonblock,
219181
clock: RefCell::new(VClock::default()),
182+
blocked_read_tid: RefCell::new(Vec::new()),
183+
blocked_write_tid: RefCell::new(Vec::new()),
220184
});
221185

222186
interp_ok(Scalar::from_i32(fd_value))
223187
}
224188
}
189+
190+
/// Block thread if the value addition will exceed u64::MAX -1,
191+
/// else just add the user-supplied value to current counter.
192+
fn eventfd_write<'tcx>(
193+
num: u64,
194+
buf_place: MPlaceTy<'tcx>,
195+
dest: &MPlaceTy<'tcx>,
196+
weak_eventfd: WeakFileDescriptionRef,
197+
ecx: &mut MiriInterpCx<'tcx>,
198+
) -> InterpResult<'tcx> {
199+
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
200+
throw_unsup_format!("eventfd FD got closed while blocking.")
201+
};
202+
203+
// Since we pass the weak file description ref, it is guaranteed to be
204+
// an eventfd file description.
205+
let eventfd = eventfd_ref.downcast::<Event>().unwrap();
206+
207+
match eventfd.counter.get().checked_add(num) {
208+
Some(new_count @ 0..=MAX_COUNTER) => {
209+
// Future `read` calls will synchronize with this write, so update the FD clock.
210+
ecx.release_clock(|clock| {
211+
eventfd.clock.borrow_mut().join(clock);
212+
});
213+
214+
// When this function is called, the addition is guaranteed to not exceed u64::MAX - 1.
215+
eventfd.counter.set(new_count);
216+
217+
// When any of the event happened, we check and update the status of all supported event
218+
// types for current file description.
219+
ecx.check_and_update_readiness(&eventfd_ref)?;
220+
221+
// Unblock *all* threads previously blocked on `read`.
222+
// We need to take out the blocked thread ids and unblock them together,
223+
// because `unblock_threads` may block them again and end up re-adding the
224+
// thread to the blocked list.
225+
let waiting_threads = std::mem::take(&mut *eventfd.blocked_read_tid.borrow_mut());
226+
// FIXME: We can randomize the order of unblocking.
227+
for thread_id in waiting_threads {
228+
ecx.unblock_thread(thread_id, BlockReason::Eventfd)?;
229+
}
230+
231+
// Return how many bytes we wrote.
232+
return ecx.write_int(buf_place.layout.size.bytes(), dest);
233+
}
234+
None | Some(u64::MAX) => {
235+
if eventfd.is_nonblock {
236+
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
237+
}
238+
239+
let dest = dest.clone();
240+
241+
eventfd.blocked_write_tid.borrow_mut().push(ecx.active_thread());
242+
243+
ecx.block_thread(
244+
BlockReason::Eventfd,
245+
None,
246+
callback!(
247+
@capture<'tcx> {
248+
num: u64,
249+
buf_place: MPlaceTy<'tcx>,
250+
dest: MPlaceTy<'tcx>,
251+
weak_eventfd: WeakFileDescriptionRef,
252+
}
253+
@unblock = |this| {
254+
eventfd_write(num, buf_place, &dest, weak_eventfd, this)
255+
}
256+
),
257+
);
258+
}
259+
};
260+
interp_ok(())
261+
}
262+
263+
/// Block thread if the current counter is 0,
264+
/// else just return the current counter value to the caller and set the counter to 0.
265+
fn eventfd_read<'tcx>(
266+
buf_place: MPlaceTy<'tcx>,
267+
dest: &MPlaceTy<'tcx>,
268+
weak_eventfd: WeakFileDescriptionRef,
269+
ecx: &mut MiriInterpCx<'tcx>,
270+
) -> InterpResult<'tcx> {
271+
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
272+
throw_unsup_format!("eventfd FD got closed while blocking.")
273+
};
274+
275+
// Since we pass the weak file description ref to the callback function, it is guaranteed to be
276+
// an eventfd file description.
277+
let eventfd = eventfd_ref.downcast::<Event>().unwrap();
278+
279+
// Block when counter == 0.
280+
let counter = eventfd.counter.replace(0);
281+
282+
if counter == 0 {
283+
if eventfd.is_nonblock {
284+
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
285+
}
286+
let dest = dest.clone();
287+
288+
eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread());
289+
290+
ecx.block_thread(
291+
BlockReason::Eventfd,
292+
None,
293+
callback!(
294+
@capture<'tcx> {
295+
buf_place: MPlaceTy<'tcx>,
296+
dest: MPlaceTy<'tcx>,
297+
weak_eventfd: WeakFileDescriptionRef,
298+
}
299+
@unblock = |this| {
300+
eventfd_read(buf_place, &dest, weak_eventfd, this)
301+
}
302+
),
303+
);
304+
} else {
305+
// Synchronize with all prior `write` calls to this FD.
306+
ecx.acquire_clock(&eventfd.clock.borrow());
307+
308+
// Give old counter value to userspace, and set counter value to 0.
309+
ecx.write_int(counter, &buf_place)?;
310+
311+
// When any of the events happened, we check and update the status of all supported event
312+
// types for current file description.
313+
ecx.check_and_update_readiness(&eventfd_ref)?;
314+
315+
// Unblock *all* threads previously blocked on `write`.
316+
// We need to take out the blocked thread ids and unblock them together,
317+
// because `unblock_threads` may block them again and end up re-adding the
318+
// thread to the blocked list.
319+
let waiting_threads = std::mem::take(&mut *eventfd.blocked_write_tid.borrow_mut());
320+
// FIXME: We can randomize the order of unblocking.
321+
for thread_id in waiting_threads {
322+
ecx.unblock_thread(thread_id, BlockReason::Eventfd)?;
323+
}
324+
325+
// Tell userspace how many bytes we read.
326+
return ecx.write_int(buf_place.layout.size.bytes(), dest);
327+
}
328+
interp_ok(())
329+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//@only-target: linux
2+
//~^ERROR: deadlocked
3+
//~^^ERROR: deadlocked
4+
//@compile-flags: -Zmiri-preemption-rate=0
5+
//@error-in-other-file: deadlock
6+
7+
use std::thread;
8+
9+
// Test the behaviour of a thread being blocked on an eventfd read, get unblocked, and then
10+
// get blocked again.
11+
12+
// The expected execution is
13+
// 1. Thread 1 blocks.
14+
// 2. Thread 2 blocks.
15+
// 3. Thread 3 unblocks both thread 1 and thread 2.
16+
// 4. Thread 1 reads.
17+
// 5. Thread 2's `read` deadlocked.
18+
19+
fn main() {
20+
// eventfd write will block when EFD_NONBLOCK flag is clear
21+
// and the addition caused counter to exceed u64::MAX - 1.
22+
let flags = libc::EFD_CLOEXEC;
23+
let fd = unsafe { libc::eventfd(0, flags) };
24+
25+
let thread1 = thread::spawn(move || {
26+
thread::park();
27+
let mut buf: [u8; 8] = [0; 8];
28+
// This read will block initially.
29+
let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
30+
assert_eq!(res, 8);
31+
let counter = u64::from_ne_bytes(buf);
32+
assert_eq!(counter, 1_u64);
33+
});
34+
35+
let thread2 = thread::spawn(move || {
36+
thread::park();
37+
let mut buf: [u8; 8] = [0; 8];
38+
// This read will block initially, then get unblocked by thread3, then get blocked again
39+
// because the `read` in thread1 executes first and set the counter to 0 again.
40+
let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
41+
//~^ERROR: deadlocked
42+
assert_eq!(res, 8);
43+
let counter = u64::from_ne_bytes(buf);
44+
assert_eq!(counter, 1_u64);
45+
});
46+
47+
let thread3 = thread::spawn(move || {
48+
thread::park();
49+
let sized_8_data = 1_u64.to_ne_bytes();
50+
// Write 1 to the counter, so both thread1 and thread2 will unblock.
51+
let res: i64 = unsafe {
52+
libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap()
53+
};
54+
// Make sure that write is successful.
55+
assert_eq!(res, 8);
56+
});
57+
58+
thread1.thread().unpark();
59+
thread2.thread().unpark();
60+
thread3.thread().unpark();
61+
62+
thread1.join().unwrap();
63+
thread2.join().unwrap();
64+
thread3.join().unwrap();
65+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
error: deadlock: the evaluated program deadlocked
2+
--> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC
3+
|
4+
LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) };
5+
| ^ the evaluated program deadlocked
6+
|
7+
= note: BACKTRACE:
8+
= note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC
9+
= note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC
10+
= note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC
11+
note: inside `main`
12+
--> tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC
13+
|
14+
LL | thread2.join().unwrap();
15+
| ^^^^^^^^^^^^^^
16+
17+
error: deadlock: the evaluated program deadlocked
18+
|
19+
= note: the evaluated program deadlocked
20+
= note: (no span available)
21+
= note: BACKTRACE on thread `unnamed-ID`:
22+
23+
error: deadlock: the evaluated program deadlocked
24+
--> tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC
25+
|
26+
LL | let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
27+
| ^ the evaluated program deadlocked
28+
|
29+
= note: BACKTRACE on thread `unnamed-ID`:
30+
= note: inside closure at tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC
31+
32+
error: deadlock: the evaluated program deadlocked
33+
|
34+
= note: the evaluated program deadlocked
35+
= note: (no span available)
36+
= note: BACKTRACE on thread `unnamed-ID`:
37+
38+
note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace
39+
40+
error: aborting due to 4 previous errors
41+

0 commit comments

Comments
 (0)