Skip to content

Commit e0cd6dd

Browse files
committed
refactor: extract BasicDeleteFileLoader from CachingDeleteFileLoader
1 parent 2c856e9 commit e0cd6dd

File tree

5 files changed

+386
-296
lines changed

5 files changed

+386
-296
lines changed
Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
use futures::channel::oneshot;
21+
use futures::future::join_all;
22+
use futures::{StreamExt, TryStreamExt};
23+
use tokio::sync::oneshot::{channel, Receiver};
24+
25+
use super::delete_filter::{DeleteFilter, EqDelFuture};
26+
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
27+
use crate::delete_vector::DeleteVector;
28+
use crate::expr::Predicate;
29+
use crate::io::FileIO;
30+
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
31+
use crate::spec::{DataContentType, SchemaRef};
32+
use crate::{Error, ErrorKind, Result};
33+
34+
#[derive(Clone, Debug)]
35+
pub(crate) struct CachingDeleteFileLoader {
36+
basic_delete_file_loader: BasicDeleteFileLoader,
37+
concurrency_limit_data_files: usize,
38+
}
39+
40+
// Intermediate context during processing of a delete file task.
41+
enum DeleteFileContext {
42+
// TODO: Delete Vector loader from Puffin files
43+
InProgEqDel(EqDelFuture),
44+
PosDels(ArrowRecordBatchStream),
45+
FreshEqDel {
46+
batch_stream: ArrowRecordBatchStream,
47+
sender: oneshot::Sender<Predicate>,
48+
},
49+
}
50+
51+
// Final result of the processing of a delete file task before
52+
// results are fully merged into the DeleteFileManager's state
53+
enum ParsedDeleteFileContext {
54+
InProgEqDel(EqDelFuture),
55+
DelVecs(HashMap<String, DeleteVector>),
56+
EqDel,
57+
}
58+
59+
#[allow(unused_variables)]
60+
impl CachingDeleteFileLoader {
61+
pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self {
62+
CachingDeleteFileLoader {
63+
basic_delete_file_loader: BasicDeleteFileLoader::new(file_io),
64+
concurrency_limit_data_files,
65+
}
66+
}
67+
68+
/// Load the deletes for all the specified tasks
69+
///
70+
/// Returned future completes once all loading has finished.
71+
///
72+
/// * Create a single stream of all delete file tasks irrespective of type,
73+
/// so that we can respect the combined concurrency limit
74+
/// * We then process each in two phases: load and parse.
75+
/// * for positional deletes the load phase instantiates an ArrowRecordBatchStream to
76+
/// stream the file contents out
77+
/// * for eq deletes, we first check if the EQ delete is already loaded or being loaded by
78+
/// another concurrently processing data file scan task. If it is, we return a future
79+
/// for the pre-existing task from the load phase. If not, we create such a future
80+
/// and store it in the state to prevent other data file tasks from starting to load
81+
/// the same equality delete file, and return a record batch stream from the load phase
82+
/// as per the other delete file types - only this time it is accompanied by a one-shot
83+
/// channel sender that we will eventually use to resolve the shared future that we stored
84+
/// in the state.
85+
/// * When this gets updated to add support for delete vectors, the load phase will return
86+
/// a PuffinReader for them.
87+
/// * The parse phase parses each record batch stream according to its associated data type.
88+
/// The result of this is a map of data file paths to delete vectors for the positional
89+
/// delete tasks (and in future for the delete vector tasks). For equality delete
90+
/// file tasks, this results in an unbound Predicate.
91+
/// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot
92+
/// channel to store them in the right place in the delete file managers state.
93+
/// * The results of all of these futures are awaited on in parallel with the specified
94+
/// level of concurrency and collected into a vec. We then combine all the delete
95+
/// vector maps that resulted from any positional delete or delete vector files into a
96+
/// single map and persist it in the state.
97+
///
98+
///
99+
/// Conceptually, the data flow is like this:
100+
/// ```none
101+
/// FileScanTaskDeleteFile
102+
/// |
103+
/// Already-loading EQ Delete | Everything Else
104+
/// +---------------------------------------------------+
105+
/// | |
106+
/// [get existing future] [load recordbatch stream / puffin]
107+
/// DeleteFileContext::InProgEqDel DeleteFileContext
108+
/// | |
109+
/// | |
110+
/// | +-----------------------------+--------------------------+
111+
/// | Pos Del Del Vec (Not yet Implemented) EQ Del
112+
/// | | | |
113+
/// | [parse pos del stream] [parse del vec puffin] [parse eq del]
114+
/// | HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
115+
/// | | | |
116+
/// | | | [persist to state]
117+
/// | | | ()
118+
/// | | | |
119+
/// | +-----------------------------+--------------------------+
120+
/// | |
121+
/// | [buffer unordered]
122+
/// | |
123+
/// | [combine del vectors]
124+
/// | HashMap<String, RoaringTreeMap>
125+
/// | |
126+
/// | [persist del vectors to state]
127+
/// | ()
128+
/// | |
129+
/// +-------------------------+-------------------------+
130+
/// |
131+
/// [join!]
132+
/// ```
133+
pub(crate) fn load_deletes(
134+
&self,
135+
delete_file_entries: &[FileScanTaskDeleteFile],
136+
schema: SchemaRef,
137+
) -> Receiver<Result<DeleteFilter>> {
138+
let (tx, rx) = channel();
139+
let del_filter = DeleteFilter::default();
140+
141+
let stream_items = delete_file_entries
142+
.iter()
143+
.map(|t| {
144+
(
145+
t.clone(),
146+
self.basic_delete_file_loader.clone(),
147+
del_filter.clone(),
148+
schema.clone(),
149+
)
150+
})
151+
.collect::<Vec<_>>();
152+
let task_stream = futures::stream::iter(stream_items);
153+
let del_filter = del_filter.clone();
154+
let concurrency_limit_data_files = self.concurrency_limit_data_files;
155+
let basic_delete_file_loader = self.basic_delete_file_loader.clone();
156+
crate::runtime::spawn(async move {
157+
let result = async move {
158+
let mut del_filter = del_filter;
159+
let basic_delete_file_loader = basic_delete_file_loader.clone();
160+
161+
let results: Vec<ParsedDeleteFileContext> = task_stream
162+
.map(move |(task, file_io, del_filter, schema)| {
163+
let basic_delete_file_loader = basic_delete_file_loader.clone();
164+
async move {
165+
Self::load_file_for_task(
166+
&task,
167+
basic_delete_file_loader.clone(),
168+
del_filter,
169+
schema,
170+
)
171+
.await
172+
}
173+
})
174+
.map(move |ctx| {
175+
Ok(async { Self::parse_file_content_for_task(ctx.await?).await })
176+
})
177+
.try_buffer_unordered(concurrency_limit_data_files)
178+
.try_collect::<Vec<_>>()
179+
.await?;
180+
181+
// wait for all in-progress EQ deletes from other tasks
182+
let _ = join_all(results.iter().filter_map(|i| {
183+
if let ParsedDeleteFileContext::InProgEqDel(fut) = i {
184+
Some(fut.clone())
185+
} else {
186+
None
187+
}
188+
}))
189+
.await;
190+
191+
for item in results {
192+
if let ParsedDeleteFileContext::DelVecs(hash_map) = item {
193+
for (data_file_path, delete_vector) in hash_map.into_iter() {
194+
del_filter.upsert_delete_vector(data_file_path, delete_vector);
195+
}
196+
}
197+
}
198+
199+
Ok(del_filter)
200+
}
201+
.await;
202+
203+
let _ = tx.send(result);
204+
});
205+
206+
rx
207+
}
208+
209+
async fn load_file_for_task(
210+
task: &FileScanTaskDeleteFile,
211+
basic_delete_file_loader: BasicDeleteFileLoader,
212+
del_filter: DeleteFilter,
213+
schema: SchemaRef,
214+
) -> Result<DeleteFileContext> {
215+
match task.file_type {
216+
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
217+
basic_delete_file_loader
218+
.parquet_to_batch_stream(&task.file_path)
219+
.await?,
220+
)),
221+
222+
DataContentType::EqualityDeletes => {
223+
let sender = {
224+
if let Some(existing) = del_filter
225+
.get_equality_delete_predicate_for_delete_file_path(&task.file_path)
226+
{
227+
return Ok(DeleteFileContext::InProgEqDel(existing.clone()));
228+
}
229+
230+
let (sender, fut) = EqDelFuture::new();
231+
232+
del_filter.insert_equality_delete(task.file_path.to_string(), fut);
233+
234+
sender
235+
};
236+
237+
Ok(DeleteFileContext::FreshEqDel {
238+
batch_stream: BasicDeleteFileLoader::evolve_schema(
239+
basic_delete_file_loader
240+
.parquet_to_batch_stream(&task.file_path)
241+
.await?,
242+
schema,
243+
)
244+
.await?,
245+
sender,
246+
})
247+
}
248+
249+
DataContentType::Data => Err(Error::new(
250+
ErrorKind::Unexpected,
251+
"tasks with files of type Data not expected here",
252+
)),
253+
}
254+
}
255+
256+
async fn parse_file_content_for_task(
257+
ctx: DeleteFileContext,
258+
) -> Result<ParsedDeleteFileContext> {
259+
match ctx {
260+
DeleteFileContext::InProgEqDel(fut) => Ok(ParsedDeleteFileContext::InProgEqDel(fut)),
261+
DeleteFileContext::PosDels(batch_stream) => {
262+
let del_vecs =
263+
Self::parse_positional_deletes_record_batch_stream(batch_stream).await?;
264+
Ok(ParsedDeleteFileContext::DelVecs(del_vecs))
265+
}
266+
DeleteFileContext::FreshEqDel {
267+
sender,
268+
batch_stream,
269+
} => {
270+
let predicate =
271+
Self::parse_equality_deletes_record_batch_stream(batch_stream).await?;
272+
273+
sender
274+
.send(predicate)
275+
.map_err(|err| {
276+
Error::new(
277+
ErrorKind::Unexpected,
278+
"Could not send eq delete predicate to state",
279+
)
280+
})
281+
.map(|_| ParsedDeleteFileContext::EqDel)
282+
}
283+
}
284+
}
285+
286+
/// Parses a record batch stream coming from positional delete files
287+
///
288+
/// Returns a map of data file path to a delete vector
289+
async fn parse_positional_deletes_record_batch_stream(
290+
stream: ArrowRecordBatchStream,
291+
) -> Result<HashMap<String, DeleteVector>> {
292+
// TODO
293+
294+
Err(Error::new(
295+
ErrorKind::FeatureUnsupported,
296+
"parsing of positional deletes is not yet supported",
297+
))
298+
}
299+
300+
/// Parses record batch streams from individual equality delete files
301+
///
302+
/// Returns an unbound Predicate for each batch stream
303+
async fn parse_equality_deletes_record_batch_stream(
304+
streams: ArrowRecordBatchStream,
305+
) -> Result<Predicate> {
306+
// TODO
307+
308+
Err(Error::new(
309+
ErrorKind::FeatureUnsupported,
310+
"parsing of equality deletes is not yet supported",
311+
))
312+
}
313+
}
314+
315+
#[cfg(test)]
316+
mod tests {
317+
use tempfile::TempDir;
318+
319+
use super::*;
320+
use crate::arrow::delete_file_loader::tests::setup;
321+
322+
#[tokio::test]
323+
async fn test_delete_file_manager_load_deletes() {
324+
let tmp_dir = TempDir::new().unwrap();
325+
let table_location = tmp_dir.path();
326+
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
327+
.unwrap()
328+
.build()
329+
.unwrap();
330+
331+
// Note that with the delete file parsing not yet in place, all we can test here is that
332+
// the call to the loader fails with the expected FeatureUnsupportedError.
333+
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
334+
335+
let file_scan_tasks = setup(table_location);
336+
337+
let result = delete_file_manager
338+
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
339+
.await
340+
.unwrap();
341+
342+
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
343+
}
344+
}

0 commit comments

Comments
 (0)