Skip to content

Commit 998093a

Browse files
committed
Add shard scaling factor
1 parent 19a8c91 commit 998093a

File tree

8 files changed

+186
-50
lines changed

8 files changed

+186
-50
lines changed

quickwit/quickwit-common/src/shared_consts.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,14 @@ pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";
6969
/// More or less the indexing throughput of a core
7070
/// i.e PIPELINE_THROUGHPUT / PIPELINE_FULL_CAPACITY
7171
pub const DEFAULT_SHARD_THROUGHPUT_LIMIT: ByteSize = ByteSize::mib(5);
72-
///
72+
/// Large enough to absorb small bursts but should remain defensive against unbalanced shards
7373
pub const DEFAULT_SHARD_BURST_LIMIT: ByteSize = ByteSize::mib(50);
7474

75+
/// Maximum factor that avoids oscillating between scale up and scale down
76+
pub const MAX_SHARD_SCALE_UP_FACTOR: f32 = 2.0;
77+
/// A high value to allow quick scale up by default
78+
pub const DEFAULT_SHARD_SCALE_UP_FACTOR: f32 = 1.5;
79+
const _: () = assert!(DEFAULT_SHARD_SCALE_UP_FACTOR < MAX_SHARD_SCALE_UP_FACTOR);
80+
7581
// (Just a reexport).
7682
pub use bytesize::MIB;

quickwit/quickwit-config/src/cluster_config/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub struct ClusterConfig {
2424
pub default_index_root_uri: Uri,
2525
pub replication_factor: usize,
2626
pub shard_throughput_limit: ByteSize,
27+
pub shard_scaling_factor: f32,
2728
}
2829

2930
impl ClusterConfig {
@@ -35,6 +36,7 @@ impl ClusterConfig {
3536
default_index_root_uri: Uri::for_test("ram:///indexes"),
3637
replication_factor: 1,
3738
shard_throughput_limit: quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT,
39+
shard_scaling_factor: 1.01,
3840
}
3941
}
4042
}

quickwit/quickwit-config/src/node_config/mod.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ use anyhow::{bail, ensure};
2525
use bytesize::ByteSize;
2626
use http::HeaderMap;
2727
use quickwit_common::net::HostAddr;
28-
use quickwit_common::shared_consts::{DEFAULT_SHARD_BURST_LIMIT, DEFAULT_SHARD_THROUGHPUT_LIMIT};
28+
use quickwit_common::shared_consts::{
29+
DEFAULT_SHARD_BURST_LIMIT, DEFAULT_SHARD_SCALE_UP_FACTOR, DEFAULT_SHARD_THROUGHPUT_LIMIT,
30+
MAX_SHARD_SCALE_UP_FACTOR,
31+
};
2932
use quickwit_common::uri::Uri;
3033
use quickwit_proto::indexing::CpuCapacity;
3134
use quickwit_proto::types::NodeId;
@@ -39,7 +42,7 @@ use crate::{ConfigFormat, MetastoreConfigs};
3942

4043
pub const DEFAULT_QW_CONFIG_PATH: &str = "config/quickwit.yaml";
4144

42-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
45+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
4346
#[serde(deny_unknown_fields)]
4447
pub struct RestConfig {
4548
pub listen_addr: SocketAddr,
@@ -50,7 +53,7 @@ pub struct RestConfig {
5053
pub tls: Option<TlsConfig>,
5154
}
5255

53-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
56+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
5457
#[serde(deny_unknown_fields)]
5558
pub struct GrpcConfig {
5659
#[serde(default = "GrpcConfig::default_max_message_size")]
@@ -83,7 +86,7 @@ impl Default for GrpcConfig {
8386
}
8487
}
8588

86-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
89+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
8790
#[serde(deny_unknown_fields)]
8891
pub struct TlsConfig {
8992
pub cert_path: String,
@@ -193,7 +196,7 @@ impl Default for IndexerConfig {
193196
}
194197
}
195198

196-
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
199+
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
197200
#[serde(deny_unknown_fields)]
198201
pub struct SplitCacheLimits {
199202
pub max_num_bytes: ByteSize,
@@ -219,7 +222,7 @@ impl SplitCacheLimits {
219222
}
220223
}
221224

222-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
225+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
223226
#[serde(deny_unknown_fields, default)]
224227
pub struct SearcherConfig {
225228
pub aggregation_memory_limit: ByteSize,
@@ -254,7 +257,7 @@ pub struct SearcherConfig {
254257
/// This policy is inspired by this guidance. It does not track instanteneous throughput, but
255258
/// computes an overall timeout using the following formula:
256259
/// `timeout_offset + num_bytes_get_request / min_throughtput`
257-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
260+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
258261
pub struct StorageTimeoutPolicy {
259262
pub min_throughtput_bytes_per_secs: u64,
260263
pub timeout_millis: u64,
@@ -338,7 +341,7 @@ impl SearcherConfig {
338341
}
339342
}
340343

341-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
344+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
342345
#[serde(deny_unknown_fields, default)]
343346
pub struct IngestApiConfig {
344347
/// Maximum memory space taken by the ingest WAL
@@ -347,10 +350,12 @@ pub struct IngestApiConfig {
347350
pub max_queue_disk_usage: ByteSize,
348351
replication_factor: usize,
349352
pub content_length_limit: ByteSize,
350-
/// [hidden] Targeted throughput for each shard
353+
/// (hidden) Targeted throughput for each shard
351354
pub shard_throughput_limit: ByteSize,
352-
/// [hidden] Targeted throughput for each shard
355+
/// (hidden) Targeted throughput for each shard
353356
pub shard_burst_limit: ByteSize,
357+
/// (hidden) new_shard_count = ceil(old_shard_count * shard_scaling_factor)
358+
pub shard_scale_up_factor: f32,
354359
}
355360

356361
impl Default for IngestApiConfig {
@@ -362,6 +367,7 @@ impl Default for IngestApiConfig {
362367
content_length_limit: ByteSize::mib(10),
363368
shard_throughput_limit: DEFAULT_SHARD_THROUGHPUT_LIMIT,
364369
shard_burst_limit: DEFAULT_SHARD_BURST_LIMIT,
370+
shard_scale_up_factor: DEFAULT_SHARD_SCALE_UP_FACTOR,
365371
}
366372
}
367373
}
@@ -422,11 +428,18 @@ impl IngestApiConfig {
422428
self.shard_burst_limit,
423429
estimated_persist_size,
424430
);
431+
ensure!(
432+
self.shard_scale_up_factor > 1.0
433+
&& self.shard_scale_up_factor <= MAX_SHARD_SCALE_UP_FACTOR,
434+
"shard_scale_up_factor ({:?}) must be in the (1,{}) interval",
435+
self.shard_scale_up_factor,
436+
MAX_SHARD_SCALE_UP_FACTOR,
437+
);
425438
Ok(())
426439
}
427440
}
428441

429-
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
442+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
430443
#[serde(deny_unknown_fields)]
431444
pub struct JaegerConfig {
432445
/// Enables the gRPC endpoint that allows the Jaeger Query Service to connect and retrieve

quickwit/quickwit-control-plane/src/control_plane.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ impl ControlPlane {
167167
ingester_pool.clone(),
168168
replication_factor,
169169
shard_throughput_limit_mib,
170+
cluster_config.shard_scaling_factor,
170171
);
171172

172173
let readiness_tx = readiness_tx.clone();

0 commit comments

Comments
 (0)