Skip to content

Commit aa20c28

Browse files
committed
Refactor: Add Leader Lease Duration into RaftState.vote
This commit enhances the management of leader lease durations by integrating them directly into the `RaftState.vote` structure. Previously, the leader lease was managed separately as a configuration parameter, and `RaftState.vote` only recorded the timestamp of the last received `AppendEntries` RPC. - Store the lease duration together with the last update time in `RaftState.vote`, we simplify the process of expiration checking and improve the clarity of state logging. - Dynamic Lease Management: This integration allows the lease duration to be reset dynamically, which is particularly useful when a leadership transfer is scheduled. This capability ensures that the leader's expiration can be immediately considered without delay. - Runtime Adjustments: Leaders can now adjust their lease durations at runtime as needed, without requiring a configuration update. This flexibility supports more responsive and adaptable leadership management within the Raft cluster. - Rename `UTime` to `Leased` and add field `lease` and related methods.
1 parent 5c82dfd commit aa20c28

32 files changed

+341
-163
lines changed

openraft/src/engine/engine_impl.rs

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use crate::core::raft_msg::AppendEntriesTx;
77
use crate::core::raft_msg::ResultSender;
88
use crate::core::sm;
99
use crate::core::ServerState;
10-
use crate::display_ext::DisplayInstantExt;
1110
use crate::display_ext::DisplayOptionExt;
1211
use crate::display_ext::DisplaySliceExt;
1312
use crate::engine::engine_config::EngineConfig;
@@ -210,7 +209,7 @@ where C: RaftTypeConfig
210209
..Default::default()
211210
};
212211
self.last_seen_vote = vote;
213-
self.state.vote.update(C::now(), vote);
212+
self.state.vote.update(C::now(), Duration::default(), vote);
214213
self.following_handler().do_append_entries(vec![entry]);
215214

216215
// With the new config, start to elect to become leader
@@ -293,37 +292,22 @@ where C: RaftTypeConfig
293292
#[tracing::instrument(level = "debug", skip_all)]
294293
pub(crate) fn handle_vote_req(&mut self, req: VoteRequest<C>) -> VoteResponse<C> {
295294
let now = C::now();
296-
let lease = self.config.timer_config.leader_lease;
297-
let local_vote = self.state.vote_ref();
298-
299-
// Make default vote-last-modified a low enough value, that expires leader lease.
300-
let local_vote_utime =
301-
self.state.vote_last_modified().unwrap_or_else(|| now - lease - Duration::from_millis(1));
295+
let local_leased_vote = &self.state.vote;
302296

303297
tracing::info!(req = display(&req), "Engine::handle_vote_req");
304298
tracing::info!(
305-
my_vote = display(self.state.vote_ref()),
299+
my_vote = display(&**local_leased_vote),
306300
my_last_log_id = display(self.state.last_log_id().display()),
301+
lease = display(local_leased_vote.time_info(now)),
307302
"Engine::handle_vote_req"
308303
);
309-
tracing::info!(
310-
"now; {}, vote is updated at: {}, vote is updated before {:?}, leader lease({:?}) will expire after {:?}",
311-
now.display(),
312-
local_vote_utime.display(),
313-
now - local_vote_utime,
314-
lease,
315-
local_vote_utime + lease - now
316-
);
317304

318-
if local_vote.is_committed() {
305+
if local_leased_vote.is_committed() {
319306
// Current leader lease has not yet expired, reject voting request
320-
if now <= local_vote_utime + lease {
307+
if !local_leased_vote.is_expired(now) {
321308
tracing::info!(
322-
"reject vote-request: leader lease has not yet expire; now; {:?}, vote is update at: {:?}, leader lease({:?}) will expire after {:?}",
323-
now,
324-
local_vote_utime,
325-
lease,
326-
local_vote_utime + lease - now
309+
"reject vote-request: leader lease has not yet expire: {}",
310+
local_leased_vote.time_info(now)
327311
);
328312

329313
return VoteResponse::new(self.state.vote_ref(), self.state.last_log_id().copied());

openraft/src/engine/handler/following_handler/append_entries_test.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use maplit::btreeset;
45
use pretty_assertions::assert_eq;
@@ -11,11 +12,11 @@ use crate::raft_state::IOId;
1112
use crate::raft_state::LogStateReader;
1213
use crate::testing::blank_ent;
1314
use crate::testing::log_id;
14-
use crate::utime::UTime;
15+
use crate::type_config::TypeConfigExt;
16+
use crate::utime::Leased;
1517
use crate::EffectiveMembership;
1618
use crate::Membership;
1719
use crate::MembershipState;
18-
use crate::TokioInstant;
1920
use crate::Vote;
2021

2122
fn m01() -> Membership<UTConfig> {
@@ -31,7 +32,11 @@ fn eng() -> Engine<UTConfig> {
3132
eng.state.enable_validation(false); // Disable validation for incomplete state
3233

3334
eng.config.id = 2;
34-
eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1));
35+
eng.state.vote.update(
36+
UTConfig::<()>::now(),
37+
Duration::from_millis(500),
38+
Vote::new_committed(2, 1),
39+
);
3540
eng.state.log_ids.append(log_id(1, 1, 1));
3641
eng.state.log_ids.append(log_id(2, 1, 3));
3742
eng.state.membership_state = MembershipState::new(
@@ -81,7 +86,11 @@ fn test_follower_append_entries_update_accepted() -> anyhow::Result<()> {
8186
// Update to a new Leader and smaller log id
8287
{
8388
// Assume this node's Leader becomes T3-N1
84-
eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(3, 1));
89+
eng.state.vote = Leased::new(
90+
UTConfig::<()>::now(),
91+
Duration::from_millis(500),
92+
Vote::new_committed(3, 1),
93+
);
8594
eng.following_handler().append_entries(Some(log_id(2, 1, 3)), vec![
8695
//
8796
blank_ent(3, 1, 4),

openraft/src/engine/handler/following_handler/commit_entries_test.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use maplit::btreeset;
45

@@ -9,7 +10,7 @@ use crate::raft_state::IOId;
910
use crate::raft_state::LogStateReader;
1011
use crate::testing::log_id;
1112
use crate::type_config::TypeConfigExt;
12-
use crate::utime::UTime;
13+
use crate::utime::Leased;
1314
use crate::EffectiveMembership;
1415
use crate::Membership;
1516
use crate::MembershipState;
@@ -27,7 +28,11 @@ fn eng() -> Engine<UTConfig> {
2728
let mut eng = Engine::testing_default(0);
2829
eng.state.enable_validation(false); // Disable validation for incomplete state
2930

30-
eng.state.vote = UTime::new(UTConfig::<()>::now(), Vote::new_committed(2, 1));
31+
eng.state.vote = Leased::new(
32+
UTConfig::<()>::now(),
33+
Duration::from_millis(500),
34+
Vote::new_committed(2, 1),
35+
);
3136
eng.state.committed = Some(log_id(1, 1, 1));
3237
eng.state.membership_state = MembershipState::new(
3338
Arc::new(EffectiveMembership::new(Some(log_id(1, 1, 1)), m01())),

openraft/src/engine/handler/following_handler/do_append_entries_test.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use maplit::btreeset;
45

@@ -11,7 +12,7 @@ use crate::raft_state::LogStateReader;
1112
use crate::testing::blank_ent;
1213
use crate::testing::log_id;
1314
use crate::type_config::TypeConfigExt;
14-
use crate::utime::UTime;
15+
use crate::utime::Leased;
1516
use crate::EffectiveMembership;
1617
use crate::Entry;
1718
use crate::EntryPayload;
@@ -40,7 +41,11 @@ fn eng() -> Engine<UTConfig> {
4041
eng.state.enable_validation(false); // Disable validation for incomplete state
4142

4243
eng.config.id = 2;
43-
eng.state.vote = UTime::new(UTConfig::<()>::now(), Vote::new_committed(2, 1));
44+
eng.state.vote = Leased::new(
45+
UTConfig::<()>::now(),
46+
Duration::from_millis(500),
47+
Vote::new_committed(2, 1),
48+
);
4449
eng.state.log_ids.append(log_id(1, 1, 1));
4550
eng.state.log_ids.append(log_id(2, 1, 3));
4651
eng.state.membership_state = MembershipState::new(
@@ -54,7 +59,7 @@ fn eng() -> Engine<UTConfig> {
5459
#[test]
5560
fn test_follower_do_append_entries_no_membership_entries() -> anyhow::Result<()> {
5661
let mut eng = eng();
57-
eng.state.vote = UTime::without_utime(Vote::new_committed(1, 1));
62+
eng.state.vote = Leased::without_last_update(Vote::new_committed(1, 1));
5863

5964
eng.following_handler().do_append_entries(vec![blank_ent(3, 1, 4)]);
6065

@@ -96,7 +101,7 @@ fn test_follower_do_append_entries_one_membership_entry() -> anyhow::Result<()>
96101
// - Follower become Learner, since it is not in the new effective membership.
97102
let mut eng = eng();
98103
eng.config.id = 2; // make it a member, the become learner
99-
eng.state.vote = UTime::without_utime(Vote::new_committed(1, 1));
104+
eng.state.vote = Leased::without_last_update(Vote::new_committed(1, 1));
100105

101106
eng.following_handler().do_append_entries(vec![blank_ent(3, 1, 4), Entry::<UTConfig> {
102107
log_id: log_id(3, 1, 5),
@@ -152,7 +157,7 @@ fn test_follower_do_append_entries_three_membership_entries() -> anyhow::Result<
152157
let mut eng = eng();
153158
eng.config.id = 5; // make it a learner, then become follower
154159
eng.state.server_state = eng.calc_server_state();
155-
eng.state.vote = UTime::without_utime(Vote::new_committed(1, 1));
160+
eng.state.vote = Leased::without_last_update(Vote::new_committed(1, 1));
156161

157162
eng.following_handler().do_append_entries(vec![
158163
blank_ent(3, 1, 4),

openraft/src/engine/handler/following_handler/install_snapshot_test.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::io::Cursor;
22
use std::sync::Arc;
3+
use std::time::Duration;
34

45
use maplit::btreeset;
56
use pretty_assertions::assert_eq;
@@ -13,12 +14,12 @@ use crate::engine::LogIdList;
1314
use crate::raft_state::IOId;
1415
use crate::raft_state::LogStateReader;
1516
use crate::testing::log_id;
17+
use crate::type_config::TypeConfigExt;
1618
use crate::EffectiveMembership;
1719
use crate::Membership;
1820
use crate::Snapshot;
1921
use crate::SnapshotMeta;
2022
use crate::StoredMembership;
21-
use crate::TokioInstant;
2223
use crate::Vote;
2324

2425
fn m12() -> Membership<UTConfig> {
@@ -33,7 +34,11 @@ fn eng() -> Engine<UTConfig> {
3334
let mut eng = Engine::testing_default(0);
3435
eng.state.enable_validation(false); // Disable validation for incomplete state
3536

36-
eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1));
37+
eng.state.vote.update(
38+
UTConfig::<()>::now(),
39+
Duration::from_millis(500),
40+
Vote::new_committed(2, 1),
41+
);
3742
eng.state.committed = Some(log_id(4, 1, 5));
3843
eng.state.log_ids = LogIdList::new(vec![
3944
//
@@ -179,7 +184,11 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
179184
let mut eng = Engine::<UTConfig>::testing_default(0);
180185
eng.state.enable_validation(false); // Disable validation for incomplete state
181186

182-
eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1));
187+
eng.state.vote.update(
188+
UTConfig::<()>::now(),
189+
Duration::from_millis(500),
190+
Vote::new_committed(2, 1),
191+
);
183192
eng.state.committed = Some(log_id(2, 1, 3));
184193
eng.state.log_ids = LogIdList::new(vec![
185194
//

openraft/src/engine/handler/following_handler/truncate_logs_test.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use maplit::btreeset;
45

@@ -9,7 +10,7 @@ use crate::engine::LogIdList;
910
use crate::raft_state::LogStateReader;
1011
use crate::testing::log_id;
1112
use crate::type_config::TypeConfigExt;
12-
use crate::utime::UTime;
13+
use crate::utime::Leased;
1314
use crate::EffectiveMembership;
1415
use crate::Membership;
1516
use crate::MembershipState;
@@ -33,7 +34,11 @@ fn eng() -> Engine<UTConfig> {
3334
eng.state.enable_validation(false); // Disable validation for incomplete state
3435

3536
eng.config.id = 2;
36-
eng.state.vote = UTime::new(UTConfig::<()>::now(), Vote::new_committed(2, 1));
37+
eng.state.vote = Leased::new(
38+
UTConfig::<()>::now(),
39+
Duration::from_millis(500),
40+
Vote::new_committed(2, 1),
41+
);
3742
eng.state.log_ids = LogIdList::new(vec![
3843
log_id(2, 1, 2), //
3944
log_id(4, 1, 4),

openraft/src/engine/handler/following_handler/update_committed_membership_test.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use maplit::btreeset;
45

@@ -7,7 +8,7 @@ use crate::engine::testing::UTConfig;
78
use crate::engine::Engine;
89
use crate::testing::log_id;
910
use crate::type_config::TypeConfigExt;
10-
use crate::utime::UTime;
11+
use crate::utime::Leased;
1112
use crate::EffectiveMembership;
1213
use crate::Membership;
1314
use crate::MembershipState;
@@ -28,7 +29,11 @@ fn m34() -> Membership<UTConfig> {
2829
fn eng() -> Engine<UTConfig> {
2930
let mut eng = Engine::testing_default(0);
3031
eng.config.id = 2;
31-
eng.state.vote = UTime::new(UTConfig::<()>::now(), Vote::new_committed(2, 1));
32+
eng.state.vote = Leased::new(
33+
UTConfig::<()>::now(),
34+
Duration::from_millis(500),
35+
Vote::new_committed(2, 1),
36+
);
3237
eng.state.membership_state = MembershipState::new(
3338
Arc::new(EffectiveMembership::new(Some(log_id(1, 1, 1)), m01())),
3439
Arc::new(EffectiveMembership::new(Some(log_id(2, 1, 3)), m23())),

openraft/src/engine/handler/leader_handler/append_entries_test.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use maplit::btreeset;
45
#[allow(unused_imports)]
@@ -19,14 +20,14 @@ use crate::raft_state::IOId;
1920
use crate::raft_state::LogStateReader;
2021
use crate::testing::blank_ent;
2122
use crate::testing::log_id;
22-
use crate::utime::UTime;
23+
use crate::type_config::TypeConfigExt;
24+
use crate::utime::Leased;
2325
use crate::vote::CommittedLeaderId;
2426
use crate::EffectiveMembership;
2527
use crate::Entry;
2628
use crate::LogId;
2729
use crate::Membership;
2830
use crate::MembershipState;
29-
use crate::TokioInstant;
3031
use crate::Vote;
3132

3233
fn m01() -> Membership<UTConfig> {
@@ -52,7 +53,11 @@ fn eng() -> Engine<UTConfig> {
5253

5354
eng.config.id = 1;
5455
eng.state.committed = Some(log_id(0, 1, 0));
55-
eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(3, 1));
56+
eng.state.vote = Leased::new(
57+
UTConfig::<()>::now(),
58+
Duration::from_millis(500),
59+
Vote::new_committed(3, 1),
60+
);
5661
eng.state.log_ids.append(log_id(1, 1, 1));
5762
eng.state.log_ids.append(log_id(2, 1, 3));
5863
eng.state.membership_state = MembershipState::new(

openraft/src/engine/handler/leader_handler/get_read_log_id_test.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use maplit::btreeset;
45
#[allow(unused_imports)]
@@ -11,11 +12,11 @@ use pretty_assertions::assert_str_eq;
1112
use crate::engine::testing::UTConfig;
1213
use crate::engine::Engine;
1314
use crate::testing::log_id;
14-
use crate::utime::UTime;
15+
use crate::type_config::TypeConfigExt;
16+
use crate::utime::Leased;
1517
use crate::EffectiveMembership;
1618
use crate::Membership;
1719
use crate::MembershipState;
18-
use crate::TokioInstant;
1920
use crate::Vote;
2021

2122
fn m01() -> Membership<UTConfig> {
@@ -32,7 +33,11 @@ fn eng() -> Engine<UTConfig> {
3233

3334
eng.config.id = 1;
3435
eng.state.committed = Some(log_id(0, 1, 0));
35-
eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(3, 1));
36+
eng.state.vote = Leased::new(
37+
UTConfig::<()>::now(),
38+
Duration::from_millis(500),
39+
Vote::new_committed(3, 1),
40+
);
3641
eng.state.log_ids.append(log_id(1, 1, 1));
3742
eng.state.log_ids.append(log_id(2, 1, 3));
3843
eng.state.membership_state = MembershipState::new(

openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use maplit::btreeset;
45
#[allow(unused_imports)]
@@ -14,11 +15,11 @@ use crate::engine::Engine;
1415
use crate::progress::Inflight;
1516
use crate::progress::Progress;
1617
use crate::testing::log_id;
17-
use crate::utime::UTime;
18+
use crate::type_config::TypeConfigExt;
19+
use crate::utime::Leased;
1820
use crate::EffectiveMembership;
1921
use crate::Membership;
2022
use crate::MembershipState;
21-
use crate::TokioInstant;
2223
use crate::Vote;
2324

2425
fn m01() -> Membership<UTConfig> {
@@ -35,7 +36,11 @@ fn eng() -> Engine<UTConfig> {
3536

3637
eng.config.id = 1;
3738
eng.state.committed = Some(log_id(0, 1, 0));
38-
eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(3, 1));
39+
eng.state.vote = Leased::new(
40+
UTConfig::<()>::now(),
41+
Duration::from_millis(500),
42+
Vote::new_committed(3, 1),
43+
);
3944
eng.state.log_ids.append(log_id(1, 1, 1));
4045
eng.state.log_ids.append(log_id(2, 1, 3));
4146
eng.state.membership_state = MembershipState::new(

0 commit comments

Comments
 (0)