Skip to content

Commit 114317d

Browse files
committed
feat: introduce delete file manager skeleton. Use in ArrowReader
1 parent 2bc03c2 commit 114317d

File tree

7 files changed

+220
-43
lines changed

7 files changed

+220
-43
lines changed

Cargo.lock

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ port_scanner = "0.1.5"
8686
rand = "0.8.5"
8787
regex = "1.10.5"
8888
reqwest = { version = "0.12.2", default-features = false, features = ["json"] }
89+
roaring = "0.10"
8990
rust_decimal = "1.31"
9091
serde = { version = "1.0.204", features = ["rc"] }
9192
serde_bytes = "0.11.15"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ parquet = { workspace = true, features = ["async"] }
7272
paste = { workspace = true }
7373
rand = { workspace = true }
7474
reqwest = { workspace = true }
75+
roaring = { workspace = true }
7576
rust_decimal = { workspace = true }
7677
serde = { workspace = true }
7778
serde_bytes = { workspace = true }
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use roaring::RoaringTreemap;
19+
20+
use crate::expr::BoundPredicate;
21+
use crate::io::FileIO;
22+
use crate::scan::FileScanTaskDeleteFile;
23+
use crate::spec::SchemaRef;
24+
use crate::{Error, ErrorKind, Result};
25+
26+
pub(crate) struct DeleteFileManager {}
27+
28+
#[allow(unused_variables)]
29+
impl DeleteFileManager {
30+
pub(crate) async fn load_deletes(
31+
delete_file_entries: Vec<FileScanTaskDeleteFile>,
32+
file_io: FileIO,
33+
concurrency_limit_data_files: usize,
34+
) -> Result<DeleteFileManager> {
35+
// TODO
36+
37+
if !delete_file_entries.is_empty() {
38+
Err(Error::new(
39+
ErrorKind::FeatureUnsupported,
40+
"Reading delete files is not yet supported",
41+
))
42+
} else {
43+
Ok(DeleteFileManager {})
44+
}
45+
}
46+
47+
pub(crate) fn build_delete_predicate(
48+
&self,
49+
snapshot_schema: SchemaRef,
50+
) -> Result<Option<BoundPredicate>> {
51+
// TODO
52+
53+
Ok(None)
54+
}
55+
56+
pub(crate) fn get_positional_delete_indexes_for_data_file(
57+
&self,
58+
data_file_path: &str,
59+
) -> Option<RoaringTreemap> {
60+
// TODO
61+
62+
None
63+
}
64+
}

crates/iceberg/src/arrow/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
mod schema;
2121
pub use schema::*;
22+
pub(crate) mod delete_file_manager;
2223
mod reader;
2324
pub(crate) mod record_batch_projector;
2425
pub(crate) mod record_batch_transformer;

crates/iceberg/src/arrow/reader.rs

Lines changed: 130 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ use futures::{try_join, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
3636
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
3737
use parquet::arrow::async_reader::AsyncFileReader;
3838
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
39-
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
39+
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
4040
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
41+
use roaring::RoaringTreemap;
4142

43+
use crate::arrow::delete_file_manager::DeleteFileManager;
4244
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
4345
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
4446
use crate::error::Result;
@@ -145,6 +147,7 @@ impl ArrowReader {
145147
file_io,
146148
row_group_filtering_enabled,
147149
row_selection_enabled,
150+
concurrency_limit_data_files,
148151
)
149152
})
150153
.map_err(|err| {
@@ -162,30 +165,24 @@ impl ArrowReader {
162165
file_io: FileIO,
163166
row_group_filtering_enabled: bool,
164167
row_selection_enabled: bool,
168+
concurrency_limit_data_files: usize,
165169
) -> Result<ArrowRecordBatchStream> {
166-
// TODO: add support for delete files
167-
if !task.deletes.is_empty() {
168-
return Err(Error::new(
169-
ErrorKind::FeatureUnsupported,
170-
"Delete files are not yet supported",
171-
));
172-
}
173-
174-
// Get the metadata for the Parquet file we need to read and build
175-
// a reader for the data within
176-
let parquet_file = file_io.new_input(&task.data_file_path)?;
177-
let (parquet_metadata, parquet_reader) =
178-
try_join!(parquet_file.metadata(), parquet_file.reader())?;
179-
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
180-
181-
let should_load_page_index = row_selection_enabled && task.predicate.is_some();
182-
183-
// Start creating the record batch stream, which wraps the parquet file reader
184-
let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
185-
parquet_file_reader,
186-
ArrowReaderOptions::new().with_page_index(should_load_page_index),
187-
)
188-
.await?;
170+
let should_load_page_index =
171+
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
172+
173+
// concurrently retrieve delete files and create RecordBatchStreamBuilder
174+
let (delete_file_manager, mut record_batch_stream_builder) = try_join!(
175+
DeleteFileManager::load_deletes(
176+
task.deletes.clone(),
177+
file_io.clone(),
178+
concurrency_limit_data_files
179+
),
180+
Self::create_parquet_record_batch_stream_builder(
181+
&task.data_file_path,
182+
file_io.clone(),
183+
should_load_page_index,
184+
)
185+
)?;
189186

190187
// Create a projection mask for the batch stream to select which columns in the
191188
// Parquet file that we want in the response
@@ -197,7 +194,7 @@ impl ArrowReader {
197194
)?;
198195
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
199196

200-
// RecordBatchTransformer performs any required transformations on the RecordBatches
197+
// RecordBatchTransformer performs any transformations required on the RecordBatches
201198
// that come back from the file, such as type promotion, default column insertion
202199
// and column re-ordering
203200
let mut record_batch_transformer =
@@ -207,49 +204,102 @@ impl ArrowReader {
207204
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
208205
}
209206

210-
if let Some(predicate) = &task.predicate {
207+
let delete_predicate = delete_file_manager.build_delete_predicate(task.schema.clone())?;
208+
209+
// In addition to the optional predicate supplied in the `FileScanTask`,
210+
// we also have an optional predicate resulting from equality delete files.
211+
// If both are present, we logical-AND them together to form a single filter
212+
// predicate that we can pass to the `RecordBatchStreamBuilder`.
213+
let final_predicate = match (&task.predicate, delete_predicate) {
214+
(None, None) => None,
215+
(Some(predicate), None) => Some(predicate.clone()),
216+
(None, Some(ref predicate)) => Some(predicate.clone()),
217+
(Some(filter_predicate), Some(delete_predicate)) => {
218+
Some(filter_predicate.clone().and(delete_predicate))
219+
}
220+
};
221+
222+
// There are two possible sources both for potential lists of selected RowGroup indices,
223+
// and for `RowSelection`s.
224+
// Selected RowGroup index lists can come from two sources:
225+
// * When there are equality delete files that are applicable;
226+
// * When there is a scan predicate and row_group_filtering_enabled = true.
227+
// `RowSelection`s can be created in either or both of the following cases:
228+
// * When there are positional delete files that are applicable;
229+
// * When there is a scan predicate and row_selection_enabled = true
230+
// Note that, in the former case we only perform row group filtering when
231+
// there is a scan predicate AND row_group_filtering_enabled = true,
232+
// but we perform row selection filtering if there are applicable
233+
// equality delete files OR (there is a scan predicate AND row_selection_enabled),
234+
// since the only implemented method of applying positional deletes is
235+
// by using a `RowSelection`.
236+
let mut selected_row_group_indices = None;
237+
let mut row_selection = None;
238+
239+
if let Some(predicate) = final_predicate {
211240
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
212241
record_batch_stream_builder.parquet_schema(),
213-
predicate,
242+
&predicate,
214243
)?;
215244

216245
let row_filter = Self::get_row_filter(
217-
predicate,
246+
&predicate,
218247
record_batch_stream_builder.parquet_schema(),
219248
&iceberg_field_ids,
220249
&field_id_map,
221250
)?;
222251
record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
223252

224-
let mut selected_row_groups = None;
225253
if row_group_filtering_enabled {
226254
let result = Self::get_selected_row_group_indices(
227-
predicate,
255+
&predicate,
228256
record_batch_stream_builder.metadata(),
229257
&field_id_map,
230258
&task.schema,
231259
)?;
232260

233-
selected_row_groups = Some(result);
261+
selected_row_group_indices = Some(result);
234262
}
235263

236264
if row_selection_enabled {
237-
let row_selection = Self::get_row_selection(
238-
predicate,
265+
row_selection = Some(Self::get_row_selection_for_filter_predicate(
266+
&predicate,
239267
record_batch_stream_builder.metadata(),
240-
&selected_row_groups,
268+
&selected_row_group_indices,
241269
&field_id_map,
242270
&task.schema,
243-
)?;
244-
245-
record_batch_stream_builder =
246-
record_batch_stream_builder.with_row_selection(row_selection);
271+
)?);
247272
}
273+
}
248274

249-
if let Some(selected_row_groups) = selected_row_groups {
250-
record_batch_stream_builder =
251-
record_batch_stream_builder.with_row_groups(selected_row_groups);
252-
}
275+
let positional_delete_indexes =
276+
delete_file_manager.get_positional_delete_indexes_for_data_file(&task.data_file_path);
277+
278+
if let Some(positional_delete_indexes) = positional_delete_indexes {
279+
let delete_row_selection = Self::build_deletes_row_selection(
280+
record_batch_stream_builder.metadata().row_groups(),
281+
&selected_row_group_indices,
282+
positional_delete_indexes,
283+
)?;
284+
285+
// merge the row selection from the delete files with the row selection
286+
// from the filter predicate, if there is one from the filter predicate
287+
row_selection = match row_selection {
288+
None => Some(delete_row_selection),
289+
Some(filter_row_selection) => {
290+
Some(filter_row_selection.intersection(&delete_row_selection))
291+
}
292+
};
293+
}
294+
295+
if let Some(row_selection) = row_selection {
296+
record_batch_stream_builder =
297+
record_batch_stream_builder.with_row_selection(row_selection);
298+
}
299+
300+
if let Some(selected_row_group_indices) = selected_row_group_indices {
301+
record_batch_stream_builder =
302+
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
253303
}
254304

255305
// Build the batch stream and send all the RecordBatches that it generates
@@ -265,6 +315,43 @@ impl ArrowReader {
265315
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
266316
}
267317

318+
async fn create_parquet_record_batch_stream_builder(
319+
data_file_path: &str,
320+
file_io: FileIO,
321+
should_load_page_index: bool,
322+
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
323+
// Get the metadata for the Parquet file we need to read and build
324+
// a reader for the data within
325+
let parquet_file = file_io.new_input(data_file_path)?;
326+
let (parquet_metadata, parquet_reader) =
327+
try_join!(parquet_file.metadata(), parquet_file.reader())?;
328+
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
329+
330+
// Create the record batch stream builder, which wraps the parquet file reader
331+
let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
332+
parquet_file_reader,
333+
ArrowReaderOptions::new().with_page_index(should_load_page_index),
334+
)
335+
.await?;
336+
Ok(record_batch_stream_builder)
337+
}
338+
339+
/// computes a `RowSelection` from positional delete indices.
340+
///
341+
/// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated
342+
/// as having been deleted by a positional delete, taking into account any row groups that have
343+
/// been skipped entirely by the filter predicate
344+
#[allow(unused)]
345+
fn build_deletes_row_selection(
346+
row_group_metadata: &[RowGroupMetaData],
347+
selected_row_groups: &Option<Vec<usize>>,
348+
mut positional_deletes: RoaringTreemap,
349+
) -> Result<RowSelection> {
350+
// TODO
351+
352+
Ok(RowSelection::default())
353+
}
354+
268355
fn build_field_id_set_and_map(
269356
parquet_schema: &SchemaDescriptor,
270357
predicate: &BoundPredicate,
@@ -475,7 +562,7 @@ impl ArrowReader {
475562
Ok(results)
476563
}
477564

478-
fn get_row_selection(
565+
fn get_row_selection_for_filter_predicate(
479566
predicate: &BoundPredicate,
480567
parquet_metadata: &Arc<ParquetMetaData>,
481568
selected_row_groups: &Option<Vec<usize>>,

crates/iceberg/src/expr/predicate.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,12 @@ pub enum BoundPredicate {
726726
Set(SetExpression<BoundReference>),
727727
}
728728

729+
impl BoundPredicate {
730+
pub(crate) fn and(self, other: BoundPredicate) -> BoundPredicate {
731+
BoundPredicate::And(LogicalExpression::new([Box::new(self), Box::new(other)]))
732+
}
733+
}
734+
729735
impl Display for BoundPredicate {
730736
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
731737
match self {

0 commit comments

Comments
 (0)