Skip to content

Commit a730baa

Browse files
Zoxccuviper
authored andcommitted
Add deadlock detection
1 parent 1e1dfc5 commit a730baa

File tree

4 files changed

+161
-16
lines changed

4 files changed

+161
-16
lines changed

rayon-core/src/lib.rs

+26-3
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@
2020
//! succeed.
2121
2222
#![doc(html_root_url = "https://docs.rs/rayon-core/1.5")]
23-
#![deny(missing_debug_implementations)]
24-
#![deny(missing_docs)]
25-
#![deny(unreachable_pub)]
2623

2724
use std::any::Any;
2825
use std::env;
@@ -71,6 +68,7 @@ pub mod tlv;
7168
pub mod internal;
7269
pub use join::{join, join_context};
7370
pub use registry::ThreadBuilder;
71+
pub use registry::{mark_blocked, mark_unblocked, Registry};
7472
pub use scope::{scope, Scope};
7573
pub use scope::{scope_fifo, ScopeFifo};
7674
pub use spawn::{spawn, spawn_fifo};
@@ -149,6 +147,9 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
149147
/// The stack size for the created worker threads
150148
stack_size: Option<usize>,
151149

150+
/// Closure invoked on deadlock.
151+
deadlock_handler: Option<Box<DeadlockHandler>>,
152+
152153
/// Closure invoked on worker thread start.
153154
start_handler: Option<Box<StartHandler>>,
154155

@@ -179,6 +180,9 @@ pub struct Configuration {
179180
/// may be invoked multiple times in parallel.
180181
type PanicHandler = Fn(Box<Any + Send>) + Send + Sync;
181182

183+
/// The type for a closure that gets invoked when the Rayon thread pool deadlocks
184+
type DeadlockHandler = Fn() + Send + Sync;
185+
182186
/// The type for a closure that gets invoked when a thread starts. The
183187
/// closure is passed the index of the thread on which it is invoked.
184188
/// Note that this same closure may be invoked multiple times in parallel.
@@ -200,6 +204,7 @@ impl Default for ThreadPoolBuilder {
200204
start_handler: None,
201205
exit_handler: None,
202206
main_handler: None,
207+
deadlock_handler: None,
203208
spawn_handler: DefaultSpawn,
204209
breadth_first: false,
205210
}
@@ -393,6 +398,7 @@ impl<S> ThreadPoolBuilder<S> {
393398
start_handler: self.start_handler,
394399
exit_handler: self.exit_handler,
395400
main_handler: self.main_handler,
401+
deadlock_handler: self.deadlock_handler,
396402
breadth_first: self.breadth_first,
397403
}
398404
}
@@ -551,6 +557,20 @@ impl<S> ThreadPoolBuilder<S> {
551557
self.breadth_first
552558
}
553559

560+
/// Takes the current deadlock callback, leaving `None`.
561+
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
562+
self.deadlock_handler.take()
563+
}
564+
565+
/// Set a callback to be invoked on current deadlock.
566+
pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> Self
567+
where
568+
H: Fn() + Send + Sync + 'static,
569+
{
570+
self.deadlock_handler = Some(Box::new(deadlock_handler));
571+
self
572+
}
573+
554574
/// Takes the current thread start callback, leaving `None`.
555575
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
556576
self.start_handler.take()
@@ -722,6 +742,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
722742
ref get_thread_name,
723743
ref panic_handler,
724744
ref stack_size,
745+
ref deadlock_handler,
725746
ref start_handler,
726747
ref main_handler,
727748
ref exit_handler,
@@ -739,6 +760,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
739760
}
740761
let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
741762
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
763+
let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
742764
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
743765
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
744766
let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder);
@@ -748,6 +770,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
748770
.field("get_thread_name", &get_thread_name)
749771
.field("panic_handler", &panic_handler)
750772
.field("stack_size", &stack_size)
773+
.field("deadlock_handler", &deadlock_handler)
751774
.field("start_handler", &start_handler)
752775
.field("exit_handler", &exit_handler)
753776
.field("main_handler", &main_handler)

rayon-core/src/registry.rs

+29-5
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use std::usize;
2525
use unwind;
2626
use util::leak;
2727
use {
28-
ErrorKind, ExitHandler, MainHandler, PanicHandler, StartHandler, ThreadPoolBuildError,
29-
ThreadPoolBuilder,
28+
DeadlockHandler, ErrorKind, ExitHandler, MainHandler, PanicHandler, StartHandler,
29+
ThreadPoolBuildError, ThreadPoolBuilder,
3030
};
3131

3232
/// Thread builder used for customization via
@@ -136,11 +136,12 @@ where
136136
}
137137
}
138138

139-
pub(super) struct Registry {
139+
pub struct Registry {
140140
thread_infos: Vec<ThreadInfo>,
141141
sleep: Sleep,
142142
injected_jobs: SegQueue<JobRef>,
143143
panic_handler: Option<Box<PanicHandler>>,
144+
deadlock_handler: Option<Box<DeadlockHandler>>,
144145
start_handler: Option<Box<StartHandler>>,
145146
exit_handler: Option<Box<ExitHandler>>,
146147
main_handler: Option<Box<MainHandler>>,
@@ -238,10 +239,11 @@ impl Registry {
238239

239240
let registry = Arc::new(Registry {
240241
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
241-
sleep: Sleep::new(),
242+
sleep: Sleep::new(n_threads),
242243
injected_jobs: SegQueue::new(),
243244
terminate_latch: CountLatch::new(),
244245
panic_handler: builder.take_panic_handler(),
246+
deadlock_handler: builder.take_deadlock_handler(),
245247
start_handler: builder.take_start_handler(),
246248
main_handler: builder.take_main_handler(),
247249
exit_handler: builder.take_exit_handler(),
@@ -563,6 +565,24 @@ impl Registry {
563565
}
564566
}
565567

568+
/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
569+
/// if no other worker thread is active
570+
#[inline]
571+
pub fn mark_blocked() {
572+
let worker_thread = WorkerThread::current();
573+
assert!(!worker_thread.is_null());
574+
unsafe {
575+
let registry = &(*worker_thread).registry;
576+
registry.sleep.mark_blocked(&registry.deadlock_handler)
577+
}
578+
}
579+
580+
/// Mark a previously blocked Rayon worker thread as unblocked
581+
#[inline]
582+
pub fn mark_unblocked(registry: &Registry) {
583+
registry.sleep.mark_unblocked()
584+
}
585+
566586
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
567587
pub(super) struct RegistryId {
568588
addr: usize,
@@ -723,7 +743,11 @@ impl WorkerThread {
723743
yields = self.registry.sleep.work_found(self.index, yields);
724744
self.execute(job);
725745
} else {
726-
yields = self.registry.sleep.no_work_found(self.index, yields);
746+
yields = self.registry.sleep.no_work_found(
747+
self.index,
748+
yields,
749+
&self.registry.deadlock_handler,
750+
);
727751
}
728752
}
729753

rayon-core/src/sleep/README.md

+33
Original file line numberDiff line numberDiff line change
@@ -386,3 +386,36 @@ some of them were hit hard:
386386
- 8-10% overhead on nbody-parreduce
387387
- 35% overhead on increment-all
388388
- 245% overhead on join-recursively
389+
390+
# Deadlock detection
391+
392+
This module tracks a number of variables in order to detect deadlocks due to user code blocking.
393+
These variables are stored in the `SleepData` struct which itself is kept behind a mutex.
394+
It contains the following fields:
395+
- `worker_count` - The number of threads in the thread pool.
396+
- `active_threads` - The number of threads in the thread pool which are running
397+
and aren't blocked in user code or sleeping.
398+
- `blocked_threads` - The number of threads which are blocked in user code.
399+
This doesn't include threads blocked by Rayon.
400+
401+
User code can indicate blocking by calling `mark_blocked` before blocking and
402+
calling `mark_unblocked` before unblocking a thread.
403+
This will adjust `active_threads` and `blocked_threads` accordingly.
404+
405+
When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to
406+
`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked
407+
by user code.
408+
409+
A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0.
410+
If we ignored `blocked_threads` we would have a deadlock
411+
immediately when creating the thread pool.
412+
We would also deadlock once the thread pool ran out of work.
413+
It is not possible for Rayon itself to deadlock.
414+
Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks.
415+
416+
We check for the deadlock condition when
417+
threads fall asleep in `mark_unblocked` and in `Sleep::sleep`.
418+
If there's a deadlock detected we call the user provided deadlock handler while we hold the
419+
lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and
420+
`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread.
421+
Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep.

rayon-core/src/sleep/mod.rs

+73-8
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,34 @@ use std::sync::atomic::{AtomicUsize, Ordering};
66
use std::sync::{Condvar, Mutex};
77
use std::thread;
88
use std::usize;
9+
use DeadlockHandler;
10+
11+
struct SleepData {
12+
/// The number of threads in the thread pool.
13+
worker_count: usize,
14+
15+
/// The number of threads in the thread pool which are running and
16+
/// aren't blocked in user code or sleeping.
17+
active_threads: usize,
18+
19+
/// The number of threads which are blocked in user code.
20+
/// This doesn't include threads blocked by this module.
21+
blocked_threads: usize,
22+
}
23+
24+
impl SleepData {
25+
/// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
26+
#[inline]
27+
pub fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
28+
if self.active_threads == 0 && self.blocked_threads > 0 {
29+
(deadlock_handler.as_ref().unwrap())();
30+
}
31+
}
32+
}
933

1034
pub(super) struct Sleep {
1135
state: AtomicUsize,
12-
data: Mutex<()>,
36+
data: Mutex<SleepData>,
1337
tickle: Condvar,
1438
}
1539

@@ -20,14 +44,42 @@ const ROUNDS_UNTIL_SLEEPY: usize = 32;
2044
const ROUNDS_UNTIL_ASLEEP: usize = 64;
2145

2246
impl Sleep {
23-
pub(super) fn new() -> Sleep {
47+
pub(super) fn new(worker_count: usize) -> Sleep {
2448
Sleep {
2549
state: AtomicUsize::new(AWAKE),
26-
data: Mutex::new(()),
50+
data: Mutex::new(SleepData {
51+
worker_count,
52+
active_threads: worker_count,
53+
blocked_threads: 0,
54+
}),
2755
tickle: Condvar::new(),
2856
}
2957
}
3058

59+
/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
60+
/// if no other worker thread is active
61+
#[inline]
62+
pub fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
63+
let mut data = self.data.lock().unwrap();
64+
debug_assert!(data.active_threads > 0);
65+
debug_assert!(data.blocked_threads < data.worker_count);
66+
debug_assert!(data.active_threads > 0);
67+
data.active_threads -= 1;
68+
data.blocked_threads += 1;
69+
70+
data.deadlock_check(deadlock_handler);
71+
}
72+
73+
/// Mark a previously blocked Rayon worker thread as unblocked
74+
#[inline]
75+
pub fn mark_unblocked(&self) {
76+
let mut data = self.data.lock().unwrap();
77+
debug_assert!(data.active_threads < data.worker_count);
78+
debug_assert!(data.blocked_threads > 0);
79+
data.active_threads += 1;
80+
data.blocked_threads -= 1;
81+
}
82+
3183
fn anyone_sleeping(&self, state: usize) -> bool {
3284
state & SLEEPING != 0
3385
}
@@ -61,7 +113,12 @@ impl Sleep {
61113
}
62114

63115
#[inline]
64-
pub(super) fn no_work_found(&self, worker_index: usize, yields: usize) -> usize {
116+
pub(super) fn no_work_found(
117+
&self,
118+
worker_index: usize,
119+
yields: usize,
120+
deadlock_handler: &Option<Box<DeadlockHandler>>,
121+
) -> usize {
65122
log!(DidNotFindWork {
66123
worker: worker_index,
67124
yields: yields,
@@ -88,7 +145,7 @@ impl Sleep {
88145
}
89146
} else {
90147
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
91-
self.sleep(worker_index);
148+
self.sleep(worker_index, deadlock_handler);
92149
0
93150
}
94151
}
@@ -122,7 +179,10 @@ impl Sleep {
122179
old_state: old_state,
123180
});
124181
if self.anyone_sleeping(old_state) {
125-
let _data = self.data.lock().unwrap();
182+
let mut data = self.data.lock().unwrap();
183+
// Set the active threads to the number of workers,
184+
// excluding threads blocked by the user since we won't wake those up
185+
data.active_threads = data.worker_count - data.blocked_threads;
126186
self.tickle.notify_all();
127187
}
128188
}
@@ -188,7 +248,7 @@ impl Sleep {
188248
self.worker_is_sleepy(state, worker_index)
189249
}
190250

191-
fn sleep(&self, worker_index: usize) {
251+
fn sleep(&self, worker_index: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) {
192252
loop {
193253
// Acquire here suffices. If we observe that the current worker is still
194254
// sleepy, then in fact we know that no writes have occurred, and anyhow
@@ -235,7 +295,7 @@ impl Sleep {
235295
// reason for the `compare_exchange` to fail is if an
236296
// awaken comes, in which case the next cycle around
237297
// the loop will just return.
238-
let data = self.data.lock().unwrap();
298+
let mut data = self.data.lock().unwrap();
239299

240300
// This must be SeqCst on success because we want to
241301
// ensure:
@@ -264,6 +324,11 @@ impl Sleep {
264324
log!(FellAsleep {
265325
worker: worker_index
266326
});
327+
328+
// Decrement the number of active threads and check for a deadlock
329+
data.active_threads -= 1;
330+
data.deadlock_check(deadlock_handler);
331+
267332
let _ = self.tickle.wait(data).unwrap();
268333
log!(GotAwoken {
269334
worker: worker_index

0 commit comments

Comments
 (0)