Skip to content

Commit aff49ea

Browse files
committed
fixes buffer overflow problem
1 parent 771df23 commit aff49ea

File tree

3 files changed

+44
-42
lines changed

3 files changed

+44
-42
lines changed

roles/pool/src/lib/mining_pool/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,9 @@ impl Downstream {
248248
return;
249249
}
250250
};
251+
let mut receiver = receiver.subscribe();
251252
loop {
252-
match receiver.subscribe().recv().await {
253+
match receiver.recv().await {
253254
Ok(received) => {
254255
let received: Result<StdFrame, _> = received
255256
.try_into()

roles/pool/src/lib/template_receiver/mod.rs

+31-31
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![allow(dead_code)]
2+
#![allow(clippy::result_large_err)]
13
use super::{
24
error::{PoolError, PoolResult},
35
mining_pool::{EitherFrame, StdFrame},
@@ -23,6 +25,7 @@ mod message_handler;
2325
mod setup_connection;
2426
use setup_connection::SetupConnectionHandler;
2527

28+
#[derive(Clone)]
2629
pub struct TemplateRx {
2730
receiver: tokio::sync::broadcast::Sender<EitherFrame>,
2831
sender: tokio::sync::broadcast::Sender<EitherFrame>,
@@ -68,15 +71,14 @@ impl TemplateRx {
6871

6972
SetupConnectionHandler::setup(&mut receiver, &mut sender, address).await?;
7073

71-
let self_ = Arc::new(Mutex::new(Self {
74+
let self_ = Self {
7275
receiver,
73-
sender,
76+
sender: sender.clone(),
7477
new_template_sender: templ_sender,
7578
new_prev_hash_sender: prev_h_sender,
7679
message_received_signal,
77-
status_tx,
78-
}));
79-
let cloned = self_.clone();
80+
status_tx: status_tx.clone(),
81+
};
8082

8183
let c_additional_size = CoinbaseOutputDataSize {
8284
coinbase_output_max_additional_size: coinbase_out_len,
@@ -86,29 +88,26 @@ impl TemplateRx {
8688
)
8789
.try_into()?;
8890

89-
Self::send(self_.clone(), frame).await?;
91+
Self::send(sender.clone(), frame)?;
9092

91-
task::spawn(async { Self::start(cloned).await });
92-
task::spawn(async { Self::on_new_solution(self_, solution_receiver).await });
93+
task::spawn(async { Self::start(self_).await });
94+
task::spawn(async { Self::on_new_solution(sender, status_tx, solution_receiver).await });
9395

9496
Ok(())
9597
}
9698

97-
pub async fn start(self_: Arc<Mutex<Self>>) {
98-
let (mut recv_msg_signal, receiver, new_template_sender, new_prev_hash_sender, status_tx) =
99-
self_
100-
.safe_lock(|s| {
101-
(
102-
s.message_received_signal.subscribe(),
103-
s.receiver.clone(),
104-
s.new_template_sender.clone(),
105-
s.new_prev_hash_sender.clone(),
106-
s.status_tx.clone(),
107-
)
108-
})
109-
.unwrap();
99+
pub async fn start(self_: Self) {
100+
let (mut recv_msg_signal, receiver, new_template_sender, new_prev_hash_sender, status_tx) = (
101+
self_.message_received_signal.subscribe(),
102+
self_.receiver.clone(),
103+
self_.new_template_sender.clone(),
104+
self_.new_prev_hash_sender.clone(),
105+
self_.status_tx.clone(),
106+
);
107+
let mut receiver = receiver.subscribe();
110108
loop {
111-
let message_from_tp = handle_result!(status_tx, receiver.subscribe().recv().await);
109+
let message_from_tp = handle_result!(status_tx, receiver.recv().await);
110+
info!("Message: {:?}", message_from_tp);
112111
let mut message_from_tp: StdFrame = handle_result!(
113112
status_tx,
114113
message_from_tp
@@ -123,7 +122,7 @@ impl TemplateRx {
123122
let msg = handle_result!(
124123
status_tx,
125124
ParseServerTemplateDistributionMessages::handle_message_template_distribution(
126-
self_.clone(),
125+
Arc::new(Mutex::new(self_.clone())),
127126
message_type,
128127
payload,
129128
)
@@ -155,34 +154,35 @@ impl TemplateRx {
155154
}
156155
}
157156

158-
pub async fn send(self_: Arc<Mutex<Self>>, sv2_frame: StdFrame) -> PoolResult<()> {
157+
pub fn send(
158+
sender: tokio::sync::broadcast::Sender<EitherFrame>,
159+
sv2_frame: StdFrame,
160+
) -> PoolResult<()> {
159161
let either_frame = sv2_frame.into();
160-
let sender = self_
161-
.safe_lock(|self_| self_.sender.clone())
162-
.map_err(|e| PoolError::PoisonLock(e.to_string()))?;
163162
sender.send(either_frame)?;
164163
Ok(())
165164
}
166165

167166
async fn on_new_solution(
168-
self_: Arc<Mutex<Self>>,
167+
sender: tokio::sync::broadcast::Sender<EitherFrame>,
168+
status_tx: status::Sender,
169169
mut rx: tokio::sync::mpsc::Receiver<SubmitSolution<'static>>,
170-
) {
171-
let status_tx = self_.safe_lock(|s| s.status_tx.clone()).unwrap();
170+
) -> PoolResult<()> {
172171
while let Some(solution) = rx.recv().await {
173172
info!("Sending Solution to TP: {:?}", &solution);
174173
let sv2_frame_res: Result<StdFrame, _> =
175174
PoolMessages::TemplateDistribution(TemplateDistribution::SubmitSolution(solution))
176175
.try_into();
177176
match sv2_frame_res {
178177
Ok(frame) => {
179-
handle_result!(status_tx, Self::send(self_.clone(), frame).await);
178+
handle_result!(status_tx, Self::send(sender.clone(), frame));
180179
}
181180
Err(_e) => {
182181
// return submit error
183182
todo!()
184183
}
185184
};
186185
}
186+
Ok(())
187187
}
188188
}

roles/pool/src/main.rs

+11-10
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,20 @@ mod args {
6868
}
6969
}
7070

71-
use tracing_subscriber::prelude::*;
72-
73-
fn init_tracing() {
74-
let console_layer = console_subscriber::spawn();
75-
tracing_subscriber::registry()
76-
.with(console_layer)
77-
.with(tracing_subscriber::fmt::layer())
78-
.init();
79-
}
71+
// use tracing_subscriber::prelude::*;
72+
73+
// fn init_tracing() {
74+
// let console_layer = console_subscriber::spawn();
75+
// tracing_subscriber::registry()
76+
// .with(console_layer)
77+
// .with(tracing_subscriber::fmt::layer())
78+
// .init();
79+
// }
8080

8181
#[tokio::main]
8282
async fn main() {
83-
init_tracing();
83+
// init_tracing();
84+
tracing_subscriber::fmt::init();
8485

8586
let args = match args::Args::from_args() {
8687
Ok(cfg) => cfg,

0 commit comments

Comments
 (0)