Skip to content

Commit 2308de6

Browse files
sjuddConvex, Inc.
authored and
Convex, Inc.
committed
Avoid building empty text segments, more backfilling tests. (#26180)
GitOrigin-RevId: e9363f1e98116df8f7a10f2a1614d110ce75e9b6
1 parent 69d2f8f commit 2308de6

File tree

5 files changed

+218
-26
lines changed

5 files changed

+218
-26
lines changed

crates/database/src/bootstrap_model/index.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
172172
let metadata = self
173173
.pending_index_metadata(namespace, index)?
174174
.ok_or_else(|| anyhow::anyhow!("Failed to find pending index: {}", index))?;
175-
self.enable_index(&metadata.into_value()).await
175+
self.enable_index(&metadata.into_value()).await?;
176+
Ok(())
176177
}
177178

178179
async fn enable_index(&mut self, backfilled_index: &TabletIndexMetadata) -> anyhow::Result<()> {

crates/database/src/text_index_worker/flusher2.rs

Lines changed: 167 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ impl<RT: Runtime> TextIndexFlusher2<RT> {
249249
let snapshot = TextIndexSnapshot {
250250
data: TextIndexSnapshotData::MultiSegment(segments),
251251
ts: backfill_ts,
252-
version: TextSnapshotVersion::V0,
252+
version: TextSnapshotVersion::V2UseStringIds,
253253
};
254254
let is_snapshotted = matches!(on_disk_state, SearchOnDiskState::SnapshottedAt(_));
255255
if is_snapshotted {
@@ -297,11 +297,16 @@ impl<RT: Runtime> TextIndexFlusher2<RT> {
297297
#[cfg(test)]
298298
mod tests {
299299
use common::{
300-
bootstrap_model::index::IndexMetadata,
300+
bootstrap_model::index::{
301+
text_index::TextIndexState,
302+
IndexConfig,
303+
IndexMetadata,
304+
},
301305
runtime::testing::TestRuntime,
302306
types::TabletIndexName,
303307
};
304308
use maplit::btreemap;
309+
use must_let::must_let;
305310
use value::TableNamespace;
306311

307312
use crate::tests::text_test_utils::{
@@ -408,6 +413,30 @@ mod tests {
408413
Ok(())
409414
}
410415

416+
#[convex_macro::test_runtime]
417+
async fn backfill_with_two_documents_leaves_document_backfilling_after_first_flush(
418+
rt: TestRuntime,
419+
) -> anyhow::Result<()> {
420+
let fixtures = TextFixtures::new(rt).await?;
421+
let IndexMetadata { name, .. } = fixtures.insert_backfilling_text_index().await?;
422+
423+
fixtures.add_document("cat").await?;
424+
fixtures.add_document("dog").await?;
425+
426+
let mut flusher = fixtures
427+
.new_search_flusher_builder()
428+
.set_incremental_multipart_threshold_bytes(0)
429+
.build();
430+
// Build the first segment, which stops because the document size is > 0
431+
flusher.step().await?;
432+
let metadata = fixtures.get_index_metadata(name).await?;
433+
must_let!(let IndexConfig::Search { on_disk_state, .. }= &metadata.config);
434+
must_let!(let TextIndexState::Backfilling(backfilling_meta) = on_disk_state);
435+
assert_eq!(backfilling_meta.segments.len(), 1);
436+
437+
Ok(())
438+
}
439+
411440
#[convex_macro::test_runtime]
412441
async fn backfill_with_two_documents_0_max_segment_size_includes_both_documents(
413442
rt: TestRuntime,
@@ -437,4 +466,140 @@ mod tests {
437466

438467
Ok(())
439468
}
469+
470+
#[convex_macro::test_runtime]
471+
async fn backfill_with_empty_index_adds_no_segments(rt: TestRuntime) -> anyhow::Result<()> {
472+
let fixtures = TextFixtures::new(rt).await?;
473+
let IndexMetadata { name, .. } = fixtures.insert_backfilling_text_index().await?;
474+
let mut flusher = fixtures.new_search_flusher2();
475+
flusher.step().await?;
476+
477+
let segments = fixtures.get_segments_metadata(name).await?;
478+
assert_eq!(0, segments.len());
479+
480+
Ok(())
481+
}
482+
483+
#[convex_macro::test_runtime]
484+
async fn backfill_with_empty_backfilled_index_new_document_adds_document(
485+
rt: TestRuntime,
486+
) -> anyhow::Result<()> {
487+
let fixtures = TextFixtures::new(rt).await?;
488+
let IndexMetadata { name, .. } = fixtures.insert_backfilling_text_index().await?;
489+
let mut flusher = fixtures.new_search_flusher2();
490+
flusher.step().await?;
491+
492+
let doc_id = fixtures.add_document("cat").await?;
493+
494+
flusher.step().await?;
495+
496+
fixtures.enable_index(&name).await?;
497+
let results = fixtures.search(name, "cat").await?;
498+
assert_eq!(doc_id, results.first().unwrap().id());
499+
500+
Ok(())
501+
}
502+
503+
#[convex_macro::test_runtime]
504+
async fn backfill_with_non_empty_backfilled_index_new_document_adds_document(
505+
rt: TestRuntime,
506+
) -> anyhow::Result<()> {
507+
let fixtures = TextFixtures::new(rt).await?;
508+
let IndexMetadata { name, .. } = fixtures.insert_backfilling_text_index().await?;
509+
fixtures.add_document("dog").await?;
510+
let mut flusher = fixtures.new_search_flusher2();
511+
flusher.step().await?;
512+
513+
let doc_id = fixtures.add_document("cat").await?;
514+
515+
flusher.step().await?;
516+
517+
fixtures.enable_index(&name).await?;
518+
let results = fixtures.search(name, "cat").await?;
519+
assert_eq!(doc_id, results.first().unwrap().id());
520+
521+
Ok(())
522+
}
523+
524+
#[convex_macro::test_runtime]
525+
async fn backfill_with_empty_enabled_index_new_document_adds_document(
526+
rt: TestRuntime,
527+
) -> anyhow::Result<()> {
528+
let fixtures = TextFixtures::new(rt).await?;
529+
let IndexMetadata { name, .. } = fixtures.insert_backfilling_text_index().await?;
530+
let mut flusher = fixtures.new_search_flusher2();
531+
flusher.step().await?;
532+
fixtures.enable_index(&name).await?;
533+
534+
let doc_id = fixtures.add_document("cat").await?;
535+
536+
flusher.step().await?;
537+
538+
let results = fixtures.search(name, "cat").await?;
539+
assert_eq!(doc_id, results.first().unwrap().id());
540+
541+
Ok(())
542+
}
543+
544+
#[convex_macro::test_runtime]
545+
async fn backfill_with_non_empty_enabled_index_new_document_adds_document(
546+
rt: TestRuntime,
547+
) -> anyhow::Result<()> {
548+
let fixtures = TextFixtures::new(rt).await?;
549+
let IndexMetadata { name, .. } = fixtures.insert_backfilling_text_index().await?;
550+
fixtures.add_document("dog").await?;
551+
let mut flusher = fixtures.new_search_flusher2();
552+
flusher.step().await?;
553+
fixtures.enable_index(&name).await?;
554+
555+
let doc_id = fixtures.add_document("cat").await?;
556+
557+
flusher.step().await?;
558+
559+
let results = fixtures.search(name, "cat").await?;
560+
assert_eq!(doc_id, results.first().unwrap().id());
561+
562+
Ok(())
563+
}
564+
565+
#[convex_macro::test_runtime]
566+
async fn backfill_with_non_empty_enabled_index_new_document_adds_new_segment(
567+
rt: TestRuntime,
568+
) -> anyhow::Result<()> {
569+
let fixtures = TextFixtures::new(rt).await?;
570+
let IndexMetadata { name, .. } = fixtures.insert_backfilling_text_index().await?;
571+
fixtures.add_document("dog").await?;
572+
let mut flusher = fixtures.new_search_flusher2();
573+
flusher.step().await?;
574+
fixtures.enable_index(&name).await?;
575+
576+
fixtures.add_document("cat").await?;
577+
578+
flusher.step().await?;
579+
580+
let segments = fixtures.get_segments_metadata(name).await?;
581+
assert_eq!(segments.len(), 2);
582+
583+
Ok(())
584+
}
585+
#[convex_macro::test_runtime]
586+
async fn backfill_with_non_empty_backfilled_index_new_document_adds_new_segment(
587+
rt: TestRuntime,
588+
) -> anyhow::Result<()> {
589+
let fixtures = TextFixtures::new(rt).await?;
590+
let IndexMetadata { name, .. } = fixtures.insert_backfilling_text_index().await?;
591+
fixtures.add_document("dog").await?;
592+
let mut flusher = fixtures.new_search_flusher2();
593+
flusher.step().await?;
594+
595+
fixtures.add_document("cat").await?;
596+
597+
flusher.step().await?;
598+
599+
fixtures.enable_index(&name).await?;
600+
let segments = fixtures.get_segments_metadata(name).await?;
601+
assert_eq!(segments.len(), 2);
602+
603+
Ok(())
604+
}
440605
}

crates/database/src/text_index_worker/text_meta.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -165,17 +165,13 @@ impl SearchIndex for TextSearchIndex {
165165
previous_segments: &mut Self::PreviousSegments,
166166
) -> anyhow::Result<Option<Self::NewSegment>> {
167167
let revision_stream = Box::pin(stream_revision_pairs(documents, &reader));
168-
// TODO(CX-6496): Make build_segment return None if there are no new documents
169-
// to index.
170-
Ok(Some(
171-
build_new_segment(
172-
revision_stream,
173-
schema.clone(),
174-
index_path,
175-
previous_segments,
176-
)
177-
.await?,
178-
))
168+
build_new_segment(
169+
revision_stream,
170+
schema.clone(),
171+
index_path,
172+
previous_segments,
173+
)
174+
.await
179175
}
180176

181177
async fn upload_new_segment<RT: Runtime>(

crates/search/src/incremental_index.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ pub async fn build_new_segment(
275275
tantivy_schema: TantivySearchIndexSchema,
276276
dir: &Path,
277277
previous_segments: &mut PreviousTextSegments,
278-
) -> anyhow::Result<NewTextSegment> {
278+
) -> anyhow::Result<Option<NewTextSegment>> {
279279
let index_path = dir.join("index_path");
280280
std::fs::create_dir(&index_path)?;
281281
let index = IndexBuilder::new()
@@ -297,6 +297,8 @@ pub async fn build_new_segment(
297297

298298
let mut num_indexed_documents = 0;
299299

300+
let mut is_at_least_one_document_indexed = false;
301+
300302
while let Some(revision_pair) = revision_stream.try_next().await? {
301303
let convex_id = revision_pair.id.internal_id();
302304
// Skip documents we have already added to the segment, but update dangling
@@ -335,6 +337,7 @@ pub async fn build_new_segment(
335337
}
336338
// Addition
337339
if let Some(new_document) = revision_pair.document() {
340+
is_at_least_one_document_indexed = true;
338341
num_indexed_documents += 1;
339342
dangling_deletes.remove(&convex_id);
340343
let tantivy_document =
@@ -348,6 +351,9 @@ pub async fn build_new_segment(
348351
"Dangling deletes is not empty. A document was deleted that is not present in other \
349352
segments nor in this stream"
350353
);
354+
if !is_at_least_one_document_indexed {
355+
return Ok(None);
356+
}
351357
segment_writer.finalize()?;
352358

353359
let new_deletion_tracker = MemoryDeletionTracker::new(new_id_tracker.num_ids() as u32);
@@ -363,10 +369,10 @@ pub async fn build_new_segment(
363369
alive_bit_set_path,
364370
deleted_terms_path,
365371
};
366-
Ok(NewTextSegment {
372+
Ok(Some(NewTextSegment {
367373
paths,
368374
num_indexed_documents,
369-
})
375+
}))
370376
}
371377

372378
pub struct SearchSegmentForMerge {

crates/search/src/searcher/searcher.rs

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1506,7 +1506,8 @@ mod tests {
15061506
test_dir.path(),
15071507
&mut previous_segments,
15081508
)
1509-
.await?;
1509+
.await?
1510+
.unwrap();
15101511
let updated_segments = previous_segments.finalize();
15111512
assert!(updated_segments.is_empty());
15121513
println!("Indexed {dataset_path} in {:?}", start.elapsed());
@@ -1637,8 +1638,11 @@ mod tests {
16371638

16381639
#[derive(Clone)]
16391640
struct TestIndex {
1641+
/// Only used for printing debug info.
16401642
strings_by_id: BTreeMap<ResolvedDocumentId, Option<String>>,
1641-
segment_paths: TextSegmentPaths,
1643+
/// Note - this is only the latest segment. This struct and tests don't
1644+
/// support querying multiple segments within an index.
1645+
segment_paths: Option<TextSegmentPaths>,
16421646
#[allow(dead_code)]
16431647
previous_segment_dirs: Vec<PathBuf>,
16441648
}
@@ -1703,7 +1707,7 @@ mod tests {
17031707
}
17041708
Ok(TestIndex {
17051709
strings_by_id,
1706-
segment_paths: new_segment.paths,
1710+
segment_paths: new_segment.map(|segment| segment.paths),
17071711
previous_segment_dirs,
17081712
})
17091713
}
@@ -1738,6 +1742,10 @@ mod tests {
17381742
test_index: TestIndex,
17391743
) -> anyhow::Result<Vec<(PostingListMatch, String)>> {
17401744
let segment_paths = test_index.segment_paths;
1745+
let Some(segment_paths) = segment_paths else {
1746+
println!("Empty segment!");
1747+
return Ok(vec![]);
1748+
};
17411749

17421750
let index_reader = index_reader_for_directory(&segment_paths.index_path)?;
17431751
let searcher = index_reader.searcher();
@@ -1925,13 +1933,29 @@ mod tests {
19251933
let (posting_list_match, s) = posting_list_matches.first().unwrap();
19261934
assert_eq!(posting_list_match.internal_id, id);
19271935
assert_eq!(s, "emma is awesome!");
1928-
let previous_segments =
1929-
PreviousTextSegments(vec![UpdatableTextSegment::load(&test_index.segment_paths)?]);
1936+
let previous_segments = PreviousTextSegments(vec![UpdatableTextSegment::load(
1937+
&test_index.segment_paths.clone().unwrap(),
1938+
)?]);
19301939
let test_dir = TempDir::new()?;
19311940
let delete_document: Vec<_> = vec![(id, Some("emma is awesome!"), None)];
19321941

1933-
let test_index =
1942+
let new_test_index =
19341943
build_test_index(delete_document.into(), test_dir.path(), previous_segments).await?;
1944+
1945+
let previous_segment_paths = new_test_index.previous_segment_dirs.first().unwrap();
1946+
let alive_bitset_path = previous_segment_paths.join(ALIVE_BITSET_PATH);
1947+
let deleted_terms_path = previous_segment_paths.join(DELETED_TERMS_PATH);
1948+
1949+
let test_index = TestIndex {
1950+
strings_by_id: new_test_index.strings_by_id,
1951+
segment_paths: Some(TextSegmentPaths {
1952+
alive_bit_set_path: alive_bitset_path,
1953+
deleted_terms_path,
1954+
..test_index.segment_paths.unwrap()
1955+
}),
1956+
previous_segment_dirs: new_test_index.previous_segment_dirs,
1957+
};
1958+
19351959
let posting_list_matches =
19361960
incremental_search_with_deletions_helper(query, test_index.clone()).await?;
19371961
assert_eq!(posting_list_matches.len(), 0);
@@ -1978,8 +2002,8 @@ mod tests {
19782002
assert_eq!(s, "emma is awesome!");
19792003

19802004
let segments = vec![
1981-
search_segment_from_path(&test_index_1.segment_paths)?,
1982-
search_segment_from_path(&test_index_2.segment_paths)?,
2005+
search_segment_from_path(&test_index_1.segment_paths.unwrap())?,
2006+
search_segment_from_path(&test_index_2.segment_paths.unwrap())?,
19832007
];
19842008

19852009
let merged_dir = TempDir::new()?;
@@ -1989,7 +2013,7 @@ mod tests {
19892013
merged_strings_by_id.append(&mut test_index_2.strings_by_id.clone());
19902014
let merged_index = TestIndex {
19912015
strings_by_id: merged_strings_by_id,
1992-
segment_paths: merged_paths,
2016+
segment_paths: Some(merged_paths),
19932017
previous_segment_dirs: vec![],
19942018
};
19952019
let mut posting_list_matches =

0 commit comments

Comments
 (0)