Skip to content

Commit 69ebba0

Browse files
sjuddConvex, Inc.
authored and
Convex, Inc.
committed
Generalize vector flush stats (#25811)
This removes the vector specific statistics from the generalized search flusher. Still a little awkward, but it's something. GitOrigin-RevId: 030a62bff75b7dd3ea99f2cc39d0c78e202e7e4d
1 parent 451aa5b commit 69ebba0

File tree

3 files changed

+75
-31
lines changed

3 files changed

+75
-31
lines changed

crates/database/src/index_workers/index_meta.rs

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ use vector::{
4444
QdrantSchema,
4545
};
4646

47-
use crate::Snapshot;
47+
use crate::{
48+
metrics::vector::log_documents_per_segment,
49+
Snapshot,
50+
};
4851

4952
pub trait SearchIndexConfigParser {
5053
type IndexType: SearchIndex;
@@ -127,10 +130,11 @@ pub trait SearchIndex {
127130

128131
type PreviousSegments;
129132

133+
type Statistics: SegmentStatistics;
134+
130135
type Schema: Send + Sync;
131-
// TODO(sam): Convert these to something more like segment statistics
132-
fn num_vectors(segment: &Self::Segment) -> u32;
133-
fn non_deleted_vectors(segment: &Self::Segment) -> anyhow::Result<u64>;
136+
137+
fn statistics(segment: &Self::Segment) -> anyhow::Result<Self::Statistics>;
134138

135139
async fn upload_new_segment<RT: Runtime>(
136140
rt: &RT,
@@ -170,6 +174,11 @@ pub trait SearchIndex {
170174
) -> anyhow::Result<Vec<Self::Segment>>;
171175
}
172176

177+
pub trait SegmentStatistics: Default {
178+
fn add(lhs: anyhow::Result<Self>, rhs: anyhow::Result<Self>) -> anyhow::Result<Self>;
179+
fn log(&self);
180+
}
181+
173182
pub struct TextSearchIndex;
174183
#[async_trait]
175184
impl SearchIndex for TextSearchIndex {
@@ -178,6 +187,7 @@ impl SearchIndex for TextSearchIndex {
178187
type PreviousSegments = ();
179188
type Schema = ();
180189
type Segment = FragmentedSearchSegment;
190+
type Statistics = TextStatistics;
181191

182192
fn get_index_sizes(snapshot: Snapshot) -> anyhow::Result<BTreeMap<IndexId, usize>> {
183193
Ok(snapshot
@@ -230,19 +240,26 @@ impl SearchIndex for TextSearchIndex {
230240
anyhow::bail!("Not implemented")
231241
}
232242

233-
fn num_vectors(_segment: &Self::Segment) -> u32 {
234-
0
235-
}
236-
237243
fn segment_id(_segment: &Self::Segment) -> String {
238244
"".to_string()
239245
}
240246

241-
fn non_deleted_vectors(_segment: &Self::Segment) -> anyhow::Result<u64> {
242-
anyhow::bail!("Not implemented")
247+
fn statistics(_segment: &Self::Segment) -> anyhow::Result<Self::Statistics> {
248+
Ok(TextStatistics)
243249
}
244250
}
245251

252+
#[derive(Default)]
253+
pub struct TextStatistics;
254+
255+
impl SegmentStatistics for TextStatistics {
256+
fn add(_: anyhow::Result<Self>, _: anyhow::Result<Self>) -> anyhow::Result<Self> {
257+
Ok(Self)
258+
}
259+
260+
fn log(&self) {}
261+
}
262+
246263
#[derive(Debug)]
247264
pub struct VectorSearchIndex;
248265

@@ -253,6 +270,7 @@ impl SearchIndex for VectorSearchIndex {
253270
type PreviousSegments = Vec<MutableFragmentedSegmentMetadata>;
254271
type Schema = QdrantSchema;
255272
type Segment = FragmentedVectorSegment;
273+
type Statistics = VectorStatistics;
256274

257275
fn get_index_sizes(snapshot: Snapshot) -> anyhow::Result<BTreeMap<IndexId, usize>> {
258276
Ok(snapshot
@@ -322,16 +340,16 @@ impl SearchIndex for VectorSearchIndex {
322340
upload_segment(rt, storage, new_segment).await
323341
}
324342

325-
fn num_vectors(segment: &Self::Segment) -> u32 {
326-
segment.num_vectors
327-
}
328-
329343
fn segment_id(segment: &Self::Segment) -> String {
330344
segment.id.clone()
331345
}
332346

333-
fn non_deleted_vectors(segment: &Self::Segment) -> anyhow::Result<u64> {
334-
segment.non_deleted_vectors()
347+
fn statistics(segment: &Self::Segment) -> anyhow::Result<Self::Statistics> {
348+
let non_deleted_vectors = segment.non_deleted_vectors()?;
349+
Ok(VectorStatistics {
350+
non_deleted_vectors,
351+
num_vectors: segment.num_vectors,
352+
})
335353
}
336354
}
337355
pub struct SearchIndexConfig<T: SearchIndex> {
@@ -350,6 +368,27 @@ pub struct BackfillState<T: SearchIndex> {
350368
pub backfill_snapshot_ts: Option<Timestamp>,
351369
}
352370

371+
#[derive(Debug, Default)]
372+
pub struct VectorStatistics {
373+
pub num_vectors: u32,
374+
pub non_deleted_vectors: u64,
375+
}
376+
377+
impl SegmentStatistics for VectorStatistics {
378+
fn add(lhs: anyhow::Result<Self>, rhs: anyhow::Result<Self>) -> anyhow::Result<Self> {
379+
let rhs = rhs?;
380+
let lhs = lhs?;
381+
Ok(Self {
382+
num_vectors: lhs.num_vectors + rhs.num_vectors,
383+
non_deleted_vectors: lhs.non_deleted_vectors + rhs.non_deleted_vectors,
384+
})
385+
}
386+
387+
fn log(&self) {
388+
log_documents_per_segment(self.non_deleted_vectors);
389+
}
390+
}
391+
353392
impl From<VectorIndexBackfillState> for BackfillState<VectorSearchIndex> {
354393
fn from(value: VectorIndexBackfillState) -> Self {
355394
Self {

crates/database/src/index_workers/search_flusher.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ use crate::{
5050
SearchIndexConfigParser,
5151
SearchOnDiskState,
5252
SearchSnapshot,
53+
SegmentStatistics,
5354
SnapshotData,
5455
},
5556
BuildReason,
5657
MultiSegmentBackfillResult,
5758
},
58-
metrics::vector::log_documents_per_segment,
5959
Database,
6060
IndexModel,
6161
Token,
@@ -238,7 +238,10 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
238238
None
239239
};
240240
let new_segment_id = new_segment.as_ref().map(T::IndexType::segment_id);
241-
let vectors_in_new_segment = new_segment.as_ref().map(T::IndexType::num_vectors);
241+
let new_segment_stats = new_segment
242+
.as_ref()
243+
.map(T::IndexType::statistics)
244+
.transpose()?;
242245

243246
let new_and_updated_parts = if let Some(new_segment) = new_segment {
244247
updated_previous_segments
@@ -249,21 +252,23 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
249252
updated_previous_segments
250253
};
251254

252-
let total_vectors = new_and_updated_parts
255+
let total_stats = new_and_updated_parts
253256
.iter()
254257
.map(|segment| {
255-
let total_vectors = T::IndexType::non_deleted_vectors(segment)?;
256-
log_documents_per_segment(total_vectors);
257-
Ok(total_vectors)
258+
let segment_stats = T::IndexType::statistics(segment)?;
259+
segment_stats.log();
260+
Ok(segment_stats)
258261
})
259-
.sum::<anyhow::Result<_>>()?;
262+
.reduce(SegmentStatistics::add)
263+
.transpose()?
264+
.unwrap_or_default();
260265
let data = SnapshotData::MultiSegment(new_and_updated_parts);
261266

262267
Ok(IndexBuildResult {
263268
snapshot_ts: *new_ts,
264269
data,
265-
total_vectors,
266-
vectors_in_new_segment,
270+
total_stats,
271+
new_segment_stats,
267272
new_segment_id,
268273
backfill_result,
269274
})
@@ -442,8 +447,8 @@ pub struct IndexBuild<T: SearchIndex> {
442447
pub struct IndexBuildResult<T: SearchIndex> {
443448
pub snapshot_ts: Timestamp,
444449
pub data: SnapshotData<T::Segment>,
445-
pub total_vectors: u64,
446-
pub vectors_in_new_segment: Option<u32>,
450+
pub total_stats: T::Statistics,
451+
pub new_segment_stats: Option<T::Statistics>,
447452
pub new_segment_id: Option<String>,
448453
// If this is set, this iteration made progress on backfilling an index
449454
pub backfill_result: Option<MultiSegmentBackfillResult>,

crates/database/src/vector_index_worker/flusher.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ impl<RT: Runtime> VectorIndexFlusher<RT> {
112112
let IndexBuildResult {
113113
snapshot_ts,
114114
data,
115-
total_vectors,
116-
vectors_in_new_segment,
115+
total_stats,
116+
new_segment_stats,
117117
new_segment_id,
118118
backfill_result,
119119
} = result;
@@ -129,8 +129,8 @@ impl<RT: Runtime> VectorIndexFlusher<RT> {
129129
},
130130
}
131131

132-
let vectors_in_new_segment = vectors_in_new_segment.unwrap_or(0);
133-
metrics::vector::log_documents_per_index(total_vectors);
132+
let vectors_in_new_segment = new_segment_stats.unwrap_or_default().num_vectors;
133+
metrics::vector::log_documents_per_index(total_stats.non_deleted_vectors);
134134
metrics::vector::log_documents_per_new_segment(vectors_in_new_segment);
135135
timer.finish();
136136
Ok(vectors_in_new_segment)

0 commit comments

Comments
 (0)