@@ -16,7 +16,10 @@ use common::{
16
16
self ,
17
17
backoff:: Backoff ,
18
18
bootstrap_model:: index:: {
19
- database_index:: DatabaseIndexState ,
19
+ database_index:: {
20
+ DatabaseIndexState ,
21
+ IndexedFields ,
22
+ } ,
20
23
IndexConfig ,
21
24
IndexMetadata ,
22
25
TabletIndexMetadata ,
@@ -64,6 +67,7 @@ use common::{
64
67
RepeatableTimestamp ,
65
68
TabletIndexName ,
66
69
Timestamp ,
70
+ WriteTimestamp ,
67
71
} ,
68
72
value:: {
69
73
ResolvedDocumentId ,
@@ -85,7 +89,10 @@ use futures::{
85
89
use governor:: Quota ;
86
90
use indexing:: index_registry:: IndexRegistry ;
87
91
use keybroker:: Identity ;
88
- use maplit:: btreeset;
92
+ use maplit:: {
93
+ btreemap,
94
+ btreeset,
95
+ } ;
89
96
use tracing:: log;
90
97
use value:: InternalDocumentId ;
91
98
@@ -95,6 +102,7 @@ use crate::{
95
102
log_num_indexes_to_backfill,
96
103
log_worker_starting,
97
104
} ,
105
+ retention:: LeaderRetentionManager ,
98
106
Database ,
99
107
ResolvedQuery ,
100
108
SystemMetadataModel ,
@@ -347,6 +355,11 @@ impl<RT: Runtime> IndexWorker<RT> {
347
355
index_selector,
348
356
)
349
357
. await ?;
358
+ let ( backfill_begin_ts, index_name, indexed_fields) =
359
+ self . begin_retention ( index_id) . await ?;
360
+ self . index_writer
361
+ . run_retention ( index_id, backfill_begin_ts, index_name, indexed_fields)
362
+ . await ?;
350
363
self . finish_backfill ( index_id) . await ?;
351
364
Ok ( ( ) )
352
365
}
@@ -386,6 +399,47 @@ impl<RT: Runtime> IndexWorker<RT> {
386
399
Ok ( index_metadata. name . clone ( ) )
387
400
}
388
401
402
+ async fn begin_retention (
403
+ & mut self ,
404
+ index_id : IndexId ,
405
+ ) -> anyhow:: Result < ( Timestamp , TabletIndexName , IndexedFields ) > {
406
+ let mut tx = self . database . begin ( Identity :: system ( ) ) . await ?;
407
+ let index_table_id = tx. bootstrap_tables ( ) . index_id ;
408
+
409
+ let ( index_doc, index_ts) = tx
410
+ . get_with_ts ( ResolvedDocumentId :: new ( index_table_id, index_id) )
411
+ . await ?
412
+ . ok_or_else ( || anyhow:: anyhow!( "Index {index_id:?} no longer exists" ) ) ?;
413
+ let index_metadata = TabletIndexMetadata :: from_document ( index_doc) ?;
414
+
415
+ // Assuming that the IndexWorker is the only writer of index state, we expect
416
+ // the state to still be `Backfilling` here. If this assertion fails, we
417
+ // somehow raced with another `IndexWorker`(!) or don't actually have the
418
+ // database lease (!).
419
+ let indexed_fields = match & index_metadata. config {
420
+ IndexConfig :: Database {
421
+ on_disk_state,
422
+ developer_config,
423
+ } => {
424
+ anyhow:: ensure!(
425
+ matches!( * on_disk_state, DatabaseIndexState :: Backfilling ( _) ) ,
426
+ "IndexWorker started backfilling index {index_metadata:?} not in Backfilling \
427
+ state",
428
+ ) ;
429
+ developer_config. fields . clone ( )
430
+ } ,
431
+ _ => anyhow:: bail!(
432
+ "IndexWorker attempted to backfill an index {index_metadata:?} which wasn't a \
433
+ database index."
434
+ ) ,
435
+ } ;
436
+ drop ( tx) ;
437
+ let WriteTimestamp :: Committed ( index_ts) = index_ts else {
438
+ anyhow:: bail!( "index {index_id} is pending write" ) ;
439
+ } ;
440
+ Ok ( ( index_ts, index_metadata. name . clone ( ) , indexed_fields) )
441
+ }
442
+
389
443
async fn finish_backfill ( & mut self , index_id : IndexId ) -> anyhow:: Result < ( ) > {
390
444
// Now that we're done, write that we've finished backfilling the index, sanity
391
445
// checking that it wasn't written concurrently with our backfill.
@@ -796,4 +850,29 @@ impl<RT: Runtime> IndexWriter<RT> {
796
850
}
797
851
Ok ( ( ) )
798
852
}
853
+
854
+ async fn run_retention (
855
+ & self ,
856
+ index_id : IndexId ,
857
+ backfill_begin_ts : Timestamp ,
858
+ index_name : TabletIndexName ,
859
+ indexed_fields : IndexedFields ,
860
+ ) -> anyhow:: Result < ( ) > {
861
+ let min_snapshot_ts = self . retention_validator . min_snapshot_ts ( ) . await ?;
862
+ let all_indexes = btreemap ! { index_id => ( index_name, indexed_fields) } ;
863
+ // TODO(lee) add checkpointing.
864
+ let mut cursor_ts = backfill_begin_ts;
865
+ while cursor_ts. succ ( ) ? < min_snapshot_ts {
866
+ ( cursor_ts, _) = LeaderRetentionManager :: delete (
867
+ min_snapshot_ts,
868
+ self . persistence . clone ( ) ,
869
+ & self . runtime ,
870
+ backfill_begin_ts,
871
+ & all_indexes,
872
+ self . retention_validator . clone ( ) ,
873
+ )
874
+ . await ?;
875
+ }
876
+ Ok ( ( ) )
877
+ }
799
878
}
0 commit comments