@@ -86,6 +86,7 @@ use common::{
86
86
GenericIndexName ,
87
87
IndexId ,
88
88
PersistenceVersion ,
89
+ RepeatableTimestamp ,
89
90
TabletIndexName ,
90
91
Timestamp ,
91
92
} ,
@@ -256,7 +257,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
256
257
// We need to delete from all indexes that might be queried.
257
258
// Therefore we scan _index.by_id at min_snapshot_ts before min_snapshot_ts
258
259
// starts moving, and update the map before confirming any deletes.
259
- let indexes_at_min_snapshot = {
260
+ let mut all_indexes = {
260
261
let reader = persistence. reader ( ) ;
261
262
let snapshot_ts =
262
263
new_static_repeatable_ts ( min_snapshot_ts, reader. as_ref ( ) , & rt) . await ?;
@@ -280,6 +281,19 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
280
281
} ;
281
282
let index_table_id =
282
283
index_table_id. ok_or_else ( || anyhow:: anyhow!( "there must be at least one index" ) ) ?;
284
+ let mut index_cursor = min_snapshot_ts;
285
+ let latest_ts = snapshot_reader. lock ( ) . latest_ts ( ) ;
286
+ // Also update the set of indexes up to the current timestamp before document
287
+ // retention starts moving.
288
+ Self :: accumulate_indexes (
289
+ persistence. as_ref ( ) ,
290
+ & mut all_indexes,
291
+ & mut index_cursor,
292
+ latest_ts,
293
+ index_table_id,
294
+ follower_retention_manager. clone ( ) ,
295
+ )
296
+ . await ?;
283
297
284
298
let ( send_min_snapshot, receive_min_snapshot) = async_channel:: bounded ( 1 ) ;
285
299
let ( send_min_document_snapshot, receive_min_document_snapshot) = async_channel:: bounded ( 1 ) ;
@@ -303,9 +317,9 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
303
317
bounds_reader. clone ( ) ,
304
318
rt. clone ( ) ,
305
319
persistence. clone ( ) ,
306
- indexes_at_min_snapshot ,
320
+ all_indexes ,
307
321
index_table_id,
308
- min_snapshot_ts ,
322
+ index_cursor ,
309
323
follower_retention_manager. clone ( ) ,
310
324
receive_min_snapshot,
311
325
checkpoint_writer,
@@ -979,7 +993,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
979
993
bounds_reader : Reader < SnapshotBounds > ,
980
994
rt : RT ,
981
995
persistence : Arc < dyn Persistence > ,
982
- indexes_at_min_snapshot : BTreeMap < IndexId , ( GenericIndexName < TableId > , IndexedFields ) > ,
996
+ mut all_indexes : BTreeMap < IndexId , ( GenericIndexName < TableId > , IndexedFields ) > ,
983
997
index_table_id : TableIdAndTableNumber ,
984
998
mut index_cursor : Timestamp ,
985
999
retention_validator : Arc < dyn RetentionValidator > ,
@@ -988,7 +1002,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
988
1002
snapshot_reader : Reader < SnapshotManager > ,
989
1003
) {
990
1004
let reader = persistence. reader ( ) ;
991
- let mut all_indexes = indexes_at_min_snapshot;
992
1005
993
1006
let mut error_backoff = Backoff :: new ( INITIAL_BACKOFF , * MAX_RETENTION_DELAY_SECONDS ) ;
994
1007
let mut min_snapshot_ts = Timestamp :: default ( ) ;
@@ -1020,10 +1033,12 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1020
1033
)
1021
1034
. await ?;
1022
1035
tracing:: trace!( "go_delete: loaded checkpoint: {cursor:?}" ) ;
1036
+ let latest_ts = snapshot_reader. lock ( ) . latest_ts ( ) ;
1023
1037
Self :: accumulate_indexes (
1024
1038
persistence. as_ref ( ) ,
1025
1039
& mut all_indexes,
1026
1040
& mut index_cursor,
1041
+ latest_ts,
1027
1042
index_table_id,
1028
1043
retention_validator. clone ( ) ,
1029
1044
)
@@ -1040,10 +1055,12 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1040
1055
)
1041
1056
. await ?;
1042
1057
tracing:: trace!( "go_delete: finished running delete" ) ;
1058
+ let latest_ts = snapshot_reader. lock ( ) . latest_ts ( ) ;
1043
1059
Self :: accumulate_indexes (
1044
1060
persistence. as_ref ( ) ,
1045
1061
& mut all_indexes,
1046
1062
& mut index_cursor,
1063
+ latest_ts,
1047
1064
index_table_id,
1048
1065
retention_validator. clone ( ) ,
1049
1066
)
@@ -1279,20 +1296,21 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1279
1296
persistence : & dyn Persistence ,
1280
1297
all_indexes : & mut BTreeMap < IndexId , ( GenericIndexName < TableId > , IndexedFields ) > ,
1281
1298
cursor : & mut Timestamp ,
1299
+ latest_ts : RepeatableTimestamp ,
1282
1300
index_table_id : TableIdAndTableNumber ,
1283
1301
retention_validator : Arc < dyn RetentionValidator > ,
1284
1302
) -> anyhow:: Result < ( ) > {
1285
1303
let reader = persistence. reader ( ) ;
1286
1304
let mut document_stream = reader. load_documents (
1287
- TimestampRange :: greater_than ( * cursor) ,
1305
+ TimestampRange :: new ( * cursor.. * latest_ts ) ? ,
1288
1306
Order :: Asc ,
1289
1307
* DEFAULT_DOCUMENTS_PAGE_SIZE ,
1290
1308
retention_validator,
1291
1309
) ;
1292
- while let Some ( ( ts , _, maybe_doc) ) = document_stream. try_next ( ) . await ? {
1310
+ while let Some ( ( _ , _, maybe_doc) ) = document_stream. try_next ( ) . await ? {
1293
1311
Self :: accumulate_index_document ( maybe_doc, all_indexes, index_table_id) ?;
1294
- * cursor = ts;
1295
1312
}
1313
+ * cursor = * latest_ts;
1296
1314
Ok ( ( ) )
1297
1315
}
1298
1316
}
0 commit comments