Skip to content

Commit d74e517

Browse files
committed
Refactor scaling arbiter to clarify and test the rules
1 parent 4cfbb9d commit d74e517

File tree

2 files changed

+127
-20
lines changed

2 files changed

+127
-20
lines changed

quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs

+125-20
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub(crate) struct ScalingArbiter {
1818
// Threshold in MiB/s below which we decrease the number of shards.
1919
scale_down_shards_threshold_mib_per_sec: f32,
2020

21-
// Threshold in MiB/s above which we increase the number of shards.
21+
// Per shard threshold in MiB/s above which we increase the number of shards.
2222
//
2323
// We want scaling up to be reactive, so we first inspect the short
2424
// term threshold.
@@ -49,29 +49,42 @@ impl ScalingArbiter {
4949
}
5050
}
5151

52-
// Scale based on the "per shard average" metric
52+
/// Computes the maximum number of shards we can have without going below
53+
/// the long term scale up threshold
54+
fn long_term_scale_up_threshold_max_shards(&self, shard_stats: ShardStats) -> usize {
55+
(shard_stats.avg_long_term_ingestion_rate * shard_stats.num_open_shards as f32
56+
/ self.scale_up_shards_long_term_threshold_mib_per_sec)
57+
.floor() as usize
58+
}
59+
60+
/// Computes the next number of shards we should have according the scaling factor
61+
fn scale_up_factor_target_shards(&self, shard_stats: ShardStats) -> usize {
62+
(shard_stats.num_open_shards as f32 * self.shard_scale_up_factor).ceil() as usize
63+
}
64+
65+
/// Scale based on the "per shard average" metric
66+
///
67+
/// Returns `None` when there are no open shards because in that case routers are expected to
68+
/// make the [`quickwit_proto::control_plane::GetOrCreateOpenShardsRequest`]
5369
pub(crate) fn should_scale(&self, shard_stats: ShardStats) -> Option<ScalingMode> {
70+
if shard_stats.num_open_shards == 0 {
71+
return None;
72+
}
5473
// Scale up based on the short term metric value while making sure that
5574
// the long term value doesn't get near the scale down threshold.
5675
if shard_stats.avg_short_term_ingestion_rate
5776
>= self.scale_up_shards_short_term_threshold_mib_per_sec
5877
{
59-
// compute the maximum number of shards we can add without going below
60-
// the long term scale up threshold
61-
let max_number_shards = (shard_stats.avg_long_term_ingestion_rate
62-
* shard_stats.num_open_shards as f32
63-
/ self.scale_up_shards_long_term_threshold_mib_per_sec)
64-
.floor() as usize;
65-
66-
// compute the next number of shards we should have according the scaling factor
67-
let target_number_shards =
68-
(shard_stats.num_open_shards as f32 * self.shard_scale_up_factor).ceil() as usize;
78+
let new_calculated_num_shards = usize::min(
79+
self.long_term_scale_up_threshold_max_shards(shard_stats),
80+
self.scale_up_factor_target_shards(shard_stats),
81+
);
6982

70-
let new_number_shards = max_number_shards.min(target_number_shards);
83+
let target_num_shards = usize::max(1, new_calculated_num_shards);
7184

72-
if new_number_shards > shard_stats.num_open_shards {
85+
if target_num_shards > shard_stats.num_open_shards {
7386
return Some(ScalingMode::Up(
74-
new_number_shards - shard_stats.num_open_shards,
87+
target_num_shards - shard_stats.num_open_shards,
7588
));
7689
}
7790
}
@@ -96,9 +109,16 @@ mod tests {
96109
#[test]
97110
fn test_scaling_arbiter_one_by_one() {
98111
// use shard throughput 10MiB to simplify calculations
99-
let scaling_arbiter = ScalingArbiter::with_max_shard_ingestion_throughput_mib_per_sec(
100-
10.0, // with a factor close to 1 shards are effectively added 1 by 1
101-
1.01,
112+
// with a factor close to 1 shards are effectively added 1 by 1
113+
let scaling_arbiter =
114+
ScalingArbiter::with_max_shard_ingestion_throughput_mib_per_sec(10.0, 1.01);
115+
assert_eq!(
116+
scaling_arbiter.should_scale(ShardStats {
117+
num_open_shards: 0,
118+
avg_short_term_ingestion_rate: 0.0,
119+
avg_long_term_ingestion_rate: 0.0,
120+
}),
121+
None,
102122
);
103123
assert_eq!(
104124
scaling_arbiter.should_scale(ShardStats {
@@ -143,8 +163,8 @@ mod tests {
143163
assert_eq!(
144164
scaling_arbiter.should_scale(ShardStats {
145165
num_open_shards: 1,
146-
avg_short_term_ingestion_rate: 8.0f32,
147-
avg_long_term_ingestion_rate: 3.0f32,
166+
avg_short_term_ingestion_rate: 8.0,
167+
avg_long_term_ingestion_rate: 3.0,
148168
}),
149169
None,
150170
);
@@ -155,6 +175,14 @@ mod tests {
155175
// use shard throughput 10MiB to simplify calculations
156176
let scaling_arbiter =
157177
ScalingArbiter::with_max_shard_ingestion_throughput_mib_per_sec(10.0, 2.);
178+
assert_eq!(
179+
scaling_arbiter.should_scale(ShardStats {
180+
num_open_shards: 0,
181+
avg_short_term_ingestion_rate: 0.0,
182+
avg_long_term_ingestion_rate: 0.0,
183+
}),
184+
None,
185+
);
158186
assert_eq!(
159187
scaling_arbiter.should_scale(ShardStats {
160188
num_open_shards: 2,
@@ -213,4 +241,81 @@ mod tests {
213241
Some(ScalingMode::Up(1)),
214242
);
215243
}
244+
245+
#[test]
246+
fn test_scale_up_computations() {
247+
// use shard throughput 10MiB to simplify calculations
248+
let scaling_arbiter =
249+
ScalingArbiter::with_max_shard_ingestion_throughput_mib_per_sec(10.0, 1.5);
250+
251+
let shard_stats = ShardStats {
252+
num_open_shards: 0,
253+
avg_short_term_ingestion_rate: 0.,
254+
avg_long_term_ingestion_rate: 0.,
255+
};
256+
assert_eq!(
257+
scaling_arbiter.long_term_scale_up_threshold_max_shards(shard_stats),
258+
0
259+
);
260+
assert_eq!(
261+
scaling_arbiter.scale_up_factor_target_shards(shard_stats),
262+
0
263+
);
264+
265+
let shard_stats = ShardStats {
266+
num_open_shards: 1,
267+
avg_short_term_ingestion_rate: 5.0,
268+
avg_long_term_ingestion_rate: 6.1,
269+
};
270+
assert_eq!(
271+
scaling_arbiter.long_term_scale_up_threshold_max_shards(shard_stats),
272+
2
273+
);
274+
assert_eq!(
275+
scaling_arbiter.scale_up_factor_target_shards(shard_stats),
276+
2
277+
);
278+
279+
let shard_stats = ShardStats {
280+
num_open_shards: 2,
281+
avg_short_term_ingestion_rate: 5.0,
282+
avg_long_term_ingestion_rate: 1.1,
283+
};
284+
assert_eq!(
285+
scaling_arbiter.long_term_scale_up_threshold_max_shards(shard_stats),
286+
0
287+
);
288+
assert_eq!(
289+
scaling_arbiter.scale_up_factor_target_shards(shard_stats),
290+
3
291+
);
292+
293+
let shard_stats = ShardStats {
294+
num_open_shards: 2,
295+
avg_short_term_ingestion_rate: 5.0,
296+
avg_long_term_ingestion_rate: 6.1,
297+
};
298+
assert_eq!(
299+
scaling_arbiter.long_term_scale_up_threshold_max_shards(shard_stats),
300+
4
301+
);
302+
assert_eq!(
303+
scaling_arbiter.scale_up_factor_target_shards(shard_stats),
304+
3
305+
);
306+
307+
let shard_stats = ShardStats {
308+
num_open_shards: 5,
309+
avg_short_term_ingestion_rate: 5.0,
310+
avg_long_term_ingestion_rate: 1.1,
311+
};
312+
assert_eq!(
313+
scaling_arbiter.long_term_scale_up_threshold_max_shards(shard_stats),
314+
1
315+
);
316+
assert_eq!(
317+
scaling_arbiter.scale_up_factor_target_shards(shard_stats),
318+
8
319+
);
320+
}
216321
}

quickwit/quickwit-control-plane/src/model/shard_table.rs

+2
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,9 @@ impl ShardTable {
574574
#[derive(Clone, Copy, Default)]
575575
pub(crate) struct ShardStats {
576576
pub num_open_shards: usize,
577+
/// Average short-term ingestion rate (MiB/s) per open shard
577578
pub avg_short_term_ingestion_rate: f32,
579+
/// Average long-term ingestion rate (MiB/s) per open shard
578580
pub avg_long_term_ingestion_rate: f32,
579581
}
580582

0 commit comments

Comments
 (0)