Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,7 @@ mod tests {
client: indexer,
indexing_tasks: Vec::new(),
indexing_capacity: CpuCapacity::from_cpu_millis(1_000),
ingester_status: IngesterStatus::Ready,
};
indexer_pool.insert(self_node_id.clone(), indexer_info);

Expand Down Expand Up @@ -1829,6 +1830,7 @@ mod tests {
client,
indexing_tasks: Vec::new(),
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
ingester_status: IngesterStatus::Ready,
};
indexer_pool.insert(indexer_node_info.node_id.clone(), indexer_node_info);
let ingester_pool = IngesterPool::default();
Expand Down Expand Up @@ -1978,6 +1980,7 @@ mod tests {
client,
indexing_tasks: Vec::new(),
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
ingester_status: IngesterStatus::Ready,
};
indexer_pool.insert(indexer_node_info.node_id.clone(), indexer_node_info);
let ingester_pool = IngesterPool::default();
Expand Down Expand Up @@ -2056,6 +2059,7 @@ mod tests {
client,
indexing_tasks: Vec::new(),
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
ingester_status: IngesterStatus::Ready,
};
indexer_pool.insert(indexer_node_info.node_id.clone(), indexer_node_info);
let ingester_pool = IngesterPool::default();
Expand Down Expand Up @@ -2782,6 +2786,7 @@ mod tests {
client: indexer,
indexing_tasks: Vec::new(),
indexing_capacity: CpuCapacity::from_cpu_millis(1_000),
ingester_status: IngesterStatus::Ready,
};
indexer_pool.insert(ingester_id.clone(), indexer_info);

Expand Down
99 changes: 94 additions & 5 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
PIPELINE_THROUGHPUT,
};
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::types::NodeId;
use scheduling::{SourceToSchedule, SourceToScheduleType};
use serde::Serialize;
Expand Down Expand Up @@ -301,7 +302,7 @@ impl IndexingScheduler {

let sources = get_sources_to_schedule(model);

let indexers: Vec<IndexerNodeInfo> = self.get_indexers_from_indexer_pool();
let indexers: Vec<IndexerNodeInfo> = self.select_available_indexers_for_scheduling();

let indexer_id_to_cpu_capacities: FnvHashMap<String, CpuCapacity> = indexers
.iter()
Expand Down Expand Up @@ -366,7 +367,7 @@ impl IndexingScheduler {
{
return;
}
let indexers: Vec<IndexerNodeInfo> = self.get_indexers_from_indexer_pool();
let indexers: Vec<IndexerNodeInfo> = self.select_available_indexers_for_scheduling();
let running_indexing_tasks_by_node_id: FnvHashMap<String, Vec<IndexingTask>> = indexers
.iter()
.map(|indexer| (indexer.node_id.to_string(), indexer.indexing_tasks.clone()))
Expand All @@ -386,8 +387,23 @@ impl IndexingScheduler {
}
}

fn get_indexers_from_indexer_pool(&self) -> Vec<IndexerNodeInfo> {
self.indexer_pool.values()
fn select_available_indexers_for_scheduling(&self) -> Vec<IndexerNodeInfo> {
let (ready, not_ready): (Vec<IndexerNodeInfo>, Vec<IndexerNodeInfo>) = self
.indexer_pool
.values()
.into_iter()
.partition(|indexer| indexer.ingester_status == IngesterStatus::Ready);

if ready.is_empty() {
// Allow scheduling on retiring indexers to drain shards
// and avoid decommission timeouts (e.g. single-node cluster).
warn!(
"no ready indexer available, falling back to retiring indexers for shard draining"
);
not_ready
} else {
ready
}
}

fn apply_physical_indexing_plan(
Expand Down Expand Up @@ -1090,9 +1106,82 @@ mod tests {
}

use quickwit_config::SourceInputFormat;
use quickwit_proto::indexing::mcpu;
use quickwit_proto::indexing::{CpuCapacity, IndexingServiceClient, MockIndexingService, mcpu};
use quickwit_proto::ingest::{Shard, ShardState};

fn mock_indexer_node_info(node_id: &str, status: IngesterStatus) -> IndexerNodeInfo {
let mock_indexer = MockIndexingService::new();
let client = IndexingServiceClient::from_mock(mock_indexer);
IndexerNodeInfo {
node_id: NodeId::from(node_id.to_string()),
generation_id: 0,
client,
indexing_tasks: Vec::new(),
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
ingester_status: status,
}
}

#[test]
fn test_select_available_indexers_returns_only_ready_when_available() {
let indexer_pool = IndexerPool::default();
let ready_indexer = mock_indexer_node_info("indexer-ready-1", IngesterStatus::Ready);
let ready_indexer_2 = mock_indexer_node_info("indexer-ready-2", IngesterStatus::Ready);
let retiring_indexer = mock_indexer_node_info("indexer-retiring", IngesterStatus::Retiring);
indexer_pool.insert(ready_indexer.node_id.clone(), ready_indexer);
indexer_pool.insert(ready_indexer_2.node_id.clone(), ready_indexer_2);
indexer_pool.insert(retiring_indexer.node_id.clone(), retiring_indexer);

let scheduler = IndexingScheduler::new(
"test-cluster".to_string(),
NodeId::from("control-plane".to_string()),
indexer_pool,
);
let selected = scheduler.select_available_indexers_for_scheduling();

assert_eq!(selected.len(), 2);
assert!(
selected
.iter()
.all(|i| i.ingester_status == IngesterStatus::Ready)
);
}

#[test]
fn test_select_available_indexers_falls_back_to_retiring_when_no_ready() {
let indexer_pool = IndexerPool::default();
let retiring_1 = mock_indexer_node_info("indexer-retiring-1", IngesterStatus::Retiring);
let retiring_2 = mock_indexer_node_info("indexer-retiring-2", IngesterStatus::Retiring);
indexer_pool.insert(retiring_1.node_id.clone(), retiring_1);
indexer_pool.insert(retiring_2.node_id.clone(), retiring_2);

let scheduler = IndexingScheduler::new(
"test-cluster".to_string(),
NodeId::from("control-plane".to_string()),
indexer_pool,
);
let selected = scheduler.select_available_indexers_for_scheduling();

assert_eq!(selected.len(), 2);
assert!(
selected
.iter()
.all(|i| i.ingester_status == IngesterStatus::Retiring)
);
}

#[test]
fn test_select_available_indexers_returns_empty_when_pool_is_empty() {
let indexer_pool = IndexerPool::default();
let scheduler = IndexingScheduler::new(
"test-cluster".to_string(),
NodeId::from("control-plane".to_string()),
indexer_pool,
);
let selected = scheduler.select_available_indexers_for_scheduling();
assert!(selected.is_empty());
}

fn kafka_source_params_for_test() -> SourceParams {
SourceParams::Kafka(KafkaSourceParams {
topic: "topic".to_string(),
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) mod model;

use quickwit_common::tower::Pool;
use quickwit_proto::indexing::{CpuCapacity, IndexingServiceClient, IndexingTask};
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::types::NodeId;

/// Indexer-node specific information stored in the pool of available indexer nodes
Expand All @@ -31,6 +32,7 @@ pub struct IndexerNodeInfo {
pub client: IndexingServiceClient,
pub indexing_tasks: Vec<IndexingTask>,
pub indexing_capacity: CpuCapacity,
pub ingester_status: IngesterStatus,
}

pub type IndexerPool = Pool<NodeId, IndexerNodeInfo>;
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub fn test_indexer_change_stream(
client,
indexing_tasks,
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
ingester_status: node.ingester_status(),
},
);
Some(change)
Expand Down
39 changes: 39 additions & 0 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,17 @@ fn setup_indexer_pool(
);
Some(change)
}
ClusterChange::Update { previous, updated }
if updated.is_indexer()
&& previous.ingester_status() != updated.ingester_status() =>
{
let change = build_indexer_insert_change(
&updated,
indexing_service_clone_opt,
grpc_max_message_size,
);
Some(change)
}
ClusterChange::Remove(node) if node.is_indexer() => {
let change = build_indexer_remove_change(&node);
Some(change)
Expand Down Expand Up @@ -1305,6 +1316,7 @@ fn build_indexer_insert_change(
client,
indexing_tasks: node.indexing_tasks().to_vec(),
indexing_capacity: node.indexing_capacity(),
ingester_status: node.ingester_status(),
},
)
}
Expand Down Expand Up @@ -1659,6 +1671,33 @@ mod tests {

assert_eq!(indexer_pool.len(), 1);

// changing the ingester status of an indexer node refreshes the indexer pool
let updated_indexer_node = ClusterNode::for_test(
"test-indexer-node",
1,
true,
&["indexer"],
&[],
IngesterStatus::Retiring,
)
.await;
cluster_change_stream_tx
.send(ClusterChange::Update {
previous: new_indexer_node.clone(),
updated: updated_indexer_node.clone(),
})
.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;

assert_eq!(indexer_pool.len(), 1);
assert_eq!(
indexer_pool
.get(&NodeId::from("test-indexer-node"))
.expect("indexer node should be in the pool")
.ingester_status,
IngesterStatus::Retiring
);

// removing an indexer node refreshes the indexer pool
cluster_change_stream_tx
.send(ClusterChange::Remove(new_indexer_node))
Expand Down
Loading