Skip to content

Commit 0e01a7c

Browse files
sjuddConvex, Inc.
authored and
Convex, Inc.
committed
Use the latest fast forward ts when flushing vector segments (#24149)
GitOrigin-RevId: ce21be9bd6b4820d518fa4bf19716086445b2cda
1 parent f3b959a commit 0e01a7c

File tree

2 files changed

+127
-30
lines changed

2 files changed

+127
-30
lines changed

crates/database/src/vector_index_worker/flusher.rs

Lines changed: 126 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ impl<RT: Runtime> VectorIndexFlusher<RT> {
222222
.await?;
223223
let job = IndexBuild {
224224
index_name: name.clone(),
225+
index_id: index_id.internal_id(),
225226
by_id: by_id_metadata.id().internal_id(),
226227
developer_config: developer_config.clone(),
227228
metadata_id: index_id,
@@ -272,17 +273,17 @@ impl<RT: Runtime> VectorIndexFlusher<RT> {
272273
let index_path = TempDir::new()?;
273274
let mut tx = self.database.begin(Identity::system()).await?;
274275
let table_id = tx.table_mapping().inject_table_number()(*job.index_name.table())?;
275-
let mut snapshot_ts = tx.begin_timestamp();
276+
let mut new_ts = tx.begin_timestamp();
276277
let (previous_segments, build_type) = match job.on_disk_state {
277278
VectorIndexState::Backfilling(ref backfill_state) => {
278279
let backfill_snapshot_ts = backfill_state
279280
.backfill_snapshot_ts
280-
.map(|ts| snapshot_ts.prior_ts(ts))
281+
.map(|ts| new_ts.prior_ts(ts))
281282
.transpose()?
282-
.unwrap_or(snapshot_ts);
283+
.unwrap_or(new_ts);
283284
// For backfilling indexes, the snapshot timestamp we return is the backfill
284285
// snapshot timestamp
285-
snapshot_ts = backfill_snapshot_ts;
286+
new_ts = backfill_snapshot_ts;
286287

287288
let cursor = backfill_state.cursor;
288289

@@ -303,13 +304,18 @@ impl<RT: Runtime> VectorIndexFlusher<RT> {
303304
vec![],
304305
MultipartBuildType::IncrementalComplete {
305306
cursor: None,
306-
backfill_snapshot_ts: snapshot_ts,
307+
backfill_snapshot_ts: new_ts,
307308
},
308309
),
309-
VectorIndexSnapshotData::MultiSegment(ref parts) => (
310-
parts.clone(),
311-
MultipartBuildType::Partial(snapshot_ts.prior_ts(snapshot.ts)?),
312-
),
310+
VectorIndexSnapshotData::MultiSegment(ref parts) => {
311+
let ts = IndexWorkerMetadataModel::new(&mut tx)
312+
.get_fast_forward_ts(snapshot.ts, job.index_id)
313+
.await?;
314+
(
315+
parts.clone(),
316+
MultipartBuildType::Partial(new_ts.prior_ts(ts)?),
317+
)
318+
},
313319
}
314320
},
315321
};
@@ -319,13 +325,7 @@ impl<RT: Runtime> VectorIndexFlusher<RT> {
319325
updated_previous_segments,
320326
backfill_result,
321327
} = self
322-
.build_multipart_segment_in_dir(
323-
job,
324-
&index_path,
325-
snapshot_ts,
326-
build_type,
327-
previous_segments,
328-
)
328+
.build_multipart_segment_in_dir(job, &index_path, new_ts, build_type, previous_segments)
329329
.await?;
330330

331331
let new_segment = if let Some(new_segment) = new_segment {
@@ -358,7 +358,7 @@ impl<RT: Runtime> VectorIndexFlusher<RT> {
358358
let data = VectorIndexSnapshotData::MultiSegment(new_and_updated_parts);
359359

360360
Ok(IndexBuildResult {
361-
snapshot_ts: *snapshot_ts,
361+
snapshot_ts: *new_ts,
362362
data,
363363
total_vectors,
364364
vectors_in_new_segment,
@@ -565,6 +565,7 @@ impl<RT: Runtime> VectorIndexFlusher<RT> {
565565
let worker = Self::new(runtime, database, storage, writer);
566566
let job = IndexBuild {
567567
index_name,
568+
index_id: metadata.clone().into_id_and_value().0.internal_id(),
568569
by_id: by_id_metadata.id().internal_id(),
569570
developer_config: developer_config.clone(),
570571
metadata_id: metadata.clone().id(),
@@ -679,6 +680,11 @@ mod tests {
679680
MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB,
680681
VECTOR_INDEX_SIZE_SOFT_LIMIT,
681682
},
683+
runtime::Runtime,
684+
types::{
685+
IndexId,
686+
IndexName,
687+
},
682688
};
683689
use keybroker::Identity;
684690
use maplit::{
@@ -697,12 +703,14 @@ mod tests {
697703
TableIdAndTableNumber,
698704
};
699705
use vector::{
706+
PublicVectorSearchQueryResult,
700707
QdrantExternalId,
701708
VectorSearch,
702709
};
703710

704711
use super::VectorIndexFlusher;
705712
use crate::{
713+
bootstrap_model::index_workers::IndexWorkerMetadataModel,
706714
test_helpers::new_test_database,
707715
tests::vector_test_utils::{
708716
add_document_vec,
@@ -714,6 +722,7 @@ mod tests {
714722
vector_index_worker::compactor::CompactionConfig,
715723
Database,
716724
IndexModel,
725+
SystemMetadataModel,
717726
UserFacingModel,
718727
};
719728

@@ -1489,22 +1498,109 @@ mod tests {
14891498
let (metrics, _) = worker.step().await?;
14901499
assert_eq!(metrics, btreemap! {resolved_index_name.clone() => 1});
14911500

1492-
let (results, _usage_stats) = fixtures
1493-
.db
1494-
.vector_search(
1495-
Identity::system(),
1496-
VectorSearch {
1497-
index_name: index_name.clone(),
1498-
vector: vector.into_iter().map(|value| value as f32).collect(),
1499-
limit: Some(1),
1500-
expressions: btreeset![],
1501-
},
1502-
)
1503-
.await?;
1504-
1501+
let results = vector_search(&fixtures.db, index_name.clone(), vector).await?;
15051502
assert_eq!(results.first().unwrap().id.internal_id(), id.internal_id());
15061503
}
15071504

15081505
Ok(())
15091506
}
1507+
1508+
async fn set_fast_forward_time_to_now<RT: Runtime>(
1509+
db: &Database<RT>,
1510+
index_id: IndexId,
1511+
) -> anyhow::Result<()> {
1512+
let mut tx = db.begin_system().await?;
1513+
let metadata = IndexWorkerMetadataModel::new(&mut tx)
1514+
.get_or_create_vector_search(index_id)
1515+
.await?;
1516+
let (worker_meta_doc_id, mut metadata) = metadata.into_id_and_value();
1517+
*metadata.index_metadata.mut_fast_forward_ts() = *tx.begin_timestamp();
1518+
SystemMetadataModel::new(&mut tx)
1519+
.replace(worker_meta_doc_id, metadata.try_into()?)
1520+
.await?;
1521+
db.commit(tx).await?;
1522+
Ok(())
1523+
}
1524+
1525+
#[convex_macro::test_runtime]
1526+
async fn multi_segment_with_newer_fast_forward_time_builds_from_fast_forward_time(
1527+
rt: TestRuntime,
1528+
) -> anyhow::Result<()> {
1529+
let fixtures = VectorFixtures::new(rt.clone()).await?;
1530+
1531+
let IndexData {
1532+
index_name,
1533+
index_id,
1534+
resolved_index_name,
1535+
} = fixtures.enabled_vector_index().await?;
1536+
1537+
let vector = [8f64, 9f64];
1538+
fixtures
1539+
.add_document_vec_array(index_name.table(), vector)
1540+
.await?;
1541+
1542+
set_fast_forward_time_to_now(&fixtures.db, index_id.internal_id()).await?;
1543+
1544+
let mut worker = fixtures.new_index_flusher_with_full_scan_threshold(0)?;
1545+
let (metrics, _) = worker.step().await?;
1546+
assert_eq!(metrics, btreemap! {resolved_index_name.clone() => 0});
1547+
1548+
let results = vector_search(&fixtures.db, index_name, vector).await?;
1549+
1550+
assert!(results.is_empty());
1551+
1552+
Ok(())
1553+
}
1554+
1555+
#[convex_macro::test_runtime]
1556+
async fn multi_segment_with_older_fast_forward_time_builds_from_index_time(
1557+
rt: TestRuntime,
1558+
) -> anyhow::Result<()> {
1559+
let fixtures = VectorFixtures::new(rt.clone()).await?;
1560+
1561+
let IndexData {
1562+
index_name,
1563+
index_id,
1564+
resolved_index_name,
1565+
} = fixtures.enabled_vector_index().await?;
1566+
1567+
set_fast_forward_time_to_now(&fixtures.db, index_id.internal_id()).await?;
1568+
1569+
let vector = [8f64, 9f64];
1570+
let vector_doc_id = fixtures
1571+
.add_document_vec_array(index_name.table(), vector)
1572+
.await?;
1573+
1574+
let mut worker = fixtures.new_index_flusher_with_full_scan_threshold(0)?;
1575+
let (metrics, _) = worker.step().await?;
1576+
assert_eq!(metrics, btreemap! {resolved_index_name.clone() => 1});
1577+
1578+
let results = vector_search(&fixtures.db, index_name, vector).await?;
1579+
1580+
assert_eq!(
1581+
results.first().unwrap().id.internal_id(),
1582+
vector_doc_id.internal_id()
1583+
);
1584+
1585+
Ok(())
1586+
}
1587+
1588+
async fn vector_search<RT: Runtime>(
1589+
db: &Database<RT>,
1590+
index_name: IndexName,
1591+
vector: [f64; 2],
1592+
) -> anyhow::Result<Vec<PublicVectorSearchQueryResult>> {
1593+
Ok(db
1594+
.vector_search(
1595+
Identity::system(),
1596+
VectorSearch {
1597+
index_name,
1598+
vector: vector.into_iter().map(|value| value as f32).collect(),
1599+
limit: Some(1),
1600+
expressions: btreeset![],
1601+
},
1602+
)
1603+
.await?
1604+
.0)
1605+
}
15101606
}

crates/database/src/vector_index_worker/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::index_workers::BuildReason;
1919

2020
pub(crate) struct IndexBuild {
2121
index_name: TabletIndexName,
22+
index_id: IndexId,
2223
by_id: IndexId,
2324
developer_config: DeveloperVectorIndexConfig,
2425
metadata_id: ResolvedDocumentId,

0 commit comments

Comments
 (0)