Skip to content

Unify id-based thread parking implementations #105903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 31, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion library/std/src/sys/sgx/mod.rs
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ pub mod process;
pub mod stdio;
pub mod thread;
pub mod thread_local_key;
pub mod thread_parker;
pub mod thread_parking;
pub mod time;

mod condvar;
8 changes: 4 additions & 4 deletions library/std/src/sys/sgx/thread.rs
Original file line number Diff line number Diff line change
@@ -65,9 +65,9 @@ mod task_queue {
/// execution. The signal is sent once all TLS destructors have finished at
/// which point no new thread locals should be created.
pub mod wait_notify {
use super::super::thread_parker::Parker;
use crate::pin::Pin;
use crate::sync::Arc;
use crate::sys_common::thread_parking::Parker;

pub struct Notifier(Arc<Parker>);

@@ -87,14 +87,14 @@ pub mod wait_notify {
/// called, this will return immediately, otherwise the current thread
/// is blocked until notified.
pub fn wait(self) {
// This is not actually `unsafe`, but it uses the `Parker` API,
// which needs `unsafe` on some platforms.
// SAFETY:
// This is only ever called on one thread.
unsafe { Pin::new(&*self.0).park() }
}
}

pub fn new() -> (Notifier, Waiter) {
let inner = Arc::new(Parker::new_internal());
let inner = Arc::new(Parker::new());
(Notifier(inner.clone()), Waiter(inner))
}
}
107 changes: 0 additions & 107 deletions library/std/src/sys/sgx/thread_parker.rs

This file was deleted.

23 changes: 23 additions & 0 deletions library/std/src/sys/sgx/thread_parking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use super::abi::usercalls;
use crate::io::ErrorKind;
use crate::time::Duration;
use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE};

pub type ThreadId = fortanix_sgx_abi::Tcs;

pub use super::abi::thread::current;

pub fn park(_hint: usize) {
usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap();
}

pub fn park_timeout(dur: Duration, _hint: usize) {
let timeout = u128::min(dur.as_nanos(), WAIT_INDEFINITE as u128 - 1) as u64;
if let Err(e) = usercalls::wait(EV_UNPARK, timeout) {
assert!(matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock))
}
}

pub fn unpark(tid: ThreadId, _hint: usize) {
let _ = usercalls::send(EV_UNPARK, Some(tid));
}
2 changes: 1 addition & 1 deletion library/std/src/sys/unix/mod.rs
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ pub mod stdio;
pub mod thread;
pub mod thread_local_dtor;
pub mod thread_local_key;
pub mod thread_parker;
pub mod thread_parking;
pub mod time;

#[cfg(target_os = "espidf")]
113 changes: 0 additions & 113 deletions library/std/src/sys/unix/thread_parker/netbsd.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ unsafe impl Sync for Parker {}
unsafe impl Send for Parker {}

impl Parker {
pub unsafe fn new(parker: *mut Parker) {
pub unsafe fn new_in_place(parker: *mut Parker) {
let semaphore = dispatch_semaphore_create(0);
assert!(
!semaphore.is_null(),
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ cfg_if::cfg_if! {
pub use darwin::Parker;
} else if #[cfg(target_os = "netbsd")] {
mod netbsd;
pub use netbsd::Parker;
pub use netbsd::{current, park, park_timeout, unpark, ThreadId};
} else {
mod pthread;
pub use pthread::Parker;
52 changes: 52 additions & 0 deletions library/std/src/sys/unix/thread_parking/netbsd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use crate::ffi::{c_int, c_void};
use crate::ptr;
use crate::time::Duration;
use libc::{_lwp_self, clockid_t, lwpid_t, time_t, timespec, CLOCK_MONOTONIC};

extern "C" {
fn ___lwp_park60(
clock_id: clockid_t,
flags: c_int,
ts: *mut timespec,
unpark: lwpid_t,
hint: *const c_void,
unparkhint: *const c_void,
) -> c_int;
fn _lwp_unpark(lwp: lwpid_t, hint: *const c_void) -> c_int;
}

pub type ThreadId = lwpid_t;

#[inline]
pub fn current() -> ThreadId {
unsafe { _lwp_self() }
}

#[inline]
pub fn park(hint: usize) {
unsafe {
___lwp_park60(0, 0, ptr::null_mut(), 0, ptr::invalid(hint), ptr::null());
}
}

pub fn park_timeout(dur: Duration, hint: usize) {
let mut timeout = timespec {
// Saturate so that the operation will definitely time out
// (even if it is after the heat death of the universe).
tv_sec: dur.as_secs().try_into().ok().unwrap_or(time_t::MAX),
tv_nsec: dur.subsec_nanos().into(),
};

// Timeout needs to be mutable since it is modified on NetBSD 9.0 and
// above.
unsafe {
___lwp_park60(CLOCK_MONOTONIC, 0, &mut timeout, 0, ptr::invalid(hint), ptr::null());
}
}

#[inline]
pub fn unpark(tid: ThreadId, hint: usize) {
unsafe {
_lwp_unpark(tid, ptr::invalid(hint));
}
}
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ impl Parker {
///
/// # Safety
/// The constructed parker must never be moved.
pub unsafe fn new(parker: *mut Parker) {
pub unsafe fn new_in_place(parker: *mut Parker) {
// Use the default mutex implementation to allow for simpler initialization.
// This could lead to undefined behaviour when deadlocking. This is avoided
// by not deadlocking. Note in particular the unlocking operation before any
2 changes: 1 addition & 1 deletion library/std/src/sys/windows/mod.rs
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ pub mod stdio;
pub mod thread;
pub mod thread_local_dtor;
pub mod thread_local_key;
pub mod thread_parker;
pub mod thread_parking;
pub mod time;
cfg_if::cfg_if! {
if #[cfg(not(target_vendor = "uwp"))] {
Original file line number Diff line number Diff line change
@@ -97,7 +97,7 @@ const NOTIFIED: i8 = 1;
impl Parker {
/// Construct the Windows parker. The UNIX parker implementation
/// requires this to happen in-place.
pub unsafe fn new(parker: *mut Parker) {
pub unsafe fn new_in_place(parker: *mut Parker) {
parker.write(Self { state: AtomicI8::new(EMPTY) });
}

2 changes: 1 addition & 1 deletion library/std/src/sys_common/mod.rs
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ pub mod process;
pub mod thread;
pub mod thread_info;
pub mod thread_local_dtor;
pub mod thread_parker;
pub mod thread_parking;
pub mod wstr;
pub mod wtf8;

Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ pub struct Parker {
impl Parker {
/// Construct the futex parker. The UNIX parker implementation
/// requires this to happen in-place.
pub unsafe fn new(parker: *mut Parker) {
pub unsafe fn new_in_place(parker: *mut Parker) {
parker.write(Self { state: AtomicU32::new(EMPTY) });
}

Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ pub struct Parker {
impl Parker {
/// Construct the generic parker. The UNIX parker implementation
/// requires this to happen in-place.
pub unsafe fn new(parker: *mut Parker) {
pub unsafe fn new_in_place(parker: *mut Parker) {
parker.write(Parker {
state: AtomicUsize::new(EMPTY),
lock: Mutex::new(()),
108 changes: 108 additions & 0 deletions library/std/src/sys_common/thread_parking/id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//! Thread parking using thread ids.
//!
//! Some platforms (notably NetBSD) have thread parking primitives whose semantics
//! match those offered by `thread::park`, with the difference that the thread to
//! be unparked is referenced by a platform-specific thread id. Since the thread
//! parker is constructed before that id is known, an atomic state variable is used
//! to manage the park state and propagate the thread id. This also avoids platform
//! calls in the case where `unpark` is called before `park`.
use crate::cell::UnsafeCell;
use crate::pin::Pin;
use crate::sync::atomic::{
fence, AtomicI8,
Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::thread_parking::{current, park, park_timeout, unpark, ThreadId};
use crate::time::Duration;

pub struct Parker {
state: AtomicI8,
tid: UnsafeCell<Option<ThreadId>>,
}

const PARKED: i8 = -1;
const EMPTY: i8 = 0;
const NOTIFIED: i8 = 1;

impl Parker {
pub fn new() -> Parker {
Parker { state: AtomicI8::new(EMPTY), tid: UnsafeCell::new(None) }
}

/// Create a new thread parker. UNIX requires this to happen in-place.
pub unsafe fn new_in_place(parker: *mut Parker) {
parker.write(Parker::new())
}

/// # Safety
/// * must always be called from the same thread
/// * must be called before the state is set to PARKED
unsafe fn init_tid(&self) {
// The field is only ever written to from this thread, so we don't need
// synchronization to read it here.
if self.tid.get().read().is_none() {
// Because this point is only reached once, before the state is set
// to PARKED for the first time, the non-atomic write here can not
// conflict with reads by other threads.
self.tid.get().write(Some(current()));
// Ensure that the write can be observed by all threads reading the
// state. Synchronizes with the acquire barrier in `unpark`.
fence(Release);
}
}

pub unsafe fn park(self: Pin<&Self>) {
self.init_tid();

// Changes NOTIFIED to EMPTY and EMPTY to PARKED.
let mut state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
if state == PARKED {
// Loop to guard against spurious wakeups.
while state == PARKED {
park(self.state.as_mut_ptr().addr());
state = self.state.load(Acquire);
}

// Since the state change has already been observed with acquire
// ordering, the state can be reset with a relaxed store instead
// of a swap.
self.state.store(EMPTY, Relaxed);
}
}

pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
self.init_tid();

let state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
if state == PARKED {
park_timeout(dur, self.state.as_mut_ptr().addr());
// Swap to ensure that we observe all state changes with acquire
// ordering, even if the state has been changed after the timeout
// occured.
self.state.swap(EMPTY, Acquire);
}
}

pub fn unpark(self: Pin<&Self>) {
let state = self.state.swap(NOTIFIED, Release);
if state == PARKED {
// Synchronize with the release fence in `init_tid` to observe the
// write to `tid`.
fence(Acquire);
// # Safety
// The thread id is initialized before the state is set to `PARKED`
// for the first time and is not written to from that point on
// (negating the need for an atomic read).
let tid = unsafe { self.tid.get().read().unwrap_unchecked() };
// It is possible that the waiting thread woke up because of a timeout
// and terminated before this call is made. This call then returns an
// error or wakes up an unrelated thread. The platform API and
// environment does allow this, however.
unpark(tid, self.state.as_mut_ptr().addr());
}
}
}

unsafe impl Send for Parker {}
unsafe impl Sync for Parker {}
Original file line number Diff line number Diff line change
@@ -11,13 +11,17 @@ cfg_if::cfg_if! {
))] {
mod futex;
pub use futex::Parker;
} else if #[cfg(any(
target_os = "netbsd",
all(target_vendor = "fortanix", target_env = "sgx"),
))] {
mod id;
pub use id::Parker;
} else if #[cfg(target_os = "solid_asp3")] {
mod wait_flag;
pub use wait_flag::Parker;
} else if #[cfg(any(windows, target_family = "unix"))] {
pub use crate::sys::thread_parker::Parker;
} else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
pub use crate::sys::thread_parker::Parker;
pub use crate::sys::thread_parking::Parker;
} else {
mod generic;
pub use generic::Parker;
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ pub struct Parker {
impl Parker {
/// Construct a parker for the current thread. The UNIX parker
/// implementation requires this to happen in-place.
pub unsafe fn new(parker: *mut Parker) {
pub unsafe fn new_in_place(parker: *mut Parker) {
parker.write(Parker { state: AtomicI8::new(EMPTY), wait_flag: WaitFlag::new() })
}

4 changes: 2 additions & 2 deletions library/std/src/thread/mod.rs
Original file line number Diff line number Diff line change
@@ -173,7 +173,7 @@ use crate::sync::Arc;
use crate::sys::thread as imp;
use crate::sys_common::thread;
use crate::sys_common::thread_info;
use crate::sys_common::thread_parker::Parker;
use crate::sys_common::thread_parking::Parker;
use crate::sys_common::{AsInner, IntoInner};
use crate::time::Duration;

@@ -1216,7 +1216,7 @@ impl Thread {
let ptr = Arc::get_mut_unchecked(&mut arc).as_mut_ptr();
addr_of_mut!((*ptr).name).write(name);
addr_of_mut!((*ptr).id).write(ThreadId::new());
Parker::new(addr_of_mut!((*ptr).parker));
Parker::new_in_place(addr_of_mut!((*ptr).parker));
Pin::new_unchecked(arc.assume_init())
};