Skip to content

Commit 8aa6ae9

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
nest DeveloperQuery inside ResolvedQuery (#25381)
see docstring on ResolvedQuery. this gets us closer to removing TableIdAndTableNumber which is a confusing concept since it mashes together tablets and table numbers. GitOrigin-RevId: 9586355db96f0e276a37e030a4e732f49a3f4455
1 parent e1105af commit 8aa6ae9

File tree

13 files changed

+227
-232
lines changed

13 files changed

+227
-232
lines changed

crates/application/src/export_worker.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,8 @@ mod tests {
988988
},
989989
};
990990
let doc = UserFacingModel::new(&mut tx).get(id, None).await?.unwrap();
991-
let doc = doc.to_resolved(&tx.table_mapping().inject_table_id())?;
991+
let tablet_id = tx.table_mapping().inject_table_id()(doc.table())?.table_id;
992+
let doc = doc.to_resolved(tablet_id);
992993
let id_v6 = doc.developer_id().encode();
993994
expected_export_entries.insert(
994995
format!("table_{i}/documents.jsonl"),

crates/common/src/document.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -655,16 +655,16 @@ impl DeveloperDocument {
655655
}
656656
}
657657

658-
pub fn to_resolved(
659-
self,
660-
f: &impl Fn(TableNumber) -> anyhow::Result<TableIdAndTableNumber>,
661-
) -> anyhow::Result<ResolvedDocument> {
662-
let id = self.id.map_table(f)?;
663-
Ok(ResolvedDocument {
664-
id,
658+
pub fn to_resolved(self, tablet_id: TableId) -> ResolvedDocument {
659+
ResolvedDocument {
660+
id: TableIdAndTableNumber {
661+
table_id: tablet_id,
662+
table_number: *self.id.table(),
663+
}
664+
.id(self.id.internal_id()),
665665
creation_time: self.creation_time,
666666
value: self.value,
667-
})
667+
}
668668
}
669669

670670
pub fn id(&self) -> DeveloperDocumentId {

crates/database/src/bootstrap_model/user_facing.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,17 @@ use value::{
2929
DeveloperDocumentId,
3030
Size,
3131
TableName,
32-
TableNumber,
3332
};
3433

3534
use crate::{
3635
metrics::{
3736
log_virtual_table_get,
3837
log_virtual_table_query,
3938
},
40-
query::IndexRangeResponse,
39+
query::{
40+
DeveloperIndexRangeResponse,
41+
IndexRangeResponse,
42+
},
4143
transaction::{
4244
IndexRangeRequest,
4345
MAX_PAGE_SIZE,
@@ -329,9 +331,9 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
329331
fn start_index_range(
330332
&mut self,
331333
request: IndexRangeRequest,
332-
) -> anyhow::Result<Result<IndexRangeResponse<TableNumber>, RangeRequest>> {
334+
) -> anyhow::Result<Result<DeveloperIndexRangeResponse, RangeRequest>> {
333335
if request.interval.is_empty() {
334-
return Ok(Ok(IndexRangeResponse {
336+
return Ok(Ok(DeveloperIndexRangeResponse {
335337
page: vec![],
336338
cursor: CursorPosition::End,
337339
}));
@@ -362,7 +364,7 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
362364
max_size: max_rows,
363365
}))
364366
},
365-
StableIndexName::Missing => Ok(Ok(IndexRangeResponse {
367+
StableIndexName::Missing => Ok(Ok(DeveloperIndexRangeResponse {
366368
page: vec![],
367369
cursor: CursorPosition::End,
368370
})),
@@ -376,7 +378,7 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
376378
pub async fn index_range_batch(
377379
&mut self,
378380
requests: BTreeMap<BatchKey, IndexRangeRequest>,
379-
) -> BTreeMap<BatchKey, anyhow::Result<IndexRangeResponse<TableNumber>>> {
381+
) -> BTreeMap<BatchKey, anyhow::Result<DeveloperIndexRangeResponse>> {
380382
let batch_size = requests.len();
381383
let mut results = BTreeMap::new();
382384
let mut fetch_requests = BTreeMap::new();
@@ -421,7 +423,7 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
421423
.map(|(key, doc, ts)| (key, doc.to_developer(), ts))
422424
.collect(),
423425
};
424-
anyhow::Ok(IndexRangeResponse {
426+
anyhow::Ok(DeveloperIndexRangeResponse {
425427
page: developer_results,
426428
cursor,
427429
})

crates/database/src/query/filter.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ use common::{
99
};
1010

1111
use super::{
12-
IndexRangeResponse,
12+
DeveloperIndexRangeResponse,
1313
QueryNode,
1414
QueryStream,
1515
QueryStreamNext,
16-
QueryType,
1716
};
1817
use crate::Transaction;
1918

@@ -23,19 +22,19 @@ use crate::Transaction;
2322
const FILTER_QUERY_PREFETCH: usize = 100;
2423

2524
/// See Query.filter().
26-
pub(super) struct Filter<T: QueryType> {
27-
inner: QueryNode<T>,
25+
pub(super) struct Filter {
26+
inner: QueryNode,
2827
expr: Expression,
2928
}
3029

31-
impl<T: QueryType> Filter<T> {
32-
pub fn new(inner: QueryNode<T>, expr: Expression) -> Self {
30+
impl Filter {
31+
pub fn new(inner: QueryNode, expr: Expression) -> Self {
3332
Self { inner, expr }
3433
}
3534
}
3635

3736
#[async_trait]
38-
impl<T: QueryType> QueryStream<T> for Filter<T> {
37+
impl QueryStream for Filter {
3938
fn cursor_position(&self) -> &Option<CursorPosition> {
4039
self.inner.cursor_position()
4140
}
@@ -52,7 +51,7 @@ impl<T: QueryType> QueryStream<T> for Filter<T> {
5251
&mut self,
5352
tx: &mut Transaction<RT>,
5453
_prefetch_hint: Option<usize>,
55-
) -> anyhow::Result<QueryStreamNext<T>> {
54+
) -> anyhow::Result<QueryStreamNext> {
5655
loop {
5756
let (document, write_timestamp) =
5857
match self.inner.next(tx, Some(FILTER_QUERY_PREFETCH)).await? {
@@ -69,7 +68,7 @@ impl<T: QueryType> QueryStream<T> for Filter<T> {
6968
}
7069
}
7170

72-
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
71+
fn feed(&mut self, index_range_response: DeveloperIndexRangeResponse) -> anyhow::Result<()> {
7372
self.inner.feed(index_range_response)
7473
}
7574

crates/database/src/query/index_range.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55

66
use async_trait::async_trait;
77
use common::{
8-
document::GenericDocument,
8+
document::DeveloperDocument,
99
index::IndexKeyBytes,
1010
interval::Interval,
1111
knobs::{
@@ -29,21 +29,21 @@ use common::{
2929
use super::{
3030
query_scanned_too_many_documents_error,
3131
query_scanned_too_much_data,
32-
IndexRangeResponse,
32+
DeveloperIndexRangeResponse,
3333
QueryStream,
3434
QueryStreamNext,
35-
QueryType,
3635
DEFAULT_QUERY_PREFETCH,
3736
MAX_QUERY_FETCH,
3837
};
3938
use crate::{
4039
metrics,
4140
transaction::IndexRangeRequest,
4241
Transaction,
42+
UserFacingModel,
4343
};
4444

4545
/// A `QueryStream` that scans a range of an index.
46-
pub struct IndexRange<T: QueryType> {
46+
pub struct IndexRange {
4747
stable_index_name: StableIndexName,
4848
/// For usage and error messages. If the table mapping has changed, this
4949
/// might get out of sync with `stable_index_name`, which is the index
@@ -60,7 +60,7 @@ pub struct IndexRange<T: QueryType> {
6060
/// `cursor_interval` must always be a subset of `interval`.
6161
cursor_interval: CursorInterval,
6262
intermediate_cursors: Option<Vec<CursorPosition>>,
63-
page: VecDeque<(IndexKeyBytes, GenericDocument<T::T>, WriteTimestamp)>,
63+
page: VecDeque<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>,
6464
/// The interval which we have yet to fetch.
6565
/// This starts as an intersection of the IndexRange's `interval` and
6666
/// `cursor_interval`, and gets smaller as results are fetched into `page`.
@@ -81,7 +81,7 @@ pub struct IndexRange<T: QueryType> {
8181
version: Option<Version>,
8282
}
8383

84-
impl<T: QueryType> IndexRange<T> {
84+
impl IndexRange {
8585
pub fn new(
8686
stable_index_name: StableIndexName,
8787
printable_index_name: IndexName,
@@ -146,7 +146,7 @@ impl<T: QueryType> IndexRange<T> {
146146
&mut self,
147147
tx: &mut Transaction<RT>,
148148
prefetch_hint: Option<usize>,
149-
) -> anyhow::Result<QueryStreamNext<T>> {
149+
) -> anyhow::Result<QueryStreamNext> {
150150
// If we have an end cursor, for correctness we need to process
151151
// the entire interval, so ignore `maximum_rows_read` and `maximum_bytes_read`.
152152
let enforce_limits = self.cursor_interval.end_inclusive.is_none();
@@ -169,7 +169,7 @@ impl<T: QueryType> IndexRange<T> {
169169
}
170170
self.cursor_interval.curr_exclusive = Some(CursorPosition::After(index_position));
171171
self.returned_results += 1;
172-
T::record_read_document(tx, &v, self.printable_index_name.table())?;
172+
UserFacingModel::new(tx).record_read_document(&v, self.printable_index_name.table())?;
173173
// Database bandwidth for index reads
174174
tx.usage_tracker.track_database_egress_size(
175175
self.printable_index_name.table().to_string(),
@@ -216,7 +216,7 @@ impl<T: QueryType> IndexRange<T> {
216216

217217
fn process_fetch(
218218
&mut self,
219-
page: Vec<(IndexKeyBytes, GenericDocument<T::T>, WriteTimestamp)>,
219+
page: Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>,
220220
fetch_cursor: CursorPosition,
221221
) -> anyhow::Result<()> {
222222
let (_, new_unfetched_interval) = self.unfetched_interval.split(fetch_cursor, self.order);
@@ -234,7 +234,7 @@ pub const fn soft_data_limit(hard_limit: usize) -> usize {
234234
}
235235

236236
#[async_trait]
237-
impl<T: QueryType> QueryStream<T> for IndexRange<T> {
237+
impl QueryStream for IndexRange {
238238
fn cursor_position(&self) -> &Option<CursorPosition> {
239239
&self.cursor_interval.curr_exclusive
240240
}
@@ -257,11 +257,11 @@ impl<T: QueryType> QueryStream<T> for IndexRange<T> {
257257
&mut self,
258258
tx: &mut Transaction<RT>,
259259
prefetch_hint: Option<usize>,
260-
) -> anyhow::Result<QueryStreamNext<T>> {
260+
) -> anyhow::Result<QueryStreamNext> {
261261
self.start_next(tx, prefetch_hint)
262262
}
263263

264-
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
264+
fn feed(&mut self, index_range_response: DeveloperIndexRangeResponse) -> anyhow::Result<()> {
265265
self.process_fetch(index_range_response.page, index_range_response.cursor)
266266
}
267267

@@ -270,7 +270,7 @@ impl<T: QueryType> QueryStream<T> for IndexRange<T> {
270270
}
271271
}
272272

273-
impl<T: QueryType> Drop for IndexRange<T> {
273+
impl Drop for IndexRange {
274274
fn drop(&mut self) {
275275
metrics::log_index_range(
276276
self.returned_results,

crates/database/src/query/limit.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,23 @@ use common::{
88
};
99

1010
use super::{
11-
IndexRangeResponse,
11+
DeveloperIndexRangeResponse,
1212
QueryNode,
1313
QueryStream,
1414
QueryStreamNext,
15-
QueryType,
1615
DEFAULT_QUERY_PREFETCH,
1716
};
1817
use crate::Transaction;
1918

2019
/// See Query.limit().
21-
pub(super) struct Limit<T: QueryType> {
22-
inner: QueryNode<T>,
20+
pub(super) struct Limit {
21+
inner: QueryNode,
2322
limit: usize,
2423
rows_emitted: usize,
2524
}
2625

27-
impl<T: QueryType> Limit<T> {
28-
pub fn new(inner: QueryNode<T>, limit: usize) -> Self {
26+
impl Limit {
27+
pub fn new(inner: QueryNode, limit: usize) -> Self {
2928
Self {
3029
inner,
3130
limit,
@@ -35,7 +34,7 @@ impl<T: QueryType> Limit<T> {
3534
}
3635

3736
#[async_trait]
38-
impl<T: QueryType> QueryStream<T> for Limit<T> {
37+
impl QueryStream for Limit {
3938
fn cursor_position(&self) -> &Option<CursorPosition> {
4039
self.inner.cursor_position()
4140
}
@@ -52,7 +51,7 @@ impl<T: QueryType> QueryStream<T> for Limit<T> {
5251
&mut self,
5352
tx: &mut Transaction<RT>,
5453
prefetch_hint: Option<usize>,
55-
) -> anyhow::Result<QueryStreamNext<T>> {
54+
) -> anyhow::Result<QueryStreamNext> {
5655
if self.rows_emitted >= self.limit {
5756
return Ok(QueryStreamNext::Ready(None));
5857
}
@@ -67,7 +66,7 @@ impl<T: QueryType> QueryStream<T> for Limit<T> {
6766
Ok(result)
6867
}
6968

70-
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
69+
fn feed(&mut self, index_range_response: DeveloperIndexRangeResponse) -> anyhow::Result<()> {
7170
self.inner.feed(index_range_response)
7271
}
7372

0 commit comments

Comments
 (0)