Skip to content

Commit bd91e82

Browse files
committed
Add SnifferSV1
`SnifferSV1` acts as a middleman between two SV1 roles. It forwards messages from one role to the other and vice versa. It also provides methods to wait for specific messages to be received from the downstream or upstream role.
1 parent ba54851 commit bd91e82

File tree

6 files changed

+248
-2
lines changed

6 files changed

+248
-2
lines changed

.github/workflows/integration-tests.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,8 @@ jobs:
2727
- name: Roles Integration Tests
2828
run: |
2929
RUST_BACKTRACE=1 RUST_LOG=debug cargo test --manifest-path=test/integration-tests/Cargo.toml --verbose --test '*' -- --nocapture --skip test_jdc_pool_fallback_after_submit_rejection --skip translation_proxy_and_jd
30+
31+
- name: SV1 Integration Tests
32+
run: |
33+
RUST_BACKTRACE=1 RUST_LOG=debug cargo test --manifest-path=test/integration-tests/Cargo.toml --verbose --test 'sv1' --features sv1 -- --nocapture
34+

test/integration-tests/Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test/integration-tests/Cargo.toml

+6-1
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,17 @@ key-utils = { path = "../../utils/key-utils" }
3131
mining_device = { path = "../../roles/test-utils/mining-device" }
3232
mining_device_sv1 = { path = "../../roles/test-utils/mining-device-sv1" }
3333
mining_proxy_sv2 = { path = "../../roles/mining-proxy" }
34-
network_helpers_sv2 = { path = "../../roles/roles-utils/network-helpers", features =["with_buffer_pool"] }
34+
network_helpers_sv2 = { path = "../../roles/roles-utils/network-helpers", features = ["with_buffer_pool"] }
3535
pool_sv2 = { path = "../../roles/pool" }
3636
roles_logic_sv2 = { path = "../../protocols/v2/roles-logic-sv2" }
3737
stratum-common = { path = "../../common" }
3838
config-helpers = { path = "../../roles/roles-utils/config-helpers" }
3939
translator_sv2 = { path = "../../roles/translator" }
40+
sv1_api = { path = "../../protocols/v1", optional = true }
4041

4142
[lib]
4243
path = "lib/mod.rs"
44+
45+
[features]
46+
default = []
47+
sv1 = ["sv1_api", "network_helpers_sv2/sv1"]

test/integration-tests/lib/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use translator_sv2::TranslatorSv2;
1616
use utils::get_available_address;
1717

1818
pub mod sniffer;
19+
#[cfg(feature = "sv1")]
20+
pub mod sv1_sniffer;
1921
pub mod template_provider;
2022
pub(crate) mod utils;
2123

@@ -376,3 +378,11 @@ pub async fn start_mining_sv2_proxy(upstreams: &[SocketAddr]) -> SocketAddr {
376378
pub async fn sleep(seconds: u64) {
377379
tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
378380
}
381+
382+
#[cfg(feature = "sv1")]
383+
pub fn start_sv1_sniffer(upstream_address: SocketAddr) -> (sv1_sniffer::SnifferSV1, SocketAddr) {
384+
let listening_address = get_available_address();
385+
let sniffer_sv1 = sv1_sniffer::SnifferSV1::new(listening_address, upstream_address);
386+
sniffer_sv1.start();
387+
(sniffer_sv1, listening_address)
388+
}
+203
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
#![cfg(feature = "sv1")]
2+
use async_channel::{Receiver, Sender};
3+
use network_helpers_sv2::sv1_connection::ConnectionSV1;
4+
use std::{collections::VecDeque, net::SocketAddr, sync::Arc, time::Duration};
5+
use tokio::{
6+
net::{TcpListener, TcpStream},
7+
select,
8+
sync::Mutex,
9+
time::sleep,
10+
};
11+
12+
#[derive(Debug, PartialEq)]
13+
enum SnifferError {
14+
DownstreamClosed,
15+
UpstreamClosed,
16+
}
17+
18+
/// Represents an SV1 sniffer.
19+
///
20+
/// This struct acts as a middleman between two SV1 roles. It forwards messages from one role to
21+
/// the other and vice versa. It also provides methods to wait for specific messages to be received
22+
/// from the downstream or upstream role.
23+
#[derive(Debug, Clone)]
24+
pub struct SnifferSV1 {
25+
listening_address: SocketAddr,
26+
upstream_address: SocketAddr,
27+
messages_from_downstream_sv1: MessagesAggregatorSV1,
28+
messages_from_upstream_sv1: MessagesAggregatorSV1,
29+
}
30+
31+
impl SnifferSV1 {
32+
/// Create a new [`SnifferSV1`] instance.
33+
///
34+
/// The listening address is the address the sniffer will listen on for incoming connections
35+
/// from the downstream role. The upstream address is the address the sniffer will connect to
36+
/// in order to forward messages to the upstream role.
37+
pub fn new(listening_address: SocketAddr, upstream_address: SocketAddr) -> Self {
38+
Self {
39+
listening_address,
40+
upstream_address,
41+
messages_from_downstream_sv1: MessagesAggregatorSV1::new(),
42+
messages_from_upstream_sv1: MessagesAggregatorSV1::new(),
43+
}
44+
}
45+
46+
/// Start the sniffer.
47+
pub fn start(&self) {
48+
let upstream_address = self.upstream_address.clone();
49+
let listening_address = self.listening_address.clone();
50+
let messages_from_downstream_sv1 = self.messages_from_downstream_sv1.clone();
51+
let messages_from_upstream_sv1 = self.messages_from_upstream_sv1.clone();
52+
tokio::spawn(async move {
53+
let sniffer_to_upstream_stream = TcpStream::connect(upstream_address)
54+
.await
55+
.expect("Failed to connect to upstream");
56+
let listener = TcpListener::bind(listening_address)
57+
.await
58+
.expect("Failed to listen on given address");
59+
let (downstream_stream, _) = listener
60+
.accept()
61+
.await
62+
.expect("Failed to accept downstream connection");
63+
let sniffer_to_upstream_connection =
64+
ConnectionSV1::new(sniffer_to_upstream_stream).await;
65+
let downstream_to_sniffer_connection = ConnectionSV1::new(downstream_stream).await;
66+
select! {
67+
_ = tokio::signal::ctrl_c() => { },
68+
_ = Self::recv_from_down_send_to_up_sv1(
69+
downstream_to_sniffer_connection.receiver(),
70+
sniffer_to_upstream_connection.sender(),
71+
messages_from_downstream_sv1
72+
) => { },
73+
_ = Self::recv_from_up_send_to_down_sv1(
74+
sniffer_to_upstream_connection.receiver(),
75+
downstream_to_sniffer_connection.sender(),
76+
messages_from_upstream_sv1
77+
) => { },
78+
};
79+
});
80+
}
81+
82+
/// Wait for a specific message to be received from the downstream role.
83+
pub async fn wait_for_message_from_downstream(&self, message: &str) {
84+
let now = std::time::Instant::now();
85+
tokio::select!(
86+
_ = tokio::signal::ctrl_c() => { },
87+
_ = async {
88+
loop {
89+
if self.messages_from_downstream_sv1.has_message(&[message]).await {
90+
break;
91+
}
92+
if now.elapsed().as_secs() > 60 {
93+
panic!(
94+
"Timeout: SV1 message {} from downstream not received",
95+
message
96+
);
97+
} else {
98+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
99+
continue;
100+
}
101+
}
102+
} => {}
103+
);
104+
}
105+
106+
/// Wait for a specific message to be received from the upstream role.
107+
pub async fn wait_for_message_from_upstream(&self, message: &[&str]) {
108+
let now = std::time::Instant::now();
109+
tokio::select!(
110+
_ = tokio::signal::ctrl_c() => { },
111+
_ = async {
112+
loop {
113+
if self.messages_from_upstream_sv1.has_message(message).await {
114+
break;
115+
}
116+
if now.elapsed().as_secs() > 60 {
117+
panic!(
118+
"Timeout: SV1 message {} from upstream not received",
119+
message.join(" ")
120+
);
121+
} else {
122+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
123+
continue;
124+
}
125+
}
126+
} => {}
127+
);
128+
}
129+
130+
async fn recv_from_up_send_to_down_sv1(
131+
recv: Receiver<sv1_api::Message>,
132+
send: Sender<sv1_api::Message>,
133+
upstream_messages: MessagesAggregatorSV1,
134+
) -> Result<(), SnifferError> {
135+
while let Ok(msg) = recv.recv().await {
136+
send.send(msg.clone())
137+
.await
138+
.map_err(|_| SnifferError::DownstreamClosed)?;
139+
upstream_messages.add_message(msg.clone()).await;
140+
sleep(Duration::from_millis(100)).await;
141+
}
142+
Err(SnifferError::UpstreamClosed)
143+
}
144+
145+
async fn recv_from_down_send_to_up_sv1(
146+
recv: Receiver<sv1_api::Message>,
147+
send: Sender<sv1_api::Message>,
148+
downstream_messages: MessagesAggregatorSV1,
149+
) -> Result<(), SnifferError> {
150+
while let Ok(msg) = recv.recv().await {
151+
send.send(msg.clone())
152+
.await
153+
.map_err(|_| SnifferError::UpstreamClosed)?;
154+
downstream_messages.add_message(msg).await;
155+
}
156+
Err(SnifferError::DownstreamClosed)
157+
}
158+
}
159+
160+
/// Represents a SV1 message manager.
161+
///
162+
/// This struct can be used in order to aggregate and manage SV1 messages.
163+
#[derive(Debug, Clone)]
164+
pub(crate) struct MessagesAggregatorSV1 {
165+
messages: Arc<Mutex<VecDeque<sv1_api::Message>>>,
166+
}
167+
168+
impl MessagesAggregatorSV1 {
169+
fn new() -> Self {
170+
Self {
171+
messages: Arc::new(Mutex::new(VecDeque::new())),
172+
}
173+
}
174+
175+
async fn add_message(&self, message: sv1_api::Message) {
176+
let mut messages = self.messages.lock().await;
177+
messages.push_back(message);
178+
}
179+
180+
async fn has_message(&self, expected_msg: &[&str]) -> bool {
181+
let messages = self.messages.lock().await;
182+
let ret = messages.iter().any(|msg| match msg {
183+
sv1_api::Message::StandardRequest(req) => req.method == *expected_msg.get(0).unwrap(),
184+
sv1_api::Message::Notification(notif) => notif.method == *expected_msg.get(0).unwrap(),
185+
sv1_api::Message::OkResponse(res) => {
186+
if let Ok(res) = network_helpers_sv2::serde_json::to_string(&res) {
187+
for m in expected_msg {
188+
if !res.contains(m) {
189+
return false;
190+
}
191+
}
192+
return true;
193+
} else {
194+
false
195+
}
196+
}
197+
sv1_api::Message::ErrorResponse(res) => {
198+
res.error.clone().unwrap().message == *expected_msg.get(0).unwrap()
199+
}
200+
});
201+
ret
202+
}
203+
}

test/integration-tests/tests/sv1.rs

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#![cfg(feature = "sv1")]
2+
use integration_tests_sv2::*;
3+
4+
#[tokio::test]
5+
async fn translate_sv1_to_sv2_successfully() {
6+
start_tracing();
7+
let (_tp, tp_addr) = start_template_provider(None);
8+
let (_pool, pool_addr) = start_pool(Some(tp_addr)).await;
9+
let (_, tproxy_addr) = start_sv2_translator(pool_addr).await;
10+
let (sniffer_sv1, sniffer_sv1_addr) = start_sv1_sniffer(tproxy_addr);
11+
let _mining_device = start_mining_device_sv1(sniffer_sv1_addr, false, None).await;
12+
sniffer_sv1
13+
.wait_for_message_from_downstream("mining.configure")
14+
.await;
15+
sniffer_sv1
16+
.wait_for_message_from_upstream(&[
17+
"minimum-difficulty",
18+
"version-rolling",
19+
"version-rolling.mask",
20+
"version-rolling.min-bit-count",
21+
])
22+
.await;
23+
}

0 commit comments

Comments
 (0)