Skip to content

Commit 4ac9b55

Browse files
authored
Fix CoalescePartitionsExec proto serialization (#15824)
* add fetch to CoalescePartitionsExecNode * gen proto code * Add test * fix * fix build * Fix test build * remove comments
1 parent b8b6214 commit 4ac9b55

File tree

10 files changed

+62
-16
lines changed

10 files changed

+62
-16
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3447,9 +3447,8 @@ fn test_parallelize_sort_preserves_fetch() -> Result<()> {
34473447
let schema = create_test_schema3()?;
34483448
let parquet_exec = parquet_exec(&schema);
34493449
let coalesced = Arc::new(CoalescePartitionsExec::new(parquet_exec.clone()));
3450-
let top_coalesced = CoalescePartitionsExec::new(coalesced.clone())
3451-
.with_fetch(Some(10))
3452-
.unwrap();
3450+
let top_coalesced =
3451+
Arc::new(CoalescePartitionsExec::new(coalesced.clone()).with_fetch(Some(10)));
34533452

34543453
let requirements = PlanWithCorrespondingCoalescePartitions::new(
34553454
top_coalesced.clone(),

datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,9 +1272,8 @@ fn test_plan_with_order_preserving_variants_preserves_fetch() -> Result<()> {
12721272
"a", &schema,
12731273
)];
12741274
let parquet_exec = parquet_exec_sorted(&schema, parquet_sort_exprs);
1275-
let coalesced = CoalescePartitionsExec::new(parquet_exec.clone())
1276-
.with_fetch(Some(10))
1277-
.unwrap();
1275+
let coalesced =
1276+
Arc::new(CoalescePartitionsExec::new(parquet_exec.clone()).with_fetch(Some(10)));
12781277

12791278
// Test sort's fetch is greater than coalesce fetch, return error because it's not reasonable
12801279
let requirements = OrderPreservationContext::new(

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,10 +1035,9 @@ pub fn replace_order_preserving_variants(
10351035

10361036
if is_sort_preserving_merge(&context.plan) {
10371037
let child_plan = Arc::clone(&context.children[0].plan);
1038-
// It's safe to unwrap because `CoalescePartitionsExec` supports `fetch`.
1039-
context.plan = CoalescePartitionsExec::new(child_plan)
1040-
.with_fetch(context.plan.fetch())
1041-
.unwrap();
1038+
context.plan = Arc::new(
1039+
CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()),
1040+
);
10421041
return Ok(context);
10431042
} else if let Some(repartition) =
10441043
context.plan.as_any().downcast_ref::<RepartitionExec>()

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,10 +409,10 @@ pub fn parallelize_sorts(
409409

410410
Ok(Transformed::yes(
411411
PlanWithCorrespondingCoalescePartitions::new(
412-
// Safe to unwrap, because `CoalescePartitionsExec` has a fetch
413-
CoalescePartitionsExec::new(Arc::clone(&requirements.plan))
414-
.with_fetch(fetch)
415-
.unwrap(),
412+
Arc::new(
413+
CoalescePartitionsExec::new(Arc::clone(&requirements.plan))
414+
.with_fetch(fetch),
415+
),
416416
false,
417417
vec![requirements],
418418
),

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ impl CoalescePartitionsExec {
5959
}
6060
}
6161

62+
/// Update fetch with the argument
63+
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
64+
self.fetch = fetch;
65+
self
66+
}
67+
6268
/// Input execution plan
6369
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
6470
&self.input

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1217,6 +1217,7 @@ message CoalesceBatchesExecNode {
12171217

12181218
message CoalescePartitionsExecNode {
12191219
PhysicalPlanNode input = 1;
1220+
optional uint32 fetch = 2;
12201221
}
12211222

12221223
message PhysicalHashRepartition {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,10 @@ impl protobuf::PhysicalPlanNode {
792792
) -> Result<Arc<dyn ExecutionPlan>> {
793793
let input: Arc<dyn ExecutionPlan> =
794794
into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
795-
Ok(Arc::new(CoalescePartitionsExec::new(input)))
795+
Ok(Arc::new(
796+
CoalescePartitionsExec::new(input)
797+
.with_fetch(merge.fetch.map(|f| f as usize)),
798+
))
796799
}
797800

798801
fn try_into_repartition_physical_plan(
@@ -2354,6 +2357,7 @@ impl protobuf::PhysicalPlanNode {
23542357
physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
23552358
protobuf::CoalescePartitionsExecNode {
23562359
input: Some(Box::new(input)),
2360+
fetch: exec.fetch().map(|f| f as u32),
23572361
},
23582362
))),
23592363
})

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ use datafusion::physical_plan::aggregates::{
6666
AggregateExec, AggregateMode, PhysicalGroupBy,
6767
};
6868
use datafusion::physical_plan::analyze::AnalyzeExec;
69+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
6970
use datafusion::physical_plan::empty::EmptyExec;
7071
use datafusion::physical_plan::expressions::{
7172
binary, cast, col, in_list, like, lit, BinaryExpr, Column, NotExpr, PhysicalSortExpr,
@@ -709,7 +710,7 @@ fn roundtrip_sort_preserve_partitioning() -> Result<()> {
709710
}
710711

711712
#[test]
712-
fn roundtrip_coalesce_with_fetch() -> Result<()> {
713+
fn roundtrip_coalesce_batches_with_fetch() -> Result<()> {
713714
let field_a = Field::new("a", DataType::Boolean, false);
714715
let field_b = Field::new("b", DataType::Int64, false);
715716
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
@@ -725,6 +726,22 @@ fn roundtrip_coalesce_with_fetch() -> Result<()> {
725726
))
726727
}
727728

729+
#[test]
730+
fn roundtrip_coalesce_partitions_with_fetch() -> Result<()> {
731+
let field_a = Field::new("a", DataType::Boolean, false);
732+
let field_b = Field::new("b", DataType::Int64, false);
733+
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
734+
735+
roundtrip_test(Arc::new(CoalescePartitionsExec::new(Arc::new(
736+
EmptyExec::new(schema.clone()),
737+
))))?;
738+
739+
roundtrip_test(Arc::new(
740+
CoalescePartitionsExec::new(Arc::new(EmptyExec::new(schema)))
741+
.with_fetch(Some(10)),
742+
))
743+
}
744+
728745
#[test]
729746
fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
730747
let file_schema =

0 commit comments

Comments
 (0)