-
Notifications
You must be signed in to change notification settings - Fork 302
feat(datafusion): implement the partitioning node for DataFusion to define the partitioning #1620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…the best partition strategy for Iceberg for writing - Implement hash partitioning for partitioned/bucketed tables - Use round-robin partitioning for unpartitioned tables - Support range distribution mode approximation via sort columns
/// - Automatically detects optimal partition count from DataFusion's SessionConfig | ||
/// - Preserves column order (partitions first, then buckets) for consistent file layout | ||
#[derive(Debug)] | ||
pub struct IcebergRepartitionExec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There already exists an RepartitionExec, why we need to create a new one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we just need to extend PhysicalExpr
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your comments!
RepartitionExec
is a generic operator: it reshuffles rows based on a distribution requirement.
Iceberg has stricter requirements for how data must be partitioned, bucketed, sorted, and grouped before writing. So, we must select the relevant partition strategy.
We have special requirements before writing:
- Use Iceberg table metadata:
- Partition specifications (identity transforms, bucket transforms)
- Sort orders
- Write distribution mode (hash, range, none)
- Select the appropriate partitioning strategy:
- Hash partitioning on partition/bucket columns for partitioned tables
- Round-robin for unpartitioned tables
- Range approximation using sort order columns
Some other requirements to preserve the" partition–bucket–range ordering" semantics required by Iceberg:
- Partition columns must be respected in the physical file layout
- Bucketing/range partitioning needs to be reproducible and consistent
- File grouping must align with Iceberg metadata expectations
Repartitioning is a plan-level operator, not an expression:
PhysicalExpr
can help compute the partition/bucket key for a row.- Reshuffling rows into partitions is still an execution node (ExecutionPlan).
- If we only extend
PhysicalExpr
, we'll have an expression that can calculate partition/bucket values, but we still need an Exec node to do the actual shuffle/repartitioning.
So, in a nutshell, why we need our "Iceberg-aware" strategy (IcebergRepartitionExec
) to determine the best partitioning, and we use it for Datafusion (calling RepartitionExec
with our selection), and we use PhysicalExpr
for determining it:
IcebergRepartitionExec (strategy selection, Iceberg-aware)
↳ chooses partitioning (hash/round-robin/range)
↳ uses Iceberg metadata (partition spec, sort order, mode)
↓
DataFusion RepartitionExec (generic shuffle operator)
↳ actually reshuffles rows into partitions
↓
PhysicalExpr (partition/bucket key computation)
↳ hash/range/bucket expressions evaluated per row
Of course, if we decide to rely 100% on DataFusion, we need to consider:
RepartitionExec
implements generic distributions without understanding the Iceberg specificities (bucket, partitions, range vs sort)- Iceberg requires that bucketing and range partitioning be reproducible and consistent across writers
- Iceberg expects hierarchical ordering: partition → bucket → range
- Data Inconsistency risk? may not be reproductible?
- If Iceberg semantics aren’t enforced at write time, we will need extra cleanup/repair jobs later (e.g., repartitioning files offline or rewriting manifests for metadata) or custom implementation
/// | ||
/// If no suitable hash columns are found (e.g., unpartitioned, non-bucketed table), | ||
/// falls back to round-robin batch partitioning for even load distribution. | ||
fn determine_partitioning_strategy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting to see. At first I just thought two cases:
- If it's partitioned table, we should just hash partition.
- If it's not partitioned, we should just use round robin partition.
However, this reminds me another case: range only partition, e.g. we only has partitions like date, time. I think in this case we should also use round robin partition since in this case most data are focused in several partitions.
Also I don't think we should take into account write.distribution-mode
for now. The example you use are for spark, but not applicable for datafusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, this reminds me another case: range only partition, e.g. we only has partitions like date, time. I think in this case we should also use round robin partition since in this case most data are focused in several partitions.
Hum. You are right. The range partitions concentrate data in recent partitions, making hash partitioning counterproductive (considering a date with a temporal partition).
Since DataFusion doesn't provide Range, the fallback is round-robin and not hashing.
Briefly:
- Hash partition: Only on bucket columns (partition spec + sort order)
- Round-robin: Everything else (unpartitioned, range, identity, temporal transforms)
Also I don't think we should take into account write.distribution-mode for now. The example you use are for spark, but not applicable for datafusion.
Oh, good point, I misunderstood this. I thought it was an iceberg-rust table property.
…robin for range partitions Signed-off-by: Florian Valeye <[email protected]>
Which issue does this PR close?
What changes are included in this PR?
Implement a physical execution repartition node that determines the relevant DataFusion partitioning strategy based on the Iceberg table schema and metadata.
Minor change: I created a new
schema_ref()
helper method.Are these changes tested?
Yes, with unit tests