Skip to content

Commit 282aa52

Browse files
committed
fix sigterm issue with translator sv2
1 parent 7483090 commit 282aa52

File tree

1 file changed

+62
-53
lines changed
  • roles/translator/src/lib

1 file changed

+62
-53
lines changed

roles/translator/src/lib/mod.rs

+62-53
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use std::{
1010
};
1111

1212
use tokio::{
13-
sync::broadcast,
13+
select,
14+
sync::{broadcast, Notify},
1415
task::{self, AbortHandle},
1516
};
1617
use tracing::{debug, error, info, warn};
@@ -32,6 +33,7 @@ pub mod utils;
3233
pub struct TranslatorSv2 {
3334
config: ProxyConfig,
3435
reconnect_wait_time: u64,
36+
shutdown: Arc<Notify>,
3537
}
3638

3739
impl TranslatorSv2 {
@@ -41,6 +43,7 @@ impl TranslatorSv2 {
4143
Self {
4244
config,
4345
reconnect_wait_time: wait_time,
46+
shutdown: Arc::new(Notify::new()),
4447
}
4548
}
4649

@@ -58,7 +61,8 @@ impl TranslatorSv2 {
5861
let task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>> =
5962
Arc::new(Mutex::new(Vec::new()));
6063

61-
self.internal_start(
64+
Self::internal_start(
65+
self.config.clone(),
6266
tx_sv1_notify.clone(),
6367
target.clone(),
6468
tx_status.clone(),
@@ -72,74 +76,79 @@ impl TranslatorSv2 {
7276
debug!("Starting up status listener");
7377
let wait_time = self.reconnect_wait_time;
7478
// Check all tasks if is_finished() is true, if so exit
75-
loop {
76-
let task_status = tokio::select! {
77-
task_status = rx_status.recv().fuse() => task_status,
78-
interrupt_signal = tokio::signal::ctrl_c().fuse() => {
79-
match interrupt_signal {
80-
Ok(()) => {
81-
info!("Interrupt received");
82-
},
83-
Err(err) => {
84-
error!("Unable to listen for interrupt signal: {}", err);
85-
// we also shut down in case of error
86-
},
87-
}
88-
break;
89-
}
90-
};
91-
let task_status: Status = task_status.unwrap();
9279

93-
match task_status.state {
94-
// Should only be sent by the downstream listener
95-
State::DownstreamShutdown(err) => {
96-
error!("SHUTDOWN from: {}", err);
97-
break;
98-
}
99-
State::BridgeShutdown(err) => {
100-
error!("SHUTDOWN from: {}", err);
101-
break;
80+
tokio::spawn({
81+
let shutdown_signal = self.shutdown.clone();
82+
async move {
83+
if tokio::signal::ctrl_c().await.is_ok() {
84+
info!("Interrupt received");
85+
shutdown_signal.notify_one();
10286
}
103-
State::UpstreamShutdown(err) => {
104-
error!("SHUTDOWN from: {}", err);
105-
break;
106-
}
107-
State::UpstreamTryReconnect(err) => {
108-
error!("Trying to reconnect the Upstream because of: {}", err);
87+
}
88+
});
10989

110-
// wait a random amount of time between 0 and 3000ms
111-
// if all the downstreams try to reconnect at the same time, the upstream may
112-
// fail
113-
tokio::time::sleep(std::time::Duration::from_millis(wait_time)).await;
90+
loop {
91+
select! {
92+
task_status = rx_status.recv().fuse() => {
93+
if let Ok(task_status_) = task_status {
94+
match task_status_.state {
95+
State::DownstreamShutdown(err) | State::BridgeShutdown(err) | State::UpstreamShutdown(err) => {
96+
error!("SHUTDOWN from: {}", err);
97+
self.shutdown.notify_one();
98+
}
99+
State::UpstreamTryReconnect(err) => {
100+
error!("Trying to reconnect the Upstream because of: {}", err);
101+
let task_collector1 = task_collector_.clone();
102+
let tx_sv1_notify1 = tx_sv1_notify.clone();
103+
let target = target.clone();
104+
let tx_status = tx_status.clone();
105+
let proxy_config = self.config.clone();
106+
tokio::spawn (async move {
107+
// wait a random amount of time between 0 and 3000ms
108+
// if all the downstreams try to reconnect at the same time, the upstream may
109+
// fail
110+
tokio::time::sleep(std::time::Duration::from_millis(wait_time)).await;
114111

115-
// kill al the tasks
116-
let task_collector_aborting = task_collector_.clone();
117-
kill_tasks(task_collector_aborting.clone());
112+
// kill al the tasks
113+
let task_collector_aborting = task_collector1.clone();
114+
kill_tasks(task_collector_aborting.clone());
118115

119-
warn!("Trying reconnecting to upstream");
120-
self.internal_start(
121-
tx_sv1_notify.clone(),
122-
target.clone(),
123-
tx_status.clone(),
124-
task_collector_.clone(),
125-
)
126-
.await;
116+
warn!("Trying reconnecting to upstream");
117+
Self::internal_start(
118+
proxy_config,
119+
tx_sv1_notify1,
120+
target.clone(),
121+
tx_status.clone(),
122+
task_collector1,
123+
)
124+
.await;
125+
});
126+
}
127+
State::Healthy(msg) => {
128+
info!("HEALTHY message: {}", msg);
129+
self.shutdown.notify_one();
130+
}
131+
}
132+
} else {
133+
info!("Channel closed");
134+
break; // Channel closed
135+
}
127136
}
128-
State::Healthy(msg) => {
129-
info!("HEALTHY message: {}", msg);
137+
_ = self.shutdown.notified() => {
138+
info!("Shutting down gracefully...");
139+
break;
130140
}
131141
}
132142
}
133143
}
134144

135145
async fn internal_start(
136-
&self,
146+
proxy_config: ProxyConfig,
137147
tx_sv1_notify: broadcast::Sender<server_to_client::Notify<'static>>,
138148
target: Arc<Mutex<Vec<u8>>>,
139149
tx_status: async_channel::Sender<Status<'static>>,
140150
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
141151
) {
142-
let proxy_config = self.config.clone();
143152
// Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream`
144153
// (Sender<SubmitSharesExtended<'static>>, Receiver<SubmitSharesExtended<'static>>)
145154
let (tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded(10);

0 commit comments

Comments
 (0)