Skip to content

Commit ec13df4

Browse files
committed
Add fast futex-based thread parker for Linux.
1 parent 1464fc3 commit ec13df4

File tree

3 files changed

+231
-123
lines changed

3 files changed

+231
-123
lines changed
+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
//! Parker implementaiton based on a Mutex and Condvar.
2+
3+
use crate::sync::atomic::AtomicUsize;
4+
use crate::sync::atomic::Ordering::SeqCst;
5+
use crate::sync::{Condvar, Mutex};
6+
use crate::time::Duration;
7+
8+
const EMPTY: usize = 0;
9+
const PARKED: usize = 1;
10+
const NOTIFIED: usize = 2;
11+
12+
pub struct Parker {
13+
state: AtomicUsize,
14+
lock: Mutex<()>,
15+
cvar: Condvar,
16+
}
17+
18+
impl Parker {
19+
pub fn new() -> Self {
20+
Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
21+
}
22+
23+
// This implementaiton doesn't require `unsafe`, but other implementations
24+
// may assume this is only called by the thread that owns the Parker.
25+
pub unsafe fn park(&self) {
26+
// If we were previously notified then we consume this notification and
27+
// return quickly.
28+
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
29+
return;
30+
}
31+
32+
// Otherwise we need to coordinate going to sleep
33+
let mut m = self.lock.lock().unwrap();
34+
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
35+
Ok(_) => {}
36+
Err(NOTIFIED) => {
37+
// We must read here, even though we know it will be `NOTIFIED`.
38+
// This is because `unpark` may have been called again since we read
39+
// `NOTIFIED` in the `compare_exchange` above. We must perform an
40+
// acquire operation that synchronizes with that `unpark` to observe
41+
// any writes it made before the call to unpark. To do that we must
42+
// read from the write it made to `state`.
43+
let old = self.state.swap(EMPTY, SeqCst);
44+
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
45+
return;
46+
} // should consume this notification, so prohibit spurious wakeups in next park.
47+
Err(_) => panic!("inconsistent park state"),
48+
}
49+
loop {
50+
m = self.cvar.wait(m).unwrap();
51+
match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
52+
Ok(_) => return, // got a notification
53+
Err(_) => {} // spurious wakeup, go back to sleep
54+
}
55+
}
56+
}
57+
58+
// This implementaiton doesn't require `unsafe`, but other implementations
59+
// may assume this is only called by the thread that owns the Parker.
60+
pub unsafe fn park_timeout(&self, dur: Duration) {
61+
// Like `park` above we have a fast path for an already-notified thread, and
62+
// afterwards we start coordinating for a sleep.
63+
// return quickly.
64+
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
65+
return;
66+
}
67+
let m = self.lock.lock().unwrap();
68+
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
69+
Ok(_) => {}
70+
Err(NOTIFIED) => {
71+
// We must read again here, see `park`.
72+
let old = self.state.swap(EMPTY, SeqCst);
73+
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
74+
return;
75+
} // should consume this notification, so prohibit spurious wakeups in next park.
76+
Err(_) => panic!("inconsistent park_timeout state"),
77+
}
78+
79+
// Wait with a timeout, and if we spuriously wake up or otherwise wake up
80+
// from a notification we just want to unconditionally set the state back to
81+
// empty, either consuming a notification or un-flagging ourselves as
82+
// parked.
83+
let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
84+
match self.state.swap(EMPTY, SeqCst) {
85+
NOTIFIED => {} // got a notification, hurray!
86+
PARKED => {} // no notification, alas
87+
n => panic!("inconsistent park_timeout state: {}", n),
88+
}
89+
}
90+
91+
pub fn unpark(&self) {
92+
// To ensure the unparked thread will observe any writes we made
93+
// before this call, we must perform a release operation that `park`
94+
// can synchronize with. To do that we must write `NOTIFIED` even if
95+
// `state` is already `NOTIFIED`. That is why this must be a swap
96+
// rather than a compare-and-swap that returns if it reads `NOTIFIED`
97+
// on failure.
98+
match self.state.swap(NOTIFIED, SeqCst) {
99+
EMPTY => return, // no one was waiting
100+
NOTIFIED => return, // already unparked
101+
PARKED => {} // gotta go wake someone up
102+
_ => panic!("inconsistent state in unpark"),
103+
}
104+
105+
// There is a period between when the parked thread sets `state` to
106+
// `PARKED` (or last checked `state` in the case of a spurious wake
107+
// up) and when it actually waits on `cvar`. If we were to notify
108+
// during this period it would be ignored and then when the parked
109+
// thread went to sleep it would never wake up. Fortunately, it has
110+
// `lock` locked at this stage so we can acquire `lock` to wait until
111+
// it is ready to receive the notification.
112+
//
113+
// Releasing `lock` before the call to `notify_one` means that when the
114+
// parked thread wakes it doesn't get woken only to have to wait for us
115+
// to release `lock`.
116+
drop(self.lock.lock().unwrap());
117+
self.cvar.notify_one()
118+
}
119+
}
+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use crate::sync::atomic::AtomicI32;
2+
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
3+
use crate::time::Duration;
4+
5+
const PARKED: i32 = -1;
6+
const EMPTY: i32 = 0;
7+
const NOTIFIED: i32 = 1;
8+
9+
pub struct Parker {
10+
state: AtomicI32,
11+
}
12+
13+
impl Parker {
14+
#[inline]
15+
pub const fn new() -> Self {
16+
Parker { state: AtomicI32::new(EMPTY) }
17+
}
18+
19+
// Assumes this is only called by the thread that owns the Parker,
20+
// which means that `self.state != PARKED`.
21+
pub unsafe fn park(&self) {
22+
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
23+
// first case.
24+
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
25+
return;
26+
}
27+
loop {
28+
// Wait for something to happen, assuming it's still set to PARKED.
29+
futex_wait(&self.state, PARKED, None);
30+
// Change NOTIFIED=>EMPTY and return in that case.
31+
if self.state.compare_and_swap(NOTIFIED, EMPTY, Acquire) == NOTIFIED {
32+
return;
33+
} else {
34+
// Spurious wake up. We loop to try again.
35+
}
36+
}
37+
}
38+
39+
// Assumes this is only called by the thread that owns the Parker,
40+
// which means that `self.state != PARKED`.
41+
pub unsafe fn park_timeout(&self, timeout: Duration) {
42+
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
43+
// first case.
44+
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
45+
return;
46+
}
47+
// Wait for something to happen, assuming it's still set to PARKED.
48+
futex_wait(&self.state, PARKED, Some(timeout));
49+
// This is not just a store, because we need to establish a
50+
// release-acquire ordering with unpark().
51+
if self.state.swap(EMPTY, Acquire) == NOTIFIED {
52+
// Woke up because of unpark().
53+
} else {
54+
// Timeout or spurious wake up.
55+
// We return either way, because we can't easily tell if it was the
56+
// timeout or not.
57+
}
58+
}
59+
60+
pub fn unpark(&self) {
61+
// Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
62+
// wake the thread in the first case.
63+
//
64+
// Note that even NOTIFIED=>NOTIFIED results in a write. This is on
65+
// purpose, to make sure every unpark() has a release-acquire ordering
66+
// with park().
67+
if self.state.swap(NOTIFIED, Release) == PARKED {
68+
futex_wake(&self.state);
69+
}
70+
}
71+
}
72+
73+
fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) {
74+
let timespec;
75+
let timespec_ptr = match timeout {
76+
Some(timeout) => {
77+
timespec = libc::timespec {
78+
tv_sec: timeout.as_secs() as _,
79+
tv_nsec: timeout.subsec_nanos() as _,
80+
};
81+
&timespec as *const libc::timespec
82+
}
83+
None => crate::ptr::null(),
84+
};
85+
unsafe {
86+
libc::syscall(
87+
libc::SYS_futex,
88+
futex as *const AtomicI32,
89+
libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG,
90+
expected,
91+
timespec_ptr,
92+
);
93+
}
94+
}
95+
96+
fn futex_wake(futex: &AtomicI32) {
97+
unsafe {
98+
libc::syscall(
99+
libc::SYS_futex,
100+
futex as *const AtomicI32,
101+
libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG,
102+
1,
103+
);
104+
}
105+
}

library/std/src/thread/parker/mod.rs

+7-123
Original file line numberDiff line numberDiff line change
@@ -1,125 +1,9 @@
1-
//! Parker implementaiton based on a Mutex and Condvar.
2-
//!
3-
//! The implementation currently uses the trivial strategy of a Mutex+Condvar
4-
//! with wakeup flag, which does not actually allow spurious wakeups. In the
5-
//! future, this will be implemented in a more efficient way, perhaps along the lines of
6-
//! http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp
7-
//! or futuxes, and in either case may allow spurious wakeups.
8-
9-
use crate::sync::atomic::AtomicUsize;
10-
use crate::sync::atomic::Ordering::SeqCst;
11-
use crate::sync::{Condvar, Mutex};
12-
use crate::time::Duration;
13-
14-
const EMPTY: usize = 0;
15-
const PARKED: usize = 1;
16-
const NOTIFIED: usize = 2;
17-
18-
pub struct Parker {
19-
state: AtomicUsize,
20-
lock: Mutex<()>,
21-
cvar: Condvar,
22-
}
23-
24-
impl Parker {
25-
pub fn new() -> Self {
26-
Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
27-
}
28-
29-
// This implementaiton doesn't require `unsafe`, but other implementations
30-
// may assume this is only called by the thread that owns the Parker.
31-
pub unsafe fn park(&self) {
32-
// If we were previously notified then we consume this notification and
33-
// return quickly.
34-
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
35-
return;
36-
}
37-
38-
// Otherwise we need to coordinate going to sleep
39-
let mut m = self.lock.lock().unwrap();
40-
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
41-
Ok(_) => {}
42-
Err(NOTIFIED) => {
43-
// We must read here, even though we know it will be `NOTIFIED`.
44-
// This is because `unpark` may have been called again since we read
45-
// `NOTIFIED` in the `compare_exchange` above. We must perform an
46-
// acquire operation that synchronizes with that `unpark` to observe
47-
// any writes it made before the call to unpark. To do that we must
48-
// read from the write it made to `state`.
49-
let old = self.state.swap(EMPTY, SeqCst);
50-
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
51-
return;
52-
} // should consume this notification, so prohibit spurious wakeups in next park.
53-
Err(_) => panic!("inconsistent park state"),
54-
}
55-
loop {
56-
m = self.cvar.wait(m).unwrap();
57-
match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
58-
Ok(_) => return, // got a notification
59-
Err(_) => {} // spurious wakeup, go back to sleep
60-
}
61-
}
62-
}
63-
64-
// This implementaiton doesn't require `unsafe`, but other implementations
65-
// may assume this is only called by the thread that owns the Parker.
66-
pub unsafe fn park_timeout(&self, dur: Duration) {
67-
// Like `park` above we have a fast path for an already-notified thread, and
68-
// afterwards we start coordinating for a sleep.
69-
// return quickly.
70-
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
71-
return;
72-
}
73-
let m = self.lock.lock().unwrap();
74-
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
75-
Ok(_) => {}
76-
Err(NOTIFIED) => {
77-
// We must read again here, see `park`.
78-
let old = self.state.swap(EMPTY, SeqCst);
79-
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
80-
return;
81-
} // should consume this notification, so prohibit spurious wakeups in next park.
82-
Err(_) => panic!("inconsistent park_timeout state"),
83-
}
84-
85-
// Wait with a timeout, and if we spuriously wake up or otherwise wake up
86-
// from a notification we just want to unconditionally set the state back to
87-
// empty, either consuming a notification or un-flagging ourselves as
88-
// parked.
89-
let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
90-
match self.state.swap(EMPTY, SeqCst) {
91-
NOTIFIED => {} // got a notification, hurray!
92-
PARKED => {} // no notification, alas
93-
n => panic!("inconsistent park_timeout state: {}", n),
94-
}
95-
}
96-
97-
pub fn unpark(&self) {
98-
// To ensure the unparked thread will observe any writes we made
99-
// before this call, we must perform a release operation that `park`
100-
// can synchronize with. To do that we must write `NOTIFIED` even if
101-
// `state` is already `NOTIFIED`. That is why this must be a swap
102-
// rather than a compare-and-swap that returns if it reads `NOTIFIED`
103-
// on failure.
104-
match self.state.swap(NOTIFIED, SeqCst) {
105-
EMPTY => return, // no one was waiting
106-
NOTIFIED => return, // already unparked
107-
PARKED => {} // gotta go wake someone up
108-
_ => panic!("inconsistent state in unpark"),
109-
}
110-
111-
// There is a period between when the parked thread sets `state` to
112-
// `PARKED` (or last checked `state` in the case of a spurious wake
113-
// up) and when it actually waits on `cvar`. If we were to notify
114-
// during this period it would be ignored and then when the parked
115-
// thread went to sleep it would never wake up. Fortunately, it has
116-
// `lock` locked at this stage so we can acquire `lock` to wait until
117-
// it is ready to receive the notification.
118-
//
119-
// Releasing `lock` before the call to `notify_one` means that when the
120-
// parked thread wakes it doesn't get woken only to have to wait for us
121-
// to release `lock`.
122-
drop(self.lock.lock().unwrap());
123-
self.cvar.notify_one()
1+
cfg_if::cfg_if! {
2+
if #[cfg(any(target_os = "linux", target_os = "android"))] {
3+
mod linux;
4+
pub use linux::Parker;
5+
} else {
6+
mod generic;
7+
pub use generic::Parker;
1248
}
1259
}

0 commit comments

Comments
 (0)