@@ -3,6 +3,7 @@ use std::{
3
3
collections:: BTreeSet ,
4
4
ops:: Bound ,
5
5
sync:: Arc ,
6
+ time:: Duration ,
6
7
} ;
7
8
8
9
use :: metrics:: {
@@ -11,6 +12,7 @@ use ::metrics::{
11
12
} ;
12
13
use anyhow:: Context as _;
13
14
use common:: {
15
+ backoff:: Backoff ,
14
16
bootstrap_model:: tables:: {
15
17
TableMetadata ,
16
18
TableState ,
@@ -26,7 +28,10 @@ use common::{
26
28
ParsedDocument ,
27
29
ResolvedDocument ,
28
30
} ,
29
- errors:: recapture_stacktrace,
31
+ errors:: {
32
+ recapture_stacktrace,
33
+ report_error,
34
+ } ,
30
35
fastrace_helpers:: {
31
36
initialize_root_from_parent,
32
37
EncodedSpan ,
@@ -591,27 +596,49 @@ impl<RT: Runtime> Committer<RT> {
591
596
. next_max_repeatable_ts ( )
592
597
. expect ( "new_max_repeatable should exist" ) ;
593
598
let persistence = self . persistence . clone ( ) ;
594
- let outer_span = Span :: enter_with_parent ( "outer_bump_max_repeatable_ts" , root_span) ;
599
+ let span = Span :: enter_with_parent ( "bump_max_repeatable_ts" , root_span) ;
600
+ let runtime = self . runtime . clone ( ) ;
595
601
self . persistence_writes . push_back (
596
602
async move {
597
- let span = Span :: enter_with_parent ( "inner_bump_max_repeatable_ts" , & outer_span) ;
598
603
// The MaxRepeatableTimestamp persistence global ensures all future
599
604
// commits on future leaders will be after new_max_repeatable, and followers
600
605
// can know this timestamp is repeatable.
601
- persistence
602
- . write_persistence_global (
603
- PersistenceGlobalKey :: MaxRepeatableTimestamp ,
604
- new_max_repeatable. into ( ) ,
605
- )
606
- . in_span ( span)
607
- . await ?;
606
+
607
+ // If we fail to bump the timestamp, we'll backoff and retry
608
+ // which will block the committer from making forward progress until we
609
+ // succceed. We don't want to kill the committer and reload the
610
+ // instance if we can avoid it, as that would exacerbate any
611
+ // load-related issues.
612
+ let mut backoff = Backoff :: new ( Duration :: from_secs ( 1 ) , Duration :: from_secs ( 60 ) ) ;
613
+ loop {
614
+ match persistence
615
+ . write_persistence_global (
616
+ PersistenceGlobalKey :: MaxRepeatableTimestamp ,
617
+ new_max_repeatable. into ( ) ,
618
+ )
619
+ . await
620
+ {
621
+ Ok ( ( ) ) => break ,
622
+ Err ( mut e) => {
623
+ let delay = backoff. fail ( & mut runtime. rng ( ) ) ;
624
+ report_error ( & mut e) . await ;
625
+ tracing:: error!(
626
+ "Failed to bump max repeatable timestamp, retrying after {:.2}s" ,
627
+ delay. as_secs_f32( )
628
+ ) ;
629
+ runtime. wait ( delay) . await ;
630
+ continue ;
631
+ } ,
632
+ }
633
+ }
608
634
Ok ( PersistenceWrite :: MaxRepeatableTimestamp {
609
635
new_max_repeatable,
610
636
timer,
611
637
result,
612
638
commit_id,
613
639
} )
614
640
}
641
+ . in_span ( span)
615
642
. boxed ( ) ,
616
643
) ;
617
644
}
0 commit comments