Skip to content

Fuel flat_map #32410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ def get_default_system_parameters(
"enable_columnar_lgalloc",
"compute_server_maintenance_interval",
"compute_dataflow_max_inflight_bytes_cc",
"compute_flat_map_fuel",
"consolidating_vec_growth_dampener",
"copy_to_s3_parquet_row_group_file_ratio",
"copy_to_s3_arrow_builder_buffer_ratio",
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ def __init__(
"compute_server_maintenance_interval",
"compute_dataflow_max_inflight_bytes",
"compute_dataflow_max_inflight_bytes_cc",
"compute_flat_map_fuel",
"consolidating_vec_growth_dampener",
"compute_hydration_concurrency",
"copy_to_s3_parquet_row_group_file_ratio",
Expand Down
9 changes: 9 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ pub const COMPUTE_APPLY_COLUMN_DEMANDS: Config<bool> = Config::new(
"When enabled, passes applys column demands to the RelationDesc used to read out of Persist.",
);

/// The amount of output the flat-map operator produces before yielding. Set to a high value to
/// avoid yielding, or to a low value to yield frequently.
pub const COMPUTE_FLAT_MAP_FUEL: Config<usize> = Config::new(
"compute_flat_map_fuel",
1_000_000,
"The amount of output the flat-map operator produces before yielding.",
);

/// Whether to render `as_specific_collection` using a fueled flat-map operator.
pub const ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION: Config<bool> = Config::new(
"enable_compute_render_fueled_as_specific_collection",
Expand Down Expand Up @@ -355,6 +363,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&ENABLE_COMPUTE_REPLICA_EXPIRATION)
.add(&COMPUTE_REPLICA_EXPIRATION_OFFSET)
.add(&COMPUTE_APPLY_COLUMN_DEMANDS)
.add(&COMPUTE_FLAT_MAP_FUEL)
.add(&CONSOLIDATING_VEC_GROWTH_DAMPENER)
.add(&ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION)
.add(&ENABLE_COMPUTE_LOGICAL_BACKPRESSURE)
Expand Down
25 changes: 22 additions & 3 deletions src/compute/src/render/flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0.

use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL;
use mz_expr::MfpPlan;
use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
use mz_repr::{DatumVec, RowArena, SharedRow};
Expand Down Expand Up @@ -41,15 +42,26 @@ where
let (ok_collection, err_collection) =
input.as_specific_collection(input_key.as_deref(), &self.config_set);
let stream = ok_collection.inner;
let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, _| {
let scope = input.scope();

// Budget to limit the number of rows processed in a single invocation.
//
// The current implementation can only yield between input batches, but not from within
// a batch. A `generate_series` can still cause unavailability if it generates many rows.
let budget = COMPUTE_FLAT_MAP_FUEL.get(&self.config_set);

let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, info| {
let activator = scope.activator_for(info.address);
Box::new(move |input, ok_output, err_output| {
let mut datums = DatumVec::new();
let mut datums_mfp = DatumVec::new();

// Buffer for extensions to `input_row`.
let mut table_func_output = Vec::new();

input.for_each(|cap, data| {
let mut budget = budget;

while let Some((cap, data)) = input.next() {
let mut ok_session = ok_output.session_with_builder(&cap);
let mut err_session = err_output.session_with_builder(&cap);

Expand Down Expand Up @@ -92,11 +104,16 @@ where
&until,
&mut ok_session,
&mut err_session,
&mut budget,
);
table_func_output.clear();
}
}
})
if budget == 0 {
activator.activate();
break;
}
}
})
});

Expand Down Expand Up @@ -129,6 +146,7 @@ fn drain_through_mfp<T>(
ConsolidatingContainerBuilder<Vec<(DataflowError, T, Diff)>>,
Counter<T, Vec<(DataflowError, T, Diff)>, Tee<T, Vec<(DataflowError, T, Diff)>>>,
>,
budget: &mut usize,
) where
T: crate::render::RenderTimestamp,
{
Expand Down Expand Up @@ -156,6 +174,7 @@ fn drain_through_mfp<T>(
);

for result in results {
*budget = budget.saturating_sub(1);
match result {
Ok((row, event_time, diff)) => {
// Copy the whole time, and re-populate event time.
Expand Down