Skip to content

Commit 8b682a1

Browse files
committed
add shutdown flag
1 parent e60c6b2 commit 8b682a1

File tree

5 files changed

+79
-41
lines changed

5 files changed

+79
-41
lines changed

roles/test-utils/mining-device-sv1/src/client.rs

+48-33
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@ use num_bigint::BigUint;
44
use num_traits::FromPrimitive;
55
use primitive_types::U256;
66
use roles_logic_sv2::utils::Mutex;
7-
use std::{convert::TryInto, net::SocketAddr, ops::Div, sync::Arc, time};
7+
use std::{
8+
convert::TryInto,
9+
net::SocketAddr,
10+
ops::Div,
11+
sync::{atomic::AtomicBool, Arc},
12+
time,
13+
};
814
use tokio::{
915
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
1016
net::TcpStream,
@@ -72,6 +78,7 @@ impl Client {
7278
upstream_addr: SocketAddr,
7379
single_submit: bool,
7480
custom_target: Option<[u8; 32]>,
81+
shutdown: Arc<AtomicBool>,
7582
) {
7683
let stream = TcpStream::connect(upstream_addr).await.unwrap();
7784
let (reader, mut writer) = stream.into_split();
@@ -105,19 +112,22 @@ impl Client {
105112

106113
// Reads messages sent by the Upstream from the socket to be passed to the
107114
// `receiver_incoming`
115+
let shutdown1 = shutdown.clone();
108116
task::spawn(async move {
109117
let mut messages = BufReader::new(reader).lines();
110-
while let Ok(message) = messages.next_line().await {
111-
match message {
112-
Some(msg) => {
113-
if let Err(e) = sender_incoming.send(msg).await {
114-
error!("Failed to send message to receiver_incoming: {:?}", e);
115-
break; // Exit the loop if sending fails
118+
while !shutdown1.clone().load(std::sync::atomic::Ordering::Relaxed) {
119+
if let Ok(message) = messages.next_line().await {
120+
match message {
121+
Some(msg) => {
122+
if let Err(e) = sender_incoming.send(msg).await {
123+
error!("Failed to send message to receiver_incoming: {:?}", e);
124+
break; // Exit the loop if sending fails
125+
}
126+
}
127+
None => {
128+
error!("Error reading from socket");
129+
break; // Exit the loop on read failure
116130
}
117-
}
118-
None => {
119-
error!("Error reading from socket");
120-
break; // Exit the loop on read failure
121131
}
122132
}
123133
}
@@ -126,8 +136,9 @@ impl Client {
126136

127137
// Waits to receive a message from `sender_outgoing` and writes it to the socket for the
128138
// Upstream to receive
139+
let shutdown2 = shutdown.clone();
129140
task::spawn(async move {
130-
loop {
141+
while !shutdown2.load(std::sync::atomic::Ordering::Relaxed) {
131142
let message: String = receiver_outgoing.recv().await.unwrap();
132143
(writer).write_all(message.as_bytes()).await.unwrap();
133144
if message.contains("mining.submit") && single_submit {
@@ -166,39 +177,43 @@ impl Client {
166177
// message to the Upstream node.
167178
// Is a separate thread as it can be CPU intensive and we do not want to block the reading
168179
// and writing of messages to the socket.
169-
std::thread::spawn(move || loop {
170-
if miner_cloned.safe_lock(|m| m.next_share()).unwrap().is_ok() {
171-
let nonce = miner_cloned.safe_lock(|m| m.header.unwrap().nonce).unwrap();
172-
let time = miner_cloned.safe_lock(|m| m.header.unwrap().time).unwrap();
173-
let job_id = miner_cloned.safe_lock(|m| m.job_id).unwrap();
174-
let version = miner_cloned.safe_lock(|m| m.version).unwrap();
175-
// Sends relevant candidate block header values needed to construct a
176-
// `mining.submit` message to the `receiver_share` in the task that is responsible
177-
// for sending messages to the Upstream node.
178-
if sender_share
179-
.try_send((nonce, job_id.unwrap(), version.unwrap(), time))
180-
.is_err()
181-
{
182-
warn!("Share channel is not available");
183-
break;
180+
let shutdown3 = shutdown.clone();
181+
std::thread::spawn(move || {
182+
while !shutdown3.load(std::sync::atomic::Ordering::Relaxed) {
183+
if miner_cloned.safe_lock(|m| m.next_share()).unwrap().is_ok() {
184+
let nonce = miner_cloned.safe_lock(|m| m.header.unwrap().nonce).unwrap();
185+
let time = miner_cloned.safe_lock(|m| m.header.unwrap().time).unwrap();
186+
let job_id = miner_cloned.safe_lock(|m| m.job_id).unwrap();
187+
let version = miner_cloned.safe_lock(|m| m.version).unwrap();
188+
// Sends relevant candidate block header values needed to construct a
189+
// `mining.submit` message to the `receiver_share` in the task that is
190+
// responsible for sending messages to the Upstream node.
191+
if sender_share
192+
.try_send((nonce, job_id.unwrap(), version.unwrap(), time))
193+
.is_err()
194+
{
195+
warn!("Share channel is not available");
196+
break;
197+
}
184198
}
199+
miner_cloned
200+
.safe_lock(|m| m.header.as_mut().map(|h| h.nonce += 1))
201+
.unwrap();
185202
}
186-
miner_cloned
187-
.safe_lock(|m| m.header.as_mut().map(|h| h.nonce += 1))
188-
.unwrap();
189203
});
190204
// Task to receive relevant candidate block header values needed to construct a
191205
// `mining.submit` message. This message is contructed as a `client_to_server::Submit` and
192206
// then serialized into json to be sent to the Upstream via the `sender_outgoing` sender.
193207
let cloned = client.clone();
208+
let shutdown4 = shutdown.clone();
194209
task::spawn(async move {
195210
tokio::select!(
196211
_ = recv_stop_submitting.changed() => {
197212
warn!("Stopping miner")
198213
},
199214
_ = async {
200215
let recv = receiver_share.clone();
201-
loop {
216+
while !shutdown4.load(std::sync::atomic::Ordering::Relaxed) {
202217
let (nonce, job_id, _version, ntime) = recv.recv().await.unwrap();
203218
if cloned.clone().safe_lock(|c| c.status).unwrap() != ClientStatus::Subscribed {
204219
continue;
@@ -225,7 +240,7 @@ impl Client {
225240
});
226241
let recv_incoming = client.safe_lock(|c| c.receiver_incoming.clone()).unwrap();
227242

228-
loop {
243+
while !shutdown.load(std::sync::atomic::Ordering::Relaxed) {
229244
match client.clone().safe_lock(|c| c.status).unwrap() {
230245
ClientStatus::Init => panic!("impossible state"),
231246
ClientStatus::Configured => {
@@ -240,7 +255,7 @@ impl Client {
240255
}
241256
// Waits for the `sender_incoming` to get message line from socket to be parsed by the
242257
// `Client`
243-
loop {
258+
while !shutdown.load(std::sync::atomic::Ordering::Relaxed) {
244259
if let Ok(incoming) = recv_incoming.clone().recv().await {
245260
Self::parse_message(client.clone(), Ok(incoming)).await;
246261
} else {
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
pub(crate) mod client;
22
pub(crate) mod job;
33
pub(crate) mod miner;
4-
use std::{net::SocketAddr, str::FromStr};
4+
use std::{
5+
net::SocketAddr,
6+
str::FromStr,
7+
sync::{atomic::AtomicBool, Arc},
8+
};
59

610
pub(crate) use client::Client;
711

812
#[tokio::main]
913
async fn main() {
1014
tracing_subscriber::fmt().init();
1115

16+
let shutdown = Arc::new(AtomicBool::new(false));
1217
const ADDR: &str = "127.0.0.1:34255";
1318
Client::connect(
1419
80,
1520
SocketAddr::from_str(ADDR).expect("Invalid upstream address"),
1621
false,
1722
None,
23+
shutdown,
1824
)
1925
.await
2026
}

roles/tests-integration/lib/mod.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
convert::{TryFrom, TryInto},
1111
net::SocketAddr,
1212
str::FromStr,
13-
sync::Once,
13+
sync::{atomic::AtomicBool, Arc, Once},
1414
};
1515
use translator_sv2::TranslatorSv2;
1616
use utils::get_available_address;
@@ -309,10 +309,17 @@ pub async fn start_mining_device_sv1(
309309
upstream_addr: SocketAddr,
310310
single_submit: bool,
311311
custom_target: Option<[u8; 32]>,
312+
shutdown: Arc<AtomicBool>,
312313
) {
313314
tokio::spawn(async move {
314-
mining_device_sv1::client::Client::connect(80, upstream_addr, single_submit, custom_target)
315-
.await;
315+
mining_device_sv1::client::Client::connect(
316+
80,
317+
upstream_addr,
318+
single_submit,
319+
custom_target,
320+
shutdown,
321+
)
322+
.await;
316323
});
317324
sleep(3).await;
318325
}

roles/tests-integration/tests/jdc_fallback.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use roles_logic_sv2::{
55
parsers::{AnyMessage, Mining},
66
};
77
use sniffer::{MessageDirection, ReplaceMessage};
8-
use std::convert::TryInto;
8+
use std::{
9+
convert::TryInto,
10+
sync::{atomic::AtomicBool, Arc},
11+
};
912

1013
// Tests whether JDC will switch to a new pool after receiving a `SubmitSharesError` message from
1114
// the currently connected pool.
@@ -60,7 +63,8 @@ async fn test_jdc_pool_fallback_after_submit_rejection() {
6063
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
6164
.await;
6265
let (_translator, sv2_translator_addr) = start_sv2_translator(jdc_addr).await;
63-
let _ = start_mining_device_sv1(sv2_translator_addr, true, None).await;
66+
let shutdown = Arc::new(AtomicBool::new(false));
67+
let _ = start_mining_device_sv1(sv2_translator_addr, true, None, shutdown).await;
6468
// Assert that JDC switched to the second (Pool,JDS) pair
6569
sniffer_2
6670
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)

roles/tests-integration/tests/translator_integration.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::{atomic::AtomicBool, Arc};
2+
13
// This file contains integration tests for the `TranslatorSv2` module.
24
//
35
// `TranslatorSv2` is a module that implements the Translator role in the Stratum V2 protocol.
@@ -23,7 +25,9 @@ async fn translate_sv1_to_sv2_successfully() {
2325
let (pool_translator_sniffer, pool_translator_sniffer_addr) =
2426
start_sniffer("0".to_string(), pool_addr, false, None).await;
2527
let (_, tproxy_addr) = start_sv2_translator(pool_translator_sniffer_addr).await;
26-
let _mining_device = start_mining_device_sv1(tproxy_addr, false, None).await;
28+
let mining_device_shutdown = Arc::new(AtomicBool::new(false));
29+
let _mining_device =
30+
start_mining_device_sv1(tproxy_addr, false, None, mining_device_shutdown).await;
2731
pool_translator_sniffer
2832
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
2933
.await;
@@ -67,7 +71,9 @@ async fn translation_proxy_and_jd() {
6771
let (_jds, jds_addr) = start_jds(tp.rpc_info()).await;
6872
let (_jdc, jdc_addr) = start_jdc(&[(jdc_pool_sniffer_addr, jds_addr)], tp_addr).await;
6973
let (_translator, tproxy_addr) = start_sv2_translator(jdc_addr).await;
70-
let _mining_device = start_mining_device_sv1(tproxy_addr, true, None).await;
74+
let mining_device_shutdown = Arc::new(AtomicBool::new(false));
75+
let _mining_device =
76+
start_mining_device_sv1(tproxy_addr, true, None, mining_device_shutdown).await;
7177
jdc_pool_sniffer
7278
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
7379
.await;

0 commit comments

Comments
 (0)