Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] explore overriding how broadcast join planning works. #12132

Open
revans2 opened this issue Feb 13, 2025 · 1 comment
Open

[FEA] explore overriding how broadcast join planning works. #12132

revans2 opened this issue Feb 13, 2025 · 1 comment
Labels
feature request New feature or request

Comments

@revans2
Copy link
Collaborator

revans2 commented Feb 13, 2025

Is your feature request related to a problem? Please describe.
Currently in Spark a broadcast join is planned if the size of the data on disk being read is <= spark.sql.autoBroadcastJoinThreshold or after an AQE shuffle the size of the compressed shuffle data <= spark.sql.autoBroadcastJoinThreshold.

This is not great. We often set spark.sql.autoBroadcastJoinThreshold to be larger and see customers do the same because it can speed up a query by a lot. But the auto-tuner currently asks the user (not a hard recommendation) to lower the threshold if it is > 100MB.

But if we look at this from first principals what we want is first to know how much memory is going to actually be used (not how well does the data compress) and second how much memory budget does each task have to devote to holding broadcast data along with how expensive that data transfer would be compared to a regular shuffle.

I am not sure if the plugin can do this or not, but the first step is to see if we can modify how broadcast works using some logical plan rules. The idea would be to have a config that gives a maximum broadcast memory budget for a task. Then we start turning join build side tables into broadcasts starting with the smallest broadcast up to the largest ones until either the task limit is reached or we are out of joins. Each of these should also take into account the data transfer costs.

The cost (time) to transfer data in a shuffled join is effectively the data size of (LHS + RHS)/number of nodes in the cluster. The cost (time) to transfer a broadcast is effectively the size of the build side * 2 * Y This is because the broadcast needs to go back to a single node and then be broadcast back out to all of the nodes. The broadcast uses a bit-torrent like protocol so a Y cost is effectively added for the bit-torrent not really being linear, but we can assume that it is close.

To make this work we need to be able to estimate the size of the data (both LHS and RHS). This is not simple. I think we can start out with using file sizes and some heuristics for cardinality estimation but #12122 would be something that could help out a lot in getting more accurate numbers. We might even be able to extend it to output things like input size on disk -> size decoded in GPU memory -> size output after a few operations (not just row counts). So we can guess at what the data sizes might be at a specific point in the plan to help us decide if a broadcast is worth it or not.

@revans2 revans2 added ? - Needs Triage Need team to review and classify feature request New feature or request labels Feb 13, 2025
@binmahone
Copy link
Collaborator

after an AQE shuffle the size of the compressed shuffle data <= spark.sql.autoBroadcastJoinThreshold.

I think you mean spark.sql.adaptive.autoBroadcastJoinThreshold here ?

@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Feb 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants