Skip to content

Commit 83f6500

Browse files
sjuddConvex, Inc.
authored and
Convex, Inc.
committed
Add multi segment metadata to text search's backfilling state (#25752)
GitOrigin-RevId: 9afd42bb5610db0f02a294c09f70bfb3ba36c861
1 parent a37cb03 commit 83f6500

File tree

17 files changed

+165
-29
lines changed

17 files changed

+165
-29
lines changed

crates/common/src/bootstrap_model/index/index_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl IndexConfig {
7575
matches!(on_disk_state, DatabaseIndexState::Backfilling(_))
7676
},
7777
IndexConfig::Search { on_disk_state, .. } => {
78-
matches!(on_disk_state, SearchIndexState::Backfilling)
78+
matches!(on_disk_state, SearchIndexState::Backfilling(_))
7979
},
8080
IndexConfig::Vector { on_disk_state, .. } => {
8181
matches!(on_disk_state, VectorIndexState::Backfilling(_))

crates/common/src/bootstrap_model/index/index_metadata.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use super::{
3636
IndexConfig,
3737
};
3838
use crate::{
39+
bootstrap_model::index::search_index::TextIndexBackfillState,
3940
document::{
4041
ParsedDocument,
4142
ResolvedDocument,
@@ -90,7 +91,7 @@ impl<T: TableIdentifier> IndexMetadata<T> {
9091
search_field,
9192
filter_fields,
9293
},
93-
SearchIndexState::Backfilling,
94+
SearchIndexState::Backfilling(TextIndexBackfillState::new()),
9495
)
9596
}
9697

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use std::str::FromStr;
2+
3+
use serde::{
4+
Deserialize,
5+
Serialize,
6+
};
7+
use sync_types::Timestamp;
8+
use value::{
9+
codegen_convex_serialization,
10+
InternalId,
11+
};
12+
13+
use crate::bootstrap_model::index::search_index::{
14+
index_snapshot::SerializedFragmentedSearchSegment,
15+
FragmentedSearchSegment,
16+
};
17+
18+
#[derive(Debug, Clone, PartialEq, Eq)]
19+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
20+
pub struct TextIndexBackfillState {
21+
pub segments: Vec<FragmentedSearchSegment>,
22+
// None at the start of backfill, then set after the first backfill iteration.
23+
pub cursor: Option<TextBackfillCursor>,
24+
}
25+
26+
impl TextIndexBackfillState {
27+
pub fn new() -> Self {
28+
Self {
29+
segments: vec![],
30+
cursor: None,
31+
}
32+
}
33+
}
34+
35+
#[derive(Debug, Clone, PartialEq, Eq)]
36+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
37+
pub struct TextBackfillCursor {
38+
pub cursor: InternalId,
39+
pub backfill_snapshot_ts: Timestamp,
40+
}
41+
42+
#[derive(Serialize, Deserialize)]
43+
pub struct SerializedTextBackfillCursor {
44+
pub document_cursor: String,
45+
pub backfill_snapshot_ts: i64,
46+
}
47+
48+
impl From<TextBackfillCursor> for SerializedTextBackfillCursor {
49+
fn from(value: TextBackfillCursor) -> Self {
50+
Self {
51+
document_cursor: value.cursor.to_string(),
52+
backfill_snapshot_ts: value.backfill_snapshot_ts.into(),
53+
}
54+
}
55+
}
56+
57+
impl TryFrom<SerializedTextBackfillCursor> for TextBackfillCursor {
58+
type Error = anyhow::Error;
59+
60+
fn try_from(value: SerializedTextBackfillCursor) -> Result<Self, Self::Error> {
61+
Ok(Self {
62+
cursor: InternalId::from_str(&value.document_cursor)?,
63+
backfill_snapshot_ts: Timestamp::try_from(value.backfill_snapshot_ts)?,
64+
})
65+
}
66+
}
67+
68+
#[derive(Serialize, Deserialize)]
69+
pub struct SerializedTextIndexBackfillState {
70+
segments: Option<Vec<SerializedFragmentedSearchSegment>>,
71+
cursor: Option<SerializedTextBackfillCursor>,
72+
}
73+
74+
impl TryFrom<TextIndexBackfillState> for SerializedTextIndexBackfillState {
75+
type Error = anyhow::Error;
76+
77+
fn try_from(backfill_state: TextIndexBackfillState) -> Result<Self, Self::Error> {
78+
Ok(SerializedTextIndexBackfillState {
79+
segments: Some(
80+
backfill_state
81+
.segments
82+
.into_iter()
83+
.map(|s| s.try_into())
84+
.collect::<anyhow::Result<Vec<_>>>()?,
85+
),
86+
cursor: backfill_state
87+
.cursor
88+
.map(|cursor| cursor.try_into())
89+
.transpose()?,
90+
})
91+
}
92+
}
93+
94+
impl TryFrom<SerializedTextIndexBackfillState> for TextIndexBackfillState {
95+
type Error = anyhow::Error;
96+
97+
fn try_from(serialized: SerializedTextIndexBackfillState) -> Result<Self, Self::Error> {
98+
Ok(TextIndexBackfillState {
99+
segments: serialized
100+
.segments
101+
.unwrap_or_default()
102+
.into_iter()
103+
.map(|s| s.try_into())
104+
.collect::<anyhow::Result<Vec<_>>>()?,
105+
cursor: serialized
106+
.cursor
107+
.map(TextBackfillCursor::try_from)
108+
.transpose()?,
109+
})
110+
}
111+
}
112+
113+
codegen_convex_serialization!(TextIndexBackfillState, SerializedTextIndexBackfillState);

crates/common/src/bootstrap_model/index/search_index/index_state.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ use super::{
88
index_snapshot::SerializedSearchIndexSnapshot,
99
SearchIndexSnapshot,
1010
};
11+
use crate::bootstrap_model::index::search_index::backfill_state::{
12+
SerializedTextIndexBackfillState,
13+
TextIndexBackfillState,
14+
};
1115

1216
/// The state of a search index.
1317
/// Search indexes begin in `Backfilling`.
@@ -16,7 +20,7 @@ use super::{
1620
#[derive(Debug, Clone, PartialEq, Eq)]
1721
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
1822
pub enum SearchIndexState {
19-
Backfilling,
23+
Backfilling(TextIndexBackfillState),
2024
Backfilled(SearchIndexSnapshot),
2125
SnapshottedAt(SearchIndexSnapshot),
2226
}
@@ -25,6 +29,10 @@ pub enum SearchIndexState {
2529
#[serde(tag = "state", rename_all = "camelCase")]
2630
pub enum SerializedSearchIndexState {
2731
Backfilling,
32+
Backfilling2 {
33+
#[serde(flatten)]
34+
backfill_state: SerializedTextIndexBackfillState,
35+
},
2836
Backfilled {
2937
#[serde(flatten)]
3038
snapshot: SerializedSearchIndexSnapshot,
@@ -40,7 +48,9 @@ impl TryFrom<SearchIndexState> for SerializedSearchIndexState {
4048

4149
fn try_from(state: SearchIndexState) -> Result<Self, Self::Error> {
4250
Ok(match state {
43-
SearchIndexState::Backfilling => SerializedSearchIndexState::Backfilling,
51+
SearchIndexState::Backfilling(state) => SerializedSearchIndexState::Backfilling2 {
52+
backfill_state: state.try_into()?,
53+
},
4454
SearchIndexState::Backfilled(snapshot) => SerializedSearchIndexState::Backfilled {
4555
snapshot: snapshot.try_into()?,
4656
},
@@ -56,7 +66,12 @@ impl TryFrom<SerializedSearchIndexState> for SearchIndexState {
5666

5767
fn try_from(serialized: SerializedSearchIndexState) -> Result<Self, Self::Error> {
5868
Ok(match serialized {
59-
SerializedSearchIndexState::Backfilling => SearchIndexState::Backfilling,
69+
SerializedSearchIndexState::Backfilling => {
70+
SearchIndexState::Backfilling(TextIndexBackfillState::new())
71+
},
72+
SerializedSearchIndexState::Backfilling2 { backfill_state } => {
73+
SearchIndexState::Backfilling(backfill_state.try_into()?)
74+
},
6075
SerializedSearchIndexState::Backfilled { snapshot } => {
6176
SearchIndexState::Backfilled(snapshot.try_into()?)
6277
},

crates/common/src/bootstrap_model/index/search_index/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
mod backfill_state;
12
mod index_config;
23
mod index_snapshot;
34
mod index_state;
45

56
pub use self::{
7+
backfill_state::TextIndexBackfillState,
68
index_config::{
79
DeveloperSearchIndexConfig,
810
SerializedDeveloperSearchIndexConfig,

crates/database/src/bootstrap_model/index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
210210
SearchIndexState::Backfilled(snapshot) => {
211211
*on_disk_state = SearchIndexState::SnapshottedAt(snapshot.clone());
212212
},
213-
SearchIndexState::Backfilling | SearchIndexState::SnapshottedAt(_) => {
213+
SearchIndexState::Backfilling(_) | SearchIndexState::SnapshottedAt(_) => {
214214
anyhow::bail!(
215215
"Expected backfilled index, but found: {on_disk_state:?} for {:?}",
216216
backfilled_index.name.descriptor()

crates/database/src/index_workers/index_meta.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl SearchIndexConfigParser for TextIndexConfigParser {
8080
Some(SearchIndexConfig {
8181
developer_config,
8282
on_disk_state: match on_disk_state {
83-
SearchIndexState::Backfilling => {
83+
SearchIndexState::Backfilling(_) => {
8484
// TODO(sam): Add support for a backfilling partial state to text search
8585
SearchOnDiskState::Backfilling(BackfillState {
8686
segments: vec![],

crates/database/src/index_workers/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use common::{
1616
runtime::Runtime,
1717
};
1818
use rand::Rng;
19+
use value::ResolvedDocumentId;
1920

2021
pub const MAX_BACKOFF: Duration = Duration::from_secs(30);
2122

@@ -48,3 +49,9 @@ pub async fn timeout_with_jitter<RT: Runtime>(rt: &RT, duration: Duration) {
4849
let sleep = rt.with_rng(|rng| half_timer + duration.mul_f32(rng.gen::<f32>()));
4950
rt.wait(sleep).await;
5051
}
52+
53+
#[derive(Debug)]
54+
pub struct MultiSegmentBackfillResult {
55+
pub new_cursor: Option<ResolvedDocumentId>,
56+
pub is_backfill_complete: bool,
57+
}

crates/database/src/search_and_vector_bootstrap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ impl IndexesToBootstrap {
165165
on_disk_state,
166166
} => {
167167
let search_index = match on_disk_state {
168-
SearchIndexState::Backfilling => {
168+
SearchIndexState::Backfilling(_) => {
169169
// We'll start a new memory search index starting at the next commit
170170
// after our persistence upper bound. After
171171
// bootstrapping, all commits after

crates/database/src/search_index_worker/fast_forward.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl<RT: Runtime> IndexFastForward<RT, SearchSnapshotVersion> for SearchFastForw
4343
SearchIndexState::SnapshottedAt(snapshot) | SearchIndexState::Backfilled(snapshot) => {
4444
snapshot
4545
},
46-
SearchIndexState::Backfilling => return None,
46+
SearchIndexState::Backfilling(_) => return None,
4747
};
4848
Some((*ts, *version))
4949
}

crates/database/src/search_index_worker/flusher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl<RT: Runtime> SearchIndexFlusher<RT> {
146146
// If the index is in the `Backfilling` state, or is already `SnapshottedAt` but
147147
// has grown too large or has the wrong format, it needs to be backfilled.
148148
let needs_backfill = match &on_disk_state {
149-
SearchIndexState::Backfilling => Some(BuildReason::Backfilling),
149+
SearchIndexState::Backfilling(_) => Some(BuildReason::Backfilling),
150150
SearchIndexState::SnapshottedAt(SearchIndexSnapshot { version, .. })
151151
| SearchIndexState::Backfilled(SearchIndexSnapshot { version, .. })
152152
if *version != expected_version =>
@@ -226,7 +226,7 @@ impl<RT: Runtime> SearchIndexFlusher<RT> {
226226
version: SearchSnapshotVersion::new(tx.persistence_version()),
227227
};
228228
let new_on_disk_state = match job.on_disk_state {
229-
SearchIndexState::Backfilling | SearchIndexState::Backfilled(_) => {
229+
SearchIndexState::Backfilling(_) | SearchIndexState::Backfilled(_) => {
230230
SearchIndexState::Backfilled(snapshot_data)
231231
},
232232
SearchIndexState::SnapshottedAt(_) => SearchIndexState::SnapshottedAt(snapshot_data),

crates/database/src/test_helpers/index_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub fn assert_backfilling(
6969
assert_matches!(on_disk_state, DatabaseIndexState::Backfilling(_))
7070
},
7171
IndexConfig::Search { on_disk_state, .. } => {
72-
assert_eq!(on_disk_state, SearchIndexState::Backfilling)
72+
assert_matches!(on_disk_state, SearchIndexState::Backfilling(_))
7373
},
7474
IndexConfig::Vector { on_disk_state, .. } => {
7575
assert_matches!(on_disk_state, VectorIndexState::Backfilling(_))

crates/database/src/vector_index_worker/flusher.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ use crate::{
7373
VectorSearchIndex,
7474
},
7575
BuildReason,
76+
MultiSegmentBackfillResult,
7677
},
7778
metrics::{
7879
self,
@@ -518,7 +519,7 @@ impl<RT: Runtime> VectorIndexFlusher<RT> {
518519

519520
let index_backfill_result =
520521
if let MultipartBuildType::IncrementalComplete { .. } = build_type {
521-
Some(VectorIndexMultiSegmentBackfillResult {
522+
Some(MultiSegmentBackfillResult {
522523
new_cursor,
523524
is_backfill_complete,
524525
})
@@ -622,7 +623,7 @@ struct MultiSegmentBuildResult {
622623
new_segment: Option<DiskSegmentValues>,
623624
updated_previous_segments: Vec<FragmentedVectorSegment>,
624625
// This is set only if the build iteration created a segment for a backfilling index
625-
backfill_result: Option<VectorIndexMultiSegmentBackfillResult>,
626+
backfill_result: Option<MultiSegmentBackfillResult>,
626627
}
627628

628629
#[cfg(any(test, feature = "testing"))]
@@ -646,13 +647,7 @@ struct IndexBuildResult {
646647
vectors_in_new_segment: Option<u32>,
647648
new_segment_id: Option<String>,
648649
// If this is set, this iteration made progress on backfilling an index
649-
backfill_result: Option<VectorIndexMultiSegmentBackfillResult>,
650-
}
651-
652-
#[derive(Debug)]
653-
pub struct VectorIndexMultiSegmentBackfillResult {
654-
pub new_cursor: Option<ResolvedDocumentId>,
655-
pub is_backfill_complete: bool,
650+
backfill_result: Option<MultiSegmentBackfillResult>,
656651
}
657652

658653
#[cfg(test)]

crates/database/src/vector_index_worker/writer.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ use vector::QdrantExternalId;
4949

5050
use super::IndexBuild;
5151
use crate::{
52-
index_workers::index_meta::VectorSearchIndex,
52+
index_workers::{
53+
index_meta::VectorSearchIndex,
54+
MultiSegmentBackfillResult,
55+
},
5356
metrics::vector::{
5457
finish_vector_index_merge_timer,
5558
vector_compaction_merge_commit_timer,
@@ -58,7 +61,6 @@ use crate::{
5861
VectorIndexMergeType,
5962
VectorWriterLockWaiter,
6063
},
61-
vector_index_worker::flusher::VectorIndexMultiSegmentBackfillResult,
6264
Database,
6365
IndexModel,
6466
SystemMetadataModel,
@@ -142,7 +144,7 @@ impl<RT: Runtime> VectorMetadataWriter<RT> {
142144
new_ts: Timestamp,
143145
new_and_modified_segments: Vec<FragmentedVectorSegment>,
144146
new_segment_id: Option<String>,
145-
index_backfill_result: Option<VectorIndexMultiSegmentBackfillResult>,
147+
index_backfill_result: Option<MultiSegmentBackfillResult>,
146148
) -> anyhow::Result<()> {
147149
let inner = self.inner(VectorWriterLockWaiter::Flusher).await;
148150

@@ -392,7 +394,7 @@ impl<RT: Runtime> Inner<RT> {
392394
backfill_complete_ts: Timestamp,
393395
mut new_and_modified_segments: Vec<FragmentedVectorSegment>,
394396
new_segment_id: Option<String>,
395-
backfill_result: VectorIndexMultiSegmentBackfillResult,
397+
backfill_result: MultiSegmentBackfillResult,
396398
) -> anyhow::Result<()> {
397399
let timer = vector_flush_merge_commit_timer();
398400
let mut tx: Transaction<RT> = self.database.begin(Identity::system()).await?;

crates/local_backend/src/schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ impl TryFrom<IndexMetadata<TableName>> for IndexMetadataResponse {
152152
},
153153
} => {
154154
let backfill_state = match on_disk_state {
155-
SearchIndexState::Backfilling => "in_progress".to_string(),
155+
SearchIndexState::Backfilling(_) => "in_progress".to_string(),
156156
// TODO(CX-3851): The result of this is used to poll for state in the CLI and
157157
// also for display in the dashboard. We might consider a new value that would
158158
// let us differentiate between Backfilled and SnapshottedAt in the dashboard.

crates/model/src/config/index_tests.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,8 @@ async fn apply_config_with_backfilling_search_index_throws(rt: TestRuntime) -> a
650650
// commit the schema.
651651
assert_root_cause_contains(
652652
result,
653-
"Expected backfilled index, but found: Backfilling for \"index\"",
653+
"Expected backfilled index, but found: Backfilling(TextIndexBackfillState { segments: [], \
654+
cursor: None }) for \"index\"",
654655
);
655656

656657
Ok(())
@@ -1099,7 +1100,7 @@ fn assert_index_data(actual: Vec<IndexConfig>, expected: Vec<TestIndexConfig>) {
10991100
on_disk_state,
11001101
} => {
11011102
let search_state = match on_disk_state {
1102-
SearchIndexState::Backfilling => TestIndexState::Backfilling,
1103+
SearchIndexState::Backfilling(_) => TestIndexState::Backfilling,
11031104
SearchIndexState::Backfilled(_) => TestIndexState::Backfilled,
11041105
SearchIndexState::SnapshottedAt(_) => TestIndexState::Enabled,
11051106
};

0 commit comments

Comments
 (0)