@@ -71,7 +71,7 @@ impl Client {
71
71
/// the information from `sender_share`, it is formatted as a `v1::client_to_server::Submit`
72
72
/// and then serialized into a json message that is sent to the Upstream via
73
73
/// `sender_outgoing`.
74
- pub async fn connect ( client_id : u32 , upstream_addr : SocketAddr ) {
74
+ pub async fn connect ( client_id : u32 , upstream_addr : SocketAddr , single_submit : bool ) {
75
75
let stream = TcpStream :: connect ( upstream_addr) . await . unwrap ( ) ;
76
76
let ( reader, mut writer) = stream. into_split ( ) ;
77
77
@@ -86,6 +86,7 @@ impl Client {
86
86
// Upstream via `sender_outgoing`
87
87
let ( sender_share, receiver_share) = unbounded ( ) ;
88
88
89
+ let ( send_stop_submitting, mut recv_stop_submitting) = tokio:: sync:: watch:: channel ( false ) ;
89
90
// Instantiates a new `Miner` (a mock of an actual Mining Device) with a job id of 0.
90
91
let miner = Arc :: new ( Mutex :: new ( Miner :: new ( 0 ) ) ) ;
91
92
@@ -104,31 +105,42 @@ impl Client {
104
105
// Reads messages sent by the Upstream from the socket to be passed to the
105
106
// `receiver_incoming`
106
107
task:: spawn ( async move {
107
- let mut messages = BufReader :: new ( reader) . lines ( ) ;
108
- while let Ok ( message) = messages. next_line ( ) . await {
109
- match message {
108
+ tokio:: select!(
109
+ _ = async {
110
+ let mut messages = BufReader :: new( reader) . lines( ) ;
111
+ while let Ok ( message) = messages. next_line( ) . await {
112
+ match message {
110
113
Some ( msg) => {
111
- if let Err ( e) = sender_incoming. send ( msg) . await {
112
- error ! ( "Failed to send message to receiver_incoming: {:?}" , e) ;
113
- break ; // Exit the loop if sending fails
114
- }
114
+ if let Err ( e) = sender_incoming. send( msg) . await {
115
+ error!( "Failed to send message to receiver_incoming: {:?}" , e) ;
116
+ break ; // Exit the loop if sending fails
117
+ }
115
118
}
116
119
None => {
117
- error ! ( "Error reading from socket" ) ;
118
- break ; // Exit the loop on read failure
120
+ error!( "Error reading from socket" ) ;
121
+ break ; // Exit the loop on read failure
119
122
}
123
+ }
120
124
}
121
- }
122
- warn ! ( "Reader task terminated." ) ;
125
+ warn!( "Reader task terminated." ) ;
126
+ } => { }
127
+ ) ;
123
128
} ) ;
124
129
125
130
// Waits to receive a message from `sender_outgoing` and writes it to the socket for the
126
131
// Upstream to receive
127
132
task:: spawn ( async move {
128
- loop {
129
- let message: String = receiver_outgoing. recv ( ) . await . unwrap ( ) ;
130
- ( writer) . write_all ( message. as_bytes ( ) ) . await . unwrap ( ) ;
131
- }
133
+ tokio:: select!(
134
+ _ = async {
135
+ loop {
136
+ let message: String = receiver_outgoing. recv( ) . await . unwrap( ) ;
137
+ ( writer) . write_all( message. as_bytes( ) ) . await . unwrap( ) ;
138
+ if message. contains( "mining.submit" ) && single_submit {
139
+ send_stop_submitting. send( true ) . unwrap( ) ;
140
+ }
141
+ }
142
+ } => { }
143
+ )
132
144
} ) ;
133
145
134
146
// Clone the sender to the Upstream node to use it in another task below as
@@ -170,43 +182,53 @@ impl Client {
170
182
// Sends relevant candidate block header values needed to construct a
171
183
// `mining.submit` message to the `receiver_share` in the task that is responsible
172
184
// for sending messages to the Upstream node.
173
- sender_share
185
+ if sender_share
174
186
. try_send ( ( nonce, job_id. unwrap ( ) , version. unwrap ( ) , time) )
175
- . unwrap ( ) ;
187
+ . is_err ( )
188
+ {
189
+ warn ! ( "Share channel is not available" ) ;
190
+ break ;
191
+ }
176
192
}
177
193
miner_cloned
178
194
. safe_lock ( |m| m. header . as_mut ( ) . map ( |h| h. nonce += 1 ) )
179
195
. unwrap ( ) ;
180
196
} ) ;
181
-
182
197
// Task to receive relevant candidate block header values needed to construct a
183
198
// `mining.submit` message. This message is contructed as a `client_to_server::Submit` and
184
199
// then serialized into json to be sent to the Upstream via the `sender_outgoing` sender.
185
200
let cloned = client. clone ( ) ;
186
201
task:: spawn ( async move {
187
- let recv = receiver_share. clone ( ) ;
188
- loop {
189
- let ( nonce, job_id, _version, ntime) = recv. recv ( ) . await . unwrap ( ) ;
190
- if cloned. clone ( ) . safe_lock ( |c| c. status ) . unwrap ( ) != ClientStatus :: Subscribed {
191
- continue ;
192
- }
193
- let extra_nonce2: Extranonce =
194
- vec ! [ 0 ; cloned. safe_lock( |c| c. extranonce2_size. unwrap( ) ) . unwrap( ) ]
195
- . try_into ( )
196
- . unwrap ( ) ;
197
- let submit = client_to_server:: Submit {
198
- id : 0 ,
199
- user_name : "user" . into ( ) , // TODO: user name should NOT be hardcoded
200
- job_id : job_id. to_string ( ) ,
201
- extra_nonce2,
202
- time : HexU32Be ( ntime) ,
203
- nonce : HexU32Be ( nonce) ,
204
- version_bits : None ,
205
- } ;
206
- let message: json_rpc:: Message = submit. into ( ) ;
207
- let message = format ! ( "{}\n " , serde_json:: to_string( & message) . unwrap( ) ) ;
208
- sender_outgoing_clone. send ( message) . await . unwrap ( ) ;
209
- }
202
+ tokio:: select!(
203
+ _ = recv_stop_submitting. changed( ) => {
204
+ warn!( "Stopping miner" )
205
+ } ,
206
+ _ = async {
207
+ let recv = receiver_share. clone( ) ;
208
+ loop {
209
+ let ( nonce, job_id, _version, ntime) = recv. recv( ) . await . unwrap( ) ;
210
+ if cloned. clone( ) . safe_lock( |c| c. status) . unwrap( ) != ClientStatus :: Subscribed {
211
+ continue ;
212
+ }
213
+ let extra_nonce2: Extranonce =
214
+ vec![ 0 ; cloned. safe_lock( |c| c. extranonce2_size. unwrap( ) ) . unwrap( ) ]
215
+ . try_into( )
216
+ . unwrap( ) ;
217
+ let submit = client_to_server:: Submit {
218
+ id: 0 ,
219
+ user_name: "user" . into( ) , // TODO: user name should NOT be hardcoded
220
+ job_id: job_id. to_string( ) ,
221
+ extra_nonce2,
222
+ time: HexU32Be ( ntime) ,
223
+ nonce: HexU32Be ( nonce) ,
224
+ version_bits: None ,
225
+ } ;
226
+ let message: json_rpc:: Message = submit. into( ) ;
227
+ let message = format!( "{}\n " , serde_json:: to_string( & message) . unwrap( ) ) ;
228
+ sender_outgoing_clone. send( message) . await . unwrap( ) ;
229
+ }
230
+ } => { }
231
+ )
210
232
} ) ;
211
233
let recv_incoming = client. safe_lock ( |c| c. receiver_incoming . clone ( ) ) . unwrap ( ) ;
212
234
@@ -226,8 +248,11 @@ impl Client {
226
248
// Waits for the `sender_incoming` to get message line from socket to be parsed by the
227
249
// `Client`
228
250
loop {
229
- let incoming = recv_incoming. recv ( ) . await . unwrap ( ) ;
230
- Self :: parse_message ( client. clone ( ) , Ok ( incoming) ) . await ;
251
+ if let Ok ( incoming) = recv_incoming. clone ( ) . recv ( ) . await {
252
+ Self :: parse_message ( client. clone ( ) , Ok ( incoming) ) . await ;
253
+ } else {
254
+ warn ! ( "Error reading from socket via `recv_incoming` channel" )
255
+ }
231
256
}
232
257
}
233
258
0 commit comments