Skip to content

Commit ee7b885

Browse files
committed
Feature: new RaftNetwork API with argument RCPOption
- `RaftNetwork` introduced 3 new API `append_entries`, `install_snapshot` and `vote` which accept an additional argument `RPCOption`, and deprecated the old API `send_append_entries`, `send_install_snapshot` and `send_vote`. - The old API will be **removed** in `0.9`. An application can still implement the old API without any changes. Openraft calls only the new API and the default implementation will delegate to the old API. - Implementing the new APIs will disable the old APIs. - The new APIs accepts an additional argument `RPCOption`, to enable an application control the networking behaviors based on the parameters in `RPCOption`. The `hard_ttl()` and `soft_ttl()` in `RPCOption` sets the hard limit and the moderate limit of the duration for which an RPC should run. Once the `soft_ttl()` ends, the RPC implementation should start to gracefully cancel the RPC, and once the `hard_ttl()` ends, Openraft will terminate the ongoing RPC at once. - Fix: databendlabs#819
1 parent 7ce38e4 commit ee7b885

15 files changed

+184
-33
lines changed

cluster_benchmark/tests/benchmark/bench_cluster.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ async fn do_bench(bench_config: &BenchConfig) -> anyhow::Result<()> {
112112
l.client_write(ClientRequest {})
113113
.await
114114
.map_err(|e| {
115-
tracing::error!("client_write error: {:?}", e);
115+
eprintln!("client_write error: {:?}", e);
116+
e
116117
})
117118
.unwrap();
118119
}

cluster_benchmark/tests/benchmark/network.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use openraft::error::InstallSnapshotError;
1212
use openraft::error::RPCError;
1313
use openraft::error::RaftError;
1414
use openraft::error::RemoteError;
15+
use openraft::network::RPCOption;
1516
use openraft::network::RaftNetwork;
1617
use openraft::network::RaftNetworkFactory;
1718
use openraft::raft::AppendEntriesRequest;
@@ -98,25 +99,28 @@ pub struct Network {
9899

99100
#[async_trait]
100101
impl RaftNetwork<MemConfig> for Network {
101-
async fn send_append_entries(
102+
async fn append_entries(
102103
&mut self,
103104
rpc: AppendEntriesRequest<MemConfig>,
105+
_option: RPCOption,
104106
) -> Result<AppendEntriesResponse<NodeId>, RPCError<NodeId, (), RaftError<NodeId>>> {
105107
let resp = self.target_raft.append_entries(rpc).await.map_err(|e| RemoteError::new(self.target, e))?;
106108
Ok(resp)
107109
}
108110

109-
async fn send_install_snapshot(
111+
async fn install_snapshot(
110112
&mut self,
111113
rpc: InstallSnapshotRequest<MemConfig>,
114+
_option: RPCOption,
112115
) -> Result<InstallSnapshotResponse<NodeId>, RPCError<NodeId, (), RaftError<NodeId, InstallSnapshotError>>> {
113116
let resp = self.target_raft.install_snapshot(rpc).await.map_err(|e| RemoteError::new(self.target, e))?;
114117
Ok(resp)
115118
}
116119

117-
async fn send_vote(
120+
async fn vote(
118121
&mut self,
119122
rpc: VoteRequest<NodeId>,
123+
_option: RPCOption,
120124
) -> Result<VoteResponse<NodeId>, RPCError<NodeId, (), RaftError<NodeId>>> {
121125
let resp = self.target_raft.vote(rpc).await.map_err(|e| RemoteError::new(self.target, e))?;
122126
Ok(resp)

openraft/src/core/raft_core.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use crate::log_id::LogIdOptionExt;
5151
use crate::log_id::RaftLogId;
5252
use crate::metrics::RaftMetrics;
5353
use crate::metrics::ReplicationMetrics;
54+
use crate::network::RPCOption;
5455
use crate::network::RPCTypes;
5556
use crate::network::RaftNetwork;
5657
use crate::network::RaftNetworkFactory;
@@ -307,8 +308,10 @@ where
307308
let target_node = eff_mem.get_node(&target).unwrap().clone();
308309
let mut client = self.network.new_client(target, &target_node).await;
309310

311+
let option = RPCOption::new(ttl);
312+
310313
let fu = async move {
311-
let outer_res = timeout(ttl, client.send_append_entries(rpc)).await;
314+
let outer_res = timeout(ttl, client.append_entries(rpc, option)).await;
312315
match outer_res {
313316
Ok(append_res) => match append_res {
314317
Ok(x) => Ok((target, x)),
@@ -991,10 +994,11 @@ where
991994

992995
let ttl = Duration::from_millis(self.config.election_timeout_min);
993996
let id = self.id;
997+
let option = RPCOption::new(ttl);
994998

995999
let _ = tokio::spawn(
9961000
async move {
997-
let tm_res = timeout(ttl, client.send_vote(req)).await;
1001+
let tm_res = timeout(ttl, client.vote(req, option)).await;
9981002
let res = match tm_res {
9991003
Ok(res) => res,
10001004

openraft/src/network/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
mod backoff;
44
mod factory;
55
#[allow(clippy::module_inception)] mod network;
6+
mod rpc_option;
67
mod rpc_type;
78

89
pub use backoff::Backoff;
910
pub use factory::RaftNetworkFactory;
1011
pub use network::RaftNetwork;
12+
pub use rpc_option::RPCOption;
1113
pub use rpc_type::RPCTypes;

openraft/src/network/network.rs

+64-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use async_trait::async_trait;
55
use crate::error::InstallSnapshotError;
66
use crate::error::RPCError;
77
use crate::error::RaftError;
8+
use crate::network::rpc_option::RPCOption;
89
use crate::network::Backoff;
910
use crate::raft::AppendEntriesRequest;
1011
use crate::raft::AppendEntriesResponse;
@@ -19,35 +20,92 @@ use crate::RaftTypeConfig;
1920
/// See the [network chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#3-impl-raftnetwork)
2021
/// for details and discussion on this trait and how to implement it.
2122
///
22-
/// Typically, the network implementation as such will be hidden behind a `Box<T>` or `Arc<T>` and
23-
/// this interface implemented on the `Box<T>` or `Arc<T>`.
24-
///
2523
/// A single network instance is used to connect to a single target node. The network instance is
2624
/// constructed by the [`RaftNetworkFactory`](`crate::network::RaftNetworkFactory`).
25+
///
26+
/// ### 2023-05-03: New API with options
27+
///
28+
/// - This trait introduced 3 new API `append_entries`, `install_snapshot` and `vote` which accept
29+
/// an additional argument [`RPCOption`], and deprecated the old API `send_append_entries`,
30+
/// `send_install_snapshot` and `send_vote`.
31+
///
32+
/// - The old API will be **removed** in `0.9`. An application can still implement the old API
33+
/// without any changes. Openraft calls only the new API and the default implementation will
34+
/// delegate to the old API.
35+
///
36+
/// - Implementing the new APIs will disable the old APIs.
2737
#[async_trait]
2838
pub trait RaftNetwork<C>: Send + Sync + 'static
2939
where C: RaftTypeConfig
3040
{
41+
/// Send an AppendEntries RPC to the target.
42+
async fn append_entries(
43+
&mut self,
44+
rpc: AppendEntriesRequest<C>,
45+
option: RPCOption,
46+
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
47+
let _ = option;
48+
#[allow(deprecated)]
49+
self.send_append_entries(rpc).await
50+
}
51+
52+
/// Send an InstallSnapshot RPC to the target.
53+
async fn install_snapshot(
54+
&mut self,
55+
rpc: InstallSnapshotRequest<C>,
56+
option: RPCOption,
57+
) -> Result<
58+
InstallSnapshotResponse<C::NodeId>,
59+
RPCError<C::NodeId, C::Node, RaftError<C::NodeId, InstallSnapshotError>>,
60+
> {
61+
let _ = option;
62+
#[allow(deprecated)]
63+
self.send_install_snapshot(rpc).await
64+
}
65+
66+
/// Send a RequestVote RPC to the target.
67+
async fn vote(
68+
&mut self,
69+
rpc: VoteRequest<C::NodeId>,
70+
option: RPCOption,
71+
) -> Result<VoteResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
72+
let _ = option;
73+
#[allow(deprecated)]
74+
self.send_vote(rpc).await
75+
}
76+
3177
/// Send an AppendEntries RPC to the target Raft node (§5).
78+
#[deprecated(note = "use `append_entries` instead. This method will be removed in 0.9")]
3279
async fn send_append_entries(
3380
&mut self,
3481
rpc: AppendEntriesRequest<C>,
35-
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>>;
82+
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
83+
let _ = rpc;
84+
unimplemented!("send_append_entries is deprecated")
85+
}
3686

3787
/// Send an InstallSnapshot RPC to the target Raft node (§7).
88+
#[deprecated(note = "use `install_snapshot` instead. This method will be removed in 0.9")]
3889
async fn send_install_snapshot(
3990
&mut self,
4091
rpc: InstallSnapshotRequest<C>,
4192
) -> Result<
4293
InstallSnapshotResponse<C::NodeId>,
4394
RPCError<C::NodeId, C::Node, RaftError<C::NodeId, InstallSnapshotError>>,
44-
>;
95+
> {
96+
let _ = rpc;
97+
unimplemented!("send_install_snapshot is deprecated")
98+
}
4599

46100
/// Send a RequestVote RPC to the target Raft node (§5).
101+
#[deprecated(note = "use `vote` instead. This method will be removed in 0.9")]
47102
async fn send_vote(
48103
&mut self,
49104
rpc: VoteRequest<C::NodeId>,
50-
) -> Result<VoteResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>>;
105+
) -> Result<VoteResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
106+
let _ = rpc;
107+
unimplemented!("send_vote is deprecated")
108+
}
51109

52110
/// Build a backoff instance if the target node is temporarily(or permanently) unreachable.
53111
///

openraft/src/network/rpc_option.rs

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use std::time::Duration;
2+
3+
/// An additional argument to the [`RaftNetwork`] methods to allow applications to customize
4+
/// networking behaviors.
5+
///
6+
/// [`RaftNetwork`]: `crate::network::RaftNetwork`
7+
pub struct RPCOption {
8+
/// The expected time-to-last for an RPC.
9+
///
10+
/// The caller will cancel an RPC if it takes longer than this duration.
11+
hard_ttl: Duration,
12+
}
13+
14+
impl RPCOption {
15+
pub fn new(hard_ttl: Duration) -> Self {
16+
Self { hard_ttl }
17+
}
18+
19+
/// The moderate max interval an RPC should last for.
20+
///
21+
/// The [`hard_ttl()`] and `soft_ttl()` of `RPCOption` sets the hard limit and the moderate
22+
/// limit of the duration for which an RPC should run. Once the `soft_ttl()` ends, the RPC
23+
/// implementation should start to gracefully cancel the RPC, and once the `hard_ttl()` ends,
24+
/// Openraft will terminate the ongoing RPC at once.
25+
///
26+
/// `soft_ttl` is smaller than [`hard_ttl()`] so that the RPC implementation can cancel the RPC
27+
/// gracefully after `soft_ttl` and before `hard_ttl`.
28+
///
29+
/// `soft_ttl` is 3/4 of `hard_ttl` but it may change in future, do not rely on this ratio.
30+
///
31+
/// [`hard_ttl()`]: `Self::hard_ttl`
32+
pub fn soft_ttl(&self) -> Duration {
33+
self.hard_ttl * 3 / 4
34+
}
35+
36+
/// The hard limit of the interval an RPC should last for.
37+
///
38+
/// When exceeding this limit, the RPC will be dropped by Openraft at once.
39+
pub fn hard_ttl(&self) -> Duration {
40+
self.hard_ttl
41+
}
42+
}

openraft/src/replication/mod.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::error::Timeout;
3333
use crate::log_id::LogIdOptionExt;
3434
use crate::log_id_range::LogIdRange;
3535
use crate::network::Backoff;
36+
use crate::network::RPCOption;
3637
use crate::network::RPCTypes;
3738
use crate::network::RaftNetwork;
3839
use crate::network::RaftNetworkFactory;
@@ -292,7 +293,8 @@ where
292293
);
293294

294295
let the_timeout = Duration::from_millis(self.config.heartbeat_interval);
295-
let res = timeout(the_timeout, self.network.send_append_entries(payload)).await;
296+
let option = RPCOption::new(the_timeout);
297+
let res = timeout(the_timeout, self.network.append_entries(payload, option)).await;
296298

297299
tracing::debug!("append_entries res: {:?}", res);
298300

@@ -576,7 +578,9 @@ where
576578
self.config.send_snapshot_timeout()
577579
};
578580

579-
let res = timeout(snap_timeout, self.network.send_install_snapshot(req)).await;
581+
let option = RPCOption::new(snap_timeout);
582+
583+
let res = timeout(snap_timeout, self.network.install_snapshot(req, option)).await;
580584

581585
let res = match res {
582586
Ok(outer_res) => match outer_res {

tests/tests/append_entries/t10_conflict_with_empty_entries.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use anyhow::Result;
5+
use openraft::network::RPCOption;
46
use openraft::network::RaftNetwork;
57
use openraft::network::RaftNetworkFactory;
68
use openraft::raft::AppendEntriesRequest;
@@ -57,7 +59,8 @@ async fn conflict_with_empty_entries() -> Result<()> {
5759
leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 5)),
5860
};
5961

60-
let resp = router.new_client(0, &()).await.send_append_entries(rpc).await?;
62+
let option = RPCOption::new(Duration::from_millis(1_000));
63+
let resp = router.new_client(0, &()).await.append_entries(rpc, option).await?;
6164
assert!(!resp.is_success());
6265
assert!(resp.is_conflict());
6366

@@ -77,7 +80,9 @@ async fn conflict_with_empty_entries() -> Result<()> {
7780
leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 5)),
7881
};
7982

80-
let resp = router.new_client(0, &()).await.send_append_entries(rpc).await?;
83+
let option = RPCOption::new(Duration::from_millis(1_000));
84+
85+
let resp = router.new_client(0, &()).await.append_entries(rpc, option).await?;
8186
assert!(resp.is_success());
8287
assert!(!resp.is_conflict());
8388

@@ -90,7 +95,9 @@ async fn conflict_with_empty_entries() -> Result<()> {
9095
leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), 5)),
9196
};
9297

93-
let resp = router.new_client(0, &()).await.send_append_entries(rpc).await?;
98+
let option = RPCOption::new(Duration::from_millis(1_000));
99+
100+
let resp = router.new_client(0, &()).await.append_entries(rpc, option).await?;
94101
assert!(!resp.is_success());
95102
assert!(resp.is_conflict());
96103

tests/tests/append_entries/t10_see_higher_vote.rs

+10-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::Duration;
33

44
use anyhow::Result;
55
use maplit::btreeset;
6+
use openraft::network::RPCOption;
67
use openraft::network::RaftNetwork;
78
use openraft::network::RaftNetworkFactory;
89
use openraft::raft::VoteRequest;
@@ -40,13 +41,18 @@ async fn append_sees_higher_vote() -> Result<()> {
4041
// Let leader lease expire
4142
sleep(Duration::from_millis(800)).await;
4243

44+
let option = RPCOption::new(Duration::from_millis(1_000));
45+
4346
let resp = router
4447
.new_client(1, &())
4548
.await
46-
.send_vote(VoteRequest {
47-
vote: Vote::new(10, 1),
48-
last_log_id: Some(LogId::new(CommittedLeaderId::new(10, 1), 5)),
49-
})
49+
.vote(
50+
VoteRequest {
51+
vote: Vote::new(10, 1),
52+
last_log_id: Some(LogId::new(CommittedLeaderId::new(10, 1), 5)),
53+
},
54+
option,
55+
)
5056
.await?;
5157

5258
assert!(resp.vote_granted);

tests/tests/append_entries/t11_append_entries_with_bigger_term.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use anyhow::Result;
45
use maplit::btreeset;
6+
use openraft::network::RPCOption;
57
use openraft::network::RaftNetwork;
68
use openraft::network::RaftNetworkFactory;
79
use openraft::raft::AppendEntriesRequest;
@@ -51,7 +53,9 @@ async fn append_entries_with_bigger_term() -> Result<()> {
5153
leader_commit: Some(LogId::new(CommittedLeaderId::new(1, 0), log_index)),
5254
};
5355

54-
let resp = router.new_client(0, &()).await.send_append_entries(req).await?;
56+
let option = RPCOption::new(Duration::from_millis(1_000));
57+
58+
let resp = router.new_client(0, &()).await.append_entries(req, option).await?;
5559
assert!(resp.is_success());
5660

5761
// after append entries, check hard state in term 2 and vote for node 1

0 commit comments

Comments
 (0)