Skip to content

Commit c21790c

Browse files
Zoxccuviper
authored andcommitted
Add callbacks for when threads start and stop doing work
1 parent a730baa commit c21790c

File tree

3 files changed

+91
-12
lines changed

3 files changed

+91
-12
lines changed

rayon-core/src/lib.rs

+52
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,12 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
162162
/// Closure invoked on worker thread start.
163163
main_handler: Option<Box<MainHandler>>,
164164

165+
/// Closure invoked when starting computations in a thread.
166+
acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
167+
168+
/// Closure invoked when blocking in a thread.
169+
release_thread_handler: Option<Box<ReleaseThreadHandler>>,
170+
165171
/// If false, worker threads will execute spawned jobs in a
166172
/// "depth-first" fashion. If true, they will do a "breadth-first"
167173
/// fashion. Depth-first is the default.
@@ -205,6 +211,8 @@ impl Default for ThreadPoolBuilder {
205211
exit_handler: None,
206212
main_handler: None,
207213
deadlock_handler: None,
214+
acquire_thread_handler: None,
215+
release_thread_handler: None,
208216
spawn_handler: DefaultSpawn,
209217
breadth_first: false,
210218
}
@@ -217,6 +225,14 @@ impl Default for ThreadPoolBuilder {
217225
/// Note that this same closure may be invoked multiple times in parallel.
218226
type MainHandler = Fn(usize, &mut FnMut()) + Send + Sync;
219227

228+
/// The type for a closure that gets invoked before starting computations in a thread.
229+
/// Note that this same closure may be invoked multiple times in parallel.
230+
type AcquireThreadHandler = Fn() + Send + Sync;
231+
232+
/// The type for a closure that gets invoked before blocking in a thread.
233+
/// Note that this same closure may be invoked multiple times in parallel.
234+
type ReleaseThreadHandler = Fn() + Send + Sync;
235+
220236
impl ThreadPoolBuilder {
221237
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
222238
pub fn new() -> Self {
@@ -399,6 +415,8 @@ impl<S> ThreadPoolBuilder<S> {
399415
exit_handler: self.exit_handler,
400416
main_handler: self.main_handler,
401417
deadlock_handler: self.deadlock_handler,
418+
acquire_thread_handler: self.acquire_thread_handler,
419+
release_thread_handler: self.release_thread_handler,
402420
breadth_first: self.breadth_first,
403421
}
404422
}
@@ -557,6 +575,34 @@ impl<S> ThreadPoolBuilder<S> {
557575
self.breadth_first
558576
}
559577

578+
/// Takes the current acquire thread callback, leaving `None`.
579+
fn take_acquire_thread_handler(&mut self) -> Option<Box<AcquireThreadHandler>> {
580+
self.acquire_thread_handler.take()
581+
}
582+
583+
/// Set a callback to be invoked when starting computations in a thread.
584+
pub fn acquire_thread_handler<H>(mut self, acquire_thread_handler: H) -> Self
585+
where
586+
H: Fn() + Send + Sync + 'static,
587+
{
588+
self.acquire_thread_handler = Some(Box::new(acquire_thread_handler));
589+
self
590+
}
591+
592+
/// Takes the current release thread callback, leaving `None`.
593+
fn take_release_thread_handler(&mut self) -> Option<Box<ReleaseThreadHandler>> {
594+
self.release_thread_handler.take()
595+
}
596+
597+
/// Set a callback to be invoked when blocking in thread.
598+
pub fn release_thread_handler<H>(mut self, release_thread_handler: H) -> Self
599+
where
600+
H: Fn() + Send + Sync + 'static,
601+
{
602+
self.release_thread_handler = Some(Box::new(release_thread_handler));
603+
self
604+
}
605+
560606
/// Takes the current deadlock callback, leaving `None`.
561607
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
562608
self.deadlock_handler.take()
@@ -746,6 +792,8 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
746792
ref start_handler,
747793
ref main_handler,
748794
ref exit_handler,
795+
ref acquire_thread_handler,
796+
ref release_thread_handler,
749797
spawn_handler: _,
750798
ref breadth_first,
751799
} = *self;
@@ -764,6 +812,8 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
764812
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
765813
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
766814
let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder);
815+
let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder);
816+
let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder);
767817

768818
f.debug_struct("ThreadPoolBuilder")
769819
.field("num_threads", num_threads)
@@ -774,6 +824,8 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
774824
.field("start_handler", &start_handler)
775825
.field("exit_handler", &exit_handler)
776826
.field("main_handler", &main_handler)
827+
.field("acquire_thread_handler", &acquire_thread_handler)
828+
.field("release_thread_handler", &release_thread_handler)
777829
.field("breadth_first", &breadth_first)
778830
.finish()
779831
}

rayon-core/src/registry.rs

+31-8
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use std::usize;
2525
use unwind;
2626
use util::leak;
2727
use {
28-
DeadlockHandler, ErrorKind, ExitHandler, MainHandler, PanicHandler, StartHandler,
29-
ThreadPoolBuildError, ThreadPoolBuilder,
28+
AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, MainHandler, PanicHandler,
29+
ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
3030
};
3131

3232
/// Thread builder used for customization via
@@ -141,10 +141,12 @@ pub struct Registry {
141141
sleep: Sleep,
142142
injected_jobs: SegQueue<JobRef>,
143143
panic_handler: Option<Box<PanicHandler>>,
144-
deadlock_handler: Option<Box<DeadlockHandler>>,
144+
pub(crate) deadlock_handler: Option<Box<DeadlockHandler>>,
145145
start_handler: Option<Box<StartHandler>>,
146146
exit_handler: Option<Box<ExitHandler>>,
147147
main_handler: Option<Box<MainHandler>>,
148+
pub(crate) acquire_thread_handler: Option<Box<AcquireThreadHandler>>,
149+
pub(crate) release_thread_handler: Option<Box<ReleaseThreadHandler>>,
148150

149151
// When this latch reaches 0, it means that all work on this
150152
// registry must be complete. This is ensured in the following ways:
@@ -247,6 +249,8 @@ impl Registry {
247249
start_handler: builder.take_start_handler(),
248250
main_handler: builder.take_main_handler(),
249251
exit_handler: builder.take_exit_handler(),
252+
acquire_thread_handler: builder.take_acquire_thread_handler(),
253+
release_thread_handler: builder.take_release_thread_handler(),
250254
});
251255

252256
// If we return early or panic, make sure to terminate existing threads.
@@ -355,9 +359,23 @@ impl Registry {
355359
/// Waits for the worker threads to stop. This is used for testing
356360
/// -- so we can check that termination actually works.
357361
pub(super) fn wait_until_stopped(&self) {
362+
self.release_thread();
358363
for info in &self.thread_infos {
359364
info.stopped.wait();
360365
}
366+
self.acquire_thread();
367+
}
368+
369+
pub(crate) fn acquire_thread(&self) {
370+
if let Some(ref acquire_thread_handler) = self.acquire_thread_handler {
371+
acquire_thread_handler();
372+
}
373+
}
374+
375+
pub(crate) fn release_thread(&self) {
376+
if let Some(ref release_thread_handler) = self.release_thread_handler {
377+
release_thread_handler();
378+
}
361379
}
362380

363381
/// ////////////////////////////////////////////////////////////////////////
@@ -504,7 +522,9 @@ impl Registry {
504522
LockLatch::new(),
505523
);
506524
self.inject(&[job.as_job_ref()]);
525+
self.release_thread();
507526
job.latch.wait();
527+
self.acquire_thread();
508528
job.into_result()
509529
}
510530

@@ -743,11 +763,10 @@ impl WorkerThread {
743763
yields = self.registry.sleep.work_found(self.index, yields);
744764
self.execute(job);
745765
} else {
746-
yields = self.registry.sleep.no_work_found(
747-
self.index,
748-
yields,
749-
&self.registry.deadlock_handler,
750-
);
766+
yields = self
767+
.registry
768+
.sleep
769+
.no_work_found(self.index, yields, &self.registry);
751770
}
752771
}
753772

@@ -843,6 +862,8 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
843862
worker_thread.wait_until(&registry.terminate_latch);
844863
};
845864

865+
registry.acquire_thread();
866+
846867
if let Some(ref handler) = registry.main_handler {
847868
match unwind::halt_unwinding(|| handler(index, &mut work)) {
848869
Ok(()) => {}
@@ -874,6 +895,8 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
874895
}
875896
// We're already exiting the thread, there's nothing else to do.
876897
}
898+
899+
registry.release_thread();
877900
}
878901

879902
/// If already in a worker-thread, just execute `op`. Otherwise,

rayon-core/src/sleep/mod.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//! for an overview.
33
44
use log::Event::*;
5+
use registry::Registry;
56
use std::sync::atomic::{AtomicUsize, Ordering};
67
use std::sync::{Condvar, Mutex};
78
use std::thread;
@@ -117,7 +118,7 @@ impl Sleep {
117118
&self,
118119
worker_index: usize,
119120
yields: usize,
120-
deadlock_handler: &Option<Box<DeadlockHandler>>,
121+
registry: &Registry,
121122
) -> usize {
122123
log!(DidNotFindWork {
123124
worker: worker_index,
@@ -145,7 +146,7 @@ impl Sleep {
145146
}
146147
} else {
147148
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
148-
self.sleep(worker_index, deadlock_handler);
149+
self.sleep(worker_index, registry);
149150
0
150151
}
151152
}
@@ -248,7 +249,7 @@ impl Sleep {
248249
self.worker_is_sleepy(state, worker_index)
249250
}
250251

251-
fn sleep(&self, worker_index: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) {
252+
fn sleep(&self, worker_index: usize, registry: &Registry) {
252253
loop {
253254
// Acquire here suffices. If we observe that the current worker is still
254255
// sleepy, then in fact we know that no writes have occurred, and anyhow
@@ -327,12 +328,15 @@ impl Sleep {
327328

328329
// Decrement the number of active threads and check for a deadlock
329330
data.active_threads -= 1;
330-
data.deadlock_check(deadlock_handler);
331+
data.deadlock_check(&registry.deadlock_handler);
332+
333+
registry.release_thread();
331334

332335
let _ = self.tickle.wait(data).unwrap();
333336
log!(GotAwoken {
334337
worker: worker_index
335338
});
339+
registry.acquire_thread();
336340
return;
337341
}
338342
} else {

0 commit comments

Comments
 (0)