Skip to content

Commit 3b0c29f

Browse files
committed
remove support for async-channel and make all channels to tokio
1 parent 0ff67c5 commit 3b0c29f

File tree

10 files changed

+128
-90
lines changed

10 files changed

+128
-90
lines changed

roles/Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

roles/pool/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ path = "src/lib/mod.rs"
1818

1919
[dependencies]
2020
stratum-common = { path = "../../common" }
21-
async-channel = "1.5.1"
2221
binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" }
2322
buffer_sv2 = { path = "../../utils/buffer" }
2423
codec_sv2 = { path = "../../protocols/v2/codec-sv2", features = ["noise_sv2"] }

roles/pool/src/lib/error.rs

+23-11
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use roles_logic_sv2::parsers::Mining;
1010
pub enum PoolError {
1111
Io(std::io::Error),
1212
ChannelSend(Box<dyn std::marker::Send + Debug>),
13-
ChannelRecv(async_channel::RecvError),
1413
BinarySv2(binary_sv2::Error),
1514
Codec(codec_sv2::Error),
1615
Noise(noise_sv2::Error),
@@ -20,6 +19,8 @@ pub enum PoolError {
2019
ComponentShutdown(String),
2120
Custom(String),
2221
Sv2ProtocolError((u32, Mining<'static>)),
22+
TokioChannelRecv(Box<dyn std::marker::Send + Debug>),
23+
TokioBroadcastChannelRecv(tokio::sync::broadcast::error::RecvError),
2324
}
2425

2526
impl std::fmt::Display for PoolError {
@@ -28,7 +29,6 @@ impl std::fmt::Display for PoolError {
2829
match self {
2930
Io(ref e) => write!(f, "I/O error: `{:?}", e),
3031
ChannelSend(ref e) => write!(f, "Channel send failed: `{:?}`", e),
31-
ChannelRecv(ref e) => write!(f, "Channel recv failed: `{:?}`", e),
3232
BinarySv2(ref e) => write!(f, "Binary SV2 error: `{:?}`", e),
3333
Codec(ref e) => write!(f, "Codec SV2 error: `{:?}", e),
3434
Framing(ref e) => write!(f, "Framing SV2 error: `{:?}`", e),
@@ -40,21 +40,23 @@ impl std::fmt::Display for PoolError {
4040
Sv2ProtocolError(ref e) => {
4141
write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e)
4242
}
43+
TokioChannelRecv(ref e) => write!(f, "Channel recv failed: `{:?}`", e),
44+
TokioBroadcastChannelRecv(ref e) => write!(f, "BroadCastChannel Recv failed: {:?}", e),
4345
}
4446
}
4547
}
4648

4749
pub type PoolResult<T> = Result<T, PoolError>;
4850

49-
impl From<std::io::Error> for PoolError {
50-
fn from(e: std::io::Error) -> PoolError {
51-
PoolError::Io(e)
51+
impl From<tokio::sync::broadcast::error::RecvError> for PoolError {
52+
fn from(value: tokio::sync::broadcast::error::RecvError) -> Self {
53+
PoolError::TokioBroadcastChannelRecv(value)
5254
}
5355
}
5456

55-
impl From<async_channel::RecvError> for PoolError {
56-
fn from(e: async_channel::RecvError) -> PoolError {
57-
PoolError::ChannelRecv(e)
57+
impl From<std::io::Error> for PoolError {
58+
fn from(e: std::io::Error) -> PoolError {
59+
PoolError::Io(e)
5860
}
5961
}
6062

@@ -82,9 +84,19 @@ impl From<roles_logic_sv2::Error> for PoolError {
8284
}
8385
}
8486

85-
impl<T: 'static + std::marker::Send + Debug> From<async_channel::SendError<T>> for PoolError {
86-
fn from(e: async_channel::SendError<T>) -> PoolError {
87-
PoolError::ChannelSend(Box::new(e))
87+
impl<T: 'static + std::marker::Send + Debug> From<tokio::sync::mpsc::error::SendError<T>>
88+
for PoolError
89+
{
90+
fn from(e: tokio::sync::mpsc::error::SendError<T>) -> PoolError {
91+
PoolError::TokioChannelRecv(Box::new(e))
92+
}
93+
}
94+
95+
impl<T: 'static + std::marker::Send + Debug> From<tokio::sync::broadcast::error::SendError<T>>
96+
for PoolError
97+
{
98+
fn from(e: tokio::sync::broadcast::error::SendError<T>) -> PoolError {
99+
PoolError::TokioChannelRecv(Box::new(e))
88100
}
89101
}
90102

roles/pool/src/lib/mining_pool/mod.rs

+30-31
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,14 @@ pub struct Downstream {
179179
receiver: tokio::sync::broadcast::Sender<EitherFrame>,
180180
sender: tokio::sync::broadcast::Sender<EitherFrame>,
181181
downstream_data: CommonDownstreamData,
182-
solution_sender: Sender<SubmitSolution<'static>>,
182+
solution_sender: tokio::sync::mpsc::Sender<SubmitSolution<'static>>,
183183
channel_factory: Arc<Mutex<PoolChannelFactory>>,
184184
}
185185

186186
/// Accept downstream connection
187187
pub struct Pool {
188188
downstreams: HashMap<u32, Arc<Mutex<Downstream>>, BuildNoHashHasher<u32>>,
189-
solution_sender: Sender<SubmitSolution<'static>>,
189+
solution_sender: tokio::sync::mpsc::Sender<SubmitSolution<'static>>,
190190
new_template_processed: bool,
191191
channel_factory: Arc<Mutex<PoolChannelFactory>>,
192192
last_prev_hash_template_id: u64,
@@ -242,7 +242,7 @@ impl Downstream {
242242
})
243243
.await
244244
{
245-
error!("Encountered Error but status channel is down: {}", e);
245+
error!("Encountered Error but status channel is down: {:?}", e);
246246
}
247247

248248
return;
@@ -491,13 +491,13 @@ impl Pool {
491491

492492
async fn on_new_prev_hash(
493493
self_: Arc<Mutex<Self>>,
494-
rx: Receiver<SetNewPrevHash<'static>>,
495-
sender_message_received_signal: Sender<()>,
494+
mut rx: tokio::sync::mpsc::Receiver<SetNewPrevHash<'static>>,
495+
sender_message_received_signal: tokio::sync::broadcast::Sender<()>,
496496
) -> PoolResult<()> {
497497
let status_tx = self_
498498
.safe_lock(|s| s.status_tx.clone())
499499
.map_err(|e| PoolError::PoisonLock(e.to_string()))?;
500-
while let Ok(new_prev_hash) = rx.recv().await {
500+
while let Some(new_prev_hash) = rx.recv().await {
501501
debug!("New prev hash received: {:?}", new_prev_hash);
502502
let res = self_
503503
.safe_lock(|s| {
@@ -537,7 +537,7 @@ impl Pool {
537537
.await;
538538
handle_result!(status_tx, res);
539539
}
540-
handle_result!(status_tx, sender_message_received_signal.send(()).await);
540+
handle_result!(status_tx, sender_message_received_signal.send(()));
541541
}
542542
Err(_) => todo!(),
543543
}
@@ -547,12 +547,12 @@ impl Pool {
547547

548548
async fn on_new_template(
549549
self_: Arc<Mutex<Self>>,
550-
rx: Receiver<NewTemplate<'static>>,
551-
sender_message_received_signal: Sender<()>,
550+
mut rx: tokio::sync::mpsc::Receiver<NewTemplate<'static>>,
551+
sender_message_received_signal: tokio::sync::broadcast::Sender<()>,
552552
) -> PoolResult<()> {
553553
let status_tx = self_.safe_lock(|s| s.status_tx.clone())?;
554554
let channel_factory = self_.safe_lock(|s| s.channel_factory.clone())?;
555-
while let Ok(mut new_template) = rx.recv().await {
555+
while let Some(mut new_template) = rx.recv().await {
556556
debug!(
557557
"New template received, creating a new mining job(s): {:?}",
558558
new_template
@@ -584,17 +584,17 @@ impl Pool {
584584
.map_err(|e| PoolError::PoisonLock(e.to_string()));
585585
handle_result!(status_tx, res);
586586

587-
handle_result!(status_tx, sender_message_received_signal.send(()).await);
587+
handle_result!(status_tx, sender_message_received_signal.send(()));
588588
}
589589
Ok(())
590590
}
591591

592592
pub fn start(
593593
config: Configuration,
594-
new_template_rx: Receiver<NewTemplate<'static>>,
595-
new_prev_hash_rx: Receiver<SetNewPrevHash<'static>>,
596-
solution_sender: Sender<SubmitSolution<'static>>,
597-
sender_message_received_signal: Sender<()>,
594+
new_template_rx: tokio::sync::mpsc::Receiver<NewTemplate<'static>>,
595+
new_prev_hash_rx: tokio::sync::mpsc::Receiver<SetNewPrevHash<'static>>,
596+
solution_sender: tokio::sync::mpsc::Sender<SubmitSolution<'static>>,
597+
sender_message_received_signal: tokio::sync::broadcast::Sender<()>,
598598
status_tx: status::Sender,
599599
) -> Arc<Mutex<Self>> {
600600
let extranonce_len = 32;
@@ -744,8 +744,8 @@ impl Pool {
744744

745745
// use super::Configuration;
746746

747-
// // this test is used to verify the `coinbase_tx_prefix` and `coinbase_tx_suffix` values tested
748-
// // against in message generator
747+
// // this test is used to verify the `coinbase_tx_prefix` and `coinbase_tx_suffix` values
748+
// tested // against in message generator
749749
// // `stratum/test/message-generator/test/pool-sri-test-extended.json`
750750
// #[test]
751751
// fn test_coinbase_outputs_from_config() {
@@ -778,9 +778,9 @@ impl Pool {
778778
// let _coinbase_tx_value_remaining: u64 = 625000000;
779779
// let _coinbase_tx_outputs_count = 0;
780780
// let coinbase_tx_locktime = 0;
781-
// let coinbase_tx_outputs: Vec<bitcoin::TxOut> = super::get_coinbase_output(&config).unwrap();
782-
// // extranonce len set to max_extranonce_size in `ChannelFactory::new_extended_channel()`
783-
// let extranonce_len = 32;
781+
// let coinbase_tx_outputs: Vec<bitcoin::TxOut> =
782+
// super::get_coinbase_output(&config).unwrap(); // extranonce len set to
783+
// max_extranonce_size in `ChannelFactory::new_extended_channel()` let extranonce_len = 32;
784784

785785
// // build coinbase TX from 'job_creator::coinbase()'
786786

@@ -813,9 +813,9 @@ impl Pool {
813813
// coinbase_tx_prefix
814814
// == [
815815
// 2, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
816-
// 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 56, 3, 76, 163, 38,
817-
// 0, 83, 116, 114, 97, 116, 117, 109, 32, 118, 50, 32, 83, 82, 73, 32, 80, 111,
818-
// 111, 108
816+
// 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 56, 3, 76, 163,
817+
// 38, 0, 83, 116, 114, 97, 116, 117, 109, 32, 118, 50, 32, 83, 82, 73, 32, 80,
818+
// 111, 111, 108
819819
// ]
820820
// .to_vec()
821821
// .try_into()
@@ -827,8 +827,8 @@ impl Pool {
827827
// == [
828828
// 255, 255, 255, 255, 1, 0, 0, 0, 0, 0, 0, 0, 0, 22, 0, 20, 235, 225, 183, 220,
829829
// 194, 147, 204, 170, 14, 231, 67, 168, 111, 137, 223, 130, 88, 194, 8, 252, 1,
830-
// 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
831-
// 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
830+
// 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
831+
// 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
832832
// ]
833833
// .to_vec()
834834
// .try_into()
@@ -840,8 +840,8 @@ impl Pool {
840840
// // copied from roles-logic-sv2::job_creator
841841
// fn coinbase_tx_prefix(coinbase: &Transaction, script_prefix_len: usize) -> B064K<'static> {
842842
// let encoded = coinbase.serialize();
843-
// // If script_prefix_len is not 0 we are not in a test enviornment and the coinbase have the
844-
// // 0 witness
843+
// // If script_prefix_len is not 0 we are not in a test enviornment and the coinbase have
844+
// the // 0 witness
845845
// let segwit_bytes = match script_prefix_len {
846846
// 0 => 0,
847847
// _ => 2,
@@ -864,8 +864,8 @@ impl Pool {
864864
// script_prefix_len: usize,
865865
// ) -> B064K<'static> {
866866
// let encoded = coinbase.serialize();
867-
// // If script_prefix_len is not 0 we are not in a test enviornment and the coinbase have the
868-
// // 0 witness
867+
// // If script_prefix_len is not 0 we are not in a test enviornment and the coinbase have
868+
// the // 0 witness
869869
// let segwit_bytes = match script_prefix_len {
870870
// 0 => 0,
871871
// _ => 2,
@@ -877,8 +877,7 @@ impl Pool {
877877
// + 4 // index
878878
// + 1 // bytes in script TODO can be also 3
879879
// + script_prefix_len // bip34_bytes
880-
// + (extranonce_len as usize)..]
881-
// .to_vec();
880+
// + (extranonce_len as usize)..] .to_vec();
882881
// r.try_into().unwrap()
883882
// }
884883

roles/pool/src/lib/mining_pool/setup_connection.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl SetupConnectionHandler {
5353
}
5454
Ok(EitherFrame::Shutdown) => {
5555
debug!("Shutdown message received");
56-
return Err(PoolError::RolesLogic(Error::Shutdown))
56+
return Err(PoolError::RolesLogic(Error::Shutdown));
5757
}
5858
Err(e) => {
5959
error!("Error receiving message: {:?}", e);

roles/pool/src/lib/mod.rs

+27-10
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ pub mod mining_pool;
33
pub mod status;
44
pub mod template_receiver;
55

6-
use async_channel::{bounded, unbounded};
7-
86
use error::PoolError;
97
use mining_pool::{get_coinbase_output, Configuration, Pool};
108
use template_receiver::TemplateRx;
@@ -24,11 +22,30 @@ impl PoolSv2 {
2422

2523
pub async fn start(&self) -> Result<(), PoolError> {
2624
let config = self.config.clone();
27-
let (status_tx, status_rx) = unbounded();
28-
let (s_new_t, r_new_t) = bounded(10);
29-
let (s_prev_hash, r_prev_hash) = bounded(10);
30-
let (s_solution, r_solution) = bounded(10);
31-
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
25+
// Single consumer, multiple producer. (mpsc can be used)
26+
// consumer on some line in start method we are using it.
27+
// producer are used in templateRx::connect and Pool start
28+
// producers are clonable so no issue. but its unbounded.
29+
// tokio also provide unbounded mpsc.
30+
// let (status_tx, status_rx) = unbounded();
31+
let (status_tx, mut status_rx) = tokio::sync::mpsc::unbounded_channel();
32+
// r_new_t consumer is sent in pool::start, s_new_t is sent in templateRx::connect
33+
// sender or producer I dont give a damn about. even the r_new_t is passed in only
34+
// start then to on_new_template, so mpsc makes sense here as well.
35+
// let (s_new_t, r_new_t) = bounded(10);
36+
let (s_new_t, r_new_t) = tokio::sync::mpsc::channel(10);
37+
// s_prev_hash (no one gives a damn about clonable stuff), which is only passed to
38+
// TemplateRx and nothing new. r_prev_hash is sent to pool::start which is also
39+
// sent to on_new_prevhash, so mpsc also works here.
40+
// let (s_prev_hash, r_prev_hash) = bounded(10);
41+
let (s_prev_hash, r_prev_hash) = tokio::sync::mpsc::channel(10);
42+
// s_solution is sent to pool (no one give a damn about clonable), r_solution is sent
43+
// to templateRx and then to on_new_solution, so mpsc works.
44+
let (s_solution, r_solution) = tokio::sync::mpsc::channel(10);
45+
// This is spicy, as the r_message_recv_signal is cloning at few of the places, so, we can
46+
// use broadcast.
47+
// let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
48+
let (s_message_recv_signal, _) = tokio::sync::broadcast::channel(10);
3249
let coinbase_output_result = get_coinbase_output(&config);
3350
let coinbase_output_len = coinbase_output_result?.len() as u32;
3451
let tp_authority_public_key = config.tp_authority_public_key;
@@ -37,8 +54,8 @@ impl PoolSv2 {
3754
s_new_t,
3855
s_prev_hash,
3956
r_solution,
40-
r_message_recv_signal,
41-
status::Sender::Upstream(status_tx.clone()),
57+
s_message_recv_signal.clone(),
58+
status::Sender::UpstreamTokio(status_tx.clone()),
4259
coinbase_output_len,
4360
tp_authority_public_key,
4461
)
@@ -49,7 +66,7 @@ impl PoolSv2 {
4966
r_prev_hash,
5067
s_solution,
5168
s_message_recv_signal,
52-
status::Sender::DownstreamListener(status_tx),
69+
status::Sender::DownstreamListenerTokio(status_tx),
5370
);
5471

5572
// Start the error handling loop

0 commit comments

Comments
 (0)