Skip to content

Commit fef0cf3

Browse files
committed
Add PoolSv2::shutdown
A simple shutdown function that exits the State loop and shutsdown the Pool server.
1 parent 98a2511 commit fef0cf3

File tree

4 files changed

+143
-58
lines changed

4 files changed

+143
-58
lines changed

roles/pool/src/lib/mining_pool/mod.rs

+60-51
Original file line numberDiff line numberDiff line change
@@ -270,47 +270,58 @@ impl Pool {
270270
async fn accept_incoming_connection(
271271
self_: Arc<Mutex<Pool>>,
272272
config: PoolConfig,
273+
mut recv_stop_signal: tokio::sync::watch::Receiver<()>,
273274
) -> PoolResult<()> {
274275
let status_tx = self_.safe_lock(|s| s.status_tx.clone())?;
275276
let listener = TcpListener::bind(&config.listen_address()).await?;
276-
info!(
277-
"Listening for encrypted connection on: {}",
278-
config.listen_address()
279-
);
280-
while let Ok((stream, _)) = listener.accept().await {
281-
let address = stream.peer_addr().unwrap();
282-
debug!(
283-
"New connection from {:?}",
284-
stream.peer_addr().map_err(PoolError::Io)
285-
);
286-
287-
let responder = Responder::from_authority_kp(
288-
&config.authority_public_key().into_bytes(),
289-
&config.authority_secret_key().into_bytes(),
290-
std::time::Duration::from_secs(config.cert_validity_sec()),
291-
);
292-
match responder {
293-
Ok(resp) => {
294-
if let Ok((receiver, sender, _, _)) =
295-
Connection::new(stream, HandshakeRole::Responder(resp)).await
296-
{
297-
handle_result!(
298-
status_tx,
299-
Self::accept_incoming_connection_(
300-
self_.clone(),
301-
receiver,
302-
sender,
303-
address
304-
)
305-
.await
306-
);
277+
info!("Pool is running on: {}", config.listen_address());
278+
// Run the listener in the background
279+
task::spawn(async move {
280+
loop {
281+
tokio::select! {
282+
_ = recv_stop_signal.changed() => {
283+
info!("Pool is stopping the server after stop shutdown signal received");
284+
break;
285+
},
286+
287+
result = listener.accept() => {
288+
match result {
289+
Ok((stream, _)) => {
290+
let address = stream.peer_addr().unwrap();
291+
info!("New connection from {:?}", stream.peer_addr().map_err(PoolError::Io));
292+
let responder = Responder::from_authority_kp(
293+
&config.authority_public_key().into_bytes(),
294+
&config.authority_secret_key().into_bytes(),
295+
std::time::Duration::from_secs(config.cert_validity_sec()),
296+
);
297+
298+
match responder {
299+
Ok(resp) => {
300+
if let Ok((receiver, sender, _, _)) = Connection::new(stream, HandshakeRole::Responder(resp)).await {
301+
handle_result!(
302+
status_tx,
303+
Self::accept_incoming_connection_(
304+
self_.clone(),
305+
receiver,
306+
sender,
307+
address
308+
).await
309+
);
310+
}
311+
}
312+
Err(_) => {
313+
return;
314+
}
315+
}
316+
}
317+
Err(e) => {
318+
error!("Error accepting connection: {:?}", e);
319+
}
320+
}
307321
}
308322
}
309-
Err(_e) => {
310-
todo!()
311-
}
312323
}
313-
}
324+
});
314325
Ok(())
315326
}
316327

@@ -444,15 +455,17 @@ impl Pool {
444455
Ok(())
445456
}
446457

447-
pub fn start(
458+
#[allow(clippy::too_many_arguments)]
459+
pub async fn start(
448460
config: PoolConfig,
449461
new_template_rx: Receiver<NewTemplate<'static>>,
450462
new_prev_hash_rx: Receiver<SetNewPrevHash<'static>>,
451463
solution_sender: Sender<SubmitSolution<'static>>,
452464
sender_message_received_signal: Sender<()>,
453465
status_tx: status::Sender,
454466
shares_per_minute: f32,
455-
) -> Arc<Mutex<Self>> {
467+
recv_stop_signal: tokio::sync::watch::Receiver<()>,
468+
) -> Result<Arc<Mutex<Self>>, PoolError> {
456469
let extranonce_len = 32;
457470
let range_0 = std::ops::Range { start: 0, end: 0 };
458471
let range_1 = std::ops::Range { start: 0, end: 16 };
@@ -488,24 +501,20 @@ impl Pool {
488501
let cloned2 = pool.clone();
489502
let cloned3 = pool.clone();
490503

491-
info!("Starting up pool listener");
504+
info!("Starting up Pool server");
492505
let status_tx_clone = status_tx.clone();
493-
task::spawn(async move {
494-
if let Err(e) = Self::accept_incoming_connection(cloned, config).await {
495-
error!("{}", e);
496-
}
497-
if status_tx_clone
506+
if let Err(e) = Self::accept_incoming_connection(cloned, config, recv_stop_signal).await {
507+
error!("Pool stopped accepting connections due to: {}", &e);
508+
let _ = status_tx_clone
498509
.send(status::Status {
499510
state: status::State::DownstreamShutdown(PoolError::ComponentShutdown(
500-
"Downstream no longer accepting incoming connections".to_string(),
511+
"Pool stopped accepting connections".to_string(),
501512
)),
502513
})
503-
.await
504-
.is_err()
505-
{
506-
error!("Downstream shutdown and Status Channel dropped");
507-
}
508-
});
514+
.await;
515+
516+
return Err(e);
517+
}
509518

510519
let cloned = sender_message_received_signal.clone();
511520
let status_tx_clone = status_tx.clone();
@@ -547,7 +556,7 @@ impl Pool {
547556
error!("Downstream shutdown and Status Channel dropped");
548557
}
549558
});
550-
cloned3
559+
Ok(cloned3)
551560
}
552561

553562
/// This removes the downstream from the list of downstreams

roles/pool/src/lib/mod.rs

+81-3
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,38 @@ use async_channel::{bounded, unbounded};
77
use config::PoolConfig;
88
use error::PoolError;
99
use mining_pool::{get_coinbase_output, Pool};
10+
use std::sync::{Arc, Mutex};
1011
use template_receiver::TemplateRx;
1112
use tokio::select;
1213
use tracing::{error, info, warn};
1314

1415
#[derive(Debug, Clone)]
1516
pub struct PoolSv2 {
1617
config: PoolConfig,
18+
status_tx: Arc<Mutex<Option<async_channel::Sender<status::Status>>>>,
1719
}
1820

1921
impl PoolSv2 {
2022
pub fn new(config: PoolConfig) -> PoolSv2 {
21-
PoolSv2 { config }
23+
PoolSv2 {
24+
config,
25+
status_tx: Arc::new(Mutex::new(None)),
26+
}
2227
}
2328

2429
pub async fn start(&self) -> Result<(), PoolError> {
2530
let config = self.config.clone();
2631
let (status_tx, status_rx) = unbounded();
32+
33+
if let Ok(mut s_tx) = self.status_tx.lock() {
34+
*s_tx = Some(status_tx.clone());
35+
} else {
36+
error!("Failed to access Pool status lock");
37+
return Err(PoolError::Custom(
38+
"Failed to access Pool status lock".to_string(),
39+
));
40+
}
41+
let (send_stop_signal, recv_stop_signal) = tokio::sync::watch::channel(());
2742
let (s_new_t, r_new_t) = bounded(10);
2843
let (s_prev_hash, r_prev_hash) = bounded(10);
2944
let (s_solution, r_solution) = bounded(10);
@@ -60,8 +75,9 @@ impl PoolSv2 {
6075
s_message_recv_signal,
6176
status::Sender::DownstreamListener(status_tx),
6277
config.shares_per_minute(),
63-
);
64-
78+
recv_stop_signal,
79+
)
80+
.await?;
6581
// Start the error handling loop
6682
// See `./status.rs` and `utils/error_handling` for information on how this operates
6783
tokio::spawn(async move {
@@ -84,16 +100,23 @@ impl PoolSv2 {
84100
let task_status: status::Status = task_status.unwrap();
85101

86102
match task_status.state {
103+
status::State::Shutdown => {
104+
info!("Received shutdown signal");
105+
let _ = send_stop_signal.send(());
106+
break;
107+
}
87108
// Should only be sent by the downstream listener
88109
status::State::DownstreamShutdown(err) => {
89110
error!(
90111
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
91112
err
92113
);
114+
let _ = send_stop_signal.send(());
93115
break;
94116
}
95117
status::State::TemplateProviderShutdown(err) => {
96118
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
119+
let _ = send_stop_signal.send(());
97120
break;
98121
}
99122
status::State::Healthy(msg) => {
@@ -105,6 +128,7 @@ impl PoolSv2 {
105128
.safe_lock(|p| p.remove_downstream(downstream_id))
106129
.is_err()
107130
{
131+
let _ = send_stop_signal.send(());
108132
break;
109133
}
110134
}
@@ -113,6 +137,31 @@ impl PoolSv2 {
113137
});
114138
Ok(())
115139
}
140+
141+
pub fn shutdown(&self) {
142+
info!("Attempting to shutdown pool");
143+
if let Ok(status_tx) = &self.status_tx.lock() {
144+
if let Some(status_tx) = status_tx.as_ref().cloned() {
145+
info!("Pool is running, sending shutdown signal");
146+
tokio::spawn(async move {
147+
if let Err(e) = status_tx
148+
.send(status::Status {
149+
state: status::State::Shutdown,
150+
})
151+
.await
152+
{
153+
error!("Failed to send shutdown signal to status loop: {:?}", e);
154+
} else {
155+
info!("Sent shutdown signal to Pool");
156+
}
157+
});
158+
} else {
159+
info!("Pool is not running.");
160+
}
161+
} else {
162+
error!("Failed to access Pool status lock");
163+
}
164+
}
116165
}
117166

118167
#[cfg(test)]
@@ -148,4 +197,33 @@ mod tests {
148197
let result = pool.start().await;
149198
assert!(result.is_err());
150199
}
200+
201+
#[tokio::test]
202+
async fn shutdown_pool() {
203+
let config_path = "config-examples/pool-config-hosted-tp-example.toml";
204+
let config: PoolConfig = match Config::builder()
205+
.add_source(File::new(config_path, FileFormat::Toml))
206+
.build()
207+
{
208+
Ok(settings) => match settings.try_deserialize::<PoolConfig>() {
209+
Ok(c) => c,
210+
Err(e) => {
211+
error!("Failed to deserialize config: {}", e);
212+
return;
213+
}
214+
},
215+
Err(e) => {
216+
error!("Failed to build config: {}", e);
217+
return;
218+
}
219+
};
220+
let pool_0 = PoolSv2::new(config.clone());
221+
let pool_1 = PoolSv2::new(config);
222+
assert!(pool_0.start().await.is_ok());
223+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
224+
assert!(pool_1.start().await.is_err());
225+
pool_0.shutdown();
226+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
227+
assert!(pool_1.start().await.is_ok());
228+
}
151229
}

roles/pool/src/lib/status.rs

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ pub enum State {
4848
TemplateProviderShutdown(PoolError),
4949
DownstreamInstanceDropped(u32),
5050
Healthy(String),
51+
Shutdown,
5152
}
5253

5354
/// message to be sent to the status loop on the main thread

test/integration-tests/lib/mod.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,7 @@ pub async fn start_pool(template_provider_address: Option<SocketAddr>) -> (PoolS
8989
SHARES_PER_MINUTE,
9090
);
9191
let pool = PoolSv2::new(config);
92-
let pool_clone = pool.clone();
93-
tokio::task::spawn(async move {
94-
assert!(pool_clone.start().await.is_ok());
95-
});
92+
assert!(pool.start().await.is_ok());
9693
// Wait a bit to let the pool exchange initial messages with the TP
9794
sleep(1).await;
9895
(pool, listening_address)

0 commit comments

Comments
 (0)