Skip to content

Commit f378333

Browse files
sjuddConvex, Inc.
authored andcommitted
Get basic multi segment text searches to work (#26152)
Fixes a couple small issues in the search path: * We errored if any term was missing from a segment * Individual files we're being loaded from directories GitOrigin-RevId: 41593eedb68a8f0afe77de7fdb3d9661bbd84414
1 parent e5fa422 commit f378333

File tree

5 files changed

+171
-69
lines changed

5 files changed

+171
-69
lines changed

crates/database/src/tests/text_test_utils.rs

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,33 @@ use common::{
1313
IndexMetadata,
1414
TabletIndexMetadata,
1515
},
16-
document::ParsedDocument,
16+
document::{
17+
ParsedDocument,
18+
ResolvedDocument,
19+
},
1720
persistence::PersistenceReader,
21+
query::{
22+
Query,
23+
QueryOperator,
24+
QuerySource,
25+
Search,
26+
SearchFilterExpression,
27+
},
1828
runtime::testing::TestRuntime,
1929
types::{
2030
GenericIndexName,
2131
IndexId,
2232
IndexName,
2333
TabletIndexName,
2434
},
35+
version::MIN_NPM_VERSION_FOR_FUZZY_SEARCH,
2536
};
2637
use maplit::btreeset;
2738
use must_let::must_let;
28-
use search::searcher::InProcessSearcher;
39+
use search::{
40+
searcher::InProcessSearcher,
41+
MAX_CANDIDATE_REVISIONS,
42+
};
2943
use storage::Storage;
3044
use sync_types::Timestamp;
3145
use value::{
@@ -47,6 +61,7 @@ use crate::{
4761
},
4862
Database,
4963
IndexModel,
64+
ResolvedQuery,
5065
TestFacingModel,
5166
TextIndexFlusher,
5267
Transaction,
@@ -187,16 +202,62 @@ impl TextFixtures {
187202
Ok(segments.clone())
188203
}
189204

190-
pub async fn add_document(&self, text: &str) -> anyhow::Result<()> {
205+
pub async fn add_document(&self, text: &str) -> anyhow::Result<ResolvedDocumentId> {
191206
let table_name = TABLE_NAME.parse::<TableName>()?;
192207
let mut tx = self.db.begin_system().await?;
193-
add_document(&mut tx, &table_name, text).await?;
208+
let doc_id = add_document(&mut tx, &table_name, text).await?;
209+
self.db.commit(tx).await?;
210+
Ok(doc_id)
211+
}
212+
213+
pub async fn enable_index(
214+
&self,
215+
index_name: &GenericIndexName<TableName>,
216+
) -> anyhow::Result<()> {
217+
let mut tx = self.db.begin_system().await?;
218+
IndexModel::new(&mut tx)
219+
.enable_index_for_testing(self.namespace, index_name)
220+
.await?;
194221
self.db.commit(tx).await?;
195222
Ok(())
196223
}
224+
225+
pub async fn search(
226+
&self,
227+
index_name: GenericIndexName<TableName>,
228+
query_string: &str,
229+
) -> anyhow::Result<Vec<ResolvedDocument>> {
230+
let mut tx = self.db.begin_system().await?;
231+
let filters = vec![SearchFilterExpression::Search(
232+
SEARCH_FIELD.parse()?,
233+
query_string.into(),
234+
)];
235+
let search = Search {
236+
table: index_name.table().clone(),
237+
index_name,
238+
filters,
239+
};
240+
241+
let query = Query {
242+
source: QuerySource::Search(search),
243+
operators: vec![QueryOperator::Limit(MAX_CANDIDATE_REVISIONS)],
244+
};
245+
let mut query_stream = ResolvedQuery::new_with_version(
246+
&mut tx,
247+
TableNamespace::Global,
248+
query,
249+
Some(MIN_NPM_VERSION_FOR_FUZZY_SEARCH.clone()),
250+
)?;
251+
let mut values = vec![];
252+
while let Some(value) = query_stream.next(&mut tx, None).await? {
253+
values.push(value);
254+
}
255+
Ok(values)
256+
}
197257
}
198258

199259
const TABLE_NAME: &str = "table";
260+
const SEARCH_FIELD: &str = "text";
200261

201262
pub struct IndexData {
202263
pub index_id: IndexId,
@@ -208,7 +269,7 @@ pub struct IndexData {
208269
pub fn backfilling_text_index() -> anyhow::Result<IndexMetadata<TableName>> {
209270
let table_name: TableName = TABLE_NAME.parse()?;
210271
let index_name = IndexName::new(table_name, "search_index".parse()?)?;
211-
let search_field: FieldPath = "text".parse()?;
272+
let search_field: FieldPath = SEARCH_FIELD.parse()?;
212273
let filter_field: FieldPath = "channel".parse()?;
213274
let metadata = IndexMetadata::new_backfilling_search_index(
214275
index_name,

crates/database/src/text_index_worker/flusher2.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ impl<RT: Runtime> TextIndexFlusher2<RT> {
232232
TextIndexState::Backfilled(TextIndexSnapshot {
233233
data: TextIndexSnapshotData::MultiSegment(segments),
234234
ts: backfill_ts,
235-
version: TextSnapshotVersion::V0,
235+
version: TextSnapshotVersion::V2UseStringIds,
236236
})
237237
} else {
238238
let cursor = if let Some(cursor) = backfill_result.new_cursor {
@@ -345,10 +345,12 @@ mod tests {
345345
rt: TestRuntime,
346346
) -> anyhow::Result<()> {
347347
let fixtures = TextFixtures::new(rt).await?;
348-
let index = fixtures.insert_backfilling_text_index().await?;
348+
let index = fixtures
349+
.insert_backfilling_text_index_with_document()
350+
.await?;
349351
let mut flusher = fixtures.new_search_flusher2();
350352
flusher.step().await?;
351-
fixtures.assert_backfilled(&index.name).await?;
353+
fixtures.assert_backfilled(&index.index_name).await?;
352354
Ok(())
353355
}
354356

@@ -367,6 +369,21 @@ mod tests {
367369
Ok(())
368370
}
369371

372+
#[convex_macro::test_runtime]
373+
async fn backfill_with_one_document_writes_document(rt: TestRuntime) -> anyhow::Result<()> {
374+
let fixtures = TextFixtures::new(rt).await?;
375+
let index = fixtures.insert_backfilling_text_index().await?;
376+
let doc_id = fixtures.add_document("cat").await?;
377+
let mut flusher = fixtures.new_search_flusher2();
378+
flusher.step().await?;
379+
fixtures.enable_index(&index.name).await?;
380+
381+
let results = fixtures.search(index.name, "cat").await?;
382+
assert_eq!(results.first().unwrap().id(), doc_id);
383+
384+
Ok(())
385+
}
386+
370387
#[convex_macro::test_runtime]
371388
async fn backfill_with_two_documents_0_max_segment_size_creates_two_segments(
372389
rt: TestRuntime,
@@ -390,4 +407,34 @@ mod tests {
390407
assert_eq!(segments.len(), 2);
391408
Ok(())
392409
}
410+
411+
#[convex_macro::test_runtime]
412+
async fn backfill_with_two_documents_0_max_segment_size_includes_both_documents(
413+
rt: TestRuntime,
414+
) -> anyhow::Result<()> {
415+
let fixtures = TextFixtures::new(rt).await?;
416+
let IndexMetadata { name, .. } = fixtures.insert_backfilling_text_index().await?;
417+
418+
let cat_doc_id = fixtures.add_document("cat").await?;
419+
let dog_doc_id = fixtures.add_document("dog").await?;
420+
421+
let mut flusher = fixtures
422+
.new_search_flusher_builder()
423+
.set_incremental_multipart_threshold_bytes(0)
424+
.build();
425+
// Build the first segment, which stops because the document size is > 0
426+
flusher.step().await?;
427+
// Build the second segment and finalize the index metadata.
428+
flusher.step().await?;
429+
430+
fixtures.enable_index(&name).await?;
431+
432+
let cat_results = fixtures.search(name.clone(), "cat").await?;
433+
assert_eq!(cat_results.first().unwrap().id(), cat_doc_id);
434+
435+
let dog_results = fixtures.search(name, "dog").await?;
436+
assert_eq!(dog_results.first().unwrap().id(), dog_doc_id);
437+
438+
Ok(())
439+
}
393440
}

crates/search/src/archive/cache.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use futures::{
3535
StreamExt,
3636
TryStreamExt,
3737
};
38+
use itertools::Itertools;
3839
use storage::{
3940
Storage,
4041
StorageCacheKey,
@@ -206,9 +207,9 @@ impl<RT: Runtime> ArchiveFetcher<RT> {
206207
let bytes_copied = tokio::io::copy_buf(&mut reader, &mut file).await?;
207208
file.shutdown().await?;
208209

209-
// We're expecting that the uncompressed tar and its contents are rougnly the
210+
// We're expecting that the uncompressed tar and its contents are roughly the
210211
// same size. There is some file moving / copying going on in
211-
// this method, but hopefully it's small enough to be arounding
212+
// this method, but hopefully it's small enough to be a rounding
212213
// error relative to the overall segment size.
213214
let path = Self::unpack_fragmented_segment_tar(output_file)?;
214215

@@ -327,6 +328,34 @@ impl<RT: Runtime> ArchiveCacheManager<RT> {
327328
result
328329
}
329330

331+
pub async fn get_single_file(
332+
&self,
333+
search_storage: Arc<dyn Storage>,
334+
storage_path: &ObjectKey,
335+
file_type: SearchFileType,
336+
) -> anyhow::Result<PathBuf> {
337+
// The archive cache always dumps things into directories, but we want a
338+
// specific file path.
339+
let parent_dir: PathBuf = self.get(search_storage, storage_path, file_type).await?;
340+
// tokio's async read_dir method punts to a thread pool too, but by using our
341+
// own, we can be runtime agnostic.
342+
let path = self
343+
.blocking_thread_pool
344+
.execute(move || try {
345+
let paths: Vec<_> = std::fs::read_dir(parent_dir)?
346+
.map_ok(|value| value.path())
347+
.try_collect()?;
348+
anyhow::ensure!(
349+
paths.len() == 1,
350+
"Expected one file but found multiple paths: {:?}",
351+
paths,
352+
);
353+
paths.first().unwrap().to_owned()
354+
})
355+
.await??;
356+
Ok(path)
357+
}
358+
330359
async fn get_logged(
331360
&self,
332361
search_storage: Arc<dyn Storage>,

crates/search/src/fragmented_segment.rs

Lines changed: 7 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use std::{
2-
path::PathBuf,
3-
sync::Arc,
4-
};
1+
use std::sync::Arc;
52

63
use common::{
74
bootstrap_model::index::vector_index::FragmentedVectorSegment,
@@ -65,7 +62,6 @@ use crate::{
6562
#[derive(Clone)]
6663
pub(crate) struct FragmentedSegmentFetcher<RT: Runtime> {
6764
archive_cache: ArchiveCacheManager<RT>,
68-
blocking_thread_pool: BoundedThreadPool<RT>,
6965
}
7066

7167
pub struct FragmentedSegmentStorageKeys {
@@ -77,14 +73,8 @@ pub struct FragmentedSegmentStorageKeys {
7773
impl<RT: Runtime> FragmentedSegmentFetcher<RT> {
7874
/// blocking_thread_pool is used for small / fast IO operations and should
7975
/// be large.
80-
pub(crate) fn new(
81-
archive_cache: ArchiveCacheManager<RT>,
82-
blocking_thread_pool: BoundedThreadPool<RT>,
83-
) -> FragmentedSegmentFetcher<RT> {
84-
Self {
85-
archive_cache,
86-
blocking_thread_pool,
87-
}
76+
pub(crate) fn new(archive_cache: ArchiveCacheManager<RT>) -> FragmentedSegmentFetcher<RT> {
77+
Self { archive_cache }
8878
}
8979

9080
/// Fetch all parts of all fragmented segments with limited concurrency.
@@ -125,14 +115,14 @@ impl<RT: Runtime> FragmentedSegmentFetcher<RT> {
125115
SearchFileType::FragmentedVectorSegment,
126116
);
127117

128-
let fetch_id_tracker = self.fetch_single_file(
118+
let fetch_id_tracker = archive_cache.get_single_file(
129119
search_storage.clone(),
130-
paths.id_tracker,
120+
&paths.id_tracker,
131121
SearchFileType::VectorIdTracker,
132122
);
133-
let fetch_bitset = self.fetch_single_file(
123+
let fetch_bitset = archive_cache.get_single_file(
134124
search_storage.clone(),
135-
paths.deleted_bitset,
125+
&paths.deleted_bitset,
136126
SearchFileType::VectorDeletedBitset,
137127
);
138128
let (segment, id_tracker, bitset) =
@@ -141,37 +131,6 @@ impl<RT: Runtime> FragmentedSegmentFetcher<RT> {
141131
segment, id_tracker, bitset,
142132
))
143133
}
144-
145-
async fn fetch_single_file(
146-
&self,
147-
search_storage: Arc<dyn Storage>,
148-
storage_path: ObjectKey,
149-
file_type: SearchFileType,
150-
) -> anyhow::Result<PathBuf> {
151-
// The archive cache always dumps things into directories, but we want a
152-
// specific file path.
153-
let parent_dir: PathBuf = self
154-
.archive_cache
155-
.get(search_storage, &storage_path, file_type)
156-
.await?;
157-
// tokio's async read_dir method punts to a thread pool too, but by using our
158-
// own, we can be runtime agnostic.
159-
let path = self
160-
.blocking_thread_pool
161-
.execute(move || try {
162-
let paths: Vec<_> = std::fs::read_dir(parent_dir)?
163-
.map_ok(|value| value.path())
164-
.try_collect()?;
165-
anyhow::ensure!(
166-
paths.len() == 1,
167-
"Expected one file but found multiple paths: {:?}",
168-
paths,
169-
);
170-
paths.first().unwrap().to_owned()
171-
})
172-
.await??;
173-
Ok(path)
174-
}
175134
}
176135

177136
pub(crate) struct FragmentedSegmentCompactor<RT: Runtime> {

0 commit comments

Comments
 (0)