Skip to content

Commit b8b6214

Browse files
authored
Fix: fetch is missing in plan_with_order_breaking_variants method (#15842)
1 parent 1fe856b commit b8b6214

File tree

2 files changed

+37
-6
lines changed

2 files changed

+37
-6
lines changed

datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@ use datafusion_common::{assert_contains, Result};
4545
use datafusion_expr::{JoinType, Operator};
4646
use datafusion_physical_expr::expressions::{self, col, Column};
4747
use datafusion_physical_expr::PhysicalSortExpr;
48-
use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{plan_with_order_preserving_variants, replace_with_order_preserving_variants, OrderPreservationContext};
48+
use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{plan_with_order_breaking_variants, plan_with_order_preserving_variants, replace_with_order_preserving_variants, OrderPreservationContext};
4949
use datafusion_common::config::ConfigOptions;
5050

5151
use crate::physical_optimizer::enforce_sorting::parquet_exec_sorted;
52+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
53+
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
5254
use object_store::memory::InMemory;
5355
use object_store::ObjectStore;
5456
use rstest::rstest;
@@ -1310,3 +1312,29 @@ fn test_plan_with_order_preserving_variants_preserves_fetch() -> Result<()> {
13101312
assert_eq!(res.plan.fetch(), Some(5),);
13111313
Ok(())
13121314
}
1315+
1316+
#[test]
1317+
fn test_plan_with_order_breaking_variants_preserves_fetch() -> Result<()> {
1318+
let schema = create_test_schema3()?;
1319+
let parquet_sort_exprs = vec![crate::physical_optimizer::test_utils::sort_expr(
1320+
"a", &schema,
1321+
)];
1322+
let parquet_exec = parquet_exec_sorted(&schema, parquet_sort_exprs.clone());
1323+
let spm = SortPreservingMergeExec::new(
1324+
LexOrdering::new(parquet_sort_exprs),
1325+
parquet_exec.clone(),
1326+
)
1327+
.with_fetch(Some(10));
1328+
let requirements = OrderPreservationContext::new(
1329+
Arc::new(spm),
1330+
true,
1331+
vec![OrderPreservationContext::new(
1332+
parquet_exec.clone(),
1333+
true,
1334+
vec![],
1335+
)],
1336+
);
1337+
let res = plan_with_order_breaking_variants(requirements)?;
1338+
assert_eq!(res.plan.fetch(), Some(10));
1339+
Ok(())
1340+
}

datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion_physical_plan::execution_plan::EmissionType;
3434
use datafusion_physical_plan::repartition::RepartitionExec;
3535
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
3636
use datafusion_physical_plan::tree_node::PlanContext;
37-
use datafusion_physical_plan::ExecutionPlanProperties;
37+
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3838

3939
use itertools::izip;
4040

@@ -167,7 +167,7 @@ pub fn plan_with_order_preserving_variants(
167167
/// Calculates the updated plan by replacing operators that preserve ordering
168168
/// inside `sort_input` with their order-breaking variants. This will restore
169169
/// the original plan modified by [`plan_with_order_preserving_variants`].
170-
fn plan_with_order_breaking_variants(
170+
pub fn plan_with_order_breaking_variants(
171171
mut sort_input: OrderPreservationContext,
172172
) -> Result<OrderPreservationContext> {
173173
let plan = &sort_input.plan;
@@ -202,10 +202,13 @@ fn plan_with_order_breaking_variants(
202202
let partitioning = plan.output_partitioning().clone();
203203
sort_input.plan = Arc::new(RepartitionExec::try_new(child, partitioning)?) as _;
204204
} else if is_sort_preserving_merge(plan) {
205-
// Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`:
205+
// Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`
206+
// SPM may have `fetch`, so pass it to the `CoalescePartitionsExec`
206207
let child = Arc::clone(&sort_input.children[0].plan);
207-
let coalesce = CoalescePartitionsExec::new(child);
208-
sort_input.plan = Arc::new(coalesce) as _;
208+
let coalesce = CoalescePartitionsExec::new(child)
209+
.with_fetch(plan.fetch())
210+
.unwrap();
211+
sort_input.plan = coalesce;
209212
} else {
210213
return sort_input.update_plan_from_children();
211214
}

0 commit comments

Comments
 (0)