Skip to content

Commit 84539cb

Browse files
committed
Change: move snapshot type definition from storage traits to RaftTypeConfig
Similar to `NodeId` or `Entry`, `SnapshotData` is also a data type that is specified by the application and needs to be defined in `RaftTypeConfig`, which is a collection of all application types. Public types changes: - Add `SnapshotData` to `RaftTypeConfig`: ```rust pub trait RaftTypeConfig { /// ... type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static; } ``` - Remove associated type `SnapshotData` from `storage::RaftStorage`. - Remove associated type `SnapshotData` from `storage::v2::RaftStateMachine`. Corresponding API changes: - Change `storage::RaftSnapshotBuilder<C: RaftTypeConfig, SNAPSHOT_DATA>` to `RaftSnapshotBuilder<C>` - Change `storage::Snapshot<NID: NodeId, N: Node, SNAPSHOT_DATA>` to `storage::Snapshot<C>` Upgrade tip: Update generic type parameter in application types to pass compilation.
1 parent a28d552 commit 84539cb

File tree

34 files changed

+286
-355
lines changed

34 files changed

+286
-355
lines changed

cluster_benchmark/tests/benchmark/network.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ use openraft::Raft;
2323
use openraft::RaftNetwork;
2424
use openraft::RaftNetworkFactory;
2525

26-
use crate::store::Config as MemConfig;
2726
use crate::store::LogStore;
2827
use crate::store::NodeId;
2928
use crate::store::StateMachineStore;
29+
use crate::store::TypeConfig as MemConfig;
3030

3131
pub type BenchRaft = Raft<MemConfig, Router, Arc<LogStore>, Arc<StateMachineStore>>;
3232

cluster_benchmark/tests/benchmark/store.rs

+18-18
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use openraft::Entry;
2020
use openraft::EntryPayload;
2121
use openraft::LogId;
2222
use openraft::RaftLogId;
23+
use openraft::RaftTypeConfig;
2324
use openraft::SnapshotMeta;
2425
use openraft::StorageError;
2526
use openraft::StorageIOError;
@@ -38,7 +39,8 @@ pub struct ClientResponse {}
3839
pub type NodeId = u64;
3940

4041
openraft::declare_raft_types!(
41-
pub Config: D = ClientRequest, R = ClientResponse, NodeId = NodeId, Node = (), Entry = Entry<Config>
42+
pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = NodeId, Node = (),
43+
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>
4244
);
4345

4446
#[derive(Debug)]
@@ -55,7 +57,7 @@ pub struct StateMachine {
5557

5658
pub struct LogStore {
5759
vote: RwLock<Option<Vote<NodeId>>>,
58-
log: RwLock<BTreeMap<u64, Entry<Config>>>,
60+
log: RwLock<BTreeMap<u64, Entry<TypeConfig>>>,
5961
last_purged_log_id: RwLock<Option<LogId<NodeId>>>,
6062
}
6163

@@ -98,11 +100,11 @@ impl StateMachineStore {
98100
}
99101

100102
#[async_trait]
101-
impl RaftLogReader<Config> for Arc<LogStore> {
103+
impl RaftLogReader<TypeConfig> for Arc<LogStore> {
102104
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
103105
&mut self,
104106
range: RB,
105-
) -> Result<Vec<Entry<Config>>, StorageError<NodeId>> {
107+
) -> Result<Vec<Entry<TypeConfig>>, StorageError<NodeId>> {
106108
let mut entries = vec![];
107109
{
108110
let log = self.log.read().await;
@@ -114,7 +116,7 @@ impl RaftLogReader<Config> for Arc<LogStore> {
114116
Ok(entries)
115117
}
116118

117-
async fn get_log_state(&mut self) -> Result<LogState<Config>, StorageError<NodeId>> {
119+
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
118120
let log = self.log.read().await;
119121
let last_serialized = log.iter().rev().next().map(|(_, ent)| ent);
120122

@@ -138,9 +140,9 @@ impl RaftLogReader<Config> for Arc<LogStore> {
138140
}
139141

140142
#[async_trait]
141-
impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<StateMachineStore> {
143+
impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
142144
#[tracing::instrument(level = "trace", skip(self))]
143-
async fn build_snapshot(&mut self) -> Result<Snapshot<NodeId, (), Cursor<Vec<u8>>>, StorageError<NodeId>> {
145+
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
144146
let data;
145147
let last_applied_log;
146148
let last_membership;
@@ -190,7 +192,7 @@ impl RaftSnapshotBuilder<Config, Cursor<Vec<u8>>> for Arc<StateMachineStore> {
190192
}
191193

192194
#[async_trait]
193-
impl RaftLogStorage<Config> for Arc<LogStore> {
195+
impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
194196
#[tracing::instrument(level = "trace", skip(self))]
195197
async fn save_vote(&mut self, vote: &Vote<NodeId>) -> Result<(), StorageError<NodeId>> {
196198
let mut v = self.vote.write().await;
@@ -225,7 +227,7 @@ impl RaftLogStorage<Config> for Arc<LogStore> {
225227

226228
#[tracing::instrument(level = "trace", skip_all)]
227229
async fn append<I>(&mut self, entries: I, callback: LogFlushed<NodeId>) -> Result<(), StorageError<NodeId>>
228-
where I: IntoIterator<Item = Entry<Config>> + Send {
230+
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
229231
{
230232
let mut log = self.log.write().await;
231233
log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index, entry)));
@@ -242,9 +244,7 @@ impl RaftLogStorage<Config> for Arc<LogStore> {
242244
}
243245

244246
#[async_trait]
245-
impl RaftStateMachine<Config> for Arc<StateMachineStore> {
246-
type SnapshotData = Cursor<Vec<u8>>;
247-
247+
impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
248248
async fn applied_state(
249249
&mut self,
250250
) -> Result<(Option<LogId<NodeId>>, StoredMembership<NodeId, ()>), StorageError<NodeId>> {
@@ -253,7 +253,7 @@ impl RaftStateMachine<Config> for Arc<StateMachineStore> {
253253
}
254254

255255
async fn apply<I>(&mut self, entries: I) -> Result<Vec<ClientResponse>, StorageError<NodeId>>
256-
where I: IntoIterator<Item = Entry<Config>> + Send {
256+
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
257257
let mut sm = self.sm.write().await;
258258

259259
let it = entries.into_iter();
@@ -275,15 +275,17 @@ impl RaftStateMachine<Config> for Arc<StateMachineStore> {
275275
}
276276

277277
#[tracing::instrument(level = "trace", skip(self))]
278-
async fn begin_receiving_snapshot(&mut self) -> Result<Box<Self::SnapshotData>, StorageError<NodeId>> {
278+
async fn begin_receiving_snapshot(
279+
&mut self,
280+
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
279281
Ok(Box::new(Cursor::new(Vec::new())))
280282
}
281283

282284
#[tracing::instrument(level = "trace", skip(self, snapshot))]
283285
async fn install_snapshot(
284286
&mut self,
285287
meta: &SnapshotMeta<NodeId, ()>,
286-
snapshot: Box<Self::SnapshotData>,
288+
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
287289
) -> Result<(), StorageError<NodeId>> {
288290
let new_snapshot = StoredSnapshot {
289291
meta: meta.clone(),
@@ -305,9 +307,7 @@ impl RaftStateMachine<Config> for Arc<StateMachineStore> {
305307
}
306308

307309
#[tracing::instrument(level = "trace", skip(self))]
308-
async fn get_current_snapshot(
309-
&mut self,
310-
) -> Result<Option<Snapshot<NodeId, (), Self::SnapshotData>>, StorageError<NodeId>> {
310+
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<NodeId>> {
311311
match &*self.current_snapshot.read().await {
312312
Some(snapshot) => {
313313
let data = snapshot.data.clone();

cluster_benchmark/tests/benchmark/store/test.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ use openraft::testing::StoreBuilder;
55
use openraft::testing::Suite;
66
use openraft::StorageError;
77

8-
use crate::store::Config;
98
use crate::store::LogStore;
109
use crate::store::NodeId;
1110
use crate::store::StateMachineStore;
11+
use crate::store::TypeConfig;
1212

1313
struct Builder {}
1414
#[async_trait]
15-
impl StoreBuilder<Config, Arc<LogStore>, Arc<StateMachineStore>> for Builder {
15+
impl StoreBuilder<TypeConfig, Arc<LogStore>, Arc<StateMachineStore>> for Builder {
1616
async fn build(&self) -> Result<((), Arc<LogStore>, Arc<StateMachineStore>), StorageError<NodeId>> {
1717
let log_store = LogStore::new_async().await;
1818
let sm = Arc::new(StateMachineStore::new());

examples/raft-kv-memstore/src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#![allow(clippy::uninlined_format_args)]
22
#![deny(unused_qualifications)]
33

4+
use std::io::Cursor;
45
use std::sync::Arc;
56

67
use actix_web::middleware;
@@ -29,7 +30,8 @@ pub type NodeId = u64;
2930

3031
openraft::declare_raft_types!(
3132
/// Declare the type configuration for example K/V store.
32-
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode, Entry = openraft::Entry<TypeConfig>
33+
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode,
34+
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>
3335
);
3436

3537
pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;

examples/raft-kv-memstore/src/store/mod.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use openraft::LogId;
1515
use openraft::RaftLogReader;
1616
use openraft::RaftSnapshotBuilder;
1717
use openraft::RaftStorage;
18+
use openraft::RaftTypeConfig;
1819
use openraft::SnapshotMeta;
1920
use openraft::StorageError;
2021
use openraft::StorageIOError;
@@ -123,9 +124,9 @@ impl RaftLogReader<TypeConfig> for Arc<Store> {
123124
}
124125

125126
#[async_trait]
126-
impl RaftSnapshotBuilder<TypeConfig, Cursor<Vec<u8>>> for Arc<Store> {
127+
impl RaftSnapshotBuilder<TypeConfig> for Arc<Store> {
127128
#[tracing::instrument(level = "trace", skip(self))]
128-
async fn build_snapshot(&mut self) -> Result<Snapshot<NodeId, BasicNode, Cursor<Vec<u8>>>, StorageError<NodeId>> {
129+
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
129130
let data;
130131
let last_applied_log;
131132
let last_membership;
@@ -176,7 +177,6 @@ impl RaftSnapshotBuilder<TypeConfig, Cursor<Vec<u8>>> for Arc<Store> {
176177

177178
#[async_trait]
178179
impl RaftStorage<TypeConfig> for Arc<Store> {
179-
type SnapshotData = Cursor<Vec<u8>>;
180180
type LogReader = Self;
181181
type SnapshotBuilder = Self;
182182

@@ -277,15 +277,17 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
277277
}
278278

279279
#[tracing::instrument(level = "trace", skip(self))]
280-
async fn begin_receiving_snapshot(&mut self) -> Result<Box<Self::SnapshotData>, StorageError<NodeId>> {
280+
async fn begin_receiving_snapshot(
281+
&mut self,
282+
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
281283
Ok(Box::new(Cursor::new(Vec::new())))
282284
}
283285

284286
#[tracing::instrument(level = "trace", skip(self, snapshot))]
285287
async fn install_snapshot(
286288
&mut self,
287289
meta: &SnapshotMeta<NodeId, BasicNode>,
288-
snapshot: Box<Self::SnapshotData>,
290+
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
289291
) -> Result<(), StorageError<NodeId>> {
290292
tracing::info!(
291293
{ snapshot_size = snapshot.get_ref().len() },
@@ -312,9 +314,7 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
312314
}
313315

314316
#[tracing::instrument(level = "trace", skip(self))]
315-
async fn get_current_snapshot(
316-
&mut self,
317-
) -> Result<Option<Snapshot<NodeId, BasicNode, Self::SnapshotData>>, StorageError<NodeId>> {
317+
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<NodeId>> {
318318
match &*self.current_snapshot.read().await {
319319
Some(snapshot) => {
320320
let data = snapshot.data.clone();

examples/raft-kv-rocksdb/src/lib.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#![deny(unused_qualifications)]
33

44
use std::fmt::Display;
5+
use std::io::Cursor;
56
use std::path::Path;
67
use std::sync::Arc;
78

@@ -33,17 +34,14 @@ pub struct Node {
3334

3435
impl Display for Node {
3536
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36-
write!(
37-
f,
38-
"ExampleNode {{ rpc_addr: {}, api_addr: {} }}",
39-
self.rpc_addr, self.api_addr
40-
)
37+
write!(f, "Node {{ rpc_addr: {}, api_addr: {} }}", self.rpc_addr, self.api_addr)
4138
}
4239
}
4340

4441
openraft::declare_raft_types!(
4542
/// Declare the type configuration for example K/V store.
46-
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node, Entry = openraft::Entry<TypeConfig>
43+
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node,
44+
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>
4745
);
4846

4947
pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;

examples/raft-kv-rocksdb/src/store.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use openraft::LogId;
2222
use openraft::RaftLogReader;
2323
use openraft::RaftSnapshotBuilder;
2424
use openraft::RaftStorage;
25+
use openraft::RaftTypeConfig;
2526
use openraft::SnapshotMeta;
2627
use openraft::StorageError;
2728
use openraft::StorageIOError;
@@ -363,9 +364,9 @@ impl RaftLogReader<TypeConfig> for Arc<Store> {
363364
}
364365

365366
#[async_trait]
366-
impl RaftSnapshotBuilder<TypeConfig, Cursor<Vec<u8>>> for Arc<Store> {
367+
impl RaftSnapshotBuilder<TypeConfig> for Arc<Store> {
367368
#[tracing::instrument(level = "trace", skip(self))]
368-
async fn build_snapshot(&mut self) -> Result<Snapshot<NodeId, Node, Cursor<Vec<u8>>>, StorageError<NodeId>> {
369+
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
369370
let data;
370371
let last_applied_log;
371372
let last_membership;
@@ -411,7 +412,6 @@ impl RaftSnapshotBuilder<TypeConfig, Cursor<Vec<u8>>> for Arc<Store> {
411412

412413
#[async_trait]
413414
impl RaftStorage<TypeConfig> for Arc<Store> {
414-
type SnapshotData = Cursor<Vec<u8>>;
415415
type LogReader = Self;
416416
type SnapshotBuilder = Self;
417417

@@ -508,15 +508,17 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
508508
}
509509

510510
#[tracing::instrument(level = "trace", skip(self))]
511-
async fn begin_receiving_snapshot(&mut self) -> Result<Box<Self::SnapshotData>, StorageError<NodeId>> {
511+
async fn begin_receiving_snapshot(
512+
&mut self,
513+
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
512514
Ok(Box::new(Cursor::new(Vec::new())))
513515
}
514516

515517
#[tracing::instrument(level = "trace", skip(self, snapshot))]
516518
async fn install_snapshot(
517519
&mut self,
518520
meta: &SnapshotMeta<NodeId, Node>,
519-
snapshot: Box<Self::SnapshotData>,
521+
snapshot: Box<<TypeConfig as RaftTypeConfig>::SnapshotData>,
520522
) -> Result<(), StorageError<NodeId>> {
521523
tracing::info!(
522524
{ snapshot_size = snapshot.get_ref().len() },
@@ -541,9 +543,7 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
541543
}
542544

543545
#[tracing::instrument(level = "trace", skip(self))]
544-
async fn get_current_snapshot(
545-
&mut self,
546-
) -> Result<Option<Snapshot<NodeId, Node, Self::SnapshotData>>, StorageError<NodeId>> {
546+
async fn get_current_snapshot(&mut self) -> Result<Option<Snapshot<TypeConfig>>, StorageError<NodeId>> {
547547
match Store::get_current_snapshot_(self)? {
548548
Some(snapshot) => {
549549
let data = snapshot.data.clone();

0 commit comments

Comments
 (0)