Skip to content

Commit 97befd4

Browse files
committed
Move the WorkerLocal type from the rustc-rayon fork into rustc_data_structures
1 parent 77eb2be commit 97befd4

File tree

5 files changed

+216
-34
lines changed

5 files changed

+216
-34
lines changed

compiler/rustc_data_structures/src/sharded.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
use crate::fx::{FxHashMap, FxHasher};
2-
use crate::sync::{Lock, LockGuard};
2+
use crate::sync::{CacheAligned, Lock, LockGuard};
33
use std::borrow::Borrow;
44
use std::collections::hash_map::RawEntryMut;
55
use std::hash::{Hash, Hasher};
66
use std::mem;
77

8-
#[derive(Clone, Default)]
9-
#[cfg_attr(parallel_compiler, repr(align(64)))]
10-
struct CacheAligned<T>(T);
11-
128
#[cfg(parallel_compiler)]
139
// 32 shards is sufficient to reduce contention on an 8-core Ryzen 7 1700,
1410
// but this should be tested on higher core count CPUs. How the `Sharded` type gets used

compiler/rustc_data_structures/src/sync.rs

+7-29
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ use std::hash::{BuildHasher, Hash};
2323
use std::ops::{Deref, DerefMut};
2424
use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
2525

26+
mod worker_local;
27+
pub use worker_local::{Registry, WorkerLocal};
28+
2629
pub use std::sync::atomic::Ordering;
2730
pub use std::sync::atomic::Ordering::SeqCst;
2831

@@ -178,33 +181,6 @@ cfg_if! {
178181

179182
use std::cell::Cell;
180183

181-
#[derive(Debug)]
182-
pub struct WorkerLocal<T>(OneThread<T>);
183-
184-
impl<T> WorkerLocal<T> {
185-
/// Creates a new worker local where the `initial` closure computes the
186-
/// value this worker local should take for each thread in the thread pool.
187-
#[inline]
188-
pub fn new<F: FnMut(usize) -> T>(mut f: F) -> WorkerLocal<T> {
189-
WorkerLocal(OneThread::new(f(0)))
190-
}
191-
192-
/// Returns the worker-local value for each thread
193-
#[inline]
194-
pub fn into_inner(self) -> Vec<T> {
195-
vec![OneThread::into_inner(self.0)]
196-
}
197-
}
198-
199-
impl<T> Deref for WorkerLocal<T> {
200-
type Target = T;
201-
202-
#[inline(always)]
203-
fn deref(&self) -> &T {
204-
&self.0
205-
}
206-
}
207-
208184
pub type MTRef<'a, T> = &'a mut T;
209185

210186
#[derive(Debug, Default)]
@@ -324,8 +300,6 @@ cfg_if! {
324300
};
325301
}
326302

327-
pub use rayon_core::WorkerLocal;
328-
329303
pub use rayon::iter::ParallelIterator;
330304
use rayon::iter::IntoParallelIterator;
331305

@@ -365,6 +339,10 @@ pub fn assert_send<T: ?Sized + Send>() {}
365339
pub fn assert_send_val<T: ?Sized + Send>(_t: &T) {}
366340
pub fn assert_send_sync_val<T: ?Sized + Sync + Send>(_t: &T) {}
367341

342+
#[derive(Clone, Default)]
343+
#[cfg_attr(parallel_compiler, repr(align(64)))]
344+
pub struct CacheAligned<T>(pub T);
345+
368346
pub trait HashMapExt<K, V> {
369347
/// Same as HashMap::insert, but it may panic if there's already an
370348
/// entry for `key` with a value not equal to `value`
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
use crate::sync::Lock;
2+
use std::cell::Cell;
3+
use std::cell::OnceCell;
4+
use std::ops::Deref;
5+
use std::sync::Arc;
6+
7+
#[cfg(parallel_compiler)]
8+
use {crate::cold_path, crate::sync::CacheAligned};
9+
10+
/// A pointer to the `RegistryData` which uniquely identifies a registry.
11+
/// This identifier can be reused if the registry gets freed.
12+
#[derive(Clone, Copy, Eq, PartialEq)]
13+
struct RegistryId(usize);
14+
15+
impl RegistryId {
16+
#[inline(always)]
17+
/// Verifies that the current thread is associated with the registry and returns its unique
18+
/// index within the registry. This panics if the current thread is not associated with this
19+
/// registry.
20+
///
21+
/// Note that there's a race possible where the identifer in `THREAD_DATA` could be reused
22+
/// so this can succeed from a different registry.
23+
#[cfg(parallel_compiler)]
24+
fn verify(self) -> usize {
25+
let (id, index) = THREAD_DATA.with(|data| (data.registry_id.get(), data.index.get()));
26+
27+
if id == self {
28+
index
29+
} else {
30+
cold_path(|| panic!("Unable to verify registry association"))
31+
}
32+
}
33+
}
34+
35+
struct RegistryData {
36+
thread_limit: usize,
37+
threads: Lock<usize>,
38+
}
39+
40+
/// Represents a list of threads which can access worker locals.
41+
#[derive(Clone)]
42+
pub struct Registry(Arc<RegistryData>);
43+
44+
thread_local! {
45+
/// The registry associated with the thread.
46+
/// This allows the `WorkerLocal` type to clone the registry in its constructor.
47+
static REGISTRY: OnceCell<Registry> = OnceCell::new();
48+
}
49+
50+
struct ThreadData {
51+
registry_id: Cell<RegistryId>,
52+
index: Cell<usize>,
53+
}
54+
55+
thread_local! {
56+
/// A thread local which contains the identifer of `REGISTRY` but allows for faster access.
57+
/// It also holds the index of the current thread.
58+
static THREAD_DATA: ThreadData = const { ThreadData {
59+
registry_id: Cell::new(RegistryId(0)),
60+
index: Cell::new(0),
61+
}};
62+
}
63+
64+
impl Registry {
65+
/// Creates a registry which can hold up to `thread_limit` threads.
66+
pub fn new(thread_limit: usize) -> Self {
67+
Registry(Arc::new(RegistryData { thread_limit, threads: Lock::new(0) }))
68+
}
69+
70+
/// Gets the registry associated with the current thread. Panics if there's no such registry.
71+
pub fn current() -> Self {
72+
REGISTRY.with(|registry| registry.get().cloned().expect("No assocated registry"))
73+
}
74+
75+
/// Registers the current thread with the registry so worker locals can be used on it.
76+
/// Panics if the thread limit is hit or if the thread already has an associated registry.
77+
pub fn register(&self) {
78+
let mut threads = self.0.threads.lock();
79+
if *threads < self.0.thread_limit {
80+
REGISTRY.with(|registry| {
81+
if registry.get().is_some() {
82+
drop(threads);
83+
panic!("Thread already has a registry");
84+
}
85+
registry.set(self.clone()).ok();
86+
THREAD_DATA.with(|data| {
87+
data.registry_id.set(self.id());
88+
data.index.set(*threads);
89+
});
90+
*threads += 1;
91+
});
92+
} else {
93+
drop(threads);
94+
panic!("Thread limit reached");
95+
}
96+
}
97+
98+
/// Gets the identifer of this registry.
99+
fn id(&self) -> RegistryId {
100+
RegistryId(&*self.0 as *const RegistryData as usize)
101+
}
102+
}
103+
104+
/// Holds worker local values for each possible thread in a registry. You can only access the
105+
/// worker local value through the `Deref` impl on the registry associated with the thread it was
106+
/// created on. It will panic otherwise.
107+
pub struct WorkerLocal<T> {
108+
#[cfg(not(parallel_compiler))]
109+
local: T,
110+
#[cfg(parallel_compiler)]
111+
locals: Box<[CacheAligned<T>]>,
112+
#[cfg(parallel_compiler)]
113+
registry: Registry,
114+
}
115+
116+
// This is safe because the `deref` call will return a reference to a `T` unique to each thread
117+
// or it will panic for threads without an associated local. So there isn't a need for `T` to do
118+
// it's own synchronization. The `verify` method on `RegistryId` has an issue where the the id
119+
// can be reused, but `WorkerLocal` has a reference to `Registry` which will prevent any reuse.
120+
#[cfg(parallel_compiler)]
121+
unsafe impl<T: Send> Sync for WorkerLocal<T> {}
122+
123+
impl<T> WorkerLocal<T> {
124+
/// Creates a new worker local where the `initial` closure computes the
125+
/// value this worker local should take for each thread in the registry.
126+
#[inline]
127+
pub fn new<F: FnMut(usize) -> T>(mut initial: F) -> WorkerLocal<T> {
128+
#[cfg(parallel_compiler)]
129+
{
130+
let registry = Registry::current();
131+
WorkerLocal {
132+
locals: {
133+
let locals: Vec<_> =
134+
(0..registry.0.thread_limit).map(|i| CacheAligned(initial(i))).collect();
135+
locals.into_boxed_slice()
136+
},
137+
registry,
138+
}
139+
}
140+
#[cfg(not(parallel_compiler))]
141+
{
142+
WorkerLocal { local: initial(0) }
143+
}
144+
}
145+
146+
/// Returns the worker-local value for each thread
147+
#[inline]
148+
pub fn into_inner(self) -> Vec<T> {
149+
#[cfg(parallel_compiler)]
150+
{
151+
self.locals.into_vec().into_iter().map(|local| local.0).collect()
152+
}
153+
#[cfg(not(parallel_compiler))]
154+
{
155+
vec![self.local]
156+
}
157+
}
158+
}
159+
160+
impl<T> WorkerLocal<Vec<T>> {
161+
/// Joins the elements of all the worker locals into one Vec
162+
pub fn join(self) -> Vec<T> {
163+
self.into_inner().into_iter().flat_map(|v| v).collect()
164+
}
165+
}
166+
167+
impl<T> Deref for WorkerLocal<T> {
168+
type Target = T;
169+
170+
#[inline(always)]
171+
#[cfg(not(parallel_compiler))]
172+
fn deref(&self) -> &T {
173+
&self.local
174+
}
175+
176+
#[inline(always)]
177+
#[cfg(parallel_compiler)]
178+
fn deref(&self) -> &T {
179+
// This is safe because `verify` will only return values less than
180+
// `self.registry.thread_limit` which is the size of the `self.locals` array.
181+
unsafe { &self.locals.get_unchecked(self.registry.id().verify()).0 }
182+
}
183+
}

compiler/rustc_interface/src/interface.rs

+19
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use rustc_session::Session;
2020
use rustc_session::{early_error, CompilerIO};
2121
use rustc_span::source_map::{FileLoader, FileName};
2222
use rustc_span::symbol::sym;
23+
use std::cell::OnceCell;
2324
use std::path::PathBuf;
2425
use std::result;
2526

@@ -59,9 +60,25 @@ impl Compiler {
5960
}
6061
}
6162

63+
fn registry_setup() {
64+
thread_local! {
65+
static ONCE: OnceCell<()> = OnceCell::new();
66+
}
67+
68+
// Create a dummy registry to allow `WorkerLocal` construction.
69+
// We use `OnceCell` so we only register one dummy registry per thread.
70+
ONCE.with(|once| {
71+
once.get_or_init(|| {
72+
rustc_data_structures::sync::Registry::new(1).register();
73+
});
74+
});
75+
}
76+
6277
/// Converts strings provided as `--cfg [cfgspec]` into a `crate_cfg`.
6378
pub fn parse_cfgspecs(cfgspecs: Vec<String>) -> FxHashSet<(String, Option<String>)> {
6479
rustc_span::create_default_session_if_not_set_then(move |_| {
80+
registry_setup();
81+
6582
let cfg = cfgspecs
6683
.into_iter()
6784
.map(|s| {
@@ -121,6 +138,8 @@ pub fn parse_cfgspecs(cfgspecs: Vec<String>) -> FxHashSet<(String, Option<String
121138
/// Converts strings provided as `--check-cfg [specs]` into a `CheckCfg`.
122139
pub fn parse_check_cfg(specs: Vec<String>) -> CheckCfg {
123140
rustc_span::create_default_session_if_not_set_then(move |_| {
141+
registry_setup();
142+
124143
let mut cfg = CheckCfg::default();
125144

126145
'specs: for s in specs {

compiler/rustc_interface/src/util.rs

+6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use libloading::Library;
44
use rustc_ast as ast;
55
use rustc_codegen_ssa::traits::CodegenBackend;
66
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
7+
#[cfg(parallel_compiler)]
8+
use rustc_data_structures::sync;
79
use rustc_errors::registry::Registry;
810
use rustc_parse::validate_attr;
911
use rustc_session as session;
@@ -165,6 +167,7 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce() -> R + Send, R: Send>(
165167
use rustc_middle::ty::tls;
166168
use rustc_query_impl::{deadlock, QueryContext, QueryCtxt};
167169

170+
let registry = sync::Registry::new(threads);
168171
let mut builder = rayon::ThreadPoolBuilder::new()
169172
.thread_name(|_| "rustc".to_string())
170173
.acquire_thread_handler(jobserver::acquire_thread)
@@ -195,6 +198,9 @@ pub(crate) fn run_in_thread_pool_with_globals<F: FnOnce() -> R + Send, R: Send>(
195198
.build_scoped(
196199
// Initialize each new worker thread when created.
197200
move |thread: rayon::ThreadBuilder| {
201+
// Register the thread for use with the `WorkerLocal` type.
202+
registry.register();
203+
198204
rustc_span::set_session_globals_then(session_globals, || thread.run())
199205
},
200206
// Run `f` on the first thread in the thread pool.

0 commit comments

Comments
 (0)