Skip to content

Commit d6581b8

Browse files
sjuddConvex, Inc.
authored and
Convex, Inc.
committed
Handle changing term values due to replaced documents in the log (#27102)
When we iterate over the document log, we may encounter a document whose most recent previous revision does not match the version of that document that's in a previous text search segment. We need these to match because we update statistics based on the contents of the document. If the document terms we use to update statistics don't match the document in the previous segment, lots of things break. To make this work, rather than taking the first previous revision of a document we find, we now look for the first previous revision of a document that's outside the timestamp bounds of the new segment. If the previous revision is within our new segment's bound, we skip processing it. As we continue backwards down the document log, we will then either find a pure add within our segment's time bounds (which we'll skip), or we eventually find a previous revision outside of our time bounds that we can use for the update. GitOrigin-RevId: 7eb7a742fb77b5a85adcc6bea70f4c2b56adbc29
1 parent bc8fbf0 commit d6581b8

File tree

9 files changed

+274
-81
lines changed

9 files changed

+274
-81
lines changed

crates/database/src/bootstrap_model/test_facing.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,17 @@ impl<'a, RT: Runtime> TestFacingModel<'a, RT> {
3434
.await
3535
}
3636

37+
#[convex_macro::instrument_future]
38+
pub async fn replace(
39+
&mut self,
40+
id: ResolvedDocumentId,
41+
value: ConvexObject,
42+
) -> anyhow::Result<ResolvedDocument> {
43+
SystemMetadataModel::new(self.tx, TableNamespace::test_user())
44+
.replace(id, value)
45+
.await
46+
}
47+
3748
/// Insert a new document and immediately read it. Prefer using `insert`
3849
/// unless you need to read the creation time.
3950
#[convex_macro::instrument_future]

crates/database/src/index_workers/index_meta.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ pub trait SearchIndex: Clone + Debug {
107107
documents: DocumentStream<'_>,
108108
reader: RepeatablePersistence,
109109
previous_segments: &mut Self::PreviousSegments,
110+
document_log_lower_bound: Option<Timestamp>,
110111
build_index_args: Self::BuildIndexArgs,
111112
multipart_build_type: MultipartBuildType,
112113
) -> anyhow::Result<Option<Self::NewSegment>>;

crates/database/src/index_workers/search_flusher.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -491,19 +491,23 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
491491
let mut is_size_exceeded = false;
492492
let qdrant_schema = T::new_schema(&developer_config);
493493

494+
let mut lower_bound_ts: Option<Timestamp> = None;
494495
let (documents, previous_segments) = match build_type {
495-
MultipartBuildType::Partial(last_ts) => (
496-
params.database.load_documents_in_table(
497-
*index_name.table(),
498-
TimestampRange::new((
499-
Bound::Excluded(*last_ts),
500-
Bound::Included(*snapshot_ts),
501-
))?,
502-
T::partial_document_order(),
503-
&row_rate_limiter,
504-
),
505-
previous_segments,
506-
),
496+
MultipartBuildType::Partial(last_ts) => {
497+
lower_bound_ts = Some(*last_ts);
498+
(
499+
params.database.load_documents_in_table(
500+
*index_name.table(),
501+
TimestampRange::new((
502+
Bound::Excluded(*last_ts),
503+
Bound::Included(*snapshot_ts),
504+
))?,
505+
T::partial_document_order(),
506+
&row_rate_limiter,
507+
),
508+
previous_segments,
509+
)
510+
},
507511
MultipartBuildType::IncrementalComplete {
508512
cursor,
509513
backfill_snapshot_ts,
@@ -584,6 +588,7 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
584588
documents,
585589
persistence,
586590
&mut mutable_previous_segments,
591+
lower_bound_ts,
587592
build_index_args,
588593
build_type,
589594
)

crates/database/src/tests/text_test_utils.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,21 @@ impl TextFixtures {
269269
Ok(segments.clone())
270270
}
271271

272+
pub async fn replace_document(
273+
&self,
274+
doc_id: ResolvedDocumentId,
275+
text: &str,
276+
) -> anyhow::Result<()> {
277+
let mut tx = self.db.begin_system().await?;
278+
let document = assert_obj!(
279+
"text" => text,
280+
"channel" => "#general",
281+
);
282+
tx.replace_inner(doc_id, document).await?;
283+
self.db.commit(tx).await?;
284+
Ok(())
285+
}
286+
272287
pub async fn add_document(&self, text: &str) -> anyhow::Result<ResolvedDocumentId> {
273288
let table_name = TABLE_NAME.parse::<TableName>()?;
274289
let mut tx = self.db.begin_system().await?;

crates/database/src/text_index_worker/flusher2.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,115 @@ mod tests {
519519
Ok(())
520520
}
521521

522+
#[convex_macro::test_runtime]
523+
async fn backfill_insert_then_replace_delete_one_segment_doesnt_panic(
524+
rt: TestRuntime,
525+
) -> anyhow::Result<()> {
526+
let fixtures = TextFixtures::new(rt).await?;
527+
let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?;
528+
let mut flusher = fixtures.new_search_flusher2();
529+
530+
let doc_id = fixtures.add_document("cat").await?;
531+
fixtures.replace_document(doc_id, "new_text").await?;
532+
let mut tx = fixtures.db.begin_system().await?;
533+
tx.delete_inner(doc_id).await?;
534+
fixtures.db.commit(tx).await?;
535+
536+
flusher.step().await?;
537+
fixtures.enable_index(&index_name).await?;
538+
539+
let results = fixtures.search(index_name.clone(), "cat").await?;
540+
assert!(results.is_empty());
541+
let results = fixtures.search(index_name, "new_text").await?;
542+
assert!(results.is_empty());
543+
544+
Ok(())
545+
}
546+
547+
#[convex_macro::test_runtime]
548+
async fn backfill_insert_then_replace_delete_second_segment_doesnt_panic(
549+
rt: TestRuntime,
550+
) -> anyhow::Result<()> {
551+
let fixtures = TextFixtures::new(rt).await?;
552+
let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?;
553+
let mut flusher = fixtures.new_search_flusher2();
554+
555+
let doc_id = fixtures.add_document("cat").await?;
556+
flusher.step().await?;
557+
558+
fixtures.replace_document(doc_id, "new_text").await?;
559+
560+
let mut tx = fixtures.db.begin_system().await?;
561+
tx.delete_inner(doc_id).await?;
562+
fixtures.db.commit(tx).await?;
563+
564+
flusher.step().await?;
565+
fixtures.enable_index(&index_name).await?;
566+
567+
let results = fixtures.search(index_name.clone(), "cat").await?;
568+
assert!(results.is_empty());
569+
let results = fixtures.search(index_name, "new_text").await?;
570+
assert!(results.is_empty());
571+
572+
Ok(())
573+
}
574+
575+
#[convex_macro::test_runtime]
576+
async fn backfill_insert_replace_replace_delete_doesnt_panic(
577+
rt: TestRuntime,
578+
) -> anyhow::Result<()> {
579+
let fixtures = TextFixtures::new(rt).await?;
580+
let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?;
581+
let mut flusher = fixtures.new_search_flusher2();
582+
583+
let doc_id = fixtures.add_document("cat").await?;
584+
flusher.step().await?;
585+
586+
fixtures.replace_document(doc_id, "new_text").await?;
587+
fixtures.replace_document(doc_id, "really_new_text").await?;
588+
589+
let mut tx = fixtures.db.begin_system().await?;
590+
tx.delete_inner(doc_id).await?;
591+
fixtures.db.commit(tx).await?;
592+
593+
flusher.step().await?;
594+
fixtures.enable_index(&index_name).await?;
595+
596+
let results = fixtures.search(index_name, "really_new_text").await?;
597+
assert!(results.is_empty());
598+
599+
Ok(())
600+
}
601+
602+
#[convex_macro::test_runtime]
603+
async fn backfill_insert_replace_replace_delete_different_segments_doesnt_panic(
604+
rt: TestRuntime,
605+
) -> anyhow::Result<()> {
606+
let fixtures = TextFixtures::new(rt).await?;
607+
let IndexData { index_name, .. } = fixtures.insert_backfilling_text_index().await?;
608+
let mut flusher = fixtures.new_search_flusher2();
609+
610+
let doc_id = fixtures.add_document("cat").await?;
611+
flusher.step().await?;
612+
613+
fixtures.replace_document(doc_id, "new_text").await?;
614+
flusher.step().await?;
615+
fixtures.replace_document(doc_id, "really_new_text").await?;
616+
flusher.step().await?;
617+
618+
let mut tx = fixtures.db.begin_system().await?;
619+
tx.delete_inner(doc_id).await?;
620+
fixtures.db.commit(tx).await?;
621+
622+
flusher.step().await?;
623+
fixtures.enable_index(&index_name).await?;
624+
625+
let results = fixtures.search(index_name, "really_new_text").await?;
626+
assert!(results.is_empty());
627+
628+
Ok(())
629+
}
630+
522631
#[convex_macro::test_runtime]
523632
async fn backfill_with_backfilled_single_segment_format_backfills_with_multi_segment_format(
524633
rt: TestRuntime,

crates/database/src/text_index_worker/text_meta.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use search::{
5656
UpdatableTextSegment,
5757
};
5858
use storage::Storage;
59+
use sync_types::Timestamp;
5960
use value::InternalId;
6061

6162
use crate::{
@@ -218,6 +219,7 @@ impl SearchIndex for TextSearchIndex {
218219
documents: DocumentStream<'_>,
219220
reader: RepeatablePersistence,
220221
previous_segments: &mut Self::PreviousSegments,
222+
lower_bound_ts: Option<Timestamp>,
221223
BuildTextIndexArgs {
222224
search_storage,
223225
segment_term_metadata_fetcher,
@@ -254,6 +256,7 @@ impl SearchIndex for TextSearchIndex {
254256
previous_segments,
255257
segment_term_metadata_fetcher,
256258
search_storage,
259+
lower_bound_ts,
257260
)
258261
.await
259262
}

crates/database/src/vector_index_worker/vector_meta.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use search::{
4242
Searcher,
4343
};
4444
use storage::Storage;
45+
use sync_types::Timestamp;
4546
use value::InternalId;
4647
use vector::{
4748
qdrant_segments::VectorDiskSegmentValues,
@@ -221,6 +222,7 @@ impl SearchIndex for VectorSearchIndex {
221222
documents: DocumentStream<'_>,
222223
_reader: RepeatablePersistence,
223224
previous_segments: &mut Self::PreviousSegments,
225+
_document_log_lower_bound: Option<Timestamp>,
224226
BuildVectorIndexArgs {
225227
full_scan_threshold_bytes,
226228
}: Self::BuildIndexArgs,

0 commit comments

Comments
 (0)