Skip to content

Commit f10654a

Browse files
authored
Faster shard scaling (#5679)
* Increase burst limit * Add shard scaling factor * Remove maximum sacle up factor * Explicit scale up field names and comments * Refactor scaling arbiter to clarify and test the rules
1 parent ec36892 commit f10654a

File tree

8 files changed

+314
-60
lines changed

8 files changed

+314
-60
lines changed

quickwit/quickwit-common/src/shared_consts.rs

+7
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,14 @@ pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";
6666
/// File name for the encoded list of fields in the split
6767
pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";
6868

69+
/// More or less the indexing throughput of a core
70+
/// i.e. PIPELINE_THROUGHPUT / PIPELINE_FULL_CAPACITY
6971
pub const DEFAULT_SHARD_THROUGHPUT_LIMIT: ByteSize = ByteSize::mib(5);
72+
/// Large enough to absorb small bursts but should remain defensive against unbalanced shards.
73+
pub const DEFAULT_SHARD_BURST_LIMIT: ByteSize = ByteSize::mib(50);
74+
75+
/// A compromise between "exponential" scale up and moderate shard count increase.
76+
pub const DEFAULT_SHARD_SCALE_UP_FACTOR: f32 = 1.5;
7077

7178
// (Just a reexport).
7279
pub use bytesize::MIB;

quickwit/quickwit-config/src/cluster_config/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub struct ClusterConfig {
2424
pub default_index_root_uri: Uri,
2525
pub replication_factor: usize,
2626
pub shard_throughput_limit: ByteSize,
27+
pub shard_scale_up_factor: f32,
2728
}
2829

2930
impl ClusterConfig {
@@ -35,6 +36,7 @@ impl ClusterConfig {
3536
default_index_root_uri: Uri::for_test("ram:///indexes"),
3637
replication_factor: 1,
3738
shard_throughput_limit: quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT,
39+
shard_scale_up_factor: 1.01,
3840
}
3941
}
4042
}

quickwit/quickwit-config/src/node_config/mod.rs

+40-11
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use anyhow::{bail, ensure};
2525
use bytesize::ByteSize;
2626
use http::HeaderMap;
2727
use quickwit_common::net::HostAddr;
28-
use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT;
28+
use quickwit_common::shared_consts::{
29+
DEFAULT_SHARD_BURST_LIMIT, DEFAULT_SHARD_SCALE_UP_FACTOR, DEFAULT_SHARD_THROUGHPUT_LIMIT,
30+
};
2931
use quickwit_common::uri::Uri;
3032
use quickwit_proto::indexing::CpuCapacity;
3133
use quickwit_proto::types::NodeId;
@@ -39,7 +41,7 @@ use crate::{ConfigFormat, MetastoreConfigs};
3941

4042
pub const DEFAULT_QW_CONFIG_PATH: &str = "config/quickwit.yaml";
4143

42-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
44+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
4345
#[serde(deny_unknown_fields)]
4446
pub struct RestConfig {
4547
pub listen_addr: SocketAddr,
@@ -50,7 +52,7 @@ pub struct RestConfig {
5052
pub tls: Option<TlsConfig>,
5153
}
5254

53-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
55+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
5456
#[serde(deny_unknown_fields)]
5557
pub struct GrpcConfig {
5658
#[serde(default = "GrpcConfig::default_max_message_size")]
@@ -83,7 +85,7 @@ impl Default for GrpcConfig {
8385
}
8486
}
8587

86-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
88+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
8789
#[serde(deny_unknown_fields)]
8890
pub struct TlsConfig {
8991
pub cert_path: String,
@@ -193,7 +195,7 @@ impl Default for IndexerConfig {
193195
}
194196
}
195197

196-
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
198+
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
197199
#[serde(deny_unknown_fields)]
198200
pub struct SplitCacheLimits {
199201
pub max_num_bytes: ByteSize,
@@ -219,7 +221,7 @@ impl SplitCacheLimits {
219221
}
220222
}
221223

222-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
224+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
223225
#[serde(deny_unknown_fields, default)]
224226
pub struct SearcherConfig {
225227
pub aggregation_memory_limit: ByteSize,
@@ -254,7 +256,7 @@ pub struct SearcherConfig {
254256
/// This policy is inspired by this guidance. It does not track instanteneous throughput, but
255257
/// computes an overall timeout using the following formula:
256258
/// `timeout_offset + num_bytes_get_request / min_throughtput`
257-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
259+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
258260
pub struct StorageTimeoutPolicy {
259261
pub min_throughtput_bytes_per_secs: u64,
260262
pub timeout_millis: u64,
@@ -338,14 +340,25 @@ impl SearcherConfig {
338340
}
339341
}
340342

341-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
343+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
342344
#[serde(deny_unknown_fields, default)]
343345
pub struct IngestApiConfig {
346+
/// Maximum memory space taken by the ingest WAL
344347
pub max_queue_memory_usage: ByteSize,
348+
/// Maximum disk space taken by the ingest WAL
345349
pub max_queue_disk_usage: ByteSize,
346350
replication_factor: usize,
347351
pub content_length_limit: ByteSize,
352+
/// (hidden) Targeted throughput for each shard
348353
pub shard_throughput_limit: ByteSize,
354+
/// (hidden) Maximum accumulated throughput capacity for underutilized
355+
/// shards, allowing the throughput limit to be temporarily exceeded
356+
pub shard_burst_limit: ByteSize,
357+
/// (hidden) new_shard_count = ceil(old_shard_count * shard_scale_up_factor)
358+
///
359+
/// Setting this too high will be cancelled out by the arbiter that prevents
360+
/// creating too many shards at once.
361+
pub shard_scale_up_factor: f32,
349362
}
350363

351364
impl Default for IngestApiConfig {
@@ -356,6 +369,8 @@ impl Default for IngestApiConfig {
356369
replication_factor: 1,
357370
content_length_limit: ByteSize::mib(10),
358371
shard_throughput_limit: DEFAULT_SHARD_THROUGHPUT_LIMIT,
372+
shard_burst_limit: DEFAULT_SHARD_BURST_LIMIT,
373+
shard_scale_up_factor: DEFAULT_SHARD_SCALE_UP_FACTOR,
359374
}
360375
}
361376
}
@@ -398,20 +413,34 @@ impl IngestApiConfig {
398413
self.max_queue_memory_usage
399414
);
400415
info!(
401-
"ingestion shard throughput limit: {:?}",
416+
"ingestion shard throughput limit: {}",
402417
self.shard_throughput_limit
403418
);
404419
ensure!(
405420
self.shard_throughput_limit >= ByteSize::mib(1)
406421
&& self.shard_throughput_limit <= ByteSize::mib(20),
407-
"shard_throughput_limit ({:?}) must be within 1mb and 20mb",
422+
"shard_throughput_limit ({}) must be within 1mb and 20mb",
408423
self.shard_throughput_limit
409424
);
425+
// The newline delimited format is persisted as something a bit larger
426+
// (lines prefixed with their length)
427+
let estimated_persist_size = ByteSize::b(3 * self.content_length_limit.as_u64() / 2);
428+
ensure!(
429+
self.shard_burst_limit >= estimated_persist_size,
430+
"shard_burst_limit ({}) must be at least 1.5*content_length_limit ({})",
431+
self.shard_burst_limit,
432+
estimated_persist_size,
433+
);
434+
ensure!(
435+
self.shard_scale_up_factor > 1.0,
436+
"shard_scale_up_factor ({}) must be greater than 1",
437+
self.shard_scale_up_factor,
438+
);
410439
Ok(())
411440
}
412441
}
413442

414-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
443+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
415444
#[serde(deny_unknown_fields)]
416445
pub struct JaegerConfig {
417446
/// Enables the gRPC endpoint that allows the Jaeger Query Service to connect and retrieve

quickwit/quickwit-control-plane/src/control_plane.rs

+1
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ impl ControlPlane {
167167
ingester_pool.clone(),
168168
replication_factor,
169169
shard_throughput_limit_mib,
170+
cluster_config.shard_scale_up_factor,
170171
);
171172

172173
let readiness_tx = readiness_tx.clone();

0 commit comments

Comments
 (0)