Skip to content

Commit 46bf095

Browse files
committed
feat: add positional delete parsing. Add tests for end-to-end positional delete functionality
1 parent acd7ab8 commit 46bf095

File tree

5 files changed

+141
-161
lines changed

5 files changed

+141
-161
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::collections::HashMap;
1919

20+
use arrow_array::{Int64Array, StringArray};
2021
use futures::{StreamExt, TryStreamExt};
2122
use tokio::sync::oneshot::{Receiver, channel};
2223

@@ -267,14 +268,44 @@ impl CachingDeleteFileLoader {
267268
///
268269
/// Returns a map of data file path to a delete vector
269270
async fn parse_positional_deletes_record_batch_stream(
270-
stream: ArrowRecordBatchStream,
271+
mut stream: ArrowRecordBatchStream,
271272
) -> Result<HashMap<String, DeleteVector>> {
272-
// TODO
273+
let mut result: HashMap<String, DeleteVector> = HashMap::default();
274+
275+
while let Some(batch) = stream.next().await {
276+
let batch = batch?;
277+
let schema = batch.schema();
278+
let columns = batch.columns();
279+
280+
let Some(file_paths) = columns[0].as_any().downcast_ref::<StringArray>() else {
281+
return Err(Error::new(
282+
ErrorKind::DataInvalid,
283+
"Could not downcast file paths array to StringArray",
284+
));
285+
};
286+
let Some(positions) = columns[1].as_any().downcast_ref::<Int64Array>() else {
287+
return Err(Error::new(
288+
ErrorKind::DataInvalid,
289+
"Could not downcast positions array to Int64Array",
290+
));
291+
};
292+
293+
for (file_path, pos) in file_paths.iter().zip(positions.iter()) {
294+
let (Some(file_path), Some(pos)) = (file_path, pos) else {
295+
return Err(Error::new(
296+
ErrorKind::DataInvalid,
297+
"null values in delete file",
298+
));
299+
};
273300

274-
Err(Error::new(
275-
ErrorKind::FeatureUnsupported,
276-
"parsing of positional deletes is not yet supported",
277-
))
301+
result
302+
.entry(file_path.to_string())
303+
.or_default()
304+
.insert(pos as u64);
305+
}
306+
}
307+
308+
Ok(result)
278309
}
279310

280311
/// Parses record batch streams from individual equality delete files
@@ -297,28 +328,37 @@ mod tests {
297328
use tempfile::TempDir;
298329

299330
use super::*;
300-
use crate::arrow::delete_file_loader::tests::setup;
331+
use crate::arrow::delete_filter::tests::setup;
301332

302333
#[tokio::test]
303-
async fn test_delete_file_manager_load_deletes() {
334+
async fn test_caching_delete_file_loader_load_deletes() {
304335
let tmp_dir = TempDir::new().unwrap();
305336
let table_location = tmp_dir.path();
306337
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
307338
.unwrap()
308339
.build()
309340
.unwrap();
310341

311-
// Note that with the delete file parsing not yet in place, all we can test here is that
312-
// the call to the loader fails with the expected FeatureUnsupportedError.
313-
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
342+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
314343

315344
let file_scan_tasks = setup(table_location);
316345

317-
let result = delete_file_manager
346+
let delete_filter = delete_file_loader
318347
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
319348
.await
349+
.unwrap()
320350
.unwrap();
321351

322-
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
352+
let result = delete_filter
353+
.get_delete_vector(&file_scan_tasks[0])
354+
.unwrap();
355+
356+
// union of pos dels from pos del file 1 and 2, ie
357+
// [0, 1, 3, 5, 6, 8, 1022, 1023] | [0, 1, 3, 5, 20, 21, 22, 23]
358+
// = [0, 1, 3, 5, 6, 8, 20, 21, 22, 23, 1022, 1023]
359+
assert_eq!(result.lock().unwrap().len(), 12);
360+
361+
let result = delete_filter.get_delete_vector(&file_scan_tasks[1]);
362+
assert!(result.is_none()); // no pos dels for file 3
323363
}
324364
}

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 2 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -110,27 +110,11 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
110110
}
111111

112112
#[cfg(test)]
113-
pub(crate) mod tests {
114-
use std::collections::HashMap;
115-
use std::fs::File;
116-
use std::path::Path;
117-
use std::sync::Arc;
118-
119-
use arrow_array::{Int64Array, RecordBatch, StringArray};
120-
use arrow_schema::Schema as ArrowSchema;
121-
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
122-
use parquet::basic::Compression;
123-
use parquet::file::properties::WriterProperties;
113+
mod tests {
124114
use tempfile::TempDir;
125115

126116
use super::*;
127-
use crate::scan::FileScanTask;
128-
use crate::spec::{DataContentType, DataFileFormat, Schema};
129-
130-
type ArrowSchemaRef = Arc<ArrowSchema>;
131-
132-
const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
133-
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
117+
use crate::arrow::delete_filter::tests::setup;
134118

135119
#[tokio::test]
136120
async fn test_basic_delete_file_loader_read_delete_file() {
@@ -141,8 +125,6 @@ pub(crate) mod tests {
141125
.build()
142126
.unwrap();
143127

144-
// Note that with the delete file parsing not yet in place, all we can test here is that
145-
// the call to the loader fails with the expected FeatureUnsupportedError.
146128
let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
147129

148130
let file_scan_tasks = setup(table_location);
@@ -159,115 +141,4 @@ pub(crate) mod tests {
159141

160142
assert_eq!(result.len(), 1);
161143
}
162-
163-
pub(crate) fn setup(table_location: &Path) -> Vec<FileScanTask> {
164-
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
165-
let positional_delete_schema = create_pos_del_schema();
166-
167-
let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
168-
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];
169-
170-
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
171-
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));
172-
173-
let props = WriterProperties::builder()
174-
.set_compression(Compression::SNAPPY)
175-
.build();
176-
177-
for n in 1..=3 {
178-
let positional_deletes_to_write =
179-
RecordBatch::try_new(positional_delete_schema.clone(), vec![
180-
file_path_col.clone(),
181-
pos_col.clone(),
182-
])
183-
.unwrap();
184-
185-
let file = File::create(format!(
186-
"{}/pos-del-{}.parquet",
187-
table_location.to_str().unwrap(),
188-
n
189-
))
190-
.unwrap();
191-
let mut writer = ArrowWriter::try_new(
192-
file,
193-
positional_deletes_to_write.schema(),
194-
Some(props.clone()),
195-
)
196-
.unwrap();
197-
198-
writer
199-
.write(&positional_deletes_to_write)
200-
.expect("Writing batch");
201-
202-
// writer must be closed to write footer
203-
writer.close().unwrap();
204-
}
205-
206-
let pos_del_1 = FileScanTaskDeleteFile {
207-
file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()),
208-
file_type: DataContentType::PositionDeletes,
209-
partition_spec_id: 0,
210-
equality_ids: vec![],
211-
};
212-
213-
let pos_del_2 = FileScanTaskDeleteFile {
214-
file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()),
215-
file_type: DataContentType::PositionDeletes,
216-
partition_spec_id: 0,
217-
equality_ids: vec![],
218-
};
219-
220-
let pos_del_3 = FileScanTaskDeleteFile {
221-
file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()),
222-
file_type: DataContentType::PositionDeletes,
223-
partition_spec_id: 0,
224-
equality_ids: vec![],
225-
};
226-
227-
let file_scan_tasks = vec![
228-
FileScanTask {
229-
start: 0,
230-
length: 0,
231-
record_count: None,
232-
data_file_path: "".to_string(),
233-
data_file_content: DataContentType::Data,
234-
data_file_format: DataFileFormat::Parquet,
235-
schema: data_file_schema.clone(),
236-
project_field_ids: vec![],
237-
predicate: None,
238-
deletes: vec![pos_del_1, pos_del_2.clone()],
239-
},
240-
FileScanTask {
241-
start: 0,
242-
length: 0,
243-
record_count: None,
244-
data_file_path: "".to_string(),
245-
data_file_content: DataContentType::Data,
246-
data_file_format: DataFileFormat::Parquet,
247-
schema: data_file_schema.clone(),
248-
project_field_ids: vec![],
249-
predicate: None,
250-
deletes: vec![pos_del_2, pos_del_3],
251-
},
252-
];
253-
254-
file_scan_tasks
255-
}
256-
257-
pub(crate) fn create_pos_del_schema() -> ArrowSchemaRef {
258-
let fields = vec![
259-
arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false)
260-
.with_metadata(HashMap::from([(
261-
PARQUET_FIELD_ID_META_KEY.to_string(),
262-
FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
263-
)])),
264-
arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata(
265-
HashMap::from([(
266-
PARQUET_FIELD_ID_META_KEY.to_string(),
267-
FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
268-
)]),
269-
),
270-
];
271-
Arc::new(arrow_schema::Schema::new(fields))
272-
}
273144
}

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
195195
}
196196

197197
#[cfg(test)]
198-
mod tests {
198+
pub(crate) mod tests {
199199
use std::fs::File;
200200
use std::path::Path;
201201
use std::sync::Arc;
@@ -218,43 +218,67 @@ mod tests {
218218
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
219219

220220
#[tokio::test]
221-
async fn test_delete_file_manager_load_deletes() {
221+
async fn test_delete_file_filter_load_deletes() {
222222
let tmp_dir = TempDir::new().unwrap();
223223
let table_location = tmp_dir.path();
224224
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
225225
.unwrap()
226226
.build()
227227
.unwrap();
228228

229-
// Note that with the delete file parsing not yet in place, all we can test here is that
230-
// the call to the loader fails with the expected FeatureUnsupportedError.
231-
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
229+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
232230

233231
let file_scan_tasks = setup(table_location);
234232

235-
let result = delete_file_manager
233+
let delete_filter = delete_file_loader
236234
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
237235
.await
236+
.unwrap()
237+
.unwrap();
238+
239+
let result = delete_filter
240+
.get_delete_vector(&file_scan_tasks[0])
241+
.unwrap();
242+
assert_eq!(result.lock().unwrap().len(), 12); // pos dels from pos del file 1 and 2
243+
244+
let delete_filter = delete_file_loader
245+
.load_deletes(&file_scan_tasks[1].deletes, file_scan_tasks[1].schema_ref())
246+
.await
247+
.unwrap()
238248
.unwrap();
239249

240-
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
250+
let result = delete_filter
251+
.get_delete_vector(&file_scan_tasks[1])
252+
.unwrap();
253+
assert_eq!(result.lock().unwrap().len(), 8); // no pos dels for file 3
241254
}
242255

243-
fn setup(table_location: &Path) -> Vec<FileScanTask> {
256+
pub(crate) fn setup(table_location: &Path) -> Vec<FileScanTask> {
244257
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
245258
let positional_delete_schema = create_pos_del_schema();
246259

247-
let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
248-
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];
249-
250-
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
251-
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));
260+
let file_path_values = [
261+
vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8],
262+
vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8],
263+
vec![format!("{}/2.parquet", table_location.to_str().unwrap()); 8],
264+
];
265+
let pos_values = [
266+
vec![0i64, 1, 3, 5, 6, 8, 1022, 1023],
267+
vec![0i64, 1, 3, 5, 20, 21, 22, 23],
268+
vec![0i64, 1, 3, 5, 6, 8, 1022, 1023],
269+
];
252270

253271
let props = WriterProperties::builder()
254272
.set_compression(Compression::SNAPPY)
255273
.build();
256274

257275
for n in 1..=3 {
276+
let file_path_vals = file_path_values.get(n - 1).unwrap();
277+
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_vals));
278+
279+
let pos_vals = pos_values.get(n - 1).unwrap();
280+
let pos_col = Arc::new(Int64Array::from_iter_values(pos_vals.clone()));
281+
258282
let positional_deletes_to_write =
259283
RecordBatch::try_new(positional_delete_schema.clone(), vec![
260284
file_path_col.clone(),
@@ -309,7 +333,7 @@ mod tests {
309333
start: 0,
310334
length: 0,
311335
record_count: None,
312-
data_file_path: "".to_string(),
336+
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
313337
data_file_content: DataContentType::Data,
314338
data_file_format: DataFileFormat::Parquet,
315339
schema: data_file_schema.clone(),
@@ -321,20 +345,20 @@ mod tests {
321345
start: 0,
322346
length: 0,
323347
record_count: None,
324-
data_file_path: "".to_string(),
348+
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
325349
data_file_content: DataContentType::Data,
326350
data_file_format: DataFileFormat::Parquet,
327351
schema: data_file_schema.clone(),
328352
project_field_ids: vec![],
329353
predicate: None,
330-
deletes: vec![pos_del_2, pos_del_3],
354+
deletes: vec![pos_del_3],
331355
},
332356
];
333357

334358
file_scan_tasks
335359
}
336360

337-
fn create_pos_del_schema() -> ArrowSchemaRef {
361+
pub(crate) fn create_pos_del_schema() -> ArrowSchemaRef {
338362
let fields = vec![
339363
arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false)
340364
.with_metadata(HashMap::from([(

crates/iceberg/src/delete_vector.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ impl DeleteVector {
3838
let outer = self.inner.bitmaps();
3939
DeleteVectorIterator { outer, inner: None }
4040
}
41+
42+
pub fn insert(&mut self, pos: u64) -> bool {
43+
self.inner.insert(pos)
44+
}
45+
46+
#[allow(unused)]
47+
pub fn len(&self) -> u64 {
48+
self.inner.len()
49+
}
4150
}
4251

4352
// Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here.

0 commit comments

Comments
 (0)