Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sigterm issue with translator sv2 #1319

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 68 additions & 55 deletions roles/translator/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::{
};

use tokio::{
sync::broadcast,
select,
sync::{broadcast, Notify},
task::{self, AbortHandle},
};
use tracing::{debug, error, info, warn};
Expand All @@ -32,6 +33,7 @@ pub mod utils;
pub struct TranslatorSv2 {
config: ProxyConfig,
reconnect_wait_time: u64,
shutdown: Arc<Notify>,
}

impl TranslatorSv2 {
Expand All @@ -41,6 +43,7 @@ impl TranslatorSv2 {
Self {
config,
reconnect_wait_time: wait_time,
shutdown: Arc::new(Notify::new()),
}
}

Expand All @@ -58,7 +61,8 @@ impl TranslatorSv2 {
let task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>> =
Arc::new(Mutex::new(Vec::new()));

self.internal_start(
Self::internal_start(
self.config.clone(),
tx_sv1_notify.clone(),
target.clone(),
tx_status.clone(),
Expand All @@ -72,74 +76,79 @@ impl TranslatorSv2 {
debug!("Starting up status listener");
let wait_time = self.reconnect_wait_time;
// Check all tasks if is_finished() is true, if so exit
loop {
let task_status = tokio::select! {
task_status = rx_status.recv().fuse() => task_status,
interrupt_signal = tokio::signal::ctrl_c().fuse() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
break;
tokio::spawn({
let shutdown_signal = self.shutdown();
async move {
if tokio::signal::ctrl_c().await.is_ok() {
info!("Interrupt received");
shutdown_signal.notify_one();
}
State::BridgeShutdown(err) => {
error!("SHUTDOWN from: {}", err);
break;
}
});

loop {
select! {
task_status = rx_status.recv().fuse() => {
if let Ok(task_status_) = task_status {
match task_status_.state {
State::DownstreamShutdown(err) | State::BridgeShutdown(err) | State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
self.shutdown().notify_one();
}
State::UpstreamTryReconnect(err) => {
error!("Trying to reconnect the Upstream because of: {}", err);
let task_collector1 = task_collector_.clone();
let tx_sv1_notify1 = tx_sv1_notify.clone();
let target = target.clone();
let tx_status = tx_status.clone();
let proxy_config = self.config.clone();
tokio::spawn (async move {
// wait a random amount of time between 0 and 3000ms
// if all the downstreams try to reconnect at the same time, the upstream may
// fail
tokio::time::sleep(std::time::Duration::from_millis(wait_time)).await;

// kill al the tasks
let task_collector_aborting = task_collector1.clone();
kill_tasks(task_collector_aborting.clone());

warn!("Trying reconnecting to upstream");
Self::internal_start(
proxy_config,
tx_sv1_notify1,
target.clone(),
tx_status.clone(),
task_collector1,
)
.await;
});
}
State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
self.shutdown().notify_one();
}
}
} else {
info!("Channel closed");
break; // Channel closed
}
}
State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
_ = self.shutdown.notified() => {
info!("Shutting down gracefully...");
break;
}
State::UpstreamTryReconnect(err) => {
error!("Trying to reconnect the Upstream because of: {}", err);

// wait a random amount of time between 0 and 3000ms
// if all the downstreams try to reconnect at the same time, the upstream may
// fail
tokio::time::sleep(std::time::Duration::from_millis(wait_time)).await;

// kill al the tasks
let task_collector_aborting = task_collector_.clone();
kill_tasks(task_collector_aborting.clone());

warn!("Trying reconnecting to upstream");
self.internal_start(
tx_sv1_notify.clone(),
target.clone(),
tx_status.clone(),
task_collector_.clone(),
)
.await;
}
State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
}
}
}

async fn internal_start(
&self,
proxy_config: ProxyConfig,
tx_sv1_notify: broadcast::Sender<server_to_client::Notify<'static>>,
target: Arc<Mutex<Vec<u8>>>,
tx_status: async_channel::Sender<Status<'static>>,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) {
let proxy_config = self.config.clone();
// Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream`
// (Sender<SubmitSharesExtended<'static>>, Receiver<SubmitSharesExtended<'static>>)
let (tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded(10);
Expand Down Expand Up @@ -278,6 +287,10 @@ impl TranslatorSv2 {
let _ =
task_collector.safe_lock(|t| t.push((task.abort_handle(), "init task".to_string())));
}

pub fn shutdown(&self) -> Arc<Notify> {
self.shutdown.clone()
}
}

fn kill_tasks(task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>) {
Expand Down
Loading