Skip to content

Commit 079e823

Browse files
authored
Merge pull request #1556 from Shourya742/2025-03-11-refactor-jdc-connection-ordering
Let JDC continue listening to the downstream port but serve only a single client.
2 parents b806682 + a34a75a commit 079e823

File tree

2 files changed

+138
-138
lines changed

2 files changed

+138
-138
lines changed

roles/jd-client/src/lib/downstream.rs

+112-72
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use super::{config::JobDeclaratorClientConfig, template_receiver::TemplateRx, PoolChangerTrigger};
2+
13
use super::{
24
job_declarator::JobDeclarator,
35
status::{self, State},
46
upstream_sv2::Upstream as UpstreamMiningNode,
57
};
6-
use async_channel::{Receiver, SendError, Sender};
8+
use async_channel::{bounded, Receiver, SendError, Sender};
79
use roles_logic_sv2::{
810
channel_logic::channel_factory::{OnNewShare, PoolChannelFactory, Share},
911
common_messages_sv2::{SetupConnection, SetupConnectionSuccess},
@@ -19,6 +21,7 @@ use roles_logic_sv2::{
1921
template_distribution_sv2::{NewTemplate, SubmitSolution},
2022
utils::Mutex,
2123
};
24+
use tokio::sync::Notify;
2225
use tracing::{debug, error, info, warn};
2326

2427
use codec_sv2::{HandshakeRole, Responder, StandardEitherFrame, StandardSv2Frame};
@@ -149,7 +152,7 @@ impl DownstreamMiningNodeStatus {
149152
}
150153

151154
use core::convert::TryInto;
152-
use std::sync::Arc;
155+
use std::{net::IpAddr, str::FromStr, sync::Arc};
153156

154157
impl DownstreamMiningNode {
155158
#[allow(clippy::too_many_arguments)]
@@ -666,94 +669,131 @@ use tokio::{
666669
time::{timeout, Duration},
667670
};
668671

669-
/// Strat listen for downstream mining node. Return as soon as one downstream connect.
672+
/// Start listen for downstream mining node. Return as soon as one downstream connect.
670673
#[allow(clippy::too_many_arguments)]
671674
pub async fn listen_for_downstream_mining(
672675
address: SocketAddr,
673676
upstream: Option<Arc<Mutex<UpstreamMiningNode>>>,
674-
solution_sender: Sender<SubmitSolution<'static>>,
675677
withhold: bool,
676678
authority_public_key: Secp256k1PublicKey,
677679
authority_secret_key: Secp256k1SecretKey,
678680
cert_validity_sec: u64,
679681
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
680-
tx_status: status::Sender,
682+
tx_status: async_channel::Sender<status::Status<'static>>,
681683
miner_coinbase_output: Vec<TxOut>,
682684
jd: Option<Arc<Mutex<JobDeclarator>>>,
683-
) -> Result<Arc<Mutex<DownstreamMiningNode>>, Error> {
685+
config: JobDeclaratorClientConfig,
686+
shutdown: Arc<Notify>,
687+
) {
684688
info!("Listening for downstream mining connections on {}", address);
685-
let listner = TcpListener::bind(address).await?;
689+
let listener = TcpListener::bind(address).await.unwrap();
690+
let mut has_downstream = false;
691+
loop {
692+
tokio::select! {
693+
_ = shutdown.notified() => {
694+
info!("Shutdown signal received. Stopping downstream mining listener.");
695+
break;
696+
}
697+
Ok((stream, _)) = listener.accept() => {
698+
if has_downstream {
699+
error!("A downstream connection is already active. Ignoring additional connections.");
700+
continue;
701+
}
702+
has_downstream = true;
703+
let task_collector = task_collector.clone();
704+
let miner_coinbase_output = miner_coinbase_output.clone();
705+
let jd = jd.clone();
706+
let upstream = upstream.clone();
707+
let timeout = config.timeout();
708+
let mut parts = config.tp_address().split(':');
709+
let ip_tp = parts.next().unwrap().to_string();
710+
let port_tp = parts.next().unwrap().parse::<u16>().unwrap();
711+
712+
let (send_solution, recv_solution) = bounded(10);
713+
714+
let responder = Responder::from_authority_kp(
715+
&authority_public_key.into_bytes(),
716+
&authority_secret_key.into_bytes(),
717+
std::time::Duration::from_secs(cert_validity_sec),
718+
)
719+
.unwrap();
720+
let (receiver, sender, recv_task_abort_handler, send_task_abort_handler) =
721+
Connection::new(stream, HandshakeRole::Responder(responder))
722+
.await
723+
.expect("impossible to connect");
724+
725+
let tx_status_downstream = status::Sender::Downstream(tx_status.clone());
726+
let node = DownstreamMiningNode::new(
727+
receiver,
728+
sender,
729+
upstream.clone(),
730+
send_solution,
731+
withhold,
732+
task_collector.clone(),
733+
tx_status_downstream,
734+
miner_coinbase_output,
735+
jd.clone(),
736+
);
686737

687-
if let Ok((stream, _)) = listner.accept().await {
688-
let responder = Responder::from_authority_kp(
689-
&authority_public_key.into_bytes(),
690-
&authority_secret_key.into_bytes(),
691-
std::time::Duration::from_secs(cert_validity_sec),
692-
)
693-
.unwrap();
694-
let (receiver, sender, recv_task_abort_handler, send_task_abort_handler) =
695-
Connection::new(stream, HandshakeRole::Responder(responder))
696-
.await
697-
.expect("impossible to connect");
698-
let node = DownstreamMiningNode::new(
699-
receiver,
700-
sender,
701-
upstream.clone(),
702-
solution_sender,
703-
withhold,
704-
task_collector,
705-
tx_status,
706-
miner_coinbase_output,
707-
jd,
708-
);
738+
let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap();
739+
let message_type = incoming.get_header().unwrap().msg_type();
740+
let payload = incoming.payload();
741+
let routing_logic = roles_logic_sv2::routing_logic::CommonRoutingLogic::None;
742+
let node = Arc::new(Mutex::new(node));
709743

710-
let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap();
711-
let message_type = incoming.get_header().unwrap().msg_type();
712-
let payload = incoming.payload();
713-
let routing_logic = roles_logic_sv2::routing_logic::CommonRoutingLogic::None;
714-
let node = Arc::new(Mutex::new(node));
715-
if let Some(upstream) = upstream {
716-
upstream
717-
.safe_lock(|s| s.downstream = Some(node.clone()))
718-
.unwrap();
719-
}
744+
if let Some(upstream) = upstream {
745+
upstream
746+
.safe_lock(|s| s.downstream = Some(node.clone()))
747+
.unwrap();
748+
}
720749

721-
// Call handle_setup_connection or fail
722-
match DownstreamMiningNode::handle_message_common(
723-
node.clone(),
724-
message_type,
725-
payload,
726-
routing_logic,
727-
) {
728-
Ok(SendToCommon::Respond(message)) => {
729-
let message = match message {
730-
roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(m) => m,
731-
_ => panic!(),
732-
};
733-
let main_task = tokio::task::spawn({
734-
let node = node.clone();
735-
async move {
736-
DownstreamMiningNode::start(&node, message).await;
737-
}
738-
});
739-
node.safe_lock(|n| {
740-
n.task_collector
741-
.safe_lock(|c| {
742-
c.push(main_task.abort_handle());
743-
c.push(recv_task_abort_handler);
744-
c.push(send_task_abort_handler);
745-
})
746-
.unwrap()
747-
})
748-
.unwrap();
749-
Ok(node)
750+
if let Ok(SendToCommon::Respond(message)) = DownstreamMiningNode::handle_message_common(
751+
node.clone(),
752+
message_type,
753+
payload,
754+
routing_logic,
755+
) {
756+
let message = match message {
757+
roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(m) => m,
758+
_ => panic!(),
759+
};
760+
761+
let main_task = tokio::task::spawn({
762+
let node = node.clone();
763+
async move {
764+
DownstreamMiningNode::start(&node, message).await;
765+
}
766+
});
767+
768+
node.safe_lock(|n| {
769+
n.task_collector
770+
.safe_lock(|c| {
771+
c.push(main_task.abort_handle());
772+
c.push(recv_task_abort_handler);
773+
c.push(send_task_abort_handler);
774+
})
775+
.unwrap()
776+
})
777+
.unwrap();
778+
779+
TemplateRx::connect(
780+
SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp),
781+
recv_solution,
782+
status::Sender::TemplateReceiver(tx_status.clone()),
783+
jd,
784+
node,
785+
task_collector,
786+
Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))),
787+
vec![],
788+
config.tp_authority_public_key().cloned(),
789+
)
790+
.await;
791+
}
750792
}
751-
Ok(_) => todo!(),
752-
Err(e) => Err(e),
753793
}
754-
} else {
755-
todo!()
756794
}
795+
796+
info!("Downstream mining listener has shut down.");
757797
}
758798

759799
impl IsDownstream for DownstreamMiningNode {

0 commit comments

Comments
 (0)