Skip to content

Commit 98a2511

Browse files
committed
Make PoolSv2::start unblocking
Currently the state loop at the end of `PoolSv2::start` is blocking, it is wrapped with `tokio::task` so we can return from the `start` function. This should give us more flexibelty in how we work with `PoolSv2`. For example we can access the `PoolSv2` state directly through the struct.
1 parent 36a4491 commit 98a2511

File tree

2 files changed

+76
-54
lines changed

2 files changed

+76
-54
lines changed

roles/pool/src/lib/mod.rs

+60-53
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,28 @@ impl PoolSv2 {
3030
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
3131
let coinbase_output_result = get_coinbase_output(&config)?;
3232
let coinbase_output_len = coinbase_output_result.len() as u32;
33-
let tp_authority_public_key = config.tp_authority_public_key();
33+
let tp_authority_public_key = config.tp_authority_public_key().cloned();
3434
let coinbase_output_sigops = coinbase_output_result
3535
.iter()
3636
.map(|output| output.script_pubkey.count_sigops() as u16)
3737
.sum::<u16>();
3838

39-
TemplateRx::connect(
40-
config.tp_address().parse().unwrap(),
41-
s_new_t,
42-
s_prev_hash,
43-
r_solution,
44-
r_message_recv_signal,
45-
status::Sender::Upstream(status_tx.clone()),
46-
coinbase_output_len,
47-
coinbase_output_sigops,
48-
tp_authority_public_key.cloned(),
49-
)
50-
.await?;
39+
let tp_address = config.tp_address().clone();
40+
let cloned_status_tx = status_tx.clone();
41+
tokio::spawn(async move {
42+
let _ = TemplateRx::connect(
43+
tp_address.parse().unwrap(),
44+
s_new_t,
45+
s_prev_hash,
46+
r_solution,
47+
r_message_recv_signal,
48+
status::Sender::Upstream(cloned_status_tx),
49+
coinbase_output_len,
50+
coinbase_output_sigops,
51+
tp_authority_public_key,
52+
)
53+
.await;
54+
});
5155
let pool = Pool::start(
5256
config.clone(),
5357
r_new_t,
@@ -60,51 +64,54 @@ impl PoolSv2 {
6064

6165
// Start the error handling loop
6266
// See `./status.rs` and `utils/error_handling` for information on how this operates
63-
loop {
64-
let task_status = select! {
65-
task_status = status_rx.recv() => task_status,
66-
interrupt_signal = tokio::signal::ctrl_c() => {
67-
match interrupt_signal {
68-
Ok(()) => {
69-
info!("Interrupt received");
70-
},
71-
Err(err) => {
72-
error!("Unable to listen for interrupt signal: {}", err);
73-
// we also shut down in case of error
74-
},
67+
tokio::spawn(async move {
68+
loop {
69+
let task_status = select! {
70+
task_status = status_rx.recv() => task_status,
71+
interrupt_signal = tokio::signal::ctrl_c() => {
72+
match interrupt_signal {
73+
Ok(()) => {
74+
info!("Interrupt received");
75+
},
76+
Err(err) => {
77+
error!("Unable to listen for interrupt signal: {}", err);
78+
// we also shut down in case of error
79+
},
80+
}
81+
break;
7582
}
76-
break Ok(());
77-
}
78-
};
79-
let task_status: status::Status = task_status.unwrap();
83+
};
84+
let task_status: status::Status = task_status.unwrap();
8085

81-
match task_status.state {
82-
// Should only be sent by the downstream listener
83-
status::State::DownstreamShutdown(err) => {
84-
error!(
85-
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
86-
err
87-
);
88-
break Ok(());
89-
}
90-
status::State::TemplateProviderShutdown(err) => {
91-
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
92-
break Ok(());
93-
}
94-
status::State::Healthy(msg) => {
95-
info!("HEALTHY message: {}", msg);
96-
}
97-
status::State::DownstreamInstanceDropped(downstream_id) => {
98-
warn!("Dropping downstream instance {} from pool", downstream_id);
99-
if pool
100-
.safe_lock(|p| p.remove_downstream(downstream_id))
101-
.is_err()
102-
{
103-
break Ok(());
86+
match task_status.state {
87+
// Should only be sent by the downstream listener
88+
status::State::DownstreamShutdown(err) => {
89+
error!(
90+
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
91+
err
92+
);
93+
break;
94+
}
95+
status::State::TemplateProviderShutdown(err) => {
96+
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
97+
break;
98+
}
99+
status::State::Healthy(msg) => {
100+
info!("HEALTHY message: {}", msg);
101+
}
102+
status::State::DownstreamInstanceDropped(downstream_id) => {
103+
warn!("Dropping downstream instance {} from pool", downstream_id);
104+
if pool
105+
.safe_lock(|p| p.remove_downstream(downstream_id))
106+
.is_err()
107+
{
108+
break;
109+
}
104110
}
105111
}
106112
}
107-
}
113+
});
114+
Ok(())
108115
}
109116
}
110117

roles/pool/src/main.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
mod lib;
44
use ext_config::{Config, File, FileFormat};
55
pub use lib::{config, status, PoolSv2};
6-
use tracing::error;
6+
use tokio::select;
7+
use tracing::{error, info};
78

89
mod args {
910
use std::path::PathBuf;
@@ -100,4 +101,18 @@ async fn main() {
100101
}
101102
};
102103
let _ = PoolSv2::new(config).start().await;
104+
select! {
105+
interrupt_signal = tokio::signal::ctrl_c() => {
106+
match interrupt_signal {
107+
Ok(()) => {
108+
info!("Pool(bin): Caught interrupt signal. Shutting down...");
109+
return;
110+
},
111+
Err(err) => {
112+
error!("Pool(bin): Unable to listen for interrupt signal: {}", err);
113+
return;
114+
},
115+
}
116+
}
117+
};
103118
}

0 commit comments

Comments
 (0)