Skip to content

Support zero copy hash repartitioning for Hash Join #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from

Conversation

zebsme
Copy link
Owner

@zebsme zebsme commented Mar 31, 2025

Which issue does this PR close?

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added documentation Improvements or additions to documentation physical-expr common labels Mar 31, 2025
@zebsme
Copy link
Owner Author

zebsme commented Mar 31, 2025

Plan logic already implemented:

explain
SELECT * FROM
(SELECT x+1 AS col0, y+1 AS col1 FROM PAIRS WHERE x == y)
JOIN f
ON col0 = f.a
JOIN s
ON col1 = s.b
----
logical_plan
01)Inner Join: col1 = CAST(s.b AS Int64)
02)--Inner Join: col0 = CAST(f.a AS Int64)
03)----Projection: CAST(pairs.x AS Int64) + Int64(1) AS col0, CAST(pairs.y AS Int64) + Int64(1) AS col1
04)------Filter: pairs.y = pairs.x
05)--------TableScan: pairs projection=[x, y]
06)----TableScan: f projection=[a]
07)--TableScan: s projection=[b]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--HashJoinExec: mode=Partitioned(SelectionVector), join_type=Inner, on=[(col1@1, CAST(s.b AS Int64)@1)], projection=[col0@0, col1@1, a@2, b@3]
03)----ProjectionExec: expr=[col0@1 as col0, col1@2 as col1, a@0 as a]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------HashJoinExec: mode=Partitioned(SelectionVector), join_type=Inner, on=[(CAST(f.a AS Int64)@1, col0@0)], projection=[a@0, col0@2, col1@3]
06)----------RepartitionExec: partitioning=HashSelectionVector([CAST(f.a AS Int64)@1], 16), input_partitions=1
07)------------ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as CAST(f.a AS Int64)]
08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
09)----------RepartitionExec: partitioning=HashSelectionVector([col0@0], 16), input_partitions=16
10)------------ProjectionExec: expr=[CAST(x@0 AS Int64) + 1 as col0, CAST(y@1 AS Int64) + 1 as col1]
11)--------------RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
12)----------------CoalesceBatchesExec: target_batch_size=8192
13)------------------FilterExec: y@1 = x@0
14)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
15)----RepartitionExec: partitioning=HashSelectionVector([CAST(s.b AS Int64)@1], 16), input_partitions=1
16)------ProjectionExec: expr=[b@0 as b, CAST(b@0 AS Int64) as CAST(s.b AS Int64)]
17)--------DataSourceExec: partitions=1, partition_sizes=[1]

@zebsme
Copy link
Owner Author

zebsme commented Apr 6, 2025

The implementation fully supports all join types (INNER, LEFT, RIGHT, FULL, etc.), but profiling reveals a performance degradation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants