Skip to content

Commit e147dad

Browse files
sjuddConvex, Inc.
authored and
Convex, Inc.
committed
Remove vector index flusher in favor of factory methods for SearchFlusher (#26359)
GitOrigin-RevId: eb3f6568ea5280ad6579bf8e61235d4eb22c75be
1 parent 172be4c commit e147dad

File tree

9 files changed

+154
-200
lines changed

9 files changed

+154
-200
lines changed

crates/database/src/index_workers/search_flusher.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ pub struct SearchIndexLimits {
135135
}
136136

137137
impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
138-
pub fn new(
138+
pub(crate) fn new(
139139
runtime: RT,
140140
database: Database<RT>,
141141
reader: Arc<dyn PersistenceReader>,
@@ -170,6 +170,10 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
170170
}
171171
}
172172

173+
/// Run one step of the flusher's main loop.
174+
///
175+
/// Returns a map of IndexName to number of documents indexed for each
176+
/// index that was built.
173177
pub async fn step(&mut self) -> anyhow::Result<(BTreeMap<TabletIndexName, u64>, Token)> {
174178
let mut metrics = BTreeMap::new();
175179

@@ -239,7 +243,7 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
239243
}
240244

241245
/// Compute the set of indexes that need to be backfilled.
242-
pub async fn needs_backfill(&self) -> anyhow::Result<(Vec<IndexBuild<T::IndexType>>, Token)> {
246+
async fn needs_backfill(&self) -> anyhow::Result<(Vec<IndexBuild<T::IndexType>>, Token)> {
243247
let mut to_build = vec![];
244248

245249
let mut tx = self.database.begin(Identity::system()).await?;
@@ -313,7 +317,7 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
313317
Ok((to_build, tx.into_token()?))
314318
}
315319

316-
pub async fn build_multipart_segment(
320+
async fn build_multipart_segment(
317321
&self,
318322
job: &IndexBuild<T::IndexType>,
319323
build_index_args: <T::IndexType as SearchIndex>::BuildIndexArgs,
@@ -590,13 +594,13 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
590594
}
591595
}
592596

593-
pub struct IndexBuild<T: SearchIndex> {
594-
pub index_name: TabletIndexName,
595-
pub index_id: IndexId,
596-
pub by_id: IndexId,
597-
pub metadata_id: ResolvedDocumentId,
598-
pub index_config: SearchIndexConfig<T>,
599-
pub build_reason: BuildReason,
597+
pub(crate) struct IndexBuild<T: SearchIndex> {
598+
pub(crate) index_name: TabletIndexName,
599+
pub(crate) index_id: IndexId,
600+
pub(crate) by_id: IndexId,
601+
pub(crate) metadata_id: ResolvedDocumentId,
602+
pub(crate) index_config: SearchIndexConfig<T>,
603+
pub(crate) build_reason: BuildReason,
600604
}
601605

602606
#[derive(Debug)]

crates/database/src/index_workers/search_worker.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,14 @@ use crate::{
3636
writer::SearchIndexMetadataWriter,
3737
},
3838
metrics::log_worker_starting,
39-
text_index_worker::flusher2::TextIndexFlusher2,
40-
vector_index_worker::compactor::CompactionConfig,
39+
text_index_worker::flusher2::{
40+
new_text_flusher,
41+
TextIndexFlusher2,
42+
},
43+
vector_index_worker::{
44+
compactor::CompactionConfig,
45+
flusher::new_vector_flusher,
46+
},
4147
Database,
4248
TextIndexFlusher,
4349
VectorIndexCompactor,
@@ -86,7 +92,7 @@ impl<RT: Runtime> SearchIndexWorker<RT> {
8692
database.clone(),
8793
// Wait a bit since vector needs time to bootstrap. Makes startup logs a bit cleaner.
8894
Duration::from_secs(5),
89-
SearchIndexWorker::VectorFlusher(VectorIndexFlusher::new(
95+
SearchIndexWorker::VectorFlusher(new_vector_flusher(
9096
runtime.clone(),
9197
database.clone(),
9298
reader.clone(),
@@ -108,7 +114,7 @@ impl<RT: Runtime> SearchIndexWorker<RT> {
108114
)),
109115
);
110116
let text_flusher = if *BUILD_MULTI_SEGMENT_TEXT_INDEXES {
111-
SearchIndexWorker::TextFlusher2(TextIndexFlusher2::new(
117+
SearchIndexWorker::TextFlusher2(new_text_flusher(
112118
runtime.clone(),
113119
database.clone(),
114120
reader,

crates/database/src/search_and_vector_bootstrap.rs

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ mod tests {
608608
DbFixtures,
609609
DbFixturesArgs,
610610
},
611-
vector_index_worker::flusher::VectorIndexFlusher,
611+
vector_index_worker::flusher::backfill_vector_indexes,
612612
Database,
613613
IndexModel,
614614
SystemMetadataModel,
@@ -632,7 +632,13 @@ mod tests {
632632

633633
let db = reopen_db(&rt, &fixtures).await?;
634634
add_vector(&db, &index_metadata, [1f32, 2f32]).await?;
635-
backfill_vector_indexes(&rt, &db, fixtures.tp.reader(), 0, fixtures.search_storage).await?;
635+
backfill_vector_indexes(
636+
rt.clone(),
637+
db.clone(),
638+
fixtures.tp.reader(),
639+
fixtures.search_storage,
640+
)
641+
.await?;
636642

637643
// This is a bit of a hack, backfilling with zero size forces all indexes to be
638644
// written to disk, which causes our boostrapping process to skip our
@@ -832,7 +838,7 @@ mod tests {
832838
.add_application_index(index_metadata.clone())
833839
.await?;
834840
db.commit(tx).await?;
835-
backfill_vector_indexes(rt, db, reader, 1000, storage.clone()).await?;
841+
backfill_vector_indexes(rt.clone(), db.clone(), reader, storage.clone()).await?;
836842
let mut tx = db.begin_system().await?;
837843
let resolved_index = IndexModel::new(&mut tx)
838844
.pending_index_metadata(TableNamespace::Global, &index_metadata.name)?
@@ -945,24 +951,6 @@ mod tests {
945951
Ok(metadata)
946952
}
947953

948-
async fn backfill_vector_indexes(
949-
rt: &TestRuntime,
950-
db: &Database<TestRuntime>,
951-
reader: Arc<dyn PersistenceReader>,
952-
index_size_soft_limit: usize,
953-
storage: Arc<dyn Storage>,
954-
) -> anyhow::Result<()> {
955-
VectorIndexFlusher::backfill_all_in_test(
956-
rt.clone(),
957-
db.clone(),
958-
reader,
959-
storage,
960-
index_size_soft_limit,
961-
)
962-
.await?;
963-
Ok(())
964-
}
965-
966954
#[convex_macro::test_runtime]
967955
async fn test_load_snapshot_without_fast_forward(rt: TestRuntime) -> anyhow::Result<()> {
968956
let db_fixtures = DbFixtures::new(&rt).await?;

crates/database/src/tests/vector_test_utils.rs

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ use crate::{
9191
test_helpers::DbFixturesArgs,
9292
vector_index_worker::{
9393
compactor::CompactionConfig,
94-
flusher::VectorIndexFlusher,
94+
flusher::{
95+
backfill_vector_indexes,
96+
new_vector_flusher_for_tests,
97+
VectorIndexFlusher,
98+
},
9599
},
96100
Database,
97101
IndexModel,
@@ -151,7 +155,7 @@ impl VectorFixtures {
151155
}
152156

153157
pub async fn backfill(&self) -> anyhow::Result<()> {
154-
backfill(
158+
backfill_vector_indexes(
155159
self.rt.clone(),
156160
self.db.clone(),
157161
self.reader.clone(),
@@ -256,7 +260,7 @@ impl VectorFixtures {
256260

257261
pub async fn run_compaction_during_flush(&self) -> anyhow::Result<()> {
258262
let (mut pause, pause_client) = PauseController::new([FLUSH_RUNNING_LABEL]);
259-
let mut flusher = VectorIndexFlusher::new_for_tests(
263+
let mut flusher = new_vector_flusher_for_tests(
260264
self.rt.clone(),
261265
self.db.clone(),
262266
self.reader.clone(),
@@ -284,7 +288,7 @@ impl VectorFixtures {
284288
&self,
285289
full_scan_threshold_kb: usize,
286290
) -> anyhow::Result<VectorIndexFlusher<TestRuntime>> {
287-
Ok(VectorIndexFlusher::new_for_tests(
291+
Ok(new_vector_flusher_for_tests(
288292
self.rt.clone(),
289293
self.db.clone(),
290294
self.reader.clone(),
@@ -301,7 +305,7 @@ impl VectorFixtures {
301305
&self,
302306
incremental_part_threshold: usize,
303307
) -> anyhow::Result<VectorIndexFlusher<TestRuntime>> {
304-
Ok(VectorIndexFlusher::new_for_tests(
308+
Ok(new_vector_flusher_for_tests(
305309
self.rt.clone(),
306310
self.db.clone(),
307311
self.reader.clone(),
@@ -449,7 +453,7 @@ pub async fn backfilled_vector_index(
449453
storage: Arc<dyn Storage>,
450454
) -> anyhow::Result<IndexData> {
451455
let index_data = backfilling_vector_index(&db).await?;
452-
backfill(rt, db.clone(), reader, storage).await?;
456+
backfill_vector_indexes(rt, db.clone(), reader, storage).await?;
453457

454458
Ok(index_data)
455459
}
@@ -502,18 +506,6 @@ pub async fn backfilled_vector_index_with_doc(
502506
Ok(result)
503507
}
504508

505-
// Avoid using index_test_utils.backfill because vector indexes use storage so
506-
// we need to pass around a consistent instance of storage but index_test_utils
507-
// creates a new one each time.
508-
pub async fn backfill<RT: Runtime>(
509-
rt: RT,
510-
db: Database<RT>,
511-
reader: Arc<dyn PersistenceReader>,
512-
storage: Arc<dyn Storage>,
513-
) -> anyhow::Result<()> {
514-
VectorIndexFlusher::backfill_all_in_test(rt.clone(), db.clone(), reader, storage, 10).await
515-
}
516-
517509
pub(crate) async fn assert_backfilled(
518510
database: &Database<TestRuntime>,
519511
namespace: TableNamespace,
@@ -651,7 +643,7 @@ impl<RT: Runtime> VectorSearcher for DeleteOnCompactSearchlight<RT> {
651643
.delete(self.to_delete.into())
652644
.await?;
653645
self.db.commit(tx).await?;
654-
backfill(
646+
backfill_vector_indexes(
655647
self.rt.clone(),
656648
self.db.clone(),
657649
self.reader.clone(),

crates/database/src/tests/vector_tests.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@ use crate::{
8686
IndexData,
8787
VectorFixtures,
8888
},
89-
vector_index_worker::flusher::VectorIndexFlusher,
89+
vector_index_worker::flusher::{
90+
backfill_vector_indexes,
91+
new_vector_flusher_for_tests,
92+
},
9093
Database,
9194
IndexModel,
9295
TableModel,
@@ -213,12 +216,11 @@ impl<RT: Runtime> Scenario<RT> {
213216
}
214217

215218
async fn backfill(&self) -> anyhow::Result<()> {
216-
VectorIndexFlusher::backfill_all_in_test(
219+
backfill_vector_indexes(
217220
self.rt.clone(),
218221
self.database.clone(),
219222
self.reader.clone(),
220223
self.search_storage.clone(),
221-
0,
222224
)
223225
.await?;
224226
Ok(())
@@ -759,7 +761,7 @@ async fn test_index_backfill_is_incremental(rt: TestRuntime) -> anyhow::Result<(
759761
scenario.add_vector_index(false).await?;
760762

761763
// Create flusher
762-
let mut flusher = VectorIndexFlusher::new_for_tests(
764+
let mut flusher = new_vector_flusher_for_tests(
763765
rt.clone(),
764766
scenario.database.clone(),
765767
scenario.reader.clone(),
@@ -844,7 +846,7 @@ async fn test_incremental_backfill_with_compaction(rt: TestRuntime) -> anyhow::R
844846
scenario.add_vector_index(false).await?;
845847

846848
// Create flusher
847-
let mut flusher = VectorIndexFlusher::new_for_tests(
849+
let mut flusher = new_vector_flusher_for_tests(
848850
rt.clone(),
849851
scenario.database.clone(),
850852
scenario.reader.clone(),

crates/database/src/text_index_worker/flusher2.rs

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
1-
use std::{
2-
collections::BTreeMap,
3-
sync::Arc,
4-
};
1+
use std::sync::Arc;
52

63
#[cfg(any(test, feature = "testing"))]
74
use common::pause::PauseClient;
85
use common::{
96
knobs::SEARCH_INDEX_SIZE_SOFT_LIMIT,
107
persistence::PersistenceReader,
118
runtime::Runtime,
12-
types::TabletIndexName,
139
};
1410
use search::{
1511
metrics::SearchType,
@@ -31,13 +27,8 @@ use crate::{
3127
TextSearchIndex,
3228
},
3329
Database,
34-
Token,
3530
};
3631

37-
pub struct TextIndexFlusher2<RT: Runtime> {
38-
flusher: SearchFlusher<RT, TextIndexConfigParser>,
39-
}
40-
4132
pub(crate) struct FlusherBuilder<RT: Runtime> {
4233
runtime: RT,
4334
database: Database<RT>,
@@ -101,7 +92,7 @@ impl<RT: Runtime> FlusherBuilder<RT> {
10192
self.storage.clone(),
10293
SearchType::Text,
10394
);
104-
let flusher = SearchFlusher::new(
95+
SearchFlusher::new(
10596
self.runtime,
10697
self.database,
10798
self.reader,
@@ -115,30 +106,20 @@ impl<RT: Runtime> FlusherBuilder<RT> {
115106
},
116107
#[cfg(any(test, feature = "testing"))]
117108
self.pause_client,
118-
);
119-
120-
TextIndexFlusher2 { flusher }
109+
)
121110
}
122111
}
123112

124-
impl<RT: Runtime> TextIndexFlusher2<RT> {
125-
pub(crate) fn new(
126-
runtime: RT,
127-
database: Database<RT>,
128-
reader: Arc<dyn PersistenceReader>,
129-
storage: Arc<dyn Storage>,
130-
segment_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>,
131-
) -> Self {
132-
FlusherBuilder::new(runtime, database, reader, storage, segment_metadata_fetcher).build()
133-
}
113+
pub type TextIndexFlusher2<RT> = SearchFlusher<RT, TextIndexConfigParser>;
134114

135-
/// Run one step of the IndexFlusher's main loop.
136-
///
137-
/// Returns a map of IndexName to number of documents indexed for each
138-
/// index that was built.
139-
pub(crate) async fn step(&mut self) -> anyhow::Result<(BTreeMap<TabletIndexName, u64>, Token)> {
140-
self.flusher.step().await
141-
}
115+
pub(crate) fn new_text_flusher<RT: Runtime>(
116+
runtime: RT,
117+
database: Database<RT>,
118+
reader: Arc<dyn PersistenceReader>,
119+
storage: Arc<dyn Storage>,
120+
segment_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>,
121+
) -> TextIndexFlusher2<RT> {
122+
FlusherBuilder::new(runtime, database, reader, storage, segment_metadata_fetcher).build()
142123
}
143124

144125
#[cfg(test)]

0 commit comments

Comments
 (0)