Skip to content

Commit 2c92c28

Browse files
committed
feat: introduce DeleteFileManager trait and refactor CachingDeleteFilemanager to be constructed prior to use
1 parent 4311f89 commit 2c92c28

File tree

2 files changed

+45
-17
lines changed

2 files changed

+45
-17
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,46 @@ use roaring::RoaringTreemap;
1919

2020
use crate::expr::BoundPredicate;
2121
use crate::io::FileIO;
22-
use crate::scan::FileScanTaskDeleteFile;
22+
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
2323
use crate::spec::SchemaRef;
2424
use crate::{Error, ErrorKind, Result};
2525

26-
pub(crate) struct DeleteFileManager {}
26+
#[allow(unused)]
27+
pub trait DeleteFileManager {
28+
/// Read the delete file referred to in the task
29+
///
30+
/// Returns the raw contents of the delete file as a RecordBatch stream
31+
fn read_delete_file(task: &FileScanTaskDeleteFile) -> Result<ArrowRecordBatchStream>;
32+
}
33+
34+
#[allow(unused)]
35+
#[derive(Clone, Debug)]
36+
pub(crate) struct CachingDeleteFileManager {
37+
file_io: FileIO,
38+
concurrency_limit_data_files: usize,
39+
}
40+
41+
impl DeleteFileManager for CachingDeleteFileManager {
42+
fn read_delete_file(_task: &FileScanTaskDeleteFile) -> Result<ArrowRecordBatchStream> {
43+
// TODO, implementation in https://github.com/apache/iceberg-rust/pull/982
44+
45+
unimplemented!()
46+
}
47+
}
2748

2849
#[allow(unused_variables)]
29-
impl DeleteFileManager {
50+
impl CachingDeleteFileManager {
51+
pub fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> CachingDeleteFileManager {
52+
Self {
53+
file_io,
54+
concurrency_limit_data_files,
55+
}
56+
}
57+
3058
pub(crate) async fn load_deletes(
59+
&self,
3160
delete_file_entries: Vec<FileScanTaskDeleteFile>,
32-
file_io: FileIO,
33-
concurrency_limit_data_files: usize,
34-
) -> Result<DeleteFileManager> {
61+
) -> Result<()> {
3562
// TODO
3663

3764
if !delete_file_entries.is_empty() {
@@ -40,7 +67,7 @@ impl DeleteFileManager {
4067
"Reading delete files is not yet supported",
4168
))
4269
} else {
43-
Ok(DeleteFileManager {})
70+
Ok(())
4471
}
4572
}
4673

crates/iceberg/src/arrow/reader.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMe
4040
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
4141
use roaring::RoaringTreemap;
4242

43-
use crate::arrow::delete_file_manager::DeleteFileManager;
43+
use crate::arrow::delete_file_manager::CachingDeleteFileManager;
4444
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
4545
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
4646
use crate::error::Result;
@@ -106,7 +106,11 @@ impl ArrowReaderBuilder {
106106
pub fn build(self) -> ArrowReader {
107107
ArrowReader {
108108
batch_size: self.batch_size,
109-
file_io: self.file_io,
109+
file_io: self.file_io.clone(),
110+
delete_file_manager: CachingDeleteFileManager::new(
111+
self.file_io.clone(),
112+
self.concurrency_limit_data_files,
113+
),
110114
concurrency_limit_data_files: self.concurrency_limit_data_files,
111115
row_group_filtering_enabled: self.row_group_filtering_enabled,
112116
row_selection_enabled: self.row_selection_enabled,
@@ -119,6 +123,7 @@ impl ArrowReaderBuilder {
119123
pub struct ArrowReader {
120124
batch_size: Option<usize>,
121125
file_io: FileIO,
126+
delete_file_manager: CachingDeleteFileManager,
122127

123128
/// the maximum number of data files that can be fetched at the same time
124129
concurrency_limit_data_files: usize,
@@ -145,9 +150,9 @@ impl ArrowReader {
145150
task,
146151
batch_size,
147152
file_io,
153+
self.delete_file_manager.clone(),
148154
row_group_filtering_enabled,
149155
row_selection_enabled,
150-
concurrency_limit_data_files,
151156
)
152157
})
153158
.map_err(|err| {
@@ -163,20 +168,16 @@ impl ArrowReader {
163168
task: FileScanTask,
164169
batch_size: Option<usize>,
165170
file_io: FileIO,
171+
delete_file_manager: CachingDeleteFileManager,
166172
row_group_filtering_enabled: bool,
167173
row_selection_enabled: bool,
168-
concurrency_limit_data_files: usize,
169174
) -> Result<ArrowRecordBatchStream> {
170175
let should_load_page_index =
171176
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
172177

173178
// concurrently retrieve delete files and create RecordBatchStreamBuilder
174-
let (delete_file_manager, mut record_batch_stream_builder) = try_join!(
175-
DeleteFileManager::load_deletes(
176-
task.deletes.clone(),
177-
file_io.clone(),
178-
concurrency_limit_data_files
179-
),
179+
let (_, mut record_batch_stream_builder) = try_join!(
180+
delete_file_manager.load_deletes(task.deletes.clone()),
180181
Self::create_parquet_record_batch_stream_builder(
181182
&task.data_file_path,
182183
file_io.clone(),

0 commit comments

Comments
 (0)