Skip to content

Commit 20b44ab

Browse files
committed
refactor: use Notify and oneshot channel rather than custom Future
1 parent 86efb05 commit 20b44ab

File tree

6 files changed

+149
-190
lines changed

6 files changed

+149
-190
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,5 +118,4 @@ url = "2.5.4"
118118
uuid = { version = "1.16", features = ["v7"] }
119119
volo = "0.10.6"
120120
volo-thrift = "0.10.6"
121-
waker-set = "0.2.0"
122121
zstd = "0.13.2"

crates/iceberg/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ tokio = { workspace = true, optional = false, features = ["sync"] }
8585
typed-builder = { workspace = true }
8686
url = { workspace = true }
8787
uuid = { workspace = true }
88-
waker-set = { workspace = true }
8988
zstd = { workspace = true }
9089

9190
[dev-dependencies]

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 47 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717

1818
use std::collections::HashMap;
1919

20-
use futures::channel::oneshot;
21-
use futures::future::join_all;
2220
use futures::{StreamExt, TryStreamExt};
2321
use tokio::sync::oneshot::{Receiver, channel};
2422

25-
use super::delete_filter::{DeleteFilter, EqDelFuture};
23+
use super::delete_filter::DeleteFilter;
2624
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
2725
use crate::delete_vector::DeleteVector;
2826
use crate::expr::Predicate;
@@ -40,18 +38,17 @@ pub(crate) struct CachingDeleteFileLoader {
4038
// Intermediate context during processing of a delete file task.
4139
enum DeleteFileContext {
4240
// TODO: Delete Vector loader from Puffin files
43-
InProgEqDel(EqDelFuture),
41+
ExistingEqDel,
4442
PosDels(ArrowRecordBatchStream),
4543
FreshEqDel {
4644
batch_stream: ArrowRecordBatchStream,
47-
sender: oneshot::Sender<Predicate>,
45+
sender: tokio::sync::oneshot::Sender<Predicate>,
4846
},
4947
}
5048

5149
// Final result of the processing of a delete file task before
5250
// results are fully merged into the DeleteFileManager's state
5351
enum ParsedDeleteFileContext {
54-
InProgEqDel(EqDelFuture),
5552
DelVecs(HashMap<String, DeleteVector>),
5653
EqDel,
5754
}
@@ -65,23 +62,23 @@ impl CachingDeleteFileLoader {
6562
}
6663
}
6764

68-
/// Load the deletes for all the specified tasks
65+
/// Initiates loading of all deletes for all the specified tasks
6966
///
70-
/// Returned future completes once all loading has finished.
67+
/// Returned future completes once all positional deletes and delete vectors
68+
/// have loaded. EQ deletes are not waited for in this method but the returned
69+
/// DeleteFilter will await their loading when queried for them.
7170
///
7271
/// * Create a single stream of all delete file tasks irrespective of type,
7372
/// so that we can respect the combined concurrency limit
7473
/// * We then process each in two phases: load and parse.
7574
/// * for positional deletes the load phase instantiates an ArrowRecordBatchStream to
7675
/// stream the file contents out
7776
/// * for eq deletes, we first check if the EQ delete is already loaded or being loaded by
78-
/// another concurrently processing data file scan task. If it is, we return a future
79-
/// for the pre-existing task from the load phase. If not, we create such a future
80-
/// and store it in the state to prevent other data file tasks from starting to load
81-
/// the same equality delete file, and return a record batch stream from the load phase
82-
/// as per the other delete file types - only this time it is accompanied by a one-shot
83-
/// channel sender that we will eventually use to resolve the shared future that we stored
84-
/// in the state.
77+
/// another concurrently processing data file scan task. If it is, we skip it.
78+
/// If not, the DeleteFilter is updated to contain a notifier to prevent other data file
79+
/// tasks from starting to load the same equality delete file. We spawn a task to load
80+
/// the EQ delete's record batch stream, convert it to a predicate, update the delete filter,
81+
/// and notify any task that was waiting for it.
8582
/// * When this gets updated to add support for delete vectors, the load phase will return
8683
/// a PuffinReader for them.
8784
/// * The parse phase parses each record batch stream according to its associated data type.
@@ -100,35 +97,34 @@ impl CachingDeleteFileLoader {
10097
/// ```none
10198
/// FileScanTaskDeleteFile
10299
/// |
103-
/// Already-loading EQ Delete | Everything Else
104-
/// +---------------------------------------------------+
105-
/// | |
106-
/// [get existing future] [load recordbatch stream / puffin]
107-
/// DeleteFileContext::InProgEqDel DeleteFileContext
108-
/// | |
109-
/// | |
110-
/// | +-----------------------------+--------------------------+
111-
/// | Pos Del Del Vec (Not yet Implemented) EQ Del
112-
/// | | | |
113-
/// | [parse pos del stream] [parse del vec puffin] [parse eq del]
114-
/// | HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
115-
/// | | | |
116-
/// | | | [persist to state]
117-
/// | | | ()
118-
/// | | | |
119-
/// | +-----------------------------+--------------------------+
120-
/// | |
121-
/// | [buffer unordered]
122-
/// | |
123-
/// | [combine del vectors]
124-
/// | HashMap<String, RoaringTreeMap>
125-
/// | |
126-
/// | [persist del vectors to state]
127-
/// | ()
128-
/// | |
129-
/// +-------------------------+-------------------------+
130-
/// |
131-
/// [join!]
100+
/// Skip Started EQ Deletes
101+
/// |
102+
/// |
103+
/// [load recordbatch stream / puffin]
104+
/// DeleteFileContext
105+
/// |
106+
/// |
107+
/// +-----------------------------+--------------------------+
108+
/// Pos Del Del Vec (Not yet Implemented) EQ Del
109+
/// | | |
110+
/// [parse pos del stream] [parse del vec puffin] [parse eq del]
111+
/// HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
112+
/// | | |
113+
/// | | [persist to state]
114+
/// | | ()
115+
/// | | |
116+
/// +-----------------------------+--------------------------+
117+
/// |
118+
/// [buffer unordered]
119+
/// |
120+
/// [combine del vectors]
121+
/// HashMap<String, RoaringTreeMap>
122+
/// |
123+
/// [persist del vectors to state]
124+
/// ()
125+
/// |
126+
/// |
127+
/// [join!]
132128
/// ```
133129
pub(crate) fn load_deletes(
134130
&self,
@@ -150,6 +146,7 @@ impl CachingDeleteFileLoader {
150146
})
151147
.collect::<Vec<_>>();
152148
let task_stream = futures::stream::iter(stream_items);
149+
153150
let del_filter = del_filter.clone();
154151
let concurrency_limit_data_files = self.concurrency_limit_data_files;
155152
let basic_delete_file_loader = self.basic_delete_file_loader.clone();
@@ -178,16 +175,6 @@ impl CachingDeleteFileLoader {
178175
.try_collect::<Vec<_>>()
179176
.await?;
180177

181-
// wait for all in-progress EQ deletes from other tasks
182-
let _ = join_all(results.iter().filter_map(|i| {
183-
if let ParsedDeleteFileContext::InProgEqDel(fut) = i {
184-
Some(fut.clone())
185-
} else {
186-
None
187-
}
188-
}))
189-
.await;
190-
191178
for item in results {
192179
if let ParsedDeleteFileContext::DelVecs(hash_map) = item {
193180
for (data_file_path, delete_vector) in hash_map.into_iter() {
@@ -220,20 +207,13 @@ impl CachingDeleteFileLoader {
220207
)),
221208

222209
DataContentType::EqualityDeletes => {
223-
let sender = {
224-
if let Some(existing) = del_filter
225-
.get_equality_delete_predicate_for_delete_file_path(&task.file_path)
226-
{
227-
return Ok(DeleteFileContext::InProgEqDel(existing.clone()));
228-
}
229-
230-
let (sender, fut) = EqDelFuture::new();
231-
232-
del_filter.insert_equality_delete(task.file_path.to_string(), fut);
233-
234-
sender
210+
let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else {
211+
return Ok(DeleteFileContext::ExistingEqDel);
235212
};
236213

214+
let (sender, receiver) = channel();
215+
del_filter.insert_equality_delete(&task.file_path, receiver);
216+
237217
Ok(DeleteFileContext::FreshEqDel {
238218
batch_stream: BasicDeleteFileLoader::evolve_schema(
239219
basic_delete_file_loader
@@ -257,7 +237,7 @@ impl CachingDeleteFileLoader {
257237
ctx: DeleteFileContext,
258238
) -> Result<ParsedDeleteFileContext> {
259239
match ctx {
260-
DeleteFileContext::InProgEqDel(fut) => Ok(ParsedDeleteFileContext::InProgEqDel(fut)),
240+
DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel),
261241
DeleteFileContext::PosDels(batch_stream) => {
262242
let del_vecs =
263243
Self::parse_positional_deletes_record_batch_stream(batch_stream).await?;

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 70 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616
// under the License.
1717

1818
use std::collections::HashMap;
19-
use std::future::Future;
20-
use std::pin::Pin;
21-
use std::sync::{Arc, Mutex, OnceLock, RwLock};
22-
use std::task::{Context, Poll};
19+
use std::sync::{Arc, Mutex, RwLock};
2320

24-
use futures::channel::oneshot;
21+
use tokio::sync::Notify;
22+
use tokio::sync::oneshot::Receiver;
2523

2624
use crate::delete_vector::DeleteVector;
2725
use crate::expr::Predicate::AlwaysTrue;
@@ -30,46 +28,16 @@ use crate::scan::{FileScanTask, FileScanTaskDeleteFile};
3028
use crate::spec::DataContentType;
3129
use crate::{Error, ErrorKind, Result};
3230

33-
// Equality deletes may apply to more than one DataFile in a scan, and so
34-
// the same equality delete file may be present in more than one invocation of
35-
// DeleteFileManager::load_deletes in the same scan. We want to deduplicate these
36-
// to avoid having to load them twice, so we immediately store cloneable futures in the
37-
// state that can be awaited upon to get te EQ deletes. That way we can check to see if
38-
// a load of each Eq delete file is already in progress and avoid starting another one.
39-
#[derive(Debug, Clone)]
40-
pub(crate) struct EqDelFuture {
41-
result: OnceLock<Predicate>,
42-
}
43-
44-
impl EqDelFuture {
45-
pub(crate) fn new() -> (oneshot::Sender<Predicate>, Self) {
46-
let (tx, rx) = oneshot::channel();
47-
let result = OnceLock::new();
48-
49-
crate::runtime::spawn({
50-
let result = result.clone();
51-
async move { result.set(rx.await.unwrap()) }
52-
});
53-
54-
(tx, Self { result })
55-
}
56-
}
57-
58-
impl Future for EqDelFuture {
59-
type Output = Predicate;
60-
61-
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
62-
match self.result.get() {
63-
None => Poll::Pending,
64-
Some(predicate) => Poll::Ready(predicate.clone()),
65-
}
66-
}
31+
#[derive(Debug)]
32+
enum EqDelState {
33+
Loading(Arc<Notify>),
34+
Loaded(Predicate),
6735
}
6836

6937
#[derive(Debug, Default)]
7038
struct DeleteFileFilterState {
7139
delete_vectors: HashMap<String, Arc<Mutex<DeleteVector>>>,
72-
equality_deletes: HashMap<String, EqDelFuture>,
40+
equality_deletes: HashMap<String, EqDelState>,
7341
}
7442

7543
#[derive(Clone, Debug, Default)]
@@ -97,17 +65,42 @@ impl DeleteFilter {
9765
.and_then(|st| st.delete_vectors.get(delete_file_path).cloned())
9866
}
9967

68+
pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option<Arc<Notify>> {
69+
let mut state = self.state.write().unwrap();
70+
71+
if !state.equality_deletes.contains_key(file_path) {
72+
return None;
73+
}
74+
75+
let notifier = Arc::new(Notify::new());
76+
state
77+
.equality_deletes
78+
.insert(file_path.to_string(), EqDelState::Loading(notifier.clone()));
79+
80+
Some(notifier)
81+
}
82+
10083
/// Retrieve the equality delete predicate for a given eq delete file path
101-
pub(crate) fn get_equality_delete_predicate_for_delete_file_path(
84+
pub(crate) async fn get_equality_delete_predicate_for_delete_file_path(
10285
&self,
10386
file_path: &str,
104-
) -> Option<EqDelFuture> {
105-
self.state
106-
.read()
107-
.unwrap()
108-
.equality_deletes
109-
.get(file_path)
110-
.cloned()
87+
) -> Option<Predicate> {
88+
let notifier = {
89+
match self.state.read().unwrap().equality_deletes.get(file_path) {
90+
None => return None,
91+
Some(EqDelState::Loading(notifier)) => notifier.clone(),
92+
Some(EqDelState::Loaded(predicate)) => {
93+
return Some(predicate.clone());
94+
}
95+
}
96+
};
97+
98+
notifier.notified().await;
99+
100+
match self.state.read().unwrap().equality_deletes.get(file_path) {
101+
Some(EqDelState::Loaded(predicate)) => Some(predicate.clone()),
102+
_ => unreachable!("Cannot be any other state than loaded"),
103+
}
111104
}
112105

113106
/// Builds eq delete predicate for the provided task.
@@ -126,8 +119,9 @@ impl DeleteFilter {
126119
continue;
127120
}
128121

129-
let Some(predicate) =
130-
self.get_equality_delete_predicate_for_delete_file_path(&delete.file_path)
122+
let Some(predicate) = self
123+
.get_equality_delete_predicate_for_delete_file_path(&delete.file_path)
124+
.await
131125
else {
132126
return Err(Error::new(
133127
ErrorKind::Unexpected,
@@ -138,7 +132,7 @@ impl DeleteFilter {
138132
));
139133
};
140134

141-
combined_predicate = combined_predicate.and(predicate.await);
135+
combined_predicate = combined_predicate.and(predicate);
142136
}
143137

144138
if combined_predicate == AlwaysTrue {
@@ -167,10 +161,32 @@ impl DeleteFilter {
167161
*entry.lock().unwrap() |= delete_vector;
168162
}
169163

170-
pub(crate) fn insert_equality_delete(&self, delete_file_path: String, eq_del: EqDelFuture) {
171-
let mut state = self.state.write().unwrap();
164+
pub(crate) fn insert_equality_delete(
165+
&self,
166+
delete_file_path: &str,
167+
eq_del: Receiver<Predicate>,
168+
) {
169+
let notify = Arc::new(Notify::new());
170+
{
171+
let mut state = self.state.write().unwrap();
172+
state.equality_deletes.insert(
173+
delete_file_path.to_string(),
174+
EqDelState::Loading(notify.clone()),
175+
);
176+
}
172177

173-
state.equality_deletes.insert(delete_file_path, eq_del);
178+
let state = self.state.clone();
179+
let delete_file_path = delete_file_path.to_string();
180+
crate::runtime::spawn(async move {
181+
let eq_del = eq_del.await.unwrap();
182+
{
183+
let mut state = state.write().unwrap();
184+
state
185+
.equality_deletes
186+
.insert(delete_file_path, EqDelState::Loaded(eq_del));
187+
}
188+
notify.notify_waiters();
189+
});
174190
}
175191
}
176192

0 commit comments

Comments
 (0)