Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/napi/src/next_api/turbopack_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub fn create_turbo_tasks(
turbo_tasks_backend::StorageMode::ReadWrite
}),
dependency_tracking,
num_workers: Some(tokio::runtime::Handle::current().metrics().num_workers()),
..Default::default()
},
Either::Left(backing_storage),
Expand Down
13 changes: 8 additions & 5 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::{
Arc,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
},
thread::available_parallelism,
};

use anyhow::{Result, bail};
Expand Down Expand Up @@ -67,7 +66,7 @@ use crate::{
},
utils::{
bi_map::BiMap, chunked_vec::ChunkedVec, dash_map_drop_contents::drop_contents,
ptr_eq_arc::PtrEqArc, sharded::Sharded, swap_retain,
ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, sharded::Sharded, swap_retain,
},
};

Expand Down Expand Up @@ -134,6 +133,10 @@ pub struct BackendOptions {
/// Enables the backing storage.
pub storage_mode: Option<StorageMode>,

/// Number of tokio worker threads. It will be used to compute the shard amount of parallel
/// datastructures. If `None`, it will use the available parallelism.
pub num_workers: Option<usize>,

/// Avoid big preallocations for faster startup. Should only be used for testing purposes.
pub small_preallocation: bool,
}
Expand All @@ -144,6 +147,7 @@ impl Default for BackendOptions {
dependency_tracking: true,
active_tracking: true,
storage_mode: Some(StorageMode::ReadWrite),
num_workers: None,
small_preallocation: false,
}
}
Expand Down Expand Up @@ -228,8 +232,7 @@ impl<B: BackingStorage> TurboTasksBackend<B> {

impl<B: BackingStorage> TurboTasksBackendInner<B> {
pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
let shard_amount =
(available_parallelism().map_or(4, |v| v.get()) * 64).next_power_of_two();
let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
let need_log = matches!(options.storage_mode, Some(StorageMode::ReadWrite));
if !options.dependency_tracking {
options.active_tracking = false;
Expand All @@ -256,7 +259,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
task_cache: BiMap::new(),
transient_tasks: FxDashMap::default(),
local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
storage: Storage::new(small_preallocation),
storage: Storage::new(shard_amount, small_preallocation),
in_progress_operations: AtomicUsize::new(0),
snapshot_request: Mutex::new(SnapshotRequest::new()),
operations_suspended: Condvar::new(),
Expand Down
7 changes: 1 addition & 6 deletions turbopack/crates/turbo-tasks-backend/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{
hash::Hash,
ops::{Deref, DerefMut},
sync::{Arc, atomic::AtomicBool},
thread::available_parallelism,
};

use bitfield::bitfield;
Expand Down Expand Up @@ -616,17 +615,13 @@ pub struct Storage {
}

impl Storage {
pub fn new(small_preallocation: bool) -> Self {
pub fn new(shard_amount: usize, small_preallocation: bool) -> Self {
let map_capacity: usize = if small_preallocation {
1024
} else {
1024 * 1024
};
let modified_capacity: usize = if small_preallocation { 0 } else { 1024 };
let shard_factor: usize = if small_preallocation { 4 } else { 64 };

let shard_amount =
(available_parallelism().map_or(4, |v| v.get()) * shard_factor).next_power_of_two();

Self {
snapshot_mode: AtomicBool::new(false),
Expand Down
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-backend/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod chunked_vec;
pub mod dash_map_drop_contents;
pub mod dash_map_multi;
pub mod ptr_eq_arc;
pub mod shard_amount;
pub mod sharded;
pub mod swap_retain;

Expand Down
44 changes: 44 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/utils/shard_amount.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::thread::available_parallelism;

/// Compute a good number of shards to use for sharded data structures.
/// The number of shards is computed based on the number of worker threads
/// and whether a small preallocation is requested.
/// A small preallocation is useful for tests where performance is not
/// critical and we want to reduce memory usage and startup time.
/// The number of shards is chosen to minimize the probability of shard
/// collisions (which can lead to false sharing) while keeping memory
/// usage reasonable.
/// The returned number is always a power of two as this is often required
/// by sharded data structures. The maximum shard amount is capped at 1 << 16 (65536).
pub fn compute_shard_amount(num_workers: Option<usize>, small_preallocation: bool) -> usize {
let num_workers = num_workers.unwrap_or_else(|| available_parallelism().map_or(4, |v| v.get()));

// Once can compute the probability of a shard collision (which leads to false sharing) using
// the birthday paradox formula. It's notable that the probability of collisions increases
// with more worker threads. To mitigate this effect, the number of shards need to grow
// quadratically with the number of worker threads. This way the probability of at least one
// collision remains constant.
//
// Lets call the worker thread count `N` and the number of shards `S`. When using `S = k * N^2`
// for some constant `k` the probability of at least one collision for large `N` can be
// approximated as: P = 1 - exp(-N^2 / (2*S)) = 1 - exp(-1/(2*k))
//
// For `k = 16` this results in a collision probability of about 3%.
// For `k = 1` this results in a collision probability of about 39%.
//
// We clamp the number of shards to 1 << 16 to avoid excessive memory usage in case of a very
// high number of worker threads. This case is hit with more than 64 worker threads for `k =
// 16` and more than 256 worker threads for `k = 1`.

if small_preallocation {
// We also clamp the minimum number of workers to 256 so all following multiplications can't
// overflow.
let num_workers = num_workers.max(256);
(num_workers * num_workers).next_power_of_two()
} else {
// We also clamp the minimum number of workers to 64 so all following multiplications can't
// overflow.
let num_workers = num_workers.max(64);
(num_workers * num_workers * 16).next_power_of_two()
}
}
6 changes: 5 additions & 1 deletion turbopack/crates/turbo-tasks-backend/tests/test_config.trs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
std::fs::create_dir_all(&path).unwrap();
turbo_tasks::TurboTasks::new(
turbo_tasks_backend::TurboTasksBackend::new(
turbo_tasks_backend::BackendOptions::default(),
turbo_tasks_backend::BackendOptions {
num_workers: Some(2),
small_preallocation: true,
..Default::default()
},
turbo_tasks_backend::default_backing_storage(
path.as_path(),
&turbo_tasks_backend::GitVersionInfo {
Expand Down
6 changes: 5 additions & 1 deletion turbopack/crates/turbo-tasks-fetch/tests/test_config.trs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
std::fs::create_dir_all(&path).unwrap();
turbo_tasks::TurboTasks::new(
turbo_tasks_backend::TurboTasksBackend::new(
turbo_tasks_backend::BackendOptions::default(),
turbo_tasks_backend::BackendOptions {
num_workers: Some(2),
small_preallocation: true,
..Default::default()
},
turbo_tasks_backend::default_backing_storage(
path.as_path(),
&turbo_tasks_backend::GitVersionInfo {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
|_name, _initial| {
turbo_tasks::TurboTasks::new(
turbo_tasks_backend::TurboTasksBackend::new(
turbo_tasks_backend::BackendOptions::default(),
turbo_tasks_backend::BackendOptions {
num_workers: Some(2),
small_preallocation: true,
..Default::default()
},
turbo_tasks_backend::noop_backing_storage(),
)
)
Expand Down
Loading