Skip to content

Commit f652034

Browse files
committed
remove all occurance of async-channel
1 parent 5b56d08 commit f652034

File tree

10 files changed

+42
-114
lines changed

10 files changed

+42
-114
lines changed

roles/Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

roles/pool/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ path = "src/lib/mod.rs"
1818

1919
[dependencies]
2020
stratum-common = { path = "../../common" }
21-
async-channel = "1.5.1"
2221
binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" }
2322
buffer_sv2 = { path = "../../utils/buffer" }
2423
codec_sv2 = { path = "../../protocols/v2/codec-sv2", features = ["noise_sv2"] }

roles/pool/src/lib/error.rs

+9-19
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use roles_logic_sv2::parsers::Mining;
1010
pub enum PoolError {
1111
Io(std::io::Error),
1212
ChannelSend(Box<dyn std::marker::Send + Debug>),
13-
ChannelRecv(async_channel::RecvError),
1413
BinarySv2(binary_sv2::Error),
1514
Codec(codec_sv2::Error),
1615
Noise(noise_sv2::Error),
@@ -21,7 +20,7 @@ pub enum PoolError {
2120
Custom(String),
2221
Sv2ProtocolError((u32, Mining<'static>)),
2322
TokioChannelRecv(Box<dyn std::marker::Send + Debug>),
24-
TokioBroadcastChannelRecv(tokio::sync::broadcast::error::RecvError)
23+
TokioBroadcastChannelRecv(tokio::sync::broadcast::error::RecvError),
2524
}
2625

2726
impl std::fmt::Display for PoolError {
@@ -30,7 +29,6 @@ impl std::fmt::Display for PoolError {
3029
match self {
3130
Io(ref e) => write!(f, "I/O error: `{:?}", e),
3231
ChannelSend(ref e) => write!(f, "Channel send failed: `{:?}`", e),
33-
ChannelRecv(ref e) => write!(f, "Channel recv failed: `{:?}`", e),
3432
BinarySv2(ref e) => write!(f, "Binary SV2 error: `{:?}`", e),
3533
Codec(ref e) => write!(f, "Codec SV2 error: `{:?}", e),
3634
Framing(ref e) => write!(f, "Framing SV2 error: `{:?}`", e),
@@ -41,9 +39,9 @@ impl std::fmt::Display for PoolError {
4139
Custom(ref e) => write!(f, "Custom SV2 error: `{:?}`", e),
4240
Sv2ProtocolError(ref e) => {
4341
write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e)
44-
},
42+
}
4543
TokioChannelRecv(ref e) => write!(f, "Channel recv failed: `{:?}`", e),
46-
TokioBroadcastChannelRecv(ref e) => write!(f, "BroadCastChannel Recv failed: {:?}", e)
44+
TokioBroadcastChannelRecv(ref e) => write!(f, "BroadCastChannel Recv failed: {:?}", e),
4745
}
4846
}
4947
}
@@ -62,12 +60,6 @@ impl From<std::io::Error> for PoolError {
6260
}
6361
}
6462

65-
impl From<async_channel::RecvError> for PoolError {
66-
fn from(e: async_channel::RecvError) -> PoolError {
67-
PoolError::ChannelRecv(e)
68-
}
69-
}
70-
7163
impl From<binary_sv2::Error> for PoolError {
7264
fn from(e: binary_sv2::Error) -> PoolError {
7365
PoolError::BinarySv2(e)
@@ -92,19 +84,17 @@ impl From<roles_logic_sv2::Error> for PoolError {
9284
}
9385
}
9486

95-
impl<'a, T: 'static + std::marker::Send + Debug> From<async_channel::SendError<T>> for PoolError {
96-
fn from(e: async_channel::SendError<T>) -> PoolError {
97-
PoolError::ChannelSend(Box::new(e))
98-
}
99-
}
100-
101-
impl<'a, T: 'static + std::marker::Send + Debug> From<tokio::sync::mpsc::error::SendError<T>> for PoolError {
87+
impl<T: 'static + std::marker::Send + Debug> From<tokio::sync::mpsc::error::SendError<T>>
88+
for PoolError
89+
{
10290
fn from(e: tokio::sync::mpsc::error::SendError<T>) -> PoolError {
10391
PoolError::TokioChannelRecv(Box::new(e))
10492
}
10593
}
10694

107-
impl<'a, T: 'static + std::marker::Send + Debug> From<tokio::sync::broadcast::error::SendError<T>> for PoolError {
95+
impl<T: 'static + std::marker::Send + Debug> From<tokio::sync::broadcast::error::SendError<T>>
96+
for PoolError
97+
{
10898
fn from(e: tokio::sync::broadcast::error::SendError<T>) -> PoolError {
10999
PoolError::TokioChannelRecv(Box::new(e))
110100
}

roles/pool/src/lib/mining_pool/mod.rs

+15-16
Original file line numberDiff line numberDiff line change
@@ -744,8 +744,8 @@ impl Pool {
744744

745745
// use super::Configuration;
746746

747-
// // this test is used to verify the `coinbase_tx_prefix` and `coinbase_tx_suffix` values tested
748-
// // against in message generator
747+
// // this test is used to verify the `coinbase_tx_prefix` and `coinbase_tx_suffix` values
748+
// tested // against in message generator
749749
// // `stratum/test/message-generator/test/pool-sri-test-extended.json`
750750
// #[test]
751751
// fn test_coinbase_outputs_from_config() {
@@ -778,9 +778,9 @@ impl Pool {
778778
// let _coinbase_tx_value_remaining: u64 = 625000000;
779779
// let _coinbase_tx_outputs_count = 0;
780780
// let coinbase_tx_locktime = 0;
781-
// let coinbase_tx_outputs: Vec<bitcoin::TxOut> = super::get_coinbase_output(&config).unwrap();
782-
// // extranonce len set to max_extranonce_size in `ChannelFactory::new_extended_channel()`
783-
// let extranonce_len = 32;
781+
// let coinbase_tx_outputs: Vec<bitcoin::TxOut> =
782+
// super::get_coinbase_output(&config).unwrap(); // extranonce len set to
783+
// max_extranonce_size in `ChannelFactory::new_extended_channel()` let extranonce_len = 32;
784784

785785
// // build coinbase TX from 'job_creator::coinbase()'
786786

@@ -813,9 +813,9 @@ impl Pool {
813813
// coinbase_tx_prefix
814814
// == [
815815
// 2, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
816-
// 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 56, 3, 76, 163, 38,
817-
// 0, 83, 116, 114, 97, 116, 117, 109, 32, 118, 50, 32, 83, 82, 73, 32, 80, 111,
818-
// 111, 108
816+
// 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 56, 3, 76, 163,
817+
// 38, 0, 83, 116, 114, 97, 116, 117, 109, 32, 118, 50, 32, 83, 82, 73, 32, 80,
818+
// 111, 111, 108
819819
// ]
820820
// .to_vec()
821821
// .try_into()
@@ -827,8 +827,8 @@ impl Pool {
827827
// == [
828828
// 255, 255, 255, 255, 1, 0, 0, 0, 0, 0, 0, 0, 0, 22, 0, 20, 235, 225, 183, 220,
829829
// 194, 147, 204, 170, 14, 231, 67, 168, 111, 137, 223, 130, 88, 194, 8, 252, 1,
830-
// 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
831-
// 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
830+
// 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
831+
// 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
832832
// ]
833833
// .to_vec()
834834
// .try_into()
@@ -840,8 +840,8 @@ impl Pool {
840840
// // copied from roles-logic-sv2::job_creator
841841
// fn coinbase_tx_prefix(coinbase: &Transaction, script_prefix_len: usize) -> B064K<'static> {
842842
// let encoded = coinbase.serialize();
843-
// // If script_prefix_len is not 0 we are not in a test enviornment and the coinbase have the
844-
// // 0 witness
843+
// // If script_prefix_len is not 0 we are not in a test enviornment and the coinbase have
844+
// the // 0 witness
845845
// let segwit_bytes = match script_prefix_len {
846846
// 0 => 0,
847847
// _ => 2,
@@ -864,8 +864,8 @@ impl Pool {
864864
// script_prefix_len: usize,
865865
// ) -> B064K<'static> {
866866
// let encoded = coinbase.serialize();
867-
// // If script_prefix_len is not 0 we are not in a test enviornment and the coinbase have the
868-
// // 0 witness
867+
// // If script_prefix_len is not 0 we are not in a test enviornment and the coinbase have
868+
// the // 0 witness
869869
// let segwit_bytes = match script_prefix_len {
870870
// 0 => 0,
871871
// _ => 2,
@@ -877,8 +877,7 @@ impl Pool {
877877
// + 4 // index
878878
// + 1 // bytes in script TODO can be also 3
879879
// + script_prefix_len // bip34_bytes
880-
// + (extranonce_len as usize)..]
881-
// .to_vec();
880+
// + (extranonce_len as usize)..] .to_vec();
882881
// r.try_into().unwrap()
883882
// }
884883

roles/pool/src/lib/mining_pool/setup_connection.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl SetupConnectionHandler {
5353
}
5454
Ok(EitherFrame::Shutdown) => {
5555
debug!("Shutdown message received");
56-
return Err(PoolError::RolesLogic(Error::Shutdown))
56+
return Err(PoolError::RolesLogic(Error::Shutdown));
5757
}
5858
Err(e) => {
5959
error!("Error receiving message: {:?}", e);

roles/pool/src/lib/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl PoolSv2 {
2828
// producers are clonable so no issue. but its unbounded.
2929
// tokio also provide unbounded mpsc.
3030
// let (status_tx, status_rx) = unbounded();
31-
let (status_tx,mut status_rx) = tokio::sync::mpsc::unbounded_channel();
31+
let (status_tx, mut status_rx) = tokio::sync::mpsc::unbounded_channel();
3232
// r_new_t consumer is sent in pool::start, s_new_t is sent in templateRx::connect
3333
// sender or producer I dont give a damn about. even the r_new_t is passed in only
3434
// start then to on_new_template, so mpsc makes sense here as well.
@@ -39,7 +39,7 @@ impl PoolSv2 {
3939
// sent to on_new_prevhash, so mpsc also works here.
4040
// let (s_prev_hash, r_prev_hash) = bounded(10);
4141
let (s_prev_hash, r_prev_hash) = tokio::sync::mpsc::channel(10);
42-
// s_solution is sent to pool (no one give a damn about clonable), r_solution is sent
42+
// s_solution is sent to pool (no one give a damn about clonable), r_solution is sent
4343
// to templateRx and then to on_new_solution, so mpsc works.
4444
let (s_solution, r_solution) = tokio::sync::mpsc::channel(10);
4545
// This is spicy, as the r_message_recv_signal is cloning at few of the places, so, we can

roles/pool/src/lib/status.rs

+7-68
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,15 @@ use super::error::PoolError;
77
/// the main thread to know which component sent the message
88
#[derive(Debug)]
99
pub enum Sender {
10-
Downstream(async_channel::Sender<Status>),
11-
DownstreamListener(async_channel::Sender<Status>),
12-
Upstream(async_channel::Sender<Status>),
1310
DownstreamTokio(tokio::sync::mpsc::UnboundedSender<Status>),
1411
DownstreamListenerTokio(tokio::sync::mpsc::UnboundedSender<Status>),
1512
UpstreamTokio(tokio::sync::mpsc::UnboundedSender<Status>),
16-
1713
}
1814

1915
#[derive(Debug)]
2016
pub enum Error {
21-
AsyncChannel(async_channel::SendError<Status>),
2217
TokioChannel(tokio::sync::mpsc::error::SendError<Status>),
23-
TokioChannelUnbounded(tokio::sync::mpsc::error::SendError<Status>)
24-
}
25-
26-
impl From<async_channel::SendError<Status>> for Error {
27-
fn from(value: async_channel::SendError<Status>) -> Self {
28-
Self::AsyncChannel(value)
29-
}
18+
TokioChannelUnbounded(tokio::sync::mpsc::error::SendError<Status>),
3019
}
3120

3221
impl From<tokio::sync::mpsc::error::SendError<Status>> for Error {
@@ -41,33 +30,26 @@ impl Sender {
4130
pub fn listener_to_connection(&self) -> Self {
4231
match self {
4332
// should only be used to clone the DownstreamListener(Sender) into Downstream(Sender)s
44-
Self::DownstreamListener(inner) => Self::Downstream(inner.clone()),
4533
Self::DownstreamListenerTokio(inner) => Self::DownstreamTokio(inner.clone()),
4634
_ => unreachable!(),
4735
}
4836
}
4937

5038
pub async fn send(&self, status: Status) -> Result<(), Error> {
5139
match self {
52-
Self::Downstream(inner) => inner.send(status).await.map_err(|e| Error::AsyncChannel(e)),
53-
Self::DownstreamListener(inner) => inner.send(status).await.map_err(|e| Error::AsyncChannel(e)),
54-
Self::Upstream(inner) => inner.send(status).await.map_err(|e| Error::AsyncChannel(e)),
55-
Self::DownstreamListenerTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e)),
56-
Self::DownstreamTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e)),
57-
Self::UpstreamTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e))
40+
Self::DownstreamListenerTokio(inner) => inner.send(status).map_err(Error::TokioChannel),
41+
Self::DownstreamTokio(inner) => inner.send(status).map_err(Error::TokioChannel),
42+
Self::UpstreamTokio(inner) => inner.send(status).map_err(Error::TokioChannel),
5843
}
5944
}
6045
}
6146

6247
impl Clone for Sender {
6348
fn clone(&self) -> Self {
6449
match self {
65-
Self::Downstream(inner) => Self::Downstream(inner.clone()),
66-
Self::DownstreamListener(inner) => Self::DownstreamListener(inner.clone()),
67-
Self::Upstream(inner) => Self::Upstream(inner.clone()),
6850
Self::DownstreamTokio(inner) => Self::DownstreamTokio(inner.clone()),
6951
Self::DownstreamListenerTokio(inner) => Self::DownstreamListenerTokio(inner.clone()),
70-
Self::UpstreamTokio(inner) => Self::UpstreamTokio(inner.clone())
52+
Self::UpstreamTokio(inner) => Self::UpstreamTokio(inner.clone()),
7153
}
7254
}
7355
}
@@ -95,46 +77,6 @@ async fn send_status(
9577
outcome: error_handling::ErrorBranch,
9678
) -> error_handling::ErrorBranch {
9779
match sender {
98-
Sender::Downstream(tx) => match e {
99-
PoolError::Sv2ProtocolError((id, Mining::OpenMiningChannelError(_))) => {
100-
tx.send(Status {
101-
state: State::DownstreamInstanceDropped(id),
102-
})
103-
.await
104-
.unwrap_or(());
105-
}
106-
_ => {
107-
let string_err = e.to_string();
108-
tx.send(Status {
109-
state: State::Healthy(string_err),
110-
})
111-
.await
112-
.unwrap_or(());
113-
}
114-
},
115-
Sender::DownstreamListener(tx) => match e {
116-
PoolError::RolesLogic(roles_logic_sv2::Error::NoDownstreamsConnected) => {
117-
tx.send(Status {
118-
state: State::Healthy("No Downstreams Connected".to_string()),
119-
})
120-
.await
121-
.unwrap_or(());
122-
}
123-
_ => {
124-
tx.send(Status {
125-
state: State::DownstreamShutdown(e),
126-
})
127-
.await
128-
.unwrap_or(());
129-
}
130-
},
131-
Sender::Upstream(tx) => {
132-
tx.send(Status {
133-
state: State::TemplateProviderShutdown(e),
134-
})
135-
.await
136-
.unwrap_or(());
137-
},
13880
Sender::DownstreamTokio(tx) => match e {
13981
PoolError::Sv2ProtocolError((id, Mining::OpenMiningChannelError(_))) => {
14082
tx.send(Status {
@@ -188,9 +130,6 @@ pub async fn handle_error(sender: &Sender, e: PoolError) -> error_handling::Erro
188130
// the map won't get send called on them
189131
send_status(sender, e, error_handling::ErrorBranch::Continue).await
190132
}
191-
PoolError::ChannelRecv(_) => {
192-
send_status(sender, e, error_handling::ErrorBranch::Break).await
193-
}
194133
PoolError::BinarySv2(_) => send_status(sender, e, error_handling::ErrorBranch::Break).await,
195134
PoolError::Codec(_) => send_status(sender, e, error_handling::ErrorBranch::Break).await,
196135
PoolError::Noise(_) => send_status(sender, e, error_handling::ErrorBranch::Continue).await,
@@ -210,10 +149,10 @@ pub async fn handle_error(sender: &Sender, e: PoolError) -> error_handling::Erro
210149
}
211150
PoolError::Sv2ProtocolError(_) => {
212151
send_status(sender, e, error_handling::ErrorBranch::Break).await
213-
},
152+
}
214153
PoolError::TokioChannelRecv(_) => {
215154
send_status(sender, e, error_handling::ErrorBranch::Continue).await
216-
},
155+
}
217156
PoolError::TokioBroadcastChannelRecv(_) => {
218157
send_status(sender, e, error_handling::ErrorBranch::Continue).await
219158
}

roles/pool/src/lib/template_receiver/mod.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl TemplateRx {
6262
None => Initiator::without_pk(),
6363
}?;
6464
let (mut receiver, mut sender, _, _) =
65-
Connection::new(stream, HandshakeRole::Initiator(initiator),10)
65+
Connection::new(stream, HandshakeRole::Initiator(initiator), 10)
6666
.await
6767
.unwrap();
6868

@@ -164,7 +164,10 @@ impl TemplateRx {
164164
Ok(())
165165
}
166166

167-
async fn on_new_solution(self_: Arc<Mutex<Self>>,mut rx: tokio::sync::mpsc::Receiver<SubmitSolution<'static>>) {
167+
async fn on_new_solution(
168+
self_: Arc<Mutex<Self>>,
169+
mut rx: tokio::sync::mpsc::Receiver<SubmitSolution<'static>>,
170+
) {
168171
let status_tx = self_.safe_lock(|s| s.status_tx.clone()).unwrap();
169172
while let Some(solution) = rx.recv().await {
170173
info!("Sending Solution to TP: {:?}", &solution);

roles/pool/src/lib/template_receiver/setup_connection.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ impl SetupConnectionHandler {
4747
let sv2_frame = sv2_frame.into();
4848
sender.send(sv2_frame)?;
4949

50-
let mut incoming: StdFrame = receiver.subscribe()
50+
let mut incoming: StdFrame = receiver
51+
.subscribe()
5152
.recv()
5253
.await?
5354
.try_into()

roles/pool/src/main.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,7 @@ fn init_tracing() {
7474
let console_layer = console_subscriber::spawn();
7575
tracing_subscriber::registry()
7676
.with(console_layer)
77-
.with(
78-
tracing_subscriber::fmt::layer()
79-
)
77+
.with(tracing_subscriber::fmt::layer())
8078
.init();
8179
}
8280

0 commit comments

Comments
 (0)