@@ -285,6 +285,7 @@ impl IngestController {
285
285
ingester_pool : IngesterPool ,
286
286
replication_factor : usize ,
287
287
max_shard_ingestion_throughput_mib_per_sec : f32 ,
288
+ shard_scaling_factor : f32 ,
288
289
) -> Self {
289
290
IngestController {
290
291
metastore,
@@ -294,6 +295,7 @@ impl IngestController {
294
295
stats : IngestControllerStats :: default ( ) ,
295
296
scaling_arbiter : ScalingArbiter :: with_max_shard_ingestion_throughput_mib_per_sec (
296
297
max_shard_ingestion_throughput_mib_per_sec,
298
+ shard_scaling_factor,
297
299
) ,
298
300
}
299
301
}
@@ -396,12 +398,13 @@ impl IngestController {
396
398
} ;
397
399
398
400
match scaling_mode {
399
- ScalingMode :: Up => {
401
+ ScalingMode :: Up ( shards ) => {
400
402
self . try_scale_up_shards (
401
403
local_shards_update. source_uid ,
402
404
shard_stats,
403
405
model,
404
406
progress,
407
+ shards,
405
408
)
406
409
. await ?;
407
410
}
@@ -670,18 +673,19 @@ impl IngestController {
670
673
shard_stats : ShardStats ,
671
674
model : & mut ControlPlaneModel ,
672
675
progress : & Progress ,
676
+ shards_to_create : usize ,
673
677
) -> MetastoreResult < ( ) > {
674
678
if !model
675
- . acquire_scaling_permits ( & source_uid, ScalingMode :: Up )
679
+ . acquire_scaling_permits ( & source_uid, ScalingMode :: Up ( shards_to_create ) )
676
680
. unwrap_or ( false )
677
681
{
678
682
return Ok ( ( ) ) ;
679
683
}
680
- let new_num_open_shards = shard_stats. num_open_shards + 1 ;
681
- let new_shard_source_uids : HashMap < SourceUid , usize > =
682
- HashMap :: from_iter ( [ ( source_uid. clone ( ) , 1 ) ] ) ;
684
+ let new_num_open_shards = shard_stats. num_open_shards + shards_to_create ;
685
+ let new_shards_per_source : HashMap < SourceUid , usize > =
686
+ HashMap :: from_iter ( [ ( source_uid. clone ( ) , shards_to_create ) ] ) ;
683
687
let successful_source_uids_res = self
684
- . try_open_shards ( new_shard_source_uids , model, & Default :: default ( ) , progress)
688
+ . try_open_shards ( new_shards_per_source , model, & Default :: default ( ) , progress)
685
689
. await ;
686
690
687
691
match successful_source_uids_res {
@@ -691,7 +695,7 @@ impl IngestController {
691
695
if successful_source_uids. is_empty ( ) {
692
696
// We did not manage to create the shard.
693
697
// We can release our permit.
694
- model. release_scaling_permits ( & source_uid, ScalingMode :: Up ) ;
698
+ model. release_scaling_permits ( & source_uid, ScalingMode :: Up ( shards_to_create ) ) ;
695
699
warn ! (
696
700
index_uid=%source_uid. index_uid,
697
701
source_id=%source_uid. source_id,
@@ -715,7 +719,7 @@ impl IngestController {
715
719
source_id=%source_uid. source_id,
716
720
"scaling up number of shards to {new_num_open_shards} failed: {metastore_error:?}"
717
721
) ;
718
- model. release_scaling_permits ( & source_uid, ScalingMode :: Up ) ;
722
+ model. release_scaling_permits ( & source_uid, ScalingMode :: Up ( shards_to_create ) ) ;
719
723
Err ( metastore_error)
720
724
}
721
725
}
@@ -739,12 +743,12 @@ impl IngestController {
739
743
/// The number of successfully open shards is returned.
740
744
async fn try_open_shards (
741
745
& mut self ,
742
- source_uids : HashMap < SourceUid , usize > ,
746
+ shards_per_source : HashMap < SourceUid , usize > ,
743
747
model : & mut ControlPlaneModel ,
744
748
unavailable_leaders : & FnvHashSet < NodeId > ,
745
749
progress : & Progress ,
746
750
) -> MetastoreResult < HashMap < SourceUid , usize > > {
747
- let num_shards: usize = source_uids . values ( ) . sum ( ) ;
751
+ let num_shards: usize = shards_per_source . values ( ) . sum ( ) ;
748
752
749
753
if num_shards == 0 {
750
754
return Ok ( HashMap :: new ( ) ) ;
@@ -756,7 +760,7 @@ impl IngestController {
756
760
return Ok ( HashMap :: new ( ) ) ;
757
761
} ;
758
762
759
- let source_uids_with_multiplicity = source_uids
763
+ let source_uids_with_multiplicity = shards_per_source
760
764
. iter ( )
761
765
. flat_map ( |( source_uid, count) | std:: iter:: repeat ( source_uid) . take ( * count) ) ;
762
766
@@ -1347,6 +1351,7 @@ mod tests {
1347
1351
ingester_pool. clone ( ) ,
1348
1352
replication_factor,
1349
1353
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
1354
+ 1.001 ,
1350
1355
) ;
1351
1356
1352
1357
let mut model = ControlPlaneModel :: default ( ) ;
@@ -1532,6 +1537,7 @@ mod tests {
1532
1537
ingester_pool,
1533
1538
replication_factor,
1534
1539
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
1540
+ 1.001 ,
1535
1541
) ;
1536
1542
1537
1543
let mut model = ControlPlaneModel :: default ( ) ;
@@ -1574,6 +1580,7 @@ mod tests {
1574
1580
ingester_pool,
1575
1581
replication_factor,
1576
1582
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
1583
+ 1.001 ,
1577
1584
) ;
1578
1585
let mut model = ControlPlaneModel :: default ( ) ;
1579
1586
@@ -1624,6 +1631,7 @@ mod tests {
1624
1631
ingester_pool. clone ( ) ,
1625
1632
replication_factor,
1626
1633
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
1634
+ 1.001 ,
1627
1635
) ;
1628
1636
1629
1637
let mut model = ControlPlaneModel :: default ( ) ;
@@ -1806,6 +1814,7 @@ mod tests {
1806
1814
ingester_pool. clone ( ) ,
1807
1815
replication_factor,
1808
1816
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
1817
+ 1.001 ,
1809
1818
) ;
1810
1819
1811
1820
let ingester_id_0 = NodeId :: from ( "test-ingester-0" ) ;
@@ -2029,6 +2038,7 @@ mod tests {
2029
2038
ingester_pool. clone ( ) ,
2030
2039
replication_factor,
2031
2040
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
2041
+ 1.001 ,
2032
2042
) ;
2033
2043
2034
2044
let index_uid = IndexUid :: for_test ( "test-index" , 0 ) ;
@@ -2166,6 +2176,7 @@ mod tests {
2166
2176
ingester_pool. clone ( ) ,
2167
2177
replication_factor,
2168
2178
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
2179
+ 1.001 ,
2169
2180
) ;
2170
2181
2171
2182
let index_uid = IndexUid :: for_test ( "test-index" , 0 ) ;
@@ -2363,6 +2374,7 @@ mod tests {
2363
2374
ingester_pool. clone ( ) ,
2364
2375
replication_factor,
2365
2376
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
2377
+ 1.001 ,
2366
2378
) ;
2367
2379
2368
2380
let index_uid = IndexUid :: for_test ( "test-index" , 0 ) ;
@@ -2487,6 +2499,7 @@ mod tests {
2487
2499
ingester_pool. clone ( ) ,
2488
2500
replication_factor,
2489
2501
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
2502
+ 1.001 ,
2490
2503
) ;
2491
2504
2492
2505
let index_uid = IndexUid :: for_test ( "test-index" , 0 ) ;
@@ -2510,9 +2523,9 @@ mod tests {
2510
2523
2511
2524
let progress = Progress :: default ( ) ;
2512
2525
2513
- // Test could not find leader.
2526
+ // Test could not find leader because no ingester in pool
2514
2527
controller
2515
- . try_scale_up_shards ( source_uid. clone ( ) , shard_stats, & mut model, & progress)
2528
+ . try_scale_up_shards ( source_uid. clone ( ) , shard_stats, & mut model, & progress, 1 )
2516
2529
. await
2517
2530
. unwrap ( ) ;
2518
2531
@@ -2564,21 +2577,21 @@ mod tests {
2564
2577
2565
2578
// Test failed to open shards.
2566
2579
controller
2567
- . try_scale_up_shards ( source_uid. clone ( ) , shard_stats, & mut model, & progress)
2580
+ . try_scale_up_shards ( source_uid. clone ( ) , shard_stats, & mut model, & progress, 1 )
2568
2581
. await
2569
2582
. unwrap ( ) ;
2570
2583
assert_eq ! ( model. all_shards( ) . count( ) , 0 ) ;
2571
2584
2572
2585
// Test failed to init shards.
2573
2586
controller
2574
- . try_scale_up_shards ( source_uid. clone ( ) , shard_stats, & mut model, & progress)
2587
+ . try_scale_up_shards ( source_uid. clone ( ) , shard_stats, & mut model, & progress, 1 )
2575
2588
. await
2576
2589
. unwrap_err ( ) ;
2577
2590
assert_eq ! ( model. all_shards( ) . count( ) , 0 ) ;
2578
2591
2579
2592
// Test successfully opened shard.
2580
2593
controller
2581
- . try_scale_up_shards ( source_uid. clone ( ) , shard_stats, & mut model, & progress)
2594
+ . try_scale_up_shards ( source_uid. clone ( ) , shard_stats, & mut model, & progress, 1 )
2582
2595
. await
2583
2596
. unwrap ( ) ;
2584
2597
assert_eq ! (
@@ -2598,6 +2611,7 @@ mod tests {
2598
2611
ingester_pool. clone ( ) ,
2599
2612
replication_factor,
2600
2613
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
2614
+ 1.001 ,
2601
2615
) ;
2602
2616
2603
2617
let index_uid = IndexUid :: for_test ( "test-index" , 0 ) ;
@@ -2824,6 +2838,7 @@ mod tests {
2824
2838
ingester_pool. clone ( ) ,
2825
2839
replication_factor,
2826
2840
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
2841
+ 1.001 ,
2827
2842
) ;
2828
2843
2829
2844
let index_uid = IndexUid :: for_test ( "test-index" , 0 ) ;
@@ -2907,6 +2922,7 @@ mod tests {
2907
2922
ingester_pool,
2908
2923
replication_factor,
2909
2924
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
2925
+ 1.001 ,
2910
2926
) ;
2911
2927
2912
2928
let mut model = ControlPlaneModel :: default ( ) ;
@@ -2982,6 +2998,7 @@ mod tests {
2982
2998
ingester_pool. clone ( ) ,
2983
2999
replication_factor,
2984
3000
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
3001
+ 1.001 ,
2985
3002
) ;
2986
3003
2987
3004
let closed_shards = controller. close_shards ( Vec :: new ( ) ) . await ;
@@ -3138,6 +3155,7 @@ mod tests {
3138
3155
ingester_pool. clone ( ) ,
3139
3156
replication_factor,
3140
3157
TEST_SHARD_THROUGHPUT_LIMIT_MIB ,
3158
+ 1.001 ,
3141
3159
) ;
3142
3160
3143
3161
let mut model = ControlPlaneModel :: default ( ) ;
0 commit comments