Spark: Add data file path for range partition when rewriting manifests#15310
Spark: Add data file path for range partition when rewriting manifests#15310lirui-apache wants to merge 1 commit intoapache:mainfrom
Conversation
b84dff1 to
ee17919
Compare
There was a problem hiding this comment.
Pull request overview
This PR addresses insufficient parallelism when rewriting manifests for partitioned tables with low partition cardinality. When many files exist in a single partition, the previous implementation would not distribute work effectively across reduce tasks. The solution adds data_file.file_path as a secondary column in the range partitioning operation while maintaining clustering by partition in the final sort.
Changes:
- Modified
repartitionAndSortto includedata_file.file_pathin range partitioning for better parallelism - Added a test to verify that reduce tasks are properly utilized even with 10,000 files in a single partition
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java | Added DATA_FILE_PATH_COLUMN_NAME constant and modified repartitionAndSort to include file_path in range partitioning |
| spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java | Added test with SparkListener to verify parallelism with 10,000 files in single partition |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| private Dataset<Row> repartitionAndSort(Dataset<Row> df, Column col, int numPartitions) { | ||
| return df.repartitionByRange(numPartitions, col).sortWithinPartitions(col); | ||
| // add file path for range partition to make sure we have enough parallelism |
There was a problem hiding this comment.
I can see how the current implementation doesn't leverage full parallelism when there is very low cardinality on partition values. My understanding of Range partitioning is that it primarily looks at trying to identify partitions with lots of values and tries to balance partition sizes, but in case of low cardinality values to begin with it doesn't add to the parallelism (that math.min(partitions, candidates.size) looks like it'd be bounded by the the cardinality of iceberg partition values since the number of candidates would be small.
I don't think AQE would also help here since my understanding is that it'll help coalesce too many small partitions, help with splitting partitions during a join etc. Don't think it would help in this case.
I think the proposed solution for by adding a higher cardinality column like file path to the repartition by range is a good way of adding parallelism here, and I can't imagine it regressing any existing cases (since the repartitionByRange already does balancing within partitions) though someone knowledgable here should chime in in case there's more implications. cc @aokolnychyi @RussellSpitzer @rdblue @singhpk234 @huaxingao
There was a problem hiding this comment.
This makes sense to me. repartitionByRange(data_file.partition) can lose parallelism when there are only a few partition values (for example, everything is in one partition). I don't think AQE fixes this for a range repartition. Spark mainly splits “big/skewed” shuffle partitions for joins, or for rebalance(...) / RebalancePartitions. I don’t think it does this for repartitionByRange. Adding data_file.file_path as a second range column looks like a good, deterministic way to get more parallel work, while still keeping partition as the first (main) clustering key.
Small perf note: file_path can be a long string, so using it as a range key may add extra compare/sort cost. Would using a cheaper salt like xxhash64(file_path) work instead?
Fixes #15291