Skip to content

Commit 27156fb

Browse files
sjuddConvex, Inc.
authored and
Convex, Inc.
committed
Move Text/Vector index impls into their respective crates (#25866)
GitOrigin-RevId: 329962d459aefff892815b6d59534e35d2e854fa
1 parent 3ee2caa commit 27156fb

File tree

8 files changed

+415
-353
lines changed

8 files changed

+415
-353
lines changed

crates/database/src/index_workers/index_meta.rs

Lines changed: 3 additions & 345 deletions
Original file line numberDiff line numberDiff line change
@@ -6,49 +6,17 @@ use std::{
66

77
use async_trait::async_trait;
88
use common::{
9-
bootstrap_model::index::{
10-
text_index::{
11-
DeveloperSearchIndexConfig,
12-
FragmentedTextSegment,
13-
TextIndexBackfillState,
14-
TextIndexSnapshot,
15-
TextIndexState,
16-
},
17-
vector_index::{
18-
DeveloperVectorIndexConfig,
19-
FragmentedVectorSegment,
20-
VectorIndexBackfillState,
21-
VectorIndexSnapshot,
22-
VectorIndexSnapshotData,
23-
VectorIndexState,
24-
},
25-
IndexConfig,
26-
},
9+
bootstrap_model::index::IndexConfig,
2710
document::ResolvedDocument,
2811
persistence::DocumentStream,
2912
runtime::Runtime,
3013
types::IndexId,
3114
};
32-
use futures::{
33-
stream::FuturesUnordered,
34-
TryStreamExt,
35-
};
36-
use search::{
37-
disk_index::upload_segment,
38-
fragmented_segment::MutableFragmentedSegmentMetadata,
39-
};
4015
use storage::Storage;
4116
use sync_types::Timestamp;
4217
use value::InternalId;
43-
use vector::{
44-
qdrant_segments::DiskSegmentValues,
45-
QdrantSchema,
46-
};
4718

48-
use crate::{
49-
metrics::vector::log_documents_per_segment,
50-
Snapshot,
51-
};
19+
use crate::Snapshot;
5220

5321
pub trait SearchIndexConfigParser {
5422
type IndexType: SearchIndex;
@@ -58,66 +26,6 @@ pub trait SearchIndexConfigParser {
5826
fn get_config(config: IndexConfig) -> Option<SearchIndexConfig<Self::IndexType>>;
5927
}
6028

61-
pub struct VectorIndexConfigParser;
62-
63-
impl SearchIndexConfigParser for VectorIndexConfigParser {
64-
type IndexType = VectorSearchIndex;
65-
66-
fn get_config(config: IndexConfig) -> Option<SearchIndexConfig<Self::IndexType>> {
67-
let IndexConfig::Vector {
68-
on_disk_state,
69-
developer_config,
70-
} = config
71-
else {
72-
return None;
73-
};
74-
Some(SearchIndexConfig {
75-
developer_config,
76-
on_disk_state: match on_disk_state {
77-
VectorIndexState::Backfilling(backfill_state) => {
78-
SearchOnDiskState::Backfilling(backfill_state.into())
79-
},
80-
VectorIndexState::Backfilled(snapshot) => {
81-
SearchOnDiskState::Backfilled(snapshot.into())
82-
},
83-
VectorIndexState::SnapshottedAt(snapshot) => {
84-
SearchOnDiskState::SnapshottedAt(snapshot.into())
85-
},
86-
},
87-
})
88-
}
89-
}
90-
91-
pub struct TextIndexConfigParser;
92-
93-
impl SearchIndexConfigParser for TextIndexConfigParser {
94-
type IndexType = TextSearchIndex;
95-
96-
fn get_config(config: IndexConfig) -> Option<SearchIndexConfig<Self::IndexType>> {
97-
let IndexConfig::Search {
98-
on_disk_state,
99-
developer_config,
100-
} = config
101-
else {
102-
return None;
103-
};
104-
Some(SearchIndexConfig {
105-
developer_config,
106-
on_disk_state: match on_disk_state {
107-
TextIndexState::Backfilling(snapshot) => {
108-
SearchOnDiskState::Backfilling(snapshot.into())
109-
},
110-
TextIndexState::Backfilled(snapshot) => {
111-
SearchOnDiskState::Backfilled(snapshot.into())
112-
},
113-
TextIndexState::SnapshottedAt(snapshot) => {
114-
SearchOnDiskState::SnapshottedAt(snapshot.into())
115-
},
116-
},
117-
})
118-
}
119-
}
120-
12129
#[async_trait]
12230
pub trait SearchIndex {
12331
type DeveloperConfig: Clone + Send;
@@ -173,187 +81,6 @@ pub trait SegmentStatistics: Default {
17381
fn add(lhs: anyhow::Result<Self>, rhs: anyhow::Result<Self>) -> anyhow::Result<Self>;
17482
fn log(&self);
17583
}
176-
177-
#[derive(Debug)]
178-
pub struct TextSearchIndex;
179-
#[async_trait]
180-
impl SearchIndex for TextSearchIndex {
181-
type DeveloperConfig = DeveloperSearchIndexConfig;
182-
type NewSegment = ();
183-
type PreviousSegments = ();
184-
type Schema = ();
185-
type Segment = FragmentedTextSegment;
186-
type Statistics = TextStatistics;
187-
188-
fn get_index_sizes(snapshot: Snapshot) -> anyhow::Result<BTreeMap<IndexId, usize>> {
189-
Ok(snapshot
190-
.search_indexes
191-
.backfilled_and_enabled_index_sizes()?
192-
.collect())
193-
}
194-
195-
fn is_version_current(snapshot: &SearchSnapshot<Self>) -> bool {
196-
// TODO(sam): This doesn't match the current persistence version based check,
197-
// but it's closer to what vector search does.
198-
snapshot.data.is_version_current()
199-
}
200-
201-
fn new_schema(_config: &Self::DeveloperConfig) -> Self::Schema {}
202-
203-
async fn download_previous_segments(
204-
_storage: Arc<dyn Storage>,
205-
_segment: Vec<Self::Segment>,
206-
) -> anyhow::Result<Self::PreviousSegments> {
207-
anyhow::bail!("Not implemented");
208-
}
209-
210-
async fn upload_previous_segments(
211-
_storage: Arc<dyn Storage>,
212-
_segments: Self::PreviousSegments,
213-
) -> anyhow::Result<Vec<Self::Segment>> {
214-
anyhow::bail!("Not implemented");
215-
}
216-
217-
fn estimate_document_size(_schema: &Self::Schema, _doc: &ResolvedDocument) -> u64 {
218-
0
219-
}
220-
221-
async fn build_disk_index(
222-
_schema: &Self::Schema,
223-
_index_path: &PathBuf,
224-
_documents: DocumentStream<'_>,
225-
_full_scan_threshold_bytes: usize,
226-
_previous_segments: &mut Self::PreviousSegments,
227-
) -> anyhow::Result<Option<Self::NewSegment>> {
228-
anyhow::bail!("Not implemented");
229-
}
230-
231-
async fn upload_new_segment<RT: Runtime>(
232-
_rt: &RT,
233-
_storage: Arc<dyn Storage>,
234-
_new_segment: Self::NewSegment,
235-
) -> anyhow::Result<Self::Segment> {
236-
anyhow::bail!("Not implemented")
237-
}
238-
239-
fn segment_id(_segment: &Self::Segment) -> String {
240-
"".to_string()
241-
}
242-
243-
fn statistics(segment: &Self::Segment) -> anyhow::Result<Self::Statistics> {
244-
Ok(TextStatistics {
245-
num_indexed_documents: segment.num_indexed_documents,
246-
})
247-
}
248-
}
249-
250-
#[derive(Debug, Default)]
251-
pub struct TextStatistics {
252-
pub num_indexed_documents: u32,
253-
}
254-
255-
impl SegmentStatistics for TextStatistics {
256-
fn add(lhs: anyhow::Result<Self>, rhs: anyhow::Result<Self>) -> anyhow::Result<Self> {
257-
Ok(Self {
258-
num_indexed_documents: lhs?.num_indexed_documents + rhs?.num_indexed_documents,
259-
})
260-
}
261-
262-
fn log(&self) {}
263-
}
264-
265-
#[derive(Debug)]
266-
pub struct VectorSearchIndex;
267-
268-
#[async_trait]
269-
impl SearchIndex for VectorSearchIndex {
270-
type DeveloperConfig = DeveloperVectorIndexConfig;
271-
type NewSegment = DiskSegmentValues;
272-
type PreviousSegments = Vec<MutableFragmentedSegmentMetadata>;
273-
type Schema = QdrantSchema;
274-
type Segment = FragmentedVectorSegment;
275-
type Statistics = VectorStatistics;
276-
277-
fn get_index_sizes(snapshot: Snapshot) -> anyhow::Result<BTreeMap<IndexId, usize>> {
278-
Ok(snapshot
279-
.vector_indexes
280-
.backfilled_and_enabled_index_sizes()?
281-
.collect())
282-
}
283-
284-
fn is_version_current(snapshot: &SearchSnapshot<Self>) -> bool {
285-
snapshot.data.is_version_current()
286-
}
287-
288-
fn new_schema(config: &Self::DeveloperConfig) -> Self::Schema {
289-
QdrantSchema::new(config)
290-
}
291-
292-
async fn download_previous_segments(
293-
storage: Arc<dyn Storage>,
294-
segments: Vec<Self::Segment>,
295-
) -> anyhow::Result<Self::PreviousSegments> {
296-
segments
297-
.into_iter()
298-
.map(|segment| MutableFragmentedSegmentMetadata::download(segment, storage.clone()))
299-
.collect::<FuturesUnordered<_>>()
300-
.try_collect::<Vec<_>>()
301-
.await
302-
}
303-
304-
async fn upload_previous_segments(
305-
storage: Arc<dyn Storage>,
306-
segments: Self::PreviousSegments,
307-
) -> anyhow::Result<Vec<Self::Segment>> {
308-
segments
309-
.into_iter()
310-
.map(|segment| segment.upload_deleted_bitset(storage.clone()))
311-
.collect::<FuturesUnordered<_>>()
312-
.try_collect::<Vec<_>>()
313-
.await
314-
}
315-
316-
fn estimate_document_size(schema: &Self::Schema, _doc: &ResolvedDocument) -> u64 {
317-
schema.estimate_vector_size() as u64
318-
}
319-
320-
async fn build_disk_index(
321-
schema: &Self::Schema,
322-
index_path: &PathBuf,
323-
documents: DocumentStream<'_>,
324-
full_scan_threshold_bytes: usize,
325-
previous_segments: &mut Self::PreviousSegments,
326-
) -> anyhow::Result<Option<Self::NewSegment>> {
327-
schema
328-
.build_disk_index(
329-
index_path,
330-
documents,
331-
full_scan_threshold_bytes,
332-
&mut previous_segments.iter_mut().collect::<Vec<_>>(),
333-
)
334-
.await
335-
}
336-
337-
async fn upload_new_segment<RT: Runtime>(
338-
rt: &RT,
339-
storage: Arc<dyn Storage>,
340-
new_segment: Self::NewSegment,
341-
) -> anyhow::Result<Self::Segment> {
342-
upload_segment(rt, storage, new_segment).await
343-
}
344-
345-
fn segment_id(segment: &Self::Segment) -> String {
346-
segment.id.clone()
347-
}
348-
349-
fn statistics(segment: &Self::Segment) -> anyhow::Result<Self::Statistics> {
350-
let non_deleted_vectors = segment.non_deleted_vectors()?;
351-
Ok(VectorStatistics {
352-
non_deleted_vectors,
353-
num_vectors: segment.num_vectors,
354-
})
355-
}
356-
}
35784
pub struct SearchIndexConfig<T: SearchIndex> {
35885
pub developer_config: T::DeveloperConfig,
35986
pub on_disk_state: SearchOnDiskState<T>,
@@ -370,72 +97,12 @@ pub struct BackfillState<T: SearchIndex> {
37097
pub backfill_snapshot_ts: Option<Timestamp>,
37198
}
37299

373-
#[derive(Debug, Default)]
374-
pub struct VectorStatistics {
375-
pub num_vectors: u32,
376-
pub non_deleted_vectors: u64,
377-
}
378-
379-
impl SegmentStatistics for VectorStatistics {
380-
fn add(lhs: anyhow::Result<Self>, rhs: anyhow::Result<Self>) -> anyhow::Result<Self> {
381-
let rhs = rhs?;
382-
let lhs = lhs?;
383-
Ok(Self {
384-
num_vectors: lhs.num_vectors + rhs.num_vectors,
385-
non_deleted_vectors: lhs.non_deleted_vectors + rhs.non_deleted_vectors,
386-
})
387-
}
388-
389-
fn log(&self) {
390-
log_documents_per_segment(self.non_deleted_vectors);
391-
}
392-
}
393-
394-
impl From<TextIndexBackfillState> for BackfillState<TextSearchIndex> {
395-
fn from(value: TextIndexBackfillState) -> Self {
396-
Self {
397-
segments: value.segments,
398-
cursor: value.cursor.clone().map(|value| value.cursor),
399-
backfill_snapshot_ts: value.cursor.map(|value| value.backfill_snapshot_ts),
400-
}
401-
}
402-
}
403-
404-
impl From<VectorIndexBackfillState> for BackfillState<VectorSearchIndex> {
405-
fn from(value: VectorIndexBackfillState) -> Self {
406-
Self {
407-
segments: value.segments,
408-
cursor: value.cursor,
409-
backfill_snapshot_ts: value.backfill_snapshot_ts,
410-
}
411-
}
412-
}
413-
414100
pub enum SearchOnDiskState<T: SearchIndex> {
415101
Backfilling(BackfillState<T>),
416102
Backfilled(SearchSnapshot<T>),
417103
SnapshottedAt(SearchSnapshot<T>),
418104
}
419105

420-
impl From<VectorIndexSnapshot> for SearchSnapshot<VectorSearchIndex> {
421-
fn from(snapshot: VectorIndexSnapshot) -> Self {
422-
Self {
423-
ts: snapshot.ts,
424-
data: SnapshotData::from(snapshot.data),
425-
}
426-
}
427-
}
428-
429-
impl From<TextIndexSnapshot> for SearchSnapshot<TextSearchIndex> {
430-
fn from(snapshot: TextIndexSnapshot) -> Self {
431-
Self {
432-
ts: snapshot.ts,
433-
// TODO(sam): Implement this.
434-
data: SnapshotData::Unknown,
435-
}
436-
}
437-
}
438-
439106
#[derive(Debug)]
440107
pub enum SnapshotData<T> {
441108
/// An unrecognized snapshot, probably from a newer version of backend than
@@ -445,16 +112,7 @@ pub enum SnapshotData<T> {
445112
}
446113

447114
impl<T> SnapshotData<T> {
448-
fn is_version_current(&self) -> bool {
115+
pub fn is_version_current(&self) -> bool {
449116
matches!(self, Self::MultiSegment(_))
450117
}
451118
}
452-
453-
impl From<VectorIndexSnapshotData> for SnapshotData<FragmentedVectorSegment> {
454-
fn from(value: VectorIndexSnapshotData) -> Self {
455-
match value {
456-
VectorIndexSnapshotData::MultiSegment(values) => SnapshotData::MultiSegment(values),
457-
VectorIndexSnapshotData::Unknown(_) => SnapshotData::Unknown,
458-
}
459-
}
460-
}

0 commit comments

Comments
 (0)