Skip to content

Add shutdown API to JDS along with task handler #1376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 100 additions & 20 deletions roles/jd-server/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
use mempool::error::JdsMempoolError;
use roles_logic_sv2::utils::Mutex;
use std::{ops::Sub, sync::Arc};
use tokio::{select, task};
use tokio::{
select,
sync::Notify,
task::{self, JoinHandle},
};
use tracing::{error, info, warn};

use codec_sv2::{StandardEitherFrame, StandardSv2Frame};
Expand All @@ -31,11 +35,44 @@
#[derive(Debug, Clone)]
pub struct JobDeclaratorServer {
config: Configuration,
shutdown: Arc<Notify>,
}

struct TaskHandler {
handles: Vec<JoinHandle<()>>,
}

impl TaskHandler {
fn new() -> Self {

Check warning on line 46 in roles/jd-server/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mod.rs#L46

Added line #L46 was not covered by tests
TaskHandler {
handles: Vec::new(),
}
}

fn add(&mut self, handle: JoinHandle<()>) {

Check warning on line 52 in roles/jd-server/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mod.rs#L52

Added line #L52 was not covered by tests
self.handles.push(handle);
}

fn abort_all(&mut self) {
for handle in self.handles.drain(..) {

Check warning on line 57 in roles/jd-server/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mod.rs#L56-L57

Added lines #L56 - L57 were not covered by tests
handle.abort();
}
}
}

impl Drop for TaskHandler {
fn drop(&mut self) {
info!("Aborting all child task.");
self.abort_all();

Check warning on line 66 in roles/jd-server/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mod.rs#L64-L66

Added lines #L64 - L66 were not covered by tests
}
}

impl JobDeclaratorServer {
pub fn new(config: Configuration) -> Self {
Self { config }
Self {
config,
shutdown: Arc::new(Notify::new()),
}
}
pub async fn start(&self) {
let config = self.config.clone();
Expand All @@ -58,13 +95,25 @@
let mut last_empty_mempool_warning =
std::time::Instant::now().sub(std::time::Duration::from_secs(60));

let mut task_handler = TaskHandler::new();

tokio::spawn({
let shutdown_signal = self.shutdown.clone();
async move {
if tokio::signal::ctrl_c().await.is_ok() {
info!("Interrupt received");
shutdown_signal.notify_one();

Check warning on line 105 in roles/jd-server/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mod.rs#L100-L105

Added lines #L100 - L105 were not covered by tests
}
}
});

// TODO if the jd-server is launched with core_rpc_url empty, the following flow is never
// taken. Consequentally new_block_receiver in JDsMempool::on_submit is never read, possibly
// reaching the channel bound. The new_block_sender is given as input to
// JobDeclarator::start()
if url.contains("http") {
let sender_update_mempool = sender.clone();
task::spawn(async move {
task_handler.add(task::spawn(async move {

Check warning on line 116 in roles/jd-server/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mod.rs#L116

Added line #L116 was not covered by tests
loop {
let update_mempool_result: Result<(), mempool::error::JdsMempoolError> =
mempool::JDsMempool::update_mempool(mempool_cloned_.clone()).await;
Expand Down Expand Up @@ -96,11 +145,11 @@
//let _transactions =
// mempool::JDsMempool::_get_transaction_list(mempool_cloned_.clone());
}
});
}));

let mempool_cloned = mempool.clone();
let sender_submit_solution = sender.clone();
task::spawn(async move {
task_handler.add(task::spawn(async move {

Check warning on line 152 in roles/jd-server/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mod.rs#L152

Added line #L152 was not covered by tests
loop {
let result = mempool::JDsMempool::on_submit(mempool_cloned.clone()).await;
if let Err(err) = result {
Expand All @@ -120,13 +169,13 @@
}
}
}
});
}));
};

let cloned = config.clone();
let mempool_cloned = mempool.clone();
let (sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded();
task::spawn(async move {
task_handler.add(task::spawn(async move {

Check warning on line 178 in roles/jd-server/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mod.rs#L178

Added line #L178 was not covered by tests
JobDeclarator::start(
cloned,
sender,
Expand All @@ -135,8 +184,8 @@
sender_add_txs_to_mempool,
)
.await
});
task::spawn(async move {
}));
task_handler.add(task::spawn(async move {

Check warning on line 188 in roles/jd-server/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mod.rs#L188

Added line #L188 was not covered by tests
loop {
if let Ok(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await {
let mempool_cloned = mempool.clone();
Expand All @@ -157,23 +206,15 @@
});
}
}
});
}));

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
interrupt_signal = tokio::signal::ctrl_c() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
_ = self.shutdown.notified() => {
info!("Shutting down gracefully...");

Check warning on line 217 in roles/jd-server/src/lib/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mod.rs#L216-L217

Added lines #L216 - L217 were not covered by tests
break;
}
};
Expand All @@ -200,6 +241,16 @@
}
}
}

/// Notifies the JD-Server to shut down gracefully.
///
/// This method triggers the shutdown process by sending a notification.
/// It ensures that any ongoing operations are properly handled before
/// the jd-server stops functioning.
#[allow(dead_code)]
pub fn shutdown(&self) {
self.shutdown.notify_one();
}
}

pub fn get_coinbase_output(config: &Configuration) -> Result<Vec<TxOut>, Error> {
Expand Down Expand Up @@ -418,4 +469,33 @@
let result: Result<CoinbaseOutput_, _> = (&input).try_into();
assert!(matches!(result, Err(Error::UnknownOutputScriptType)));
}

#[tokio::test]
async fn test_shutdown() {
let config_path = "config-examples/jds-config-local-example.toml";
let config: Configuration = match Config::builder()
.add_source(File::new(config_path, FileFormat::Toml))
.build()
{
Ok(settings) => match settings.try_deserialize::<Configuration>() {
Ok(c) => c,
Err(e) => {
error!("Failed to deserialize config: {}", e);
return;
}
},
Err(e) => {
error!("Failed to build config: {}", e);
return;
}
};
let jds = JobDeclaratorServer::new(config.clone());
let cloned = jds.clone();
tokio::spawn(async move {
cloned.start().await;
});
jds.shutdown();
let jds_addr = config.listen_jd_address.clone();
assert!(std::net::TcpListener::bind(jds_addr).is_ok());
}
}
Loading