@@ -117,9 +117,11 @@ use crate::{
117
117
latest_min_document_snapshot_timer,
118
118
latest_min_snapshot_timer,
119
119
log_document_retention_cursor_age,
120
+ log_document_retention_cursor_lag,
120
121
log_document_retention_no_cursor,
121
122
log_document_retention_scanned_document,
122
123
log_retention_cursor_age,
124
+ log_retention_cursor_lag,
123
125
log_retention_documents_deleted,
124
126
log_retention_expired_index_entry,
125
127
log_retention_index_entries_deleted,
@@ -441,6 +443,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
441
443
// even if the deletion future is stuck.
442
444
Self :: get_checkpoint (
443
445
persistence. reader ( ) . as_ref ( ) ,
446
+ bounds_writer. reader ( ) ,
444
447
snapshot_reader. clone ( ) ,
445
448
retention_type,
446
449
)
@@ -1050,6 +1053,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1050
1053
let _timer = retention_delete_timer ( ) ;
1051
1054
let cursor = Self :: get_checkpoint (
1052
1055
reader. as_ref ( ) ,
1056
+ bounds_reader. clone ( ) ,
1053
1057
snapshot_reader. clone ( ) ,
1054
1058
RetentionType :: Index ,
1055
1059
)
@@ -1179,6 +1183,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1179
1183
let _timer = retention_delete_documents_timer ( ) ;
1180
1184
let cursor = Self :: get_checkpoint (
1181
1185
reader. as_ref ( ) ,
1186
+ bounds_reader. clone ( ) ,
1182
1187
snapshot_reader. clone ( ) ,
1183
1188
RetentionType :: Document ,
1184
1189
)
@@ -1268,6 +1273,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1268
1273
1269
1274
async fn get_checkpoint (
1270
1275
persistence : & dyn PersistenceReader ,
1276
+ bounds_reader : Reader < SnapshotBounds > ,
1271
1277
snapshot_reader : Reader < SnapshotManager > ,
1272
1278
retention_type : RetentionType ,
1273
1279
) -> anyhow:: Result < Timestamp > {
@@ -1276,12 +1282,28 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1276
1282
// Only log if the checkpoint has been written once, to avoid logging time since
1277
1283
// epoch when the instance is first starting up.
1278
1284
match retention_type {
1279
- RetentionType :: Document => log_document_retention_cursor_age (
1280
- ( * snapshot_reader. lock ( ) . latest_ts ( ) ) . secs_since_f64 ( checkpoint) ,
1281
- ) ,
1282
- RetentionType :: Index => log_retention_cursor_age (
1283
- ( * snapshot_reader. lock ( ) . latest_ts ( ) ) . secs_since_f64 ( checkpoint) ,
1284
- ) ,
1285
+ RetentionType :: Document => {
1286
+ log_document_retention_cursor_age (
1287
+ ( * snapshot_reader. lock ( ) . latest_ts ( ) ) . secs_since_f64 ( checkpoint) ,
1288
+ ) ;
1289
+ log_document_retention_cursor_lag (
1290
+ bounds_reader
1291
+ . lock ( )
1292
+ . min_document_snapshot_ts
1293
+ . secs_since_f64 ( checkpoint) ,
1294
+ ) ;
1295
+ } ,
1296
+ RetentionType :: Index => {
1297
+ log_retention_cursor_age (
1298
+ ( * snapshot_reader. lock ( ) . latest_ts ( ) ) . secs_since_f64 ( checkpoint) ,
1299
+ ) ;
1300
+ log_retention_cursor_lag (
1301
+ bounds_reader
1302
+ . lock ( )
1303
+ . min_snapshot_ts
1304
+ . secs_since_f64 ( checkpoint) ,
1305
+ ) ;
1306
+ } ,
1285
1307
}
1286
1308
} else {
1287
1309
match retention_type {
0 commit comments