Skip to content

Commit b914409

Browse files
authored
[Bug Fix]: Deem hash repartition unnecessary when input and output has 1 partition (#10095)
* Add input partition number check * Minor changes
1 parent 417b928 commit b914409

File tree

2 files changed

+103
-1
lines changed

2 files changed

+103
-1
lines changed

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,8 @@ fn add_hash_on_top(
875875
n_target: usize,
876876
) -> Result<DistributionContext> {
877877
// Early return if hash repartition is unnecessary
878-
if n_target == 1 {
878+
// `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
879+
if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
879880
return Ok(input);
880881
}
881882

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3587,3 +3587,104 @@ SELECT 1 FROM join_partitioned_table JOIN (SELECT c1 AS id1 FROM join_partitione
35873587
1
35883588
1
35893589
1
3590+
3591+
3592+
statement ok
3593+
set datafusion.explain.logical_plan_only = false;
3594+
3595+
query TT
3596+
EXPLAIN SELECT * FROM (
3597+
SELECT 1 as c, 2 as d
3598+
UNION ALL
3599+
SELECT 1 as c, 3 AS d
3600+
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
3601+
----
3602+
logical_plan
3603+
01)Projection: a.c, a.d, rhs.e, rhs.f
3604+
02)--Full Join: a.c = rhs.e
3605+
03)----SubqueryAlias: a
3606+
04)------Union
3607+
05)--------Projection: Int64(1) AS c, Int64(2) AS d
3608+
06)----------EmptyRelation
3609+
07)--------Projection: Int64(1) AS c, Int64(3) AS d
3610+
08)----------EmptyRelation
3611+
09)----SubqueryAlias: rhs
3612+
10)------Projection: Int64(1) AS e, Int64(3) AS f
3613+
11)--------EmptyRelation
3614+
physical_plan
3615+
01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f]
3616+
02)--CoalesceBatchesExec: target_batch_size=2
3617+
03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)]
3618+
04)------CoalesceBatchesExec: target_batch_size=2
3619+
05)--------RepartitionExec: partitioning=Hash([e@0], 2), input_partitions=1
3620+
06)----------ProjectionExec: expr=[1 as e, 3 as f]
3621+
07)------------PlaceholderRowExec
3622+
08)------CoalesceBatchesExec: target_batch_size=2
3623+
09)--------RepartitionExec: partitioning=Hash([c@0], 2), input_partitions=2
3624+
10)----------UnionExec
3625+
11)------------ProjectionExec: expr=[1 as c, 2 as d]
3626+
12)--------------PlaceholderRowExec
3627+
13)------------ProjectionExec: expr=[1 as c, 3 as d]
3628+
14)--------------PlaceholderRowExec
3629+
3630+
query IIII
3631+
SELECT * FROM (
3632+
SELECT 1 as c, 2 as d
3633+
UNION ALL
3634+
SELECT 1 as c, 3 AS d
3635+
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
3636+
----
3637+
1 2 1 3
3638+
1 3 1 3
3639+
3640+
statement ok
3641+
set datafusion.execution.target_partitions = 1;
3642+
3643+
query TT
3644+
EXPLAIN SELECT * FROM (
3645+
SELECT 1 as c, 2 as d
3646+
UNION ALL
3647+
SELECT 1 as c, 3 AS d
3648+
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
3649+
----
3650+
logical_plan
3651+
01)Projection: a.c, a.d, rhs.e, rhs.f
3652+
02)--Full Join: a.c = rhs.e
3653+
03)----SubqueryAlias: a
3654+
04)------Union
3655+
05)--------Projection: Int64(1) AS c, Int64(2) AS d
3656+
06)----------EmptyRelation
3657+
07)--------Projection: Int64(1) AS c, Int64(3) AS d
3658+
08)----------EmptyRelation
3659+
09)----SubqueryAlias: rhs
3660+
10)------Projection: Int64(1) AS e, Int64(3) AS f
3661+
11)--------EmptyRelation
3662+
physical_plan
3663+
01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f]
3664+
02)--CoalesceBatchesExec: target_batch_size=2
3665+
03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)]
3666+
04)------ProjectionExec: expr=[1 as e, 3 as f]
3667+
05)--------PlaceholderRowExec
3668+
06)------CoalesceBatchesExec: target_batch_size=2
3669+
07)--------RepartitionExec: partitioning=Hash([c@0], 1), input_partitions=2
3670+
08)----------UnionExec
3671+
09)------------ProjectionExec: expr=[1 as c, 2 as d]
3672+
10)--------------PlaceholderRowExec
3673+
11)------------ProjectionExec: expr=[1 as c, 3 as d]
3674+
12)--------------PlaceholderRowExec
3675+
3676+
query IIII
3677+
SELECT * FROM (
3678+
SELECT 1 as c, 2 as d
3679+
UNION ALL
3680+
SELECT 1 as c, 3 AS d
3681+
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
3682+
----
3683+
1 2 1 3
3684+
1 3 1 3
3685+
3686+
statement ok
3687+
set datafusion.explain.logical_plan_only = true;
3688+
3689+
statement ok
3690+
set datafusion.execution.target_partitions = 2;

0 commit comments

Comments
 (0)