Skip to content

Commit ac0f902

Browse files
committed
Add PoolSv2::shutdown
A simple shutdown function that exits the State loop and shutsdown the Pool server.
1 parent 196d8c4 commit ac0f902

File tree

3 files changed

+143
-59
lines changed

3 files changed

+143
-59
lines changed

roles/pool/src/lib/mining_pool/mod.rs

+65-52
Original file line numberDiff line numberDiff line change
@@ -270,47 +270,63 @@ impl Pool {
270270
async fn accept_incoming_connection(
271271
self_: Arc<Mutex<Pool>>,
272272
config: PoolConfig,
273+
mut recv_stop_signal: tokio::sync::watch::Receiver<()>,
273274
) -> PoolResult<()> {
274275
let status_tx = self_.safe_lock(|s| s.status_tx.clone())?;
275-
let listener = TcpListener::bind(&config.listen_address()).await?;
276-
info!(
277-
"Listening for encrypted connection on: {}",
278-
config.listen_address()
279-
);
280-
while let Ok((stream, _)) = listener.accept().await {
281-
let address = stream.peer_addr().unwrap();
282-
debug!(
283-
"New connection from {:?}",
284-
stream.peer_addr().map_err(PoolError::Io)
285-
);
286-
287-
let responder = Responder::from_authority_kp(
288-
&config.authority_public_key().into_bytes(),
289-
&config.authority_secret_key().into_bytes(),
290-
std::time::Duration::from_secs(config.cert_validity_sec()),
291-
);
292-
match responder {
293-
Ok(resp) => {
294-
if let Ok((receiver, sender, _, _)) =
295-
Connection::new(stream, HandshakeRole::Responder(resp)).await
296-
{
297-
handle_result!(
298-
status_tx,
299-
Self::accept_incoming_connection_(
300-
self_.clone(),
301-
receiver,
302-
sender,
303-
address
304-
)
305-
.await
306-
);
276+
let listener = match TcpListener::bind(&config.listen_address()).await {
277+
Ok(listener) => listener,
278+
Err(e) => {
279+
return Err(PoolError::Io(e));
280+
}
281+
};
282+
info!("Pool is running on: {}", config.listen_address());
283+
// Run the listener in the background
284+
task::spawn(async move {
285+
loop {
286+
tokio::select! {
287+
_ = recv_stop_signal.changed() => {
288+
dbg!("Pool is stopping listener after stop signal received");
289+
break;
290+
},
291+
292+
result = listener.accept() => {
293+
match result {
294+
Ok((stream, _)) => {
295+
let address = stream.peer_addr().unwrap();
296+
info!("New connection from {:?}", stream.peer_addr().map_err(PoolError::Io));
297+
let responder = Responder::from_authority_kp(
298+
&config.authority_public_key().into_bytes(),
299+
&config.authority_secret_key().into_bytes(),
300+
std::time::Duration::from_secs(config.cert_validity_sec()),
301+
);
302+
303+
match responder {
304+
Ok(resp) => {
305+
if let Ok((receiver, sender, _, _)) = Connection::new(stream, HandshakeRole::Responder(resp)).await {
306+
handle_result!(
307+
status_tx,
308+
Self::accept_incoming_connection_(
309+
self_.clone(),
310+
receiver,
311+
sender,
312+
address
313+
).await
314+
);
315+
}
316+
}
317+
Err(_) => {
318+
return;
319+
}
320+
}
321+
}
322+
Err(e) => {
323+
error!("Error accepting connection: {:?}", e);
324+
}
325+
}
307326
}
308327
}
309-
Err(_e) => {
310-
todo!()
311-
}
312328
}
313-
}
329+
});
314330
Ok(())
315331
}
316332

@@ -444,15 +460,16 @@ impl Pool {
444460
Ok(())
445461
}
446462

447-
pub fn start(
463+
pub async fn start(
448464
config: PoolConfig,
449465
new_template_rx: Receiver<NewTemplate<'static>>,
450466
new_prev_hash_rx: Receiver<SetNewPrevHash<'static>>,
451467
solution_sender: Sender<SubmitSolution<'static>>,
452468
sender_message_received_signal: Sender<()>,
453469
status_tx: status::Sender,
454470
shares_per_minute: f32,
455-
) -> Arc<Mutex<Self>> {
471+
recv_stop_signal: tokio::sync::watch::Receiver<()>,
472+
) -> Result<Arc<Mutex<Self>>, PoolError> {
456473
let extranonce_len = 32;
457474
let range_0 = std::ops::Range { start: 0, end: 0 };
458475
let range_1 = std::ops::Range { start: 0, end: 16 };
@@ -488,24 +505,20 @@ impl Pool {
488505
let cloned2 = pool.clone();
489506
let cloned3 = pool.clone();
490507

491-
info!("Starting up pool listener");
508+
info!("Starting up Pool server");
492509
let status_tx_clone = status_tx.clone();
493-
task::spawn(async move {
494-
if let Err(e) = Self::accept_incoming_connection(cloned, config).await {
495-
error!("{}", e);
496-
}
497-
if status_tx_clone
510+
if let Err(e) = Self::accept_incoming_connection(cloned, config, recv_stop_signal).await {
511+
error!("Pool stopped accepting connections due to: {}", &e);
512+
let _ = status_tx_clone
498513
.send(status::Status {
499514
state: status::State::DownstreamShutdown(PoolError::ComponentShutdown(
500-
"Downstream no longer accepting incoming connections".to_string(),
515+
"Pool stopped accepting connections".to_string(),
501516
)),
502517
})
503-
.await
504-
.is_err()
505-
{
506-
error!("Downstream shutdown and Status Channel dropped");
507-
}
508-
});
518+
.await;
519+
520+
return Err(e);
521+
}
509522

510523
let cloned = sender_message_received_signal.clone();
511524
let status_tx_clone = status_tx.clone();
@@ -547,7 +560,7 @@ impl Pool {
547560
error!("Downstream shutdown and Status Channel dropped");
548561
}
549562
});
550-
cloned3
563+
Ok(cloned3)
551564
}
552565

553566
/// This removes the downstream from the list of downstreams

roles/pool/src/lib/mod.rs

+77-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use async_channel::{bounded, unbounded};
99
use config::PoolConfig;
1010
use error::PoolError;
1111
use mining_pool::{get_coinbase_output, Pool};
12+
use std::sync::{Arc, Mutex};
1213
use template_receiver::TemplateRx;
1314
use tracing::{error, info, warn};
1415

@@ -17,16 +18,30 @@ use tokio::select;
1718
#[derive(Debug, Clone)]
1819
pub struct PoolSv2 {
1920
config: PoolConfig,
21+
status_tx: Arc<Mutex<Option<async_channel::Sender<status::Status>>>>,
2022
}
2123

2224
impl PoolSv2 {
2325
pub fn new(config: PoolConfig) -> PoolSv2 {
24-
PoolSv2 { config }
26+
PoolSv2 {
27+
config,
28+
status_tx: Arc::new(Mutex::new(None)),
29+
}
2530
}
2631

2732
pub async fn start(&self) -> Result<(), PoolError> {
2833
let config = self.config.clone();
2934
let (status_tx, status_rx) = unbounded();
35+
36+
if let Ok(mut s_tx) = self.status_tx.lock() {
37+
*s_tx = Some(status_tx.clone());
38+
} else {
39+
error!("Failed to access Pool status lock");
40+
return Err(PoolError::Custom(
41+
"Failed to access Pool status lock".to_string(),
42+
));
43+
}
44+
let (send_stop_signal, recv_stop_signal) = tokio::sync::watch::channel(());
3045
let (s_new_t, r_new_t) = bounded(10);
3146
let (s_prev_hash, r_prev_hash) = bounded(10);
3247
let (s_solution, r_solution) = bounded(10);
@@ -53,8 +68,9 @@ impl PoolSv2 {
5368
s_message_recv_signal,
5469
status::Sender::DownstreamListener(status_tx),
5570
config.shares_per_minute(),
56-
);
57-
71+
recv_stop_signal,
72+
)
73+
.await?;
5874
// Start the error handling loop
5975
// See `./status.rs` and `utils/error_handling` for information on how this operates
6076
tokio::spawn(async move {
@@ -83,10 +99,12 @@ impl PoolSv2 {
8399
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
84100
err
85101
);
102+
let _ = send_stop_signal.send(());
86103
break;
87104
}
88105
status::State::TemplateProviderShutdown(err) => {
89106
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
107+
let _ = send_stop_signal.send(());
90108
break;
91109
}
92110
status::State::Healthy(msg) => {
@@ -98,6 +116,7 @@ impl PoolSv2 {
98116
.safe_lock(|p| p.remove_downstream(downstream_id))
99117
.is_err()
100118
{
119+
let _ = send_stop_signal.send(());
101120
break;
102121
}
103122
}
@@ -106,6 +125,33 @@ impl PoolSv2 {
106125
});
107126
Ok(())
108127
}
128+
129+
pub fn shutdown(&self) {
130+
info!("Attempting to shutdown pool");
131+
if let Ok(status_tx) = &self.status_tx.lock() {
132+
if let Some(status_tx) = status_tx.as_ref().cloned() {
133+
info!("Pool is running, sending shutdown signal");
134+
tokio::spawn(async move {
135+
if let Err(e) = status_tx
136+
.send(status::Status {
137+
state: status::State::DownstreamShutdown(PoolError::Custom(
138+
"Shutdown".to_string(),
139+
)),
140+
})
141+
.await
142+
{
143+
error!("Failed to send shutdown signal to status loop: {:?}", e);
144+
} else {
145+
info!("Sent shutdown signal to Pool");
146+
}
147+
});
148+
} else {
149+
info!("Pool is not running.");
150+
}
151+
} else {
152+
error!("Failed to access Pool status lock");
153+
}
154+
}
109155
}
110156

111157
#[cfg(test)]
@@ -141,4 +187,32 @@ mod tests {
141187
let result = pool.start().await;
142188
assert!(result.is_err());
143189
}
190+
191+
#[tokio::test]
192+
async fn shutdown_pool() {
193+
let config_path = "config-examples/pool-config-hosted-tp-example.toml";
194+
let config: PoolConfig = match Config::builder()
195+
.add_source(File::new(config_path, FileFormat::Toml))
196+
.build()
197+
{
198+
Ok(settings) => match settings.try_deserialize::<PoolConfig>() {
199+
Ok(c) => c,
200+
Err(e) => {
201+
error!("Failed to deserialize config: {}", e);
202+
return;
203+
}
204+
},
205+
Err(e) => {
206+
error!("Failed to build config: {}", e);
207+
return;
208+
}
209+
};
210+
let pool_0 = PoolSv2::new(config.clone());
211+
let pool_1 = PoolSv2::new(config);
212+
assert!(pool_0.start().await.is_ok());
213+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
214+
assert!(pool_1.start().await.is_err());
215+
pool_0.shutdown();
216+
assert!(pool_1.start().await.is_ok());
217+
}
144218
}

roles/tests-integration/lib/mod.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,7 @@ pub async fn start_pool(template_provider_address: Option<SocketAddr>) -> (PoolS
8989
SHARES_PER_MINUTE,
9090
);
9191
let pool = PoolSv2::new(config);
92-
let pool_clone = pool.clone();
93-
tokio::task::spawn(async move {
94-
assert!(pool_clone.start().await.is_ok());
95-
});
92+
assert!(pool.start().await.is_ok());
9693
// Wait a bit to let the pool exchange initial messages with the TP
9794
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
9895
(pool, listening_address)

0 commit comments

Comments
 (0)