Skip to content

Commit cdcf47b

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
split QueryNode::next so the caller handles IndexRangeRequests (#23784)
allow the caller of QueryNode::next to execute index ranges in batches by returning an IndexRangeRequest that should be executed. The response is passed back in to QueryNode::feed. Then the caller calls QueryNode::next again and loops. considered alternatives: 1. #23743 inverts control completely, pulling the QueryNode logic into a batched function. 2. execute `next()` in parallel, passing in a channel instead of `&mut Transaction`. unfortunately `next()` can call six different methods on Transaction which would all have to be handled by the channel. GitOrigin-RevId: 7479a27cb2ad6f54c931d0f9f2c029823e3ec3e6
1 parent f7acb64 commit cdcf47b

File tree

12 files changed

+293
-152
lines changed

12 files changed

+293
-152
lines changed

crates/database/src/bootstrap_model/user_facing.rs

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use common::{
99
DeveloperDocument,
1010
ResolvedDocument,
1111
},
12-
index::IndexKeyBytes,
1312
query::CursorPosition,
1413
runtime::Runtime,
1514
types::{
@@ -30,13 +29,15 @@ use value::{
3029
DeveloperDocumentId,
3130
Size,
3231
TableName,
32+
TableNumber,
3333
};
3434

3535
use crate::{
3636
metrics::{
3737
log_virtual_table_get,
3838
log_virtual_table_query,
3939
},
40+
query::IndexRangeResponse,
4041
transaction::{
4142
IndexRangeRequest,
4243
MAX_PAGE_SIZE,
@@ -322,17 +323,12 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
322323
async fn start_index_range(
323324
&mut self,
324325
request: IndexRangeRequest,
325-
) -> anyhow::Result<
326-
Result<
327-
(
328-
Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>,
329-
CursorPosition,
330-
),
331-
RangeRequest,
332-
>,
333-
> {
326+
) -> anyhow::Result<Result<IndexRangeResponse<TableNumber>, RangeRequest>> {
334327
if request.interval.is_empty() {
335-
return Ok(Ok((vec![], CursorPosition::End)));
328+
return Ok(Ok(IndexRangeResponse {
329+
page: vec![],
330+
cursor: CursorPosition::End,
331+
}));
336332
}
337333

338334
let max_rows = cmp::min(request.max_rows, MAX_PAGE_SIZE);
@@ -357,7 +353,10 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
357353
return Ok(Ok(virtual_result));
358354
},
359355
StableIndexName::Missing => {
360-
return Ok(Ok((vec![], CursorPosition::End)));
356+
return Ok(Ok(IndexRangeResponse {
357+
page: vec![],
358+
cursor: CursorPosition::End,
359+
}));
361360
},
362361
};
363362
let index_name = tablet_index_name
@@ -378,13 +377,7 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
378377
pub async fn index_range_batch(
379378
&mut self,
380379
requests: BTreeMap<BatchKey, IndexRangeRequest>,
381-
) -> BTreeMap<
382-
BatchKey,
383-
anyhow::Result<(
384-
Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>,
385-
CursorPosition,
386-
)>,
387-
> {
380+
) -> BTreeMap<BatchKey, anyhow::Result<IndexRangeResponse<TableNumber>>> {
388381
let batch_size = requests.len();
389382
let mut results = BTreeMap::new();
390383
let mut fetch_requests = BTreeMap::new();
@@ -409,12 +402,15 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
409402
.await;
410403

411404
for (batch_key, fetch_result) in fetch_results {
412-
let result = fetch_result.map(|(resolved_results, cursor)| {
413-
let developer_results = resolved_results
405+
let result = fetch_result.map(|IndexRangeResponse { page, cursor }| {
406+
let developer_results = page
414407
.into_iter()
415408
.map(|(key, doc, ts)| (key, doc.to_developer(), ts))
416409
.collect();
417-
(developer_results, cursor)
410+
IndexRangeResponse {
411+
page: developer_results,
412+
cursor,
413+
}
418414
});
419415
results.insert(batch_key, result);
420416
}

crates/database/src/query/filter.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
use async_trait::async_trait;
22
use common::{
3-
document::GenericDocument,
43
query::{
54
CursorPosition,
65
Expression,
76
},
87
runtime::Runtime,
9-
types::WriteTimestamp,
108
};
119

1210
use super::{
11+
IndexRangeResponse,
1312
QueryNode,
1413
QueryStream,
14+
QueryStreamNext,
1515
QueryType,
1616
};
1717
use crate::Transaction;
@@ -51,17 +51,24 @@ impl<T: QueryType> QueryStream<T> for Filter<T> {
5151
&mut self,
5252
tx: &mut Transaction<RT>,
5353
_prefetch_hint: Option<usize>,
54-
) -> anyhow::Result<Option<(GenericDocument<T::T>, WriteTimestamp)>> {
54+
) -> anyhow::Result<QueryStreamNext<T>> {
5555
loop {
5656
let (document, write_timestamp) =
5757
match self.inner.next(tx, Some(FILTER_QUERY_PREFETCH)).await? {
58-
Some(v) => v,
59-
None => return Ok(None),
58+
QueryStreamNext::Ready(Some(v)) => v,
59+
QueryStreamNext::Ready(None) => return Ok(QueryStreamNext::Ready(None)),
60+
QueryStreamNext::WaitingOn(request) => {
61+
return Ok(QueryStreamNext::WaitingOn(request))
62+
},
6063
};
6164
let value = document.value().0.clone();
6265
if self.expr.eval(&value)?.into_boolean()? {
63-
return Ok(Some((document, write_timestamp)));
66+
return Ok(QueryStreamNext::Ready(Some((document, write_timestamp))));
6467
}
6568
}
6669
}
70+
71+
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
72+
self.inner.feed(index_range_response)
73+
}
6774
}

crates/database/src/query/index_range.rs

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::{
33
collections::VecDeque,
44
};
55

6-
use anyhow::Context;
76
use async_trait::async_trait;
87
use common::{
98
document::GenericDocument,
@@ -25,12 +24,13 @@ use common::{
2524
},
2625
version::Version,
2726
};
28-
use maplit::btreemap;
2927

3028
use super::{
3129
query_scanned_too_many_documents_error,
3230
query_scanned_too_much_data,
31+
IndexRangeResponse,
3332
QueryStream,
33+
QueryStreamNext,
3434
QueryType,
3535
DEFAULT_QUERY_PREFETCH,
3636
MAX_QUERY_FETCH,
@@ -145,8 +145,7 @@ impl<T: QueryType> IndexRange<T> {
145145
&mut self,
146146
tx: &mut Transaction<RT>,
147147
prefetch_hint: Option<usize>,
148-
) -> anyhow::Result<Result<Option<(GenericDocument<T::T>, WriteTimestamp)>, IndexRangeRequest>>
149-
{
148+
) -> anyhow::Result<QueryStreamNext<T>> {
150149
// If we have an end cursor, for correctness we need to process
151150
// the entire interval, so ignore `maximum_rows_read` and `maximum_bytes_read`.
152151
let enforce_limits = self.cursor_interval.end_inclusive.is_none();
@@ -177,10 +176,10 @@ impl<T: QueryType> IndexRange<T> {
177176
self.printable_index_name.is_system_owned(),
178177
);
179178
self.returned_bytes += v.size();
180-
return Ok(Ok(Some((v, timestamp))));
179+
return Ok(QueryStreamNext::Ready(Some((v, timestamp))));
181180
}
182181
if let Some(CursorPosition::End) = self.cursor_interval.curr_exclusive {
183-
return Ok(Ok(None));
182+
return Ok(QueryStreamNext::Ready(None));
184183
}
185184
if self.unfetched_interval.is_empty() {
186185
// We're out of results. If we have an end cursor then we must
@@ -192,7 +191,7 @@ impl<T: QueryType> IndexRange<T> {
192191
.clone()
193192
.unwrap_or(CursorPosition::End),
194193
);
195-
return Ok(Ok(None));
194+
return Ok(QueryStreamNext::Ready(None));
196195
}
197196

198197
let mut max_rows = prefetch_hint
@@ -205,7 +204,7 @@ impl<T: QueryType> IndexRange<T> {
205204
}
206205
max_rows = cmp::min(max_rows, maximum_rows_read - self.rows_read);
207206
}
208-
Ok(Err(IndexRangeRequest {
207+
Ok(QueryStreamNext::WaitingOn(IndexRangeRequest {
209208
stable_index_name: self.stable_index_name.clone(),
210209
interval: self.unfetched_interval.clone(),
211210
order: self.order,
@@ -227,25 +226,6 @@ impl<T: QueryType> IndexRange<T> {
227226
self.page.extend(page);
228227
Ok(())
229228
}
230-
231-
#[convex_macro::instrument_future]
232-
async fn _next<RT: Runtime>(
233-
&mut self,
234-
tx: &mut Transaction<RT>,
235-
prefetch_hint: Option<usize>,
236-
) -> anyhow::Result<Option<(GenericDocument<T::T>, WriteTimestamp)>> {
237-
loop {
238-
let request = match self.start_next(tx, prefetch_hint)? {
239-
Ok(result) => return Ok(result),
240-
Err(request) => request,
241-
};
242-
let (page, fetch_cursor) = T::index_range_batch(tx, btreemap! {0 => request})
243-
.await
244-
.remove(&0)
245-
.context("batch_key missing")??;
246-
self.process_fetch(page, fetch_cursor)?;
247-
}
248-
}
249229
}
250230

251231
pub const fn soft_data_limit(hard_limit: usize) -> usize {
@@ -276,8 +256,12 @@ impl<T: QueryType> QueryStream<T> for IndexRange<T> {
276256
&mut self,
277257
tx: &mut Transaction<RT>,
278258
prefetch_hint: Option<usize>,
279-
) -> anyhow::Result<Option<(GenericDocument<T::T>, WriteTimestamp)>> {
280-
self._next(tx, prefetch_hint).await
259+
) -> anyhow::Result<QueryStreamNext<T>> {
260+
self.start_next(tx, prefetch_hint)
261+
}
262+
263+
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
264+
self.process_fetch(index_range_response.page, index_range_response.cursor)
281265
}
282266
}
283267

crates/database/src/query/limit.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ use std::cmp;
33
use async_trait::async_trait;
44
use common::{
55
self,
6-
document::GenericDocument,
76
query::CursorPosition,
87
runtime::Runtime,
9-
types::WriteTimestamp,
108
};
119

1210
use super::{
11+
IndexRangeResponse,
1312
QueryNode,
1413
QueryStream,
14+
QueryStreamNext,
1515
QueryType,
1616
DEFAULT_QUERY_PREFETCH,
1717
};
@@ -52,18 +52,22 @@ impl<T: QueryType> QueryStream<T> for Limit<T> {
5252
&mut self,
5353
tx: &mut Transaction<RT>,
5454
prefetch_hint: Option<usize>,
55-
) -> anyhow::Result<Option<(GenericDocument<T::T>, WriteTimestamp)>> {
55+
) -> anyhow::Result<QueryStreamNext<T>> {
5656
if self.rows_emitted >= self.limit {
57-
return Ok(None);
57+
return Ok(QueryStreamNext::Ready(None));
5858
}
5959
let inner_hint = cmp::min(
6060
prefetch_hint.unwrap_or(DEFAULT_QUERY_PREFETCH),
6161
self.limit - self.rows_emitted,
6262
);
63-
let next_value = self.inner.next(tx, Some(inner_hint)).await?;
64-
if next_value.is_some() {
63+
let result = self.inner.next(tx, Some(inner_hint)).await?;
64+
if let QueryStreamNext::Ready(Some(_)) = result {
6565
self.rows_emitted += 1;
6666
}
67-
Ok(next_value)
67+
Ok(result)
68+
}
69+
70+
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
71+
self.inner.feed(index_range_response)
6872
}
6973
}

0 commit comments

Comments
 (0)