Skip to content

Commit e1e3ba4

Browse files
Skip producing empty parquet files (#1230)
## Which issue does this PR close? - Closes #1224. ## What changes are included in this PR? Skip generating empty parquet files. ## Are these changes tested? UT.
1 parent 609b792 commit e1e3ba4

File tree

2 files changed

+72
-14
lines changed

2 files changed

+72
-14
lines changed

crates/iceberg/src/io/file_io.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,17 @@ impl OutputFile {
331331
}
332332

333333
/// Checks if file exists.
334-
pub async fn exists(&self) -> crate::Result<bool> {
334+
pub async fn exists(&self) -> Result<bool> {
335335
Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
336336
}
337337

338+
/// Deletes file.
339+
///
340+
/// If the file does not exist, it will not return error.
341+
pub async fn delete(&self) -> Result<()> {
342+
Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?)
343+
}
344+
338345
/// Converts into [`InputFile`].
339346
pub fn to_input_file(self) -> InputFile {
340347
InputFile {

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ impl FileWriter for ParquetWriter {
554554
Ok(())
555555
}
556556

557-
async fn close(mut self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
557+
async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
558558
let writer = match self.inner_writer.take() {
559559
Some(writer) => writer,
560560
None => return Ok(vec![]),
@@ -566,22 +566,33 @@ impl FileWriter for ParquetWriter {
566566

567567
let written_size = self.written_size.load(std::sync::atomic::Ordering::Relaxed);
568568

569-
let parquet_metadata =
570-
Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| {
569+
if self.current_row_num == 0 {
570+
self.out_file.delete().await.map_err(|err| {
571571
Error::new(
572572
ErrorKind::Unexpected,
573-
"Failed to convert metadata from thrift to parquet.",
573+
"Failed to delete empty parquet file.",
574574
)
575575
.with_source(err)
576-
})?);
577-
578-
Ok(vec![Self::parquet_to_data_file_builder(
579-
self.schema,
580-
parquet_metadata,
581-
written_size as usize,
582-
self.out_file.location().to_string(),
583-
self.nan_value_count_visitor.nan_value_counts,
584-
)?])
576+
})?;
577+
Ok(vec![])
578+
} else {
579+
let parquet_metadata =
580+
Arc::new(self.thrift_to_parquet_metadata(metadata).map_err(|err| {
581+
Error::new(
582+
ErrorKind::Unexpected,
583+
"Failed to convert metadata from thrift to parquet.",
584+
)
585+
.with_source(err)
586+
})?);
587+
588+
Ok(vec![Self::parquet_to_data_file_builder(
589+
self.schema,
590+
parquet_metadata,
591+
written_size as usize,
592+
self.out_file.location().to_string(),
593+
self.nan_value_count_visitor.nan_value_counts,
594+
)?])
595+
}
585596
}
586597
}
587598

@@ -2218,4 +2229,44 @@ mod tests {
22182229

22192230
Ok(())
22202231
}
2232+
2233+
#[tokio::test]
2234+
async fn test_write_empty_parquet_file() {
2235+
let temp_dir = TempDir::new().unwrap();
2236+
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
2237+
let location_gen =
2238+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
2239+
let file_name_gen =
2240+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
2241+
2242+
// write data
2243+
let pw = ParquetWriterBuilder::new(
2244+
WriterProperties::builder().build(),
2245+
Arc::new(
2246+
Schema::builder()
2247+
.with_schema_id(1)
2248+
.with_fields(vec![NestedField::required(
2249+
0,
2250+
"col",
2251+
Type::Primitive(PrimitiveType::Long),
2252+
)
2253+
.with_id(0)
2254+
.into()])
2255+
.build()
2256+
.expect("Failed to create schema"),
2257+
),
2258+
file_io.clone(),
2259+
location_gen,
2260+
file_name_gen,
2261+
)
2262+
.build()
2263+
.await
2264+
.unwrap();
2265+
2266+
let res = pw.close().await.unwrap();
2267+
assert_eq!(res.len(), 0);
2268+
2269+
// Check that file should have been deleted.
2270+
assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
2271+
}
22212272
}

0 commit comments

Comments
 (0)