Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add shutdown flag #1569

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 48 additions & 33 deletions roles/test-utils/mining-device-sv1/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ use num_bigint::BigUint;
use num_traits::FromPrimitive;
use primitive_types::U256;
use roles_logic_sv2::utils::Mutex;
use std::{convert::TryInto, net::SocketAddr, ops::Div, sync::Arc, time};
use std::{
convert::TryInto,
net::SocketAddr,
ops::Div,
sync::{atomic::AtomicBool, Arc},
time,
};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpStream,
Expand Down Expand Up @@ -72,6 +78,7 @@ impl Client {
upstream_addr: SocketAddr,
single_submit: bool,
custom_target: Option<[u8; 32]>,
shutdown: Arc<AtomicBool>,
) {
let stream = TcpStream::connect(upstream_addr).await.unwrap();
let (reader, mut writer) = stream.into_split();
Expand Down Expand Up @@ -105,19 +112,22 @@ impl Client {

// Reads messages sent by the Upstream from the socket to be passed to the
// `receiver_incoming`
let shutdown1 = shutdown.clone();
task::spawn(async move {
let mut messages = BufReader::new(reader).lines();
while let Ok(message) = messages.next_line().await {
match message {
Some(msg) => {
if let Err(e) = sender_incoming.send(msg).await {
error!("Failed to send message to receiver_incoming: {:?}", e);
break; // Exit the loop if sending fails
while !shutdown1.clone().load(std::sync::atomic::Ordering::Relaxed) {
if let Ok(message) = messages.next_line().await {
match message {
Some(msg) => {
if let Err(e) = sender_incoming.send(msg).await {
error!("Failed to send message to receiver_incoming: {:?}", e);
break; // Exit the loop if sending fails
}
}
None => {
error!("Error reading from socket");
break; // Exit the loop on read failure
}
}
None => {
error!("Error reading from socket");
break; // Exit the loop on read failure
}
}
}
Expand All @@ -126,8 +136,9 @@ impl Client {

// Waits to receive a message from `sender_outgoing` and writes it to the socket for the
// Upstream to receive
let shutdown2 = shutdown.clone();
task::spawn(async move {
loop {
while !shutdown2.load(std::sync::atomic::Ordering::Relaxed) {
let message: String = receiver_outgoing.recv().await.unwrap();
(writer).write_all(message.as_bytes()).await.unwrap();
if message.contains("mining.submit") && single_submit {
Expand Down Expand Up @@ -166,39 +177,43 @@ impl Client {
// message to the Upstream node.
// Is a separate thread as it can be CPU intensive and we do not want to block the reading
// and writing of messages to the socket.
std::thread::spawn(move || loop {
if miner_cloned.safe_lock(|m| m.next_share()).unwrap().is_ok() {
let nonce = miner_cloned.safe_lock(|m| m.header.unwrap().nonce).unwrap();
let time = miner_cloned.safe_lock(|m| m.header.unwrap().time).unwrap();
let job_id = miner_cloned.safe_lock(|m| m.job_id).unwrap();
let version = miner_cloned.safe_lock(|m| m.version).unwrap();
// Sends relevant candidate block header values needed to construct a
// `mining.submit` message to the `receiver_share` in the task that is responsible
// for sending messages to the Upstream node.
if sender_share
.try_send((nonce, job_id.unwrap(), version.unwrap(), time))
.is_err()
{
warn!("Share channel is not available");
break;
let shutdown3 = shutdown.clone();
std::thread::spawn(move || {
while !shutdown3.load(std::sync::atomic::Ordering::Relaxed) {
if miner_cloned.safe_lock(|m| m.next_share()).unwrap().is_ok() {
let nonce = miner_cloned.safe_lock(|m| m.header.unwrap().nonce).unwrap();
let time = miner_cloned.safe_lock(|m| m.header.unwrap().time).unwrap();
let job_id = miner_cloned.safe_lock(|m| m.job_id).unwrap();
let version = miner_cloned.safe_lock(|m| m.version).unwrap();
// Sends relevant candidate block header values needed to construct a
// `mining.submit` message to the `receiver_share` in the task that is
// responsible for sending messages to the Upstream node.
if sender_share
.try_send((nonce, job_id.unwrap(), version.unwrap(), time))
.is_err()
{
warn!("Share channel is not available");
break;
}
}
miner_cloned
.safe_lock(|m| m.header.as_mut().map(|h| h.nonce += 1))
.unwrap();
}
miner_cloned
.safe_lock(|m| m.header.as_mut().map(|h| h.nonce += 1))
.unwrap();
});
// Task to receive relevant candidate block header values needed to construct a
// `mining.submit` message. This message is contructed as a `client_to_server::Submit` and
// then serialized into json to be sent to the Upstream via the `sender_outgoing` sender.
let cloned = client.clone();
let shutdown4 = shutdown.clone();
task::spawn(async move {
tokio::select!(
_ = recv_stop_submitting.changed() => {
warn!("Stopping miner")
},
_ = async {
let recv = receiver_share.clone();
loop {
while !shutdown4.load(std::sync::atomic::Ordering::Relaxed) {
let (nonce, job_id, _version, ntime) = recv.recv().await.unwrap();
if cloned.clone().safe_lock(|c| c.status).unwrap() != ClientStatus::Subscribed {
continue;
Expand All @@ -225,7 +240,7 @@ impl Client {
});
let recv_incoming = client.safe_lock(|c| c.receiver_incoming.clone()).unwrap();

loop {
while !shutdown.load(std::sync::atomic::Ordering::Relaxed) {
match client.clone().safe_lock(|c| c.status).unwrap() {
ClientStatus::Init => panic!("impossible state"),
ClientStatus::Configured => {
Expand All @@ -240,7 +255,7 @@ impl Client {
}
// Waits for the `sender_incoming` to get message line from socket to be parsed by the
// `Client`
loop {
while !shutdown.load(std::sync::atomic::Ordering::Relaxed) {
if let Ok(incoming) = recv_incoming.clone().recv().await {
Self::parse_message(client.clone(), Ok(incoming)).await;
} else {
Expand Down
8 changes: 7 additions & 1 deletion roles/test-utils/mining-device-sv1/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
pub(crate) mod client;
pub(crate) mod job;
pub(crate) mod miner;
use std::{net::SocketAddr, str::FromStr};
use std::{
net::SocketAddr,
str::FromStr,
sync::{atomic::AtomicBool, Arc},
};

pub(crate) use client::Client;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt().init();

let shutdown = Arc::new(AtomicBool::new(false));
const ADDR: &str = "127.0.0.1:34255";
Client::connect(
80,
SocketAddr::from_str(ADDR).expect("Invalid upstream address"),
false,
None,
shutdown,
)
.await
}
13 changes: 10 additions & 3 deletions roles/tests-integration/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
convert::{TryFrom, TryInto},
net::SocketAddr,
str::FromStr,
sync::Once,
sync::{atomic::AtomicBool, Arc, Once},
};
use translator_sv2::TranslatorSv2;
use utils::get_available_address;
Expand Down Expand Up @@ -309,10 +309,17 @@ pub async fn start_mining_device_sv1(
upstream_addr: SocketAddr,
single_submit: bool,
custom_target: Option<[u8; 32]>,
shutdown: Arc<AtomicBool>,
) {
tokio::spawn(async move {
mining_device_sv1::client::Client::connect(80, upstream_addr, single_submit, custom_target)
.await;
mining_device_sv1::client::Client::connect(
80,
upstream_addr,
single_submit,
custom_target,
shutdown,
)
.await;
});
sleep(3).await;
}
Expand Down
8 changes: 6 additions & 2 deletions roles/tests-integration/tests/jdc_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use roles_logic_sv2::{
parsers::{AnyMessage, Mining},
};
use sniffer::{MessageDirection, ReplaceMessage};
use std::convert::TryInto;
use std::{
convert::TryInto,
sync::{atomic::AtomicBool, Arc},
};

// Tests whether JDC will switch to a new pool after receiving a `SubmitSharesError` message from
// the currently connected pool.
Expand Down Expand Up @@ -60,7 +63,8 @@ async fn test_jdc_pool_fallback_after_submit_rejection() {
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
.await;
let (_translator, sv2_translator_addr) = start_sv2_translator(jdc_addr).await;
let _ = start_mining_device_sv1(sv2_translator_addr, true, None).await;
let shutdown = Arc::new(AtomicBool::new(false));
let _ = start_mining_device_sv1(sv2_translator_addr, true, None, shutdown).await;
// Assert that JDC switched to the second (Pool,JDS) pair
sniffer_2
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
Expand Down
10 changes: 8 additions & 2 deletions roles/tests-integration/tests/translator_integration.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::{atomic::AtomicBool, Arc};

// This file contains integration tests for the `TranslatorSv2` module.
//
// `TranslatorSv2` is a module that implements the Translator role in the Stratum V2 protocol.
Expand All @@ -23,7 +25,9 @@ async fn translate_sv1_to_sv2_successfully() {
let (pool_translator_sniffer, pool_translator_sniffer_addr) =
start_sniffer("0".to_string(), pool_addr, false, None).await;
let (_, tproxy_addr) = start_sv2_translator(pool_translator_sniffer_addr).await;
let _mining_device = start_mining_device_sv1(tproxy_addr, false, None).await;
let mining_device_shutdown = Arc::new(AtomicBool::new(false));
let _mining_device =
start_mining_device_sv1(tproxy_addr, false, None, mining_device_shutdown).await;
pool_translator_sniffer
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
.await;
Expand Down Expand Up @@ -67,7 +71,9 @@ async fn translation_proxy_and_jd() {
let (_jds, jds_addr) = start_jds(tp.rpc_info()).await;
let (_jdc, jdc_addr) = start_jdc(&[(jdc_pool_sniffer_addr, jds_addr)], tp_addr).await;
let (_translator, tproxy_addr) = start_sv2_translator(jdc_addr).await;
let _mining_device = start_mining_device_sv1(tproxy_addr, true, None).await;
let mining_device_shutdown = Arc::new(AtomicBool::new(false));
let _mining_device =
start_mining_device_sv1(tproxy_addr, true, None, mining_device_shutdown).await;
jdc_pool_sniffer
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
.await;
Expand Down
Loading