Skip to content

Commit 9be36f8

Browse files
committed
Add deadlock detection
1 parent ebafbae commit 9be36f8

File tree

4 files changed

+158
-12
lines changed

4 files changed

+158
-12
lines changed

rayon-core/src/lib.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@
6161
//! conflicting requirements will need to be resolved before the build will
6262
//! succeed.
6363
64-
#![deny(missing_debug_implementations)]
65-
#![deny(missing_docs)]
66-
#![deny(unreachable_pub)]
6764
#![warn(rust_2018_idioms)]
6865

6966
use std::any::Any;
@@ -99,6 +96,7 @@ pub mod tlv;
9996
pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
10097
pub use self::join::{join, join_context};
10198
pub use self::registry::ThreadBuilder;
99+
pub use self::registry::{mark_blocked, mark_unblocked, Registry};
102100
pub use self::scope::{in_place_scope, scope, Scope};
103101
pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
104102
pub use self::spawn::{spawn, spawn_fifo};
@@ -189,6 +187,9 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
189187
/// The stack size for the created worker threads
190188
stack_size: Option<usize>,
191189

190+
/// Closure invoked on deadlock.
191+
deadlock_handler: Option<Box<DeadlockHandler>>,
192+
192193
/// Closure invoked on worker thread start.
193194
start_handler: Option<Box<StartHandler>>,
194195

@@ -217,6 +218,9 @@ pub struct Configuration {
217218
/// may be invoked multiple times in parallel.
218219
type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
219220

221+
/// The type for a closure that gets invoked when the Rayon thread pool deadlocks
222+
type DeadlockHandler = dyn Fn() + Send + Sync;
223+
220224
/// The type for a closure that gets invoked when a thread starts. The
221225
/// closure is passed the index of the thread on which it is invoked.
222226
/// Note that this same closure may be invoked multiple times in parallel.
@@ -237,6 +241,7 @@ impl Default for ThreadPoolBuilder {
237241
stack_size: None,
238242
start_handler: None,
239243
exit_handler: None,
244+
deadlock_handler: None,
240245
spawn_handler: DefaultSpawn,
241246
breadth_first: false,
242247
}
@@ -454,6 +459,7 @@ impl<S> ThreadPoolBuilder<S> {
454459
stack_size: self.stack_size,
455460
start_handler: self.start_handler,
456461
exit_handler: self.exit_handler,
462+
deadlock_handler: self.deadlock_handler,
457463
breadth_first: self.breadth_first,
458464
}
459465
}
@@ -612,6 +618,20 @@ impl<S> ThreadPoolBuilder<S> {
612618
self.breadth_first
613619
}
614620

621+
/// Takes the current deadlock callback, leaving `None`.
622+
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
623+
self.deadlock_handler.take()
624+
}
625+
626+
/// Set a callback to be invoked on current deadlock.
627+
pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> Self
628+
where
629+
H: Fn() + Send + Sync + 'static,
630+
{
631+
self.deadlock_handler = Some(Box::new(deadlock_handler));
632+
self
633+
}
634+
615635
/// Takes the current thread start callback, leaving `None`.
616636
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
617637
self.start_handler.take()
@@ -778,6 +798,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
778798
ref get_thread_name,
779799
ref panic_handler,
780800
ref stack_size,
801+
ref deadlock_handler,
781802
ref start_handler,
782803
ref exit_handler,
783804
spawn_handler: _,
@@ -794,6 +815,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
794815
}
795816
let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
796817
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
818+
let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
797819
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
798820
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
799821

@@ -802,6 +824,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
802824
.field("get_thread_name", &get_thread_name)
803825
.field("panic_handler", &panic_handler)
804826
.field("stack_size", &stack_size)
827+
.field("deadlock_handler", &deadlock_handler)
805828
.field("start_handler", &start_handler)
806829
.field("exit_handler", &exit_handler)
807830
.field("breadth_first", &breadth_first)

rayon-core/src/registry.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use crate::sleep::Sleep;
66
use crate::tlv::Tlv;
77
use crate::unwind;
88
use crate::{
9-
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
10-
Yield,
9+
DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError,
10+
ThreadPoolBuilder, Yield,
1111
};
1212
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
1313
use std::cell::Cell;
@@ -130,13 +130,14 @@ where
130130
}
131131
}
132132

133-
pub(super) struct Registry {
133+
pub struct Registry {
134134
logger: Logger,
135135
thread_infos: Vec<ThreadInfo>,
136136
sleep: Sleep,
137137
injected_jobs: Injector<JobRef>,
138138
broadcasts: Mutex<Vec<Worker<JobRef>>>,
139139
panic_handler: Option<Box<PanicHandler>>,
140+
deadlock_handler: Option<Box<DeadlockHandler>>,
140141
start_handler: Option<Box<StartHandler>>,
141142
exit_handler: Option<Box<ExitHandler>>,
142143

@@ -290,6 +291,7 @@ impl Registry {
290291
broadcasts: Mutex::new(broadcasts),
291292
terminate_count: AtomicUsize::new(1),
292293
panic_handler: builder.take_panic_handler(),
294+
deadlock_handler: builder.take_deadlock_handler(),
293295
start_handler: builder.take_start_handler(),
294296
exit_handler: builder.take_exit_handler(),
295297
});
@@ -317,7 +319,7 @@ impl Registry {
317319
Ok(registry)
318320
}
319321

320-
pub(super) fn current() -> Arc<Registry> {
322+
pub fn current() -> Arc<Registry> {
321323
unsafe {
322324
let worker_thread = WorkerThread::current();
323325
let registry = if worker_thread.is_null() {
@@ -624,6 +626,24 @@ impl Registry {
624626
}
625627
}
626628

629+
/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
630+
/// if no other worker thread is active
631+
#[inline]
632+
pub fn mark_blocked() {
633+
let worker_thread = WorkerThread::current();
634+
assert!(!worker_thread.is_null());
635+
unsafe {
636+
let registry = &(*worker_thread).registry;
637+
registry.sleep.mark_blocked(&registry.deadlock_handler)
638+
}
639+
}
640+
641+
/// Mark a previously blocked Rayon worker thread as unblocked
642+
#[inline]
643+
pub fn mark_unblocked(registry: &Registry) {
644+
registry.sleep.mark_unblocked()
645+
}
646+
627647
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
628648
pub(super) struct RegistryId {
629649
addr: usize,
@@ -823,9 +843,12 @@ impl WorkerThread {
823843
self.execute(job);
824844
idle_state = self.registry.sleep.start_looking(self.index, latch);
825845
} else {
826-
self.registry
827-
.sleep
828-
.no_work_found(&mut idle_state, latch, || self.has_injected_job())
846+
self.registry.sleep.no_work_found(
847+
&mut idle_state,
848+
latch,
849+
|| self.has_injected_job(),
850+
&self.registry.deadlock_handler,
851+
)
829852
}
830853
}
831854

rayon-core/src/sleep/README.md

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,37 @@ Meanwhile, the sleepy thread does the following:
216216
Either PushFence or SleepFence must come first:
217217

218218
* If PushFence comes first, then PushJob must be visible to ReadJob.
219-
* If SleepFence comes first, then IncSleepers is visible to ReadSleepers.
219+
* If SleepFence comes first, then IncSleepers is visible to ReadSleepers.
220+
221+
# Deadlock detection
222+
223+
This module tracks a number of variables in order to detect deadlocks due to user code blocking.
224+
These variables are stored in the `SleepData` struct which itself is kept behind a mutex.
225+
It contains the following fields:
226+
- `worker_count` - The number of threads in the thread pool.
227+
- `active_threads` - The number of threads in the thread pool which are running
228+
and aren't blocked in user code or sleeping.
229+
- `blocked_threads` - The number of threads which are blocked in user code.
230+
This doesn't include threads blocked by Rayon.
231+
232+
User code can indicate blocking by calling `mark_blocked` before blocking and
233+
calling `mark_unblocked` before unblocking a thread.
234+
This will adjust `active_threads` and `blocked_threads` accordingly.
235+
236+
When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to
237+
`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked
238+
by user code.
239+
240+
A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0.
241+
If we ignored `blocked_threads` we would have a deadlock
242+
immediately when creating the thread pool.
243+
We would also deadlock once the thread pool ran out of work.
244+
It is not possible for Rayon itself to deadlock.
245+
Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks.
246+
247+
We check for the deadlock condition when
248+
threads fall asleep in `mark_unblocked` and in `Sleep::sleep`.
249+
If there's a deadlock detected we call the user provided deadlock handler while we hold the
250+
lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and
251+
`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread.
252+
Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep.

rayon-core/src/sleep/mod.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use crate::latch::CoreLatch;
55
use crate::log::Event::*;
66
use crate::log::Logger;
7+
use crate::DeadlockHandler;
78
use crossbeam_utils::CachePadded;
89
use std::sync::atomic::Ordering;
910
use std::sync::{Condvar, Mutex};
@@ -14,6 +15,29 @@ mod counters;
1415
pub(crate) use self::counters::THREADS_MAX;
1516
use self::counters::{AtomicCounters, JobsEventCounter};
1617

18+
struct SleepData {
19+
/// The number of threads in the thread pool.
20+
worker_count: usize,
21+
22+
/// The number of threads in the thread pool which are running and
23+
/// aren't blocked in user code or sleeping.
24+
active_threads: usize,
25+
26+
/// The number of threads which are blocked in user code.
27+
/// This doesn't include threads blocked by this module.
28+
blocked_threads: usize,
29+
}
30+
31+
impl SleepData {
32+
/// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
33+
#[inline]
34+
pub fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
35+
if self.active_threads == 0 && self.blocked_threads > 0 {
36+
(deadlock_handler.as_ref().unwrap())();
37+
}
38+
}
39+
}
40+
1741
/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
1842
/// of workers. It has callbacks that are invoked periodically at significant events,
1943
/// such as when workers are looping and looking for work, when latches are set, or when
@@ -29,6 +53,8 @@ pub(super) struct Sleep {
2953
worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
3054

3155
counters: AtomicCounters,
56+
57+
data: Mutex<SleepData>,
3258
}
3359

3460
/// An instance of this struct is created when a thread becomes idle.
@@ -68,9 +94,38 @@ impl Sleep {
6894
logger,
6995
worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
7096
counters: AtomicCounters::new(),
97+
data: Mutex::new(SleepData {
98+
worker_count: n_threads,
99+
active_threads: n_threads,
100+
blocked_threads: 0,
101+
}),
71102
}
72103
}
73104

105+
/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
106+
/// if no other worker thread is active
107+
#[inline]
108+
pub fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
109+
let mut data = self.data.lock().unwrap();
110+
debug_assert!(data.active_threads > 0);
111+
debug_assert!(data.blocked_threads < data.worker_count);
112+
debug_assert!(data.active_threads > 0);
113+
data.active_threads -= 1;
114+
data.blocked_threads += 1;
115+
116+
data.deadlock_check(deadlock_handler);
117+
}
118+
119+
/// Mark a previously blocked Rayon worker thread as unblocked
120+
#[inline]
121+
pub fn mark_unblocked(&self) {
122+
let mut data = self.data.lock().unwrap();
123+
debug_assert!(data.active_threads < data.worker_count);
124+
debug_assert!(data.blocked_threads > 0);
125+
data.active_threads += 1;
126+
data.blocked_threads -= 1;
127+
}
128+
74129
#[inline]
75130
pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState {
76131
self.logger.log(|| ThreadIdle {
@@ -106,6 +161,7 @@ impl Sleep {
106161
idle_state: &mut IdleState,
107162
latch: &CoreLatch,
108163
has_injected_jobs: impl FnOnce() -> bool,
164+
deadlock_handler: &Option<Box<DeadlockHandler>>,
109165
) {
110166
if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
111167
thread::yield_now();
@@ -119,7 +175,7 @@ impl Sleep {
119175
thread::yield_now();
120176
} else {
121177
debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
122-
self.sleep(idle_state, latch, has_injected_jobs);
178+
self.sleep(idle_state, latch, has_injected_jobs, deadlock_handler);
123179
}
124180
}
125181

@@ -142,6 +198,7 @@ impl Sleep {
142198
idle_state: &mut IdleState,
143199
latch: &CoreLatch,
144200
has_injected_jobs: impl FnOnce() -> bool,
201+
deadlock_handler: &Option<Box<DeadlockHandler>>,
145202
) {
146203
let worker_index = idle_state.worker_index;
147204

@@ -215,6 +272,13 @@ impl Sleep {
215272
// the one that wakes us.)
216273
self.counters.sub_sleeping_thread();
217274
} else {
275+
{
276+
// Decrement the number of active threads and check for a deadlock
277+
let mut data = self.data.lock().unwrap();
278+
data.active_threads -= 1;
279+
data.deadlock_check(deadlock_handler);
280+
}
281+
218282
// If we don't see an injected job (the normal case), then flag
219283
// ourselves as asleep and wait till we are notified.
220284
//
@@ -372,6 +436,9 @@ impl Sleep {
372436
// do.
373437
self.counters.sub_sleeping_thread();
374438

439+
// Increment the number of active threads
440+
self.data.lock().unwrap().active_threads += 1;
441+
375442
self.logger.log(|| ThreadNotify { worker: index });
376443

377444
true

0 commit comments

Comments
 (0)