Skip to content

Commit edb1d27

Browse files
committed
feat: delete file manager loading
1 parent 7c844d5 commit edb1d27

File tree

2 files changed

+390
-33
lines changed

2 files changed

+390
-33
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

Lines changed: 348 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,50 +15,375 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashMap;
19+
use std::future::Future;
20+
use std::ops::BitAndAssign;
21+
use std::pin::Pin;
22+
use std::sync::{Arc, RwLock};
23+
use std::task::{Context, Poll};
24+
25+
use futures::{StreamExt, TryStreamExt};
1826
use roaring::RoaringTreemap;
27+
use tokio::sync::oneshot;
1928

20-
use crate::expr::BoundPredicate;
29+
use crate::arrow::ArrowReader;
30+
use crate::expr::Predicate::AlwaysTrue;
31+
use crate::expr::{Bind, BoundPredicate, Predicate};
2132
use crate::io::FileIO;
22-
use crate::scan::FileScanTaskDeleteFile;
23-
use crate::spec::SchemaRef;
33+
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile};
34+
use crate::spec::DataContentType;
2435
use crate::{Error, ErrorKind, Result};
2536

26-
pub(crate) struct DeleteFileManager {}
37+
// Equality deletes may apply to more than one DataFile in a scan, and so
38+
// the same equality delete file may be present in more than one invocation of
39+
// DeleteFileManager::load_deletes in the same scan. We want to deduplicate these
40+
// to avoid having to load them twice, so we immediately store cloneable futures in the
41+
// state that can be awaited upon to get te EQ deletes. That way we can check to see if
42+
// a load of each Eq delete file is already in progress and avoid starting another one.
43+
#[derive(Debug, Clone)]
44+
struct EqDelFuture {}
45+
46+
impl EqDelFuture {
47+
pub fn new() -> (oneshot::Sender<Predicate>, Self) {
48+
todo!()
49+
}
50+
}
51+
52+
impl Future for EqDelFuture {
53+
type Output = Predicate;
54+
55+
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
56+
todo!()
57+
}
58+
}
59+
60+
#[derive(Debug, Default)]
61+
struct DeleteFileManagerState {
62+
// delete vectors and positional deletes get merged when loaded into a single delete vector
63+
// per data file
64+
delete_vectors: HashMap<String, RoaringTreemap>,
65+
66+
// equality delete files are parsed into unbound `Predicate`s. We store them here as
67+
// cloneable futures (see note below)
68+
equality_deletes: HashMap<String, EqDelFuture>,
69+
}
70+
71+
type StateRef = Arc<RwLock<DeleteFileManagerState>>;
72+
73+
#[derive(Clone, Debug)]
74+
pub(crate) struct DeleteFileManager {
75+
state: Arc<RwLock<DeleteFileManagerState>>,
76+
}
77+
78+
// Intermediate context during processing of a delete file task.
79+
enum DeleteFileContext {
80+
// TODO: Delete Vector loader from Puffin files
81+
InProgEqDel(EqDelFuture),
82+
PosDels(ArrowRecordBatchStream),
83+
FreshEqDel {
84+
batch_stream: ArrowRecordBatchStream,
85+
sender: oneshot::Sender<Predicate>,
86+
},
87+
}
88+
89+
// Final result of the processing of a delete file task before
90+
// results are fully merged into the DeleteFileManager's state
91+
enum ParsedDeleteFileContext {
92+
InProgEqDel(EqDelFuture),
93+
DelVecs(HashMap<String, RoaringTreemap>),
94+
EqDel,
95+
}
2796

2897
#[allow(unused_variables)]
2998
impl DeleteFileManager {
99+
pub(crate) fn new() -> DeleteFileManager {
100+
Self {
101+
state: Default::default(),
102+
}
103+
}
104+
30105
pub(crate) async fn load_deletes(
31-
delete_file_entries: Vec<FileScanTaskDeleteFile>,
106+
&self,
107+
delete_file_entries: &[FileScanTaskDeleteFile],
32108
file_io: FileIO,
33109
concurrency_limit_data_files: usize,
34-
) -> Result<DeleteFileManager> {
35-
// TODO
110+
) -> Result<()> {
111+
/*
112+
* Create a single stream of all delete file tasks irrespective of type,
113+
so that we can respect the combined concurrency limit
114+
* We then process each in two phases: load and parse.
115+
* for positional deletes the load phase instantiates an ArrowRecordBatchStream to
116+
stream the file contents out
117+
* for eq deletes, we first check if the EQ delete is already loaded or being loaded by
118+
another concurrently processing data file scan task. If it is, we return a future
119+
for the pre-existing task from the load phase. If not, we create such a future
120+
and store it in the state to prevent other data file tasks from starting to load
121+
the same equality delete file, and return a record batch stream from the load phase
122+
as per the other delete file types - only this time it is accompanied by a one-shot
123+
channel sender that we will eventually use to resolve the shared future that we stored
124+
in the state.
125+
* When this gets updated to add support for delete vectors, the load phase will return
126+
a PuffinReader for them.
127+
* The parse phase parses each record batch stream according to its associated data type.
128+
The result of this is a map of data file paths to delete vectors for the positional
129+
delete tasks (and in future for the delete vector tasks). For equality delete
130+
file tasks, this results in an unbound Predicate.
131+
* The unbound Predicates resulting from equality deletes are sent to their associated oneshot
132+
channel to store them in the right place in the delete file manager's state.
133+
* The results of all of these futures are awaited on in parallel with the specified
134+
level of concurrency and collected into a vec. We then combine all of the delete
135+
vector maps that resulted from any positional delete or delete vector files into a
136+
single map and persist it in the state.
137+
138+
139+
Conceptually, the data flow is like this:
140+
141+
FileScanTaskDeleteFile
142+
|
143+
Already-loading EQ Delete | Everything Else
144+
+---------------------------------------------------+
145+
| |
146+
[get existing future] [load recordbatch stream / puffin]
147+
DeleteFileContext::InProgEqDel DeleteFileContext
148+
| |
149+
| |
150+
| +-----------------------------+--------------------------+
151+
| Pos Del Del Vec (Not yet Implemented) EQ Del
152+
| | | |
153+
| [parse pos del stream] [parse del vec puffin] [parse eq del]
154+
| HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
155+
| | | |
156+
| | | [persist to state]
157+
| | | ()
158+
| | | |
159+
| +-----------------------------+--------------------------+
160+
| |
161+
| [buffer unordered]
162+
| |
163+
| [combine del vectors]
164+
| HashMap<String, RoaringTreeMap>
165+
| |
166+
| [persist del vectors to state]
167+
| ()
168+
| |
169+
+-------------------------+-------------------------+
170+
|
171+
[join!]
172+
*/
173+
174+
let stream_items = delete_file_entries
175+
.iter()
176+
.map(|t| (t.clone(), file_io.clone(), self.state.clone()))
177+
.collect::<Vec<_>>();
178+
// NOTE: removing the collect and just passing the iterator to futures::stream:iter
179+
// results in an error 'implementation of `std::ops::FnOnce` is not general enough'
180+
181+
let task_stream = futures::stream::iter(stream_items.into_iter());
182+
183+
let results: Vec<ParsedDeleteFileContext> = task_stream
184+
.map(move |(task, file_io, state_ref)| async {
185+
Self::load_file_for_task(task, file_io, state_ref).await
186+
})
187+
.map(move |ctx| Ok(async { Self::parse_file_content_for_task(ctx.await?).await }))
188+
.try_buffer_unordered(concurrency_limit_data_files)
189+
.try_collect::<Vec<_>>()
190+
.await?;
191+
192+
let merged_delete_vectors = results
193+
.into_iter()
194+
.fold(HashMap::default(), Self::merge_delete_vectors);
195+
196+
self.state.write().unwrap().delete_vectors = merged_delete_vectors;
197+
198+
Ok(())
199+
}
200+
201+
async fn load_file_for_task(
202+
task: FileScanTaskDeleteFile,
203+
file_io: FileIO,
204+
state: StateRef,
205+
) -> Result<DeleteFileContext> {
206+
match task.file_type {
207+
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
208+
Self::parquet_to_batch_stream(&task.file_path, file_io).await?,
209+
)),
210+
211+
DataContentType::EqualityDeletes => {
212+
let (sender, fut) = EqDelFuture::new();
213+
{
214+
let mut state = state.write().unwrap();
215+
216+
if let Some(existing) = state.equality_deletes.get(&task.file_path) {
217+
return Ok(DeleteFileContext::InProgEqDel(existing.clone()));
218+
}
36219

37-
if !delete_file_entries.is_empty() {
38-
Err(Error::new(
39-
ErrorKind::FeatureUnsupported,
40-
"Reading delete files is not yet supported",
41-
))
42-
} else {
43-
Ok(DeleteFileManager {})
220+
state
221+
.equality_deletes
222+
.insert(task.file_path.to_string(), fut);
223+
}
224+
225+
Ok(DeleteFileContext::FreshEqDel {
226+
batch_stream: Self::parquet_to_batch_stream(&task.file_path, file_io).await?,
227+
sender,
228+
})
229+
}
230+
231+
DataContentType::Data => Err(Error::new(
232+
ErrorKind::Unexpected,
233+
"tasks with files of type Data not expected here",
234+
)),
235+
}
236+
}
237+
238+
async fn parse_file_content_for_task(
239+
ctx: DeleteFileContext,
240+
) -> Result<ParsedDeleteFileContext> {
241+
match ctx {
242+
DeleteFileContext::InProgEqDel(fut) => Ok(ParsedDeleteFileContext::InProgEqDel(fut)),
243+
DeleteFileContext::PosDels(batch_stream) => {
244+
let del_vecs =
245+
Self::parse_positional_deletes_record_batch_stream(batch_stream).await?;
246+
Ok(ParsedDeleteFileContext::DelVecs(del_vecs))
247+
}
248+
DeleteFileContext::FreshEqDel {
249+
sender,
250+
batch_stream,
251+
} => {
252+
let predicate =
253+
Self::parse_equality_deletes_record_batch_stream(batch_stream).await?;
254+
255+
sender
256+
.send(predicate)
257+
.map_err(|err| {
258+
Error::new(
259+
ErrorKind::Unexpected,
260+
"Could not send eq delete predicate to state",
261+
)
262+
})
263+
.map(|_| ParsedDeleteFileContext::EqDel)
264+
}
44265
}
45266
}
46267

47-
pub(crate) fn build_delete_predicate(
268+
fn merge_delete_vectors(
269+
mut merged_delete_vectors: HashMap<String, RoaringTreemap>,
270+
item: ParsedDeleteFileContext,
271+
) -> HashMap<String, RoaringTreemap> {
272+
if let ParsedDeleteFileContext::DelVecs(del_vecs) = item {
273+
del_vecs.into_iter().for_each(|(key, val)| {
274+
let entry = merged_delete_vectors.entry(key).or_default();
275+
entry.bitand_assign(val);
276+
});
277+
}
278+
279+
merged_delete_vectors
280+
}
281+
282+
/// Loads a RecordBatchStream for a given datafile.
283+
async fn parquet_to_batch_stream(
284+
data_file_path: &str,
285+
file_io: FileIO,
286+
) -> Result<ArrowRecordBatchStream> {
287+
/*
288+
Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly
289+
as that introduces a circular dependency.
290+
*/
291+
let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder(
292+
data_file_path,
293+
file_io.clone(),
294+
false,
295+
)
296+
.await?
297+
.build()?
298+
.map_err(|e| Error::new(ErrorKind::Unexpected, format!("{}", e)));
299+
300+
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
301+
}
302+
303+
/// Parses a record batch stream coming from positional delete files
304+
///
305+
/// Returns a map of data file path to a delete vector
306+
async fn parse_positional_deletes_record_batch_stream(
307+
stream: ArrowRecordBatchStream,
308+
) -> Result<HashMap<String, RoaringTreemap>> {
309+
// TODO
310+
311+
Ok(HashMap::default())
312+
}
313+
314+
/// Parses record batch streams from individual equality delete files
315+
///
316+
/// Returns an unbound Predicate for each batch stream
317+
async fn parse_equality_deletes_record_batch_stream(
318+
streams: ArrowRecordBatchStream,
319+
) -> Result<Predicate> {
320+
// TODO
321+
322+
Ok(AlwaysTrue)
323+
}
324+
325+
/// Builds eq delete predicate for the provided task.
326+
///
327+
/// Must await on load_deletes before calling this.
328+
pub(crate) async fn build_delete_predicate_for_task(
48329
&self,
49-
snapshot_schema: SchemaRef,
330+
file_scan_task: &FileScanTask,
50331
) -> Result<Option<BoundPredicate>> {
51-
// TODO
332+
// * Filter the task's deletes into just the Equality deletes
333+
// * Retrieve the unbound predicate for each from self.state.equality_deletes
334+
// * Logical-AND them all together to get a single combined `Predicate`
335+
// * Bind the predicate to the task's schema to get a `BoundPredicate`
336+
337+
let mut combined_predicate = AlwaysTrue;
338+
for delete in &file_scan_task.deletes {
339+
if !is_equality_delete(delete) {
340+
continue;
341+
}
342+
343+
let predicate = {
344+
let state = self.state.read().unwrap();
345+
346+
let Some(predicate) = state.equality_deletes.get(&delete.file_path) else {
347+
return Err(Error::new(
348+
ErrorKind::Unexpected,
349+
format!(
350+
"Missing predicate for equality delete file '{}'",
351+
delete.file_path
352+
),
353+
));
354+
};
355+
356+
predicate.clone()
357+
};
52358

53-
Ok(None)
359+
combined_predicate = combined_predicate.and(predicate.await);
360+
}
361+
362+
if combined_predicate == AlwaysTrue {
363+
return Ok(None);
364+
}
365+
366+
// TODO: handle case-insensitive case
367+
let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?;
368+
Ok(Some(bound_predicate))
54369
}
55370

56-
pub(crate) fn get_positional_delete_indexes_for_data_file(
371+
/// Retrieve a delete vector for the data file associated with a given file scan task
372+
///
373+
/// Should only be called after awaiting on load_deletes. Takes the vector to avoid a
374+
/// clone since each item is specific to a single data file and won't need to be used again
375+
pub(crate) fn get_delete_vector_for_task(
57376
&self,
58-
data_file_path: &str,
377+
file_scan_task: &FileScanTask,
59378
) -> Option<RoaringTreemap> {
60-
// TODO
61-
62-
None
379+
self.state
380+
.write()
381+
.unwrap()
382+
.delete_vectors
383+
.remove(file_scan_task.data_file_path())
63384
}
64385
}
386+
387+
pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
388+
matches!(f.file_type, DataContentType::EqualityDeletes)
389+
}

0 commit comments

Comments
 (0)