@@ -703,7 +703,10 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
703
703
Ok ( ( ) )
704
704
}
705
705
706
- #[ try_stream( ok = ( Timestamp , InternalDocumentId ) , error = anyhow:: Error ) ]
706
+ /// Finds expired documents in the documents log and returns a tuple of the
707
+ /// form (scanned_document_ts, (expired_document_ts,
708
+ /// internal_document_ts))
709
+ #[ try_stream( ok = ( Timestamp , ( Timestamp , InternalDocumentId ) ) , error = anyhow:: Error ) ]
707
710
async fn expired_documents (
708
711
rt : & RT ,
709
712
reader : RepeatablePersistence ,
@@ -724,7 +727,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
724
727
. try_chunks2 ( * RETENTION_READ_CHUNK )
725
728
. map ( move |chunk| async move {
726
729
let chunk = chunk?. to_vec ( ) ;
727
- let mut entries_to_delete: Vec < ( Timestamp , InternalDocumentId ) > = vec ! [ ] ;
730
+ let mut entries_to_delete: Vec < ( Timestamp , ( Timestamp , InternalDocumentId ) ) > =
731
+ vec ! [ ] ;
728
732
// Prev revs are the documents we are deleting.
729
733
// Each prev rev has 1 or 2 entries to delete per document -- one entry at
730
734
// the prev rev's ts, and a tombstone at the current rev's ts if
@@ -752,7 +756,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
752
756
the retention window"
753
757
) ;
754
758
755
- entries_to_delete. push ( ( ts, id ) ) ;
759
+ entries_to_delete. push ( ( ts, ( ts , id ) ) ) ;
756
760
}
757
761
log_document_retention_scanned_document ( maybe_doc. is_none ( ) , false ) ;
758
762
continue ;
@@ -775,11 +779,11 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
775
779
the retention window"
776
780
) ;
777
781
778
- entries_to_delete. push ( ( * prev_rev_ts, id) ) ;
782
+ entries_to_delete. push ( ( ts , ( * prev_rev_ts, id) ) ) ;
779
783
780
784
// Deletes tombstones
781
785
if maybe_doc. is_none ( ) {
782
- entries_to_delete. push ( ( ts, id ) ) ;
786
+ entries_to_delete. push ( ( ts, ( ts , id ) ) ) ;
783
787
}
784
788
785
789
log_document_retention_scanned_document ( maybe_doc. is_none ( ) , true ) ;
@@ -889,8 +893,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
889
893
/// Partitions documents into RETENTION_DELETE_PARALLEL parts where each
890
894
/// document id only exists in one part
891
895
fn partition_document_chunk (
892
- to_partition : Vec < ( Timestamp , InternalDocumentId ) > ,
893
- ) -> Vec < Vec < ( Timestamp , InternalDocumentId ) > > {
896
+ to_partition : Vec < ( Timestamp , ( Timestamp , InternalDocumentId ) ) > ,
897
+ ) -> Vec < Vec < ( Timestamp , ( Timestamp , InternalDocumentId ) ) > > {
894
898
let mut parts = Vec :: new ( ) ;
895
899
for _ in 0 ..* RETENTION_DELETE_PARALLEL {
896
900
parts. push ( vec ! [ ] ) ;
@@ -948,7 +952,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
948
952
}
949
953
950
954
async fn delete_document_chunk (
951
- delete_chunk : Vec < ( Timestamp , InternalDocumentId ) > ,
955
+ delete_chunk : Vec < ( Timestamp , ( Timestamp , InternalDocumentId ) ) > ,
952
956
persistence : Arc < dyn Persistence > ,
953
957
mut new_cursor : Timestamp ,
954
958
) -> anyhow:: Result < ( Timestamp , usize ) > {
@@ -957,34 +961,21 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
957
961
return Ok ( ( new_cursor, delete_chunk. len ( ) ) ) ;
958
962
}
959
963
let _timer = retention_delete_document_chunk_timer ( ) ;
960
- let delete_chunk = delete_chunk. to_vec ( ) ;
961
- let documents_to_delete = persistence. documents_to_delete ( & delete_chunk) . await ?;
962
- let total_documents_to_delete = documents_to_delete. len ( ) ;
963
- tracing:: trace!( "delete_documents: got documents to delete {total_documents_to_delete:?}" ) ;
964
- // If there are more entries to delete than we see in the delete chunk,
965
- // it means retention skipped deleting entries before, and we
966
- // incorrectly bumped DocumentRetentionConfirmedDeletedTimestamp anyway.
967
- if documents_to_delete. len ( ) > delete_chunk. len ( ) {
968
- report_error ( & mut anyhow:: anyhow!(
969
- "retention wanted to delete {} documents but found {total_documents_to_delete} to \
970
- delete",
971
- delete_chunk. len( ) ,
972
- ) ) ;
973
- anyhow:: bail!(
974
- "Retention wanted to delete {} documents but found {total_documents_to_delete} to
975
- delete. Likely DocumentRetentionConfirmedDeletedTimestamp was bumped incorrectly" ,
976
- delete_chunk. len( )
977
- )
978
- }
979
- for document_to_delete in documents_to_delete. iter ( ) {
980
- // If we're deleting a document, we've definitely deleted
981
- // entries for documents at all prior timestamps.
964
+ let total_documents_to_delete = delete_chunk. len ( ) ;
965
+ tracing:: trace!(
966
+ "delete_documents: there are {total_documents_to_delete:?} documents to delete"
967
+ ) ;
968
+ for document_to_delete in delete_chunk. iter ( ) {
969
+ // If we're deleting the previous revision of a document, we've definitely
970
+ // deleted entries for documents at all prior timestamps.
982
971
if document_to_delete. 0 > Timestamp :: MIN {
983
972
new_cursor = cmp:: max ( new_cursor, document_to_delete. 0 . pred ( ) ?) ;
984
973
}
985
974
}
986
975
let deleted_rows = if total_documents_to_delete > 0 {
987
- persistence. delete ( documents_to_delete) . await ?
976
+ persistence
977
+ . delete ( delete_chunk. into_iter ( ) . map ( |doc| doc. 1 ) . collect ( ) )
978
+ . await ?
988
979
} else {
989
980
0
990
981
} ;
@@ -1811,7 +1802,11 @@ mod tests {
1811
1802
let expired: Vec < _ > = expired_stream. try_collect ( ) . await ?;
1812
1803
1813
1804
assert_eq ! ( expired. len( ) , 5 ) ;
1814
- assert_eq ! ( p. delete( expired) . await ?, 5 ) ;
1805
+ assert_eq ! (
1806
+ p. delete( expired. into_iter( ) . map( |doc| doc. 1 ) . collect( ) )
1807
+ . await ?,
1808
+ 5
1809
+ ) ;
1815
1810
1816
1811
let reader = p. reader ( ) ;
1817
1812
0 commit comments