@@ -67,7 +67,7 @@ impl RaftNode {
67
67
& mut self ,
68
68
sender : Sender < RaftEvent > ,
69
69
cancel : CancellationToken ,
70
- ) -> Result < ( ) , Status > {
70
+ ) -> Result < ( ) , RaftConnectionError > {
71
71
// Start the gRPC server
72
72
let id = self . config . candidate_id . clone ( ) ;
73
73
tokio:: spawn ( async move {
@@ -81,10 +81,15 @@ impl RaftNode {
81
81
82
82
// Connect to all other nodes
83
83
let config = self . config . clone ( ) ;
84
- let raft_clients = self . raft_clients . clone ( ) ;
85
- connect_to_raft_servers ( config, raft_clients) ;
84
+ let clients = connect_to_raft_servers ( config) . await ?;
85
+ {
86
+ let mut lk = self . raft_clients . clone ( ) ;
87
+ let mut raft_clients = lk. lock ( ) . unwrap ( ) ;
88
+ raft_clients. extend ( clients) ;
89
+ }
86
90
87
91
// Main loop
92
+ println ! ( "Starting raft main loop" ) ;
88
93
loop {
89
94
if cancel. is_cancelled ( ) {
90
95
break ;
@@ -178,41 +183,58 @@ impl State {
178
183
}
179
184
}
180
185
181
- fn connect_to_raft_servers ( config : RaftConfig , raft_clients : RaftClients ) {
182
- tokio:: spawn ( async move {
183
- let Some ( roster) = config. roster else {
184
- return ;
185
- } ;
186
+ #[ derive( Debug ) ]
187
+ pub enum RaftConnectionError {
188
+ EmptyRoster ,
189
+ MaxAttemptsReached ( String ) ,
190
+ JoinError ,
191
+ }
186
192
187
- for node in roster {
188
- // skip the current node
189
- if node == config. candidate_id {
190
- continue ;
191
- }
193
+ async fn connect_to_raft_servers (
194
+ config : RaftConfig ,
195
+ ) -> Result <
196
+ HashMap < String , raft:: raft_client:: RaftClient < tonic:: transport:: channel:: Channel > > ,
197
+ RaftConnectionError ,
198
+ > {
199
+ let Some ( roster) = config. roster else {
200
+ return Err ( RaftConnectionError :: EmptyRoster ) ;
201
+ } ;
202
+
203
+ let mut tasks = tokio:: task:: JoinSet :: new ( ) ;
204
+ for node in roster {
205
+ if node == config. candidate_id {
206
+ continue ;
207
+ }
208
+ tasks. spawn ( async move { ( node. clone ( ) , connect_to_raft_server ( node) . await ) } ) ;
209
+ }
192
210
193
- let nameport = node. split ( ':' ) . collect :: < Vec < & str > > ( ) ;
194
- let ip = format ! ( "http://{}:{}" , nameport[ 0 ] , nameport[ 1 ] ) ;
195
- info ! ( "Connecting to {ip}" ) ;
196
-
197
- // try to connect to the node
198
- // if it fails, the node is not up yet
199
- // so we will try again in the next iteration
200
- let raft_clients_clone = raft_clients. clone ( ) ;
201
- tokio:: spawn ( async move {
202
- loop {
203
- let raft_client = raft:: raft_client:: RaftClient :: connect ( ip. clone ( ) ) . await ;
204
- if let Ok ( raft_client) = raft_client {
205
- {
206
- let mut raft_clients = raft_clients_clone. lock ( ) . unwrap ( ) ;
207
- raft_clients. insert ( node, raft_client) ;
208
- }
209
- break ;
210
- }
211
- tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
212
- }
213
- } ) ;
211
+ let mut clients = HashMap :: new ( ) ;
212
+ for ( node, client) in tasks. join_all ( ) . await {
213
+ clients. insert ( node, client?) ;
214
+ }
215
+ Ok ( clients)
216
+ }
217
+
218
+ pub async fn connect_to_raft_server (
219
+ node : String ,
220
+ ) -> Result < raft:: raft_client:: RaftClient < tonic:: transport:: Channel > , RaftConnectionError > {
221
+ // try to connect to the node
222
+ // if it fails, the node is not up yet
223
+ // so we will try again in the next iteration after a backoff period
224
+ let mut attempt = 0 ;
225
+ let max_attempts = 6 ;
226
+ while attempt < max_attempts {
227
+ println ! ( "Connecting to {node}" ) ;
228
+ match raft:: raft_client:: RaftClient :: connect ( format ! ( "http://{node}" ) ) . await {
229
+ Ok ( raft_client) => {
230
+ return Ok ( raft_client) ;
231
+ }
232
+ Err ( e) => println ! ( "Error connecting to {node}: {e}" ) ,
214
233
}
215
- } ) ;
234
+ tokio:: time:: sleep ( Duration :: from_secs ( 2 ^ attempt) ) . await ;
235
+ attempt += 1 ;
236
+ }
237
+ Err ( RaftConnectionError :: MaxAttemptsReached ( node) )
216
238
}
217
239
218
240
/// Start raft gRPC server
0 commit comments