Skip to content
19 changes: 12 additions & 7 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::sync::Arc;

use futures::{StreamExt, TryStreamExt};
use parquet::arrow::ParquetRecordBatchStreamBuilder;

use crate::arrow::ArrowReader;
use crate::arrow::reader::ParquetReadOptions;
Expand Down Expand Up @@ -61,16 +62,20 @@ impl BasicDeleteFileLoader {
Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly
as that introduces a circular dependency.
*/
let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder(
let parquet_read_options = ParquetReadOptions::builder().build();

let (parquet_file_reader, arrow_metadata) = ArrowReader::open_parquet_file(
data_file_path,
self.file_io.clone(),
None,
&self.file_io,
file_size_in_bytes,
ParquetReadOptions::builder().build(),
parquet_read_options,
)
.await?
.build()?
.map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));
.await?;

let record_batch_stream =
ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata)
.build()?
.map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")));

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}
Expand Down
68 changes: 36 additions & 32 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use fnv::FnvHashSet;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
Expand Down Expand Up @@ -320,12 +320,10 @@ impl ArrowReader {
let delete_filter_rx =
delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));

// Migrated tables lack field IDs, requiring us to inspect the schema to choose
// between field-ID-based or position-based projection
let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
// Open the Parquet file once, loading its metadata
let (parquet_file_reader, arrow_metadata) = Self::open_parquet_file(
&task.data_file_path,
file_io.clone(),
None,
&file_io,
task.file_size_in_bytes,
parquet_read_options,
)
Expand All @@ -334,7 +332,7 @@ impl ArrowReader {
// Check if Parquet file has embedded field IDs
// Corresponds to Java's ParquetSchemaUtil.hasIds()
// Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
let missing_field_ids = initial_stream_builder
let missing_field_ids = arrow_metadata
.schema()
.fields()
.iter()
Expand All @@ -356,38 +354,42 @@ impl ArrowReader {
// - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
// - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
// - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
let mut record_batch_stream_builder = if missing_field_ids {
let arrow_metadata = if missing_field_ids {
// Parquet file lacks field IDs - must assign them before reading
let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
// Branch 2: Apply name mapping to assign correct Iceberg field IDs
// Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
// to columns without field id"
// Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
apply_name_mapping_to_arrow_schema(
Arc::clone(initial_stream_builder.schema()),
Arc::clone(arrow_metadata.schema()),
name_mapping,
)?
} else {
// Branch 3: No name mapping - use position-based fallback IDs
// Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
};

let options = ArrowReaderOptions::new().with_schema(arrow_schema);

Self::create_parquet_record_batch_stream_builder(
&task.data_file_path,
file_io.clone(),
Some(options),
task.file_size_in_bytes,
parquet_read_options,
)
.await?
ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to create ArrowReaderMetadata with field ID schema",
)
.with_source(e)
},
)?
} else {
// Branch 1: File has embedded field IDs - trust them
initial_stream_builder
arrow_metadata
};

// Build the stream reader, reusing the already-opened file reader
let mut record_batch_stream_builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file_reader, arrow_metadata);

// Filter out metadata fields for Parquet projection (they don't exist in files)
let project_field_ids_without_metadata: Vec<i32> = task
.project_field_ids
Expand Down Expand Up @@ -579,30 +581,32 @@ impl ArrowReader {
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}

pub(crate) async fn create_parquet_record_batch_stream_builder(
/// Opens a Parquet file and loads its metadata, returning both the reader and metadata.
/// The reader can be reused to build a `ParquetRecordBatchStreamBuilder` without
/// reopening the file.
pub(crate) async fn open_parquet_file(
data_file_path: &str,
file_io: FileIO,
arrow_reader_options: Option<ArrowReaderOptions>,
file_io: &FileIO,
file_size_in_bytes: u64,
parquet_read_options: ParquetReadOptions,
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
) -> Result<(ArrowFileReader, ArrowReaderMetadata)> {
let parquet_file = file_io.new_input(data_file_path)?;
let parquet_reader = parquet_file.reader().await?;
let parquet_file_reader = ArrowFileReader::new(
let mut reader = ArrowFileReader::new(
FileMetadata {
size: file_size_in_bytes,
},
parquet_reader,
)
.with_parquet_read_options(parquet_read_options);

// Create the record batch stream builder, which wraps the parquet file reader
let options = arrow_reader_options.unwrap_or_default();
let record_batch_stream_builder =
ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
Ok(record_batch_stream_builder)
let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
.await
.map_err(|e| {
Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
})?;

Ok((reader, arrow_metadata))
}

/// computes a `RowSelection` from positional delete indices.
Expand Down
Loading