Skip to content

Commit b0591ed

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
text search walk stable index name (#25376)
like index range queries, text search queries should walk a consistent tablet. otherwise there might be weird issues where it's walking an index and the tablet changes out from under it, possibly changing the indexed fields. GitOrigin-RevId: d576844a9a5b29a5a736057a19a819fe27a01f54
1 parent 2693de9 commit b0591ed

File tree

14 files changed

+143
-67
lines changed

14 files changed

+143
-67
lines changed

crates/application/src/tests/cron_jobs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ async fn test_cron_jobs_helper(rt: TestRuntime, backend_state: BackendState) ->
194194
let mut table_model = TableModel::new(&mut tx);
195195
assert!(table_model.table_is_empty(&OBJECTS_TABLE).await?);
196196
let mut logs_query = cron_log_query(&mut tx)?;
197-
logs_query.expect_none(&mut tx).await?;
197+
assert!(logs_query.next(&mut tx, Some(1)).await?.is_none());
198198

199199
// Resuming the backend should make the jobs execute.
200200
let mut model = BackendStateModel::new(&mut tx);

crates/common/src/query.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use value::{
3333
ConvexObject,
3434
ConvexValue,
3535
TableId,
36-
TableIdAndTableNumber,
3736
};
3837

3938
use crate::{
@@ -52,6 +51,7 @@ use crate::{
5251
IndexName,
5352
MaybeValue,
5453
TableName,
54+
TabletIndexName,
5555
},
5656
value::{
5757
sha256::Sha256 as CommonSha256,
@@ -581,12 +581,9 @@ pub struct Search {
581581
}
582582

583583
impl Search {
584-
pub fn to_internal(
585-
self,
586-
f: &impl Fn(TableName) -> anyhow::Result<TableIdAndTableNumber>,
587-
) -> anyhow::Result<InternalSearch> {
584+
pub fn to_internal(self, tablet_index_name: TabletIndexName) -> anyhow::Result<InternalSearch> {
588585
Ok(InternalSearch {
589-
index_name: self.index_name.to_resolved(f)?.into(),
586+
index_name: tablet_index_name,
590587
table_name: self.table,
591588
filters: self
592589
.filters

crates/common/src/types/index.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,14 +115,22 @@ pub type TabletIndexName = GenericIndexName<TableId>;
115115

116116
/// Like TabletIndexName in that it refers to a stable underlying index,
117117
/// but it works for virtual tables too.
118-
#[derive(Debug, Clone)]
118+
#[derive(Debug, Clone, Eq, PartialEq)]
119119
pub enum StableIndexName {
120120
Physical(TabletIndexName),
121121
Virtual(IndexName, TabletIndexName),
122122
Missing,
123123
}
124124

125125
impl StableIndexName {
126+
pub fn tablet_index_name(&self) -> Option<&TabletIndexName> {
127+
match self {
128+
StableIndexName::Physical(tablet_index_name) => Some(tablet_index_name),
129+
StableIndexName::Virtual(_, tablet_index_name) => Some(tablet_index_name),
130+
StableIndexName::Missing => None,
131+
}
132+
}
133+
126134
pub fn virtual_table_number_map(
127135
&self,
128136
table_mapping: &TableMapping,

crates/database/src/bootstrap_model/index.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -487,11 +487,9 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
487487
stable_index_name: &StableIndexName,
488488
printable_index_name: &IndexName,
489489
) -> anyhow::Result<IndexedFields> {
490-
let resolved_index_name = match stable_index_name {
491-
StableIndexName::Physical(index_name) => index_name,
492-
StableIndexName::Virtual(_, index_name) => index_name,
493-
StableIndexName::Missing => anyhow::bail!(index_not_found_error(printable_index_name)),
494-
};
490+
let resolved_index_name = stable_index_name
491+
.tablet_index_name()
492+
.with_context(|| index_not_found_error(printable_index_name))?;
495493
let metadata =
496494
self.require_enabled_index_metadata(printable_index_name, resolved_index_name)?;
497495
match metadata.config.clone() {

crates/database/src/query/filter.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use common::{
55
Expression,
66
},
77
runtime::Runtime,
8+
types::TabletIndexName,
89
};
910

1011
use super::{
@@ -71,4 +72,8 @@ impl<T: QueryType> QueryStream<T> for Filter<T> {
7172
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
7273
self.inner.feed(index_range_response)
7374
}
75+
76+
fn tablet_index_name(&self) -> Option<&TabletIndexName> {
77+
self.inner.tablet_index_name()
78+
}
7479
}

crates/database/src/query/index_range.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use common::{
2020
types::{
2121
IndexName,
2222
StableIndexName,
23+
TabletIndexName,
2324
WriteTimestamp,
2425
},
2526
version::Version,
@@ -263,6 +264,10 @@ impl<T: QueryType> QueryStream<T> for IndexRange<T> {
263264
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
264265
self.process_fetch(index_range_response.page, index_range_response.cursor)
265266
}
267+
268+
fn tablet_index_name(&self) -> Option<&TabletIndexName> {
269+
self.stable_index_name.tablet_index_name()
270+
}
266271
}
267272

268273
impl<T: QueryType> Drop for IndexRange<T> {

crates/database/src/query/limit.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use async_trait::async_trait;
44
use common::{
55
query::CursorPosition,
66
runtime::Runtime,
7+
types::TabletIndexName,
78
};
89

910
use super::{
@@ -69,4 +70,8 @@ impl<T: QueryType> QueryStream<T> for Limit<T> {
6970
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
7071
self.inner.feed(index_range_response)
7172
}
73+
74+
fn tablet_index_name(&self) -> Option<&TabletIndexName> {
75+
self.inner.tablet_index_name()
76+
}
7277
}

crates/database/src/query/mod.rs

Lines changed: 71 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ use common::{
1010
database_index::IndexedFields,
1111
INDEX_TABLE,
1212
},
13-
document::GenericDocument,
13+
document::{
14+
DeveloperDocument,
15+
GenericDocument,
16+
ResolvedDocument,
17+
},
1418
index::IndexKeyBytes,
1519
interval::Interval,
1620
query::{
@@ -24,6 +28,7 @@ use common::{
2428
runtime::Runtime,
2529
types::{
2630
IndexName,
31+
TabletIndexName,
2732
WriteTimestamp,
2833
},
2934
version::Version,
@@ -110,6 +115,10 @@ trait QueryStream<T: QueryType>: Send {
110115
prefetch_hint: Option<usize>,
111116
) -> anyhow::Result<QueryStreamNext<T>>;
112117
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()>;
118+
119+
/// All queries walk an index of some kind, as long as the table exists.
120+
/// This is that index name, tied to a tablet.
121+
fn tablet_index_name(&self) -> Option<&TabletIndexName>;
113122
}
114123

115124
pub struct IndexRangeResponse<T: TableIdentifier> {
@@ -391,9 +400,12 @@ impl<RT: Runtime, T: QueryType> CompiledQuery<RT, T> {
391400
version,
392401
))
393402
},
394-
QuerySource::Search(search) => {
395-
QueryNode::Search(SearchQuery::new(search, cursor_interval, version))
396-
},
403+
QuerySource::Search(search) => QueryNode::Search(SearchQuery::new(
404+
stable_index_name,
405+
search,
406+
cursor_interval,
407+
version,
408+
)),
397409
};
398410
for operator in query.operators {
399411
let next_node = match operator {
@@ -445,11 +457,21 @@ impl<RT: Runtime, T: QueryType> CompiledQuery<RT, T> {
445457
}
446458
}
447459

460+
pub fn fingerprint(&self) -> &QueryFingerprint {
461+
&self.query_fingerprint
462+
}
463+
464+
pub fn is_approaching_data_limit(&self) -> bool {
465+
self.root.is_approaching_data_limit()
466+
}
467+
}
468+
469+
impl<RT: Runtime> DeveloperQuery<RT> {
448470
pub async fn next(
449471
&mut self,
450472
tx: &mut Transaction<RT>,
451473
prefetch_hint: Option<usize>,
452-
) -> anyhow::Result<Option<GenericDocument<T::T>>> {
474+
) -> anyhow::Result<Option<DeveloperDocument>> {
453475
match self.next_with_ts(tx, prefetch_hint).await? {
454476
None => Ok(None),
455477
Some((document, _)) => Ok(Some(document)),
@@ -461,32 +483,56 @@ impl<RT: Runtime, T: QueryType> CompiledQuery<RT, T> {
461483
&mut self,
462484
tx: &mut Transaction<RT>,
463485
prefetch_hint: Option<usize>,
464-
) -> anyhow::Result<Option<(GenericDocument<T::T>, WriteTimestamp)>> {
486+
) -> anyhow::Result<Option<(DeveloperDocument, WriteTimestamp)>> {
465487
query_batch_next(btreemap! {0 => (self, prefetch_hint)}, tx)
466488
.await
467489
.remove(&0)
468490
.context("batch_key missing")?
469491
}
492+
}
470493

471-
pub async fn expect_one(
494+
impl<RT: Runtime> ResolvedQuery<RT> {
495+
pub async fn next(
472496
&mut self,
473497
tx: &mut Transaction<RT>,
474-
) -> anyhow::Result<GenericDocument<T::T>> {
475-
let v = self
476-
.next(tx, Some(2))
477-
.await?
478-
.ok_or_else(|| anyhow::anyhow!("Expected one value for query, received zero"))?;
498+
prefetch_hint: Option<usize>,
499+
) -> anyhow::Result<Option<ResolvedDocument>> {
500+
match self.next_with_ts(tx, prefetch_hint).await? {
501+
None => Ok(None),
502+
Some((document, _)) => Ok(Some(document)),
503+
}
504+
}
479505

480-
if self.next(tx, Some(1)).await?.is_some() {
481-
anyhow::bail!("Received more than one value for query");
506+
#[convex_macro::instrument_future]
507+
pub async fn next_with_ts(
508+
&mut self,
509+
tx: &mut Transaction<RT>,
510+
prefetch_hint: Option<usize>,
511+
) -> anyhow::Result<Option<(ResolvedDocument, WriteTimestamp)>> {
512+
let tablet_id = self
513+
.root
514+
.tablet_index_name()
515+
.map(|index_name| *index_name.table());
516+
let result = query_batch_next(btreemap! {0 => (self, prefetch_hint)}, tx)
517+
.await
518+
.remove(&0)
519+
.context("batch_key missing")??;
520+
if let Some((document, _)) = &result {
521+
// TODO(lee) inject tablet id here which will allow the rest of the query
522+
// pipeline to use DeveloperDocuments only. To ensure this will be
523+
// correct, we do an assertion temporarily.
524+
anyhow::ensure!(
525+
document.table().table_id
526+
== tablet_id.context("document must come from some tablet")?
527+
);
482528
}
483-
Ok(v)
529+
Ok(result)
484530
}
485531

486532
pub async fn expect_at_most_one(
487533
&mut self,
488534
tx: &mut Transaction<RT>,
489-
) -> anyhow::Result<Option<GenericDocument<T::T>>> {
535+
) -> anyhow::Result<Option<ResolvedDocument>> {
490536
let v = match self.next(tx, Some(2)).await? {
491537
Some(v) => v,
492538
None => return Ok(None),
@@ -496,22 +542,6 @@ impl<RT: Runtime, T: QueryType> CompiledQuery<RT, T> {
496542
}
497543
Ok(Some(v))
498544
}
499-
500-
pub async fn expect_none(&mut self, tx: &mut Transaction<RT>) -> anyhow::Result<()> {
501-
anyhow::ensure!(
502-
self.next(tx, Some(1)).await?.is_none(),
503-
"Expected no value for this query, but received one."
504-
);
505-
Ok(())
506-
}
507-
508-
pub fn fingerprint(&self) -> &QueryFingerprint {
509-
&self.query_fingerprint
510-
}
511-
512-
pub fn is_approaching_data_limit(&self) -> bool {
513-
self.root.is_approaching_data_limit()
514-
}
515545
}
516546

517547
pub async fn query_batch_next<RT: Runtime, T: QueryType>(
@@ -623,6 +653,15 @@ impl<T: QueryType> QueryStream<T> for QueryNode<T> {
623653
QueryNode::Limit(r) => r.feed(index_range_response),
624654
}
625655
}
656+
657+
fn tablet_index_name(&self) -> Option<&TabletIndexName> {
658+
match self {
659+
QueryNode::IndexRange(r) => r.tablet_index_name(),
660+
QueryNode::Search(r) => r.tablet_index_name(),
661+
QueryNode::Filter(r) => r.tablet_index_name(),
662+
QueryNode::Limit(r) => r.tablet_index_name(),
663+
}
664+
}
626665
}
627666

628667
/// Return a system limit for reading too many documents in a query

crates/database/src/query/search_query.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ use common::{
99
SearchVersion,
1010
},
1111
runtime::Runtime,
12-
types::WriteTimestamp,
12+
types::{
13+
StableIndexName,
14+
TabletIndexName,
15+
WriteTimestamp,
16+
},
1317
version::{
1418
Version,
1519
MIN_NPM_VERSION_FOR_FUZZY_SEARCH,
@@ -39,6 +43,11 @@ use crate::{
3943

4044
/// A `QueryStream` that begins by querying a search index.
4145
pub struct SearchQuery<T: QueryType> {
46+
// The tablet index being searched.
47+
// Table names in `query` are just for error messages and usage, and may
48+
// get out of sync with this.
49+
stable_index_name: StableIndexName,
50+
4251
query: Search,
4352
// Results are generated on the first call to SearchQuery::next.
4453
results: Option<SearchResultIterator<T>>,
@@ -50,8 +59,14 @@ pub struct SearchQuery<T: QueryType> {
5059
}
5160

5261
impl<T: QueryType> SearchQuery<T> {
53-
pub fn new(query: Search, cursor_interval: CursorInterval, version: Option<Version>) -> Self {
62+
pub fn new(
63+
stable_index_name: StableIndexName,
64+
query: Search,
65+
cursor_interval: CursorInterval,
66+
version: Option<Version>,
67+
) -> Self {
5468
Self {
69+
stable_index_name,
5570
query,
5671
results: None,
5772
cursor_interval,
@@ -71,7 +86,9 @@ impl<T: QueryType> SearchQuery<T> {
7186
tx: &mut Transaction<RT>,
7287
) -> anyhow::Result<SearchResultIterator<T>> {
7388
let search_version = self.get_cli_gated_search_version();
74-
let revisions = tx.search(&self.query, search_version).await?;
89+
let revisions = tx
90+
.search(&self.stable_index_name, &self.query, search_version)
91+
.await?;
7592
let revisions_in_range = revisions
7693
.into_iter()
7794
.filter(|(_, index_key)| self.cursor_interval.contains(index_key))
@@ -145,6 +162,10 @@ impl<T: QueryType> QueryStream<T> for SearchQuery<T> {
145162
fn feed(&mut self, _index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
146163
anyhow::bail!("cannot feed an index range response into a search query");
147164
}
165+
166+
fn tablet_index_name(&self) -> Option<&TabletIndexName> {
167+
self.stable_index_name.tablet_index_name()
168+
}
148169
}
149170

150171
#[derive(Clone)]

crates/database/src/tests/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1833,7 +1833,7 @@ async fn test_retries(rt: TestRuntime) -> anyhow::Result<()> {
18331833
let mut tx = db.begin_system().await?;
18341834
let query = Query::full_table_scan("table".parse()?, Order::Asc);
18351835
let mut compiled_query = CompiledResolvedQuery::new(&mut tx, query)?;
1836-
compiled_query.expect_none(&mut tx).await?;
1836+
assert!(compiled_query.next(&mut tx, None).await?.is_none());
18371837
Ok(())
18381838
}
18391839

0 commit comments

Comments
 (0)