Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ service SearchService {

// Describe how a search would be processed.
rpc SearchPlan(SearchRequest) returns (SearchPlanResponse);

// Returns the current load of this searcher node.
rpc GetLoad(GetLoadRequest) returns (GetLoadResponse);
}

/// Scroll Request
Expand Down Expand Up @@ -110,6 +113,15 @@ message ReportSplitsRequest {

message ReportSplitsResponse {}

message GetLoadRequest {}

message GetLoadResponse {
// Current load expressed as the sum of job costs (same arbitrary unit as
// Job::cost() in the search job placer) across all queued and active tasks
// in the SearchPermitProvider.
uint64 load_job_cost = 1;
}

// -- ListFields -------------------

message ListFieldsRequest {
Expand Down
87 changes: 87 additions & 0 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions quickwit/quickwit-search/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ enum SearchServiceClientImpl {
pub struct SearchServiceClient {
client_impl: SearchServiceClientImpl,
grpc_addr: SocketAddr,
/// In test/testsuite builds, overrides the load returned by `get_load()` for local clients,
/// so that tests using mock services don't need to set up `get_load` expectations.
#[cfg(any(test, feature = "testsuite"))]
test_load: Option<usize>,
}

impl fmt::Debug for SearchServiceClient {
Expand All @@ -74,6 +78,8 @@ impl SearchServiceClient {
SearchServiceClient {
client_impl: SearchServiceClientImpl::Grpc(client),
grpc_addr,
#[cfg(any(test, feature = "testsuite"))]
test_load: None,
}
}

Expand All @@ -82,9 +88,21 @@ impl SearchServiceClient {
SearchServiceClient {
client_impl: SearchServiceClientImpl::Local(service),
grpc_addr,
#[cfg(any(test, feature = "testsuite"))]
test_load: None,
}
}

/// Sets the load to return from `get_load()` for this client in test/testsuite builds.
///
/// This short-circuits the call to the underlying service so that mock services
/// do not need to set up `get_load` expectations in tests unrelated to load-aware placement.
#[cfg(any(test, feature = "testsuite"))]
pub fn with_test_load(mut self, load: usize) -> Self {
self.test_load = Some(load);
self
}

/// Return the grpc_addr the underlying client connects to.
pub fn grpc_addr(&self) -> SocketAddr {
self.grpc_addr
Expand Down Expand Up @@ -218,6 +236,32 @@ impl SearchServiceClient {
Ok(())
}

/// Returns the current load of the targeted node, expressed as the sum of job costs
/// across all queued and active tasks in its SearchPermitProvider.
pub async fn get_load(&mut self) -> crate::Result<usize> {
// In test/testsuite builds, short-circuit for local clients so that mock services
// do not need a `get_load` expectation in tests unrelated to load-aware placement.
#[cfg(any(test, feature = "testsuite"))]
if let SearchServiceClientImpl::Local(_) = &self.client_impl {
return Ok(self.test_load.unwrap_or(0));
}
match &mut self.client_impl {
SearchServiceClientImpl::Local(service) => Ok(service.get_load().await),
SearchServiceClientImpl::Grpc(grpc_client) => {
match grpc_client
.get_load(quickwit_proto::search::GetLoadRequest {})
.await
{
Ok(response) => Ok(response.into_inner().load_job_cost as usize),
// Older searcher nodes do not implement `get_load`. To
// preserve a smooth upgrade path, treat them as unloaded
Err(tonic_error) if tonic_error.code() == tonic::Code::Unimplemented => Ok(0),
Err(tonic_error) => Err(parse_grpc_error(&tonic_error)),
}
}
}
}

/// Indexers call report_splits to inform searchers node about the presence of a split, which
/// would then be considered as a candidate for the searcher split cache.
pub async fn report_splits(&mut self, report_splits_request: ReportSplitsRequest) {
Expand Down
13 changes: 9 additions & 4 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1547,15 +1547,20 @@ async fn schedule_search_tasks(
mut splits: Vec<(SplitIdAndFooterOffsets, SearchRequest)>,
searcher_context: &SearcherContext,
) -> ScheduleSearchTaskResult {
let permit_sizes: Vec<ByteSize> = splits
let task_metadata: Vec<crate::search_permit_provider::SplitSearchTaskMetadata> = splits
.iter()
.map(|(split, _)| {
compute_initial_memory_allocation(
let memory_allocation = compute_initial_memory_allocation(
split,
searcher_context
.searcher_config
.warmup_single_split_initial_allocation,
)
);
let job_cost = crate::root::compute_split_cost(split.num_docs);
crate::search_permit_provider::SplitSearchTaskMetadata {
memory_allocation,
job_cost,
}
})
.collect();

Expand All @@ -1569,7 +1574,7 @@ async fn schedule_search_tasks(

let search_permit_futures = searcher_context
.search_permit_provider
.get_permits_with_offload(permit_sizes, offload_threshold)
.get_permits_with_offload(task_metadata, offload_threshold)
.await;

let splits_to_run_on_lambda: Vec<(SplitIdAndFooterOffsets, SearchRequest)> =
Expand Down
14 changes: 9 additions & 5 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::ops::Bound;
use std::sync::Arc;

use anyhow::Context;
use bytesize::ByteSize;
use futures::future::try_join_all;
use itertools::{Either, Itertools};
use quickwit_common::pretty::PrettySample;
Expand Down Expand Up @@ -327,23 +326,28 @@ pub async fn leaf_list_terms(
splits: &[SplitIdAndFooterOffsets],
) -> Result<LeafListTermsResponse, SearchError> {
info!(split_offsets = ?PrettySample::new(splits, 5));
let permit_sizes: Vec<ByteSize> = splits
let task_metadata: Vec<crate::search_permit_provider::SplitSearchTaskMetadata> = splits
.iter()
.map(|split| {
compute_initial_memory_allocation(
let memory_allocation = compute_initial_memory_allocation(
split,
searcher_context
.searcher_config
.warmup_single_split_initial_allocation,
)
);
let job_cost = crate::root::compute_split_cost(split.num_docs);
crate::search_permit_provider::SplitSearchTaskMetadata {
memory_allocation,
job_cost,
}
})
.collect();
// We have added offloading leaf search to lambdas, but not for list_terms yet.
// TODO (Add it)
// https://github.com/quickwit-oss/quickwit/issues/6150
let permits = searcher_context
.search_permit_provider
.get_permits(permit_sizes)
.get_permits(task_metadata)
.await;
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
Expand Down
9 changes: 5 additions & 4 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl<'a> From<&'a SplitMetadata> for SearchJob {
fn from(split_metadata: &'a SplitMetadata) -> Self {
SearchJob {
index_uid: split_metadata.index_uid.clone(),
cost: compute_split_cost(split_metadata),
cost: compute_split_cost(split_metadata.num_docs as u64),
offsets: extract_split_and_footer_offsets(split_metadata),
}
}
Expand Down Expand Up @@ -1674,18 +1674,19 @@ async fn assign_client_fetch_docs_jobs(
fetch_docs_req_jobs.push(fetch_docs_job);
}

// don't do a second call to GetLoad to place fetch_docs jobs
let assigned_jobs = client_pool
.assign_jobs(fetch_docs_req_jobs, &HashSet::new())
.assign_jobs_ignoring_load(fetch_docs_req_jobs, &HashSet::new())
.await?;

Ok(assigned_jobs)
}

// Measure the cost associated to searching in a given split metadata.
fn compute_split_cost(split_metadata: &SplitMetadata) -> usize {
pub(crate) fn compute_split_cost(num_docs: u64) -> usize {
// TODO this formula could be tuned a lot more. The general idea is that there is a fixed
// cost to searching a split, plus a somewhat-linear cost depending on the size of the split
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should include if we have aggregations or not, or ideally a cost from the query

5 + split_metadata.num_docs / 100_000
5 + (num_docs / 100_000) as usize
}

/// Builds a LeafSearchRequest to one node, from a list of [`SearchJob`].
Expand Down
Loading
Loading