@@ -71,7 +71,7 @@ impl Client {
71
71
/// the information from `sender_share`, it is formatted as a `v1::client_to_server::Submit`
72
72
/// and then serialized into a json message that is sent to the Upstream via
73
73
/// `sender_outgoing`.
74
- pub async fn connect ( client_id : u32 , upstream_addr : SocketAddr ) {
74
+ pub async fn connect ( client_id : u32 , upstream_addr : SocketAddr , single_submit : bool ) {
75
75
let stream = TcpStream :: connect ( upstream_addr) . await . unwrap ( ) ;
76
76
let ( reader, mut writer) = stream. into_split ( ) ;
77
77
@@ -86,6 +86,7 @@ impl Client {
86
86
// Upstream via `sender_outgoing`
87
87
let ( sender_share, receiver_share) = unbounded ( ) ;
88
88
89
+ let ( send_stop_submitting, mut recv_stop_submitting) = tokio:: sync:: watch:: channel ( false ) ;
89
90
// Instantiates a new `Miner` (a mock of an actual Mining Device) with a job id of 0.
90
91
let miner = Arc :: new ( Mutex :: new ( Miner :: new ( 0 ) ) ) ;
91
92
@@ -128,6 +129,9 @@ impl Client {
128
129
loop {
129
130
let message: String = receiver_outgoing. recv ( ) . await . unwrap ( ) ;
130
131
( writer) . write_all ( message. as_bytes ( ) ) . await . unwrap ( ) ;
132
+ if message. contains ( "mining.submit" ) && single_submit {
133
+ send_stop_submitting. send ( true ) . unwrap ( ) ;
134
+ }
131
135
}
132
136
} ) ;
133
137
@@ -170,43 +174,53 @@ impl Client {
170
174
// Sends relevant candidate block header values needed to construct a
171
175
// `mining.submit` message to the `receiver_share` in the task that is responsible
172
176
// for sending messages to the Upstream node.
173
- sender_share
177
+ if sender_share
174
178
. try_send ( ( nonce, job_id. unwrap ( ) , version. unwrap ( ) , time) )
175
- . unwrap ( ) ;
179
+ . is_err ( )
180
+ {
181
+ warn ! ( "Share channel is not available" ) ;
182
+ break ;
183
+ }
176
184
}
177
185
miner_cloned
178
186
. safe_lock ( |m| m. header . as_mut ( ) . map ( |h| h. nonce += 1 ) )
179
187
. unwrap ( ) ;
180
188
} ) ;
181
-
182
189
// Task to receive relevant candidate block header values needed to construct a
183
190
// `mining.submit` message. This message is contructed as a `client_to_server::Submit` and
184
191
// then serialized into json to be sent to the Upstream via the `sender_outgoing` sender.
185
192
let cloned = client. clone ( ) ;
186
193
task:: spawn ( async move {
187
- let recv = receiver_share. clone ( ) ;
188
- loop {
189
- let ( nonce, job_id, _version, ntime) = recv. recv ( ) . await . unwrap ( ) ;
190
- if cloned. clone ( ) . safe_lock ( |c| c. status ) . unwrap ( ) != ClientStatus :: Subscribed {
191
- continue ;
192
- }
193
- let extra_nonce2: Extranonce =
194
- vec ! [ 0 ; cloned. safe_lock( |c| c. extranonce2_size. unwrap( ) ) . unwrap( ) ]
195
- . try_into ( )
196
- . unwrap ( ) ;
197
- let submit = client_to_server:: Submit {
198
- id : 0 ,
199
- user_name : "user" . into ( ) , // TODO: user name should NOT be hardcoded
200
- job_id : job_id. to_string ( ) ,
201
- extra_nonce2,
202
- time : HexU32Be ( ntime) ,
203
- nonce : HexU32Be ( nonce) ,
204
- version_bits : None ,
205
- } ;
206
- let message: json_rpc:: Message = submit. into ( ) ;
207
- let message = format ! ( "{}\n " , serde_json:: to_string( & message) . unwrap( ) ) ;
208
- sender_outgoing_clone. send ( message) . await . unwrap ( ) ;
209
- }
194
+ tokio:: select!(
195
+ _ = recv_stop_submitting. changed( ) => {
196
+ warn!( "Stopping miner" )
197
+ } ,
198
+ _ = async {
199
+ let recv = receiver_share. clone( ) ;
200
+ loop {
201
+ let ( nonce, job_id, _version, ntime) = recv. recv( ) . await . unwrap( ) ;
202
+ if cloned. clone( ) . safe_lock( |c| c. status) . unwrap( ) != ClientStatus :: Subscribed {
203
+ continue ;
204
+ }
205
+ let extra_nonce2: Extranonce =
206
+ vec![ 0 ; cloned. safe_lock( |c| c. extranonce2_size. unwrap( ) ) . unwrap( ) ]
207
+ . try_into( )
208
+ . unwrap( ) ;
209
+ let submit = client_to_server:: Submit {
210
+ id: 0 ,
211
+ user_name: "user" . into( ) , // TODO: user name should NOT be hardcoded
212
+ job_id: job_id. to_string( ) ,
213
+ extra_nonce2,
214
+ time: HexU32Be ( ntime) ,
215
+ nonce: HexU32Be ( nonce) ,
216
+ version_bits: None ,
217
+ } ;
218
+ let message: json_rpc:: Message = submit. into( ) ;
219
+ let message = format!( "{}\n " , serde_json:: to_string( & message) . unwrap( ) ) ;
220
+ sender_outgoing_clone. send( message) . await . unwrap( ) ;
221
+ }
222
+ } => { }
223
+ )
210
224
} ) ;
211
225
let recv_incoming = client. safe_lock ( |c| c. receiver_incoming . clone ( ) ) . unwrap ( ) ;
212
226
@@ -226,8 +240,11 @@ impl Client {
226
240
// Waits for the `sender_incoming` to get message line from socket to be parsed by the
227
241
// `Client`
228
242
loop {
229
- let incoming = recv_incoming. recv ( ) . await . unwrap ( ) ;
230
- Self :: parse_message ( client. clone ( ) , Ok ( incoming) ) . await ;
243
+ if let Ok ( incoming) = recv_incoming. clone ( ) . recv ( ) . await {
244
+ Self :: parse_message ( client. clone ( ) , Ok ( incoming) ) . await ;
245
+ } else {
246
+ warn ! ( "Error reading from socket via `recv_incoming` channel" )
247
+ }
231
248
}
232
249
}
233
250
0 commit comments