Skip to content

Native Sort-Merge Writer for Iceberg ClusteredWriter Path #3595

@Shekharrajak

Description

@Shekharrajak

What is the problem the feature request solves?

Implement a fused sort-merge writer in native Rust for Comet's Iceberg write path, eliminating JNI round-trips when writing partitioned/sorted data to Iceberg tables.

Current Comet Writer Behavior

// parquet_writer.rs - Current implementation 
while let Some(batch) = stream.try_next().await {
    writer.write(&batch)?;  // All batches go to ONE file
}
writer.close()?;
  • No sorting capability
  • No partition detection
  • Single file per Spark partition
  • ClusteredWriter logic (from iceberg-rust) is not utilized

Related Work

  • iceberg-rust ClusteredWriter: iceberg-rust/crates/iceberg/src/writer/partitioning/clustered_writer.rs
  • DataFusion External Sort: datafusion/physical-plan/src/sorts/sort.rs
  • Comet Native Sort: CometSortExec in operators.scala

Describe the potential solution

Instead of:

  1. Spark: "Execute CometSort" -> JNI -> Native sort -> JNI -> Arrow batches to Spark
  2. Spark: "Execute CometWrite" -> JNI -> Native write

It would be:

  1. Spark: "Execute CometSortWrite" -> JNI -> Native (Sort -> Write) -> JNI -> metadata only
  • Design protobuf schema for ClusteredParquetWriter operator
  • Implement ClusteredParquetWriterExec in Rust
  • Integrate DataFusion's external sort with spill support
  • Add partition extraction and validation logic
  • Create Scala-side CometClusteredWriteExec operator
  • Add integration with Spark's FileCommitProtocol
  • Benchmark against current Spark sort + Comet write path
  • Add tests for partition ordering validation

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions