@@ -270,47 +270,63 @@ impl Pool {
270
270
async fn accept_incoming_connection (
271
271
self_ : Arc < Mutex < Pool > > ,
272
272
config : PoolConfig ,
273
+ mut recv_stop_signal : tokio:: sync:: watch:: Receiver < ( ) > ,
273
274
) -> PoolResult < ( ) > {
274
275
let status_tx = self_. safe_lock ( |s| s. status_tx . clone ( ) ) ?;
275
- let listener = TcpListener :: bind ( & config. listen_address ( ) ) . await ?;
276
- info ! (
277
- "Listening for encrypted connection on: {}" ,
278
- config. listen_address( )
279
- ) ;
280
- while let Ok ( ( stream, _) ) = listener. accept ( ) . await {
281
- let address = stream. peer_addr ( ) . unwrap ( ) ;
282
- debug ! (
283
- "New connection from {:?}" ,
284
- stream. peer_addr( ) . map_err( PoolError :: Io )
285
- ) ;
286
-
287
- let responder = Responder :: from_authority_kp (
288
- & config. authority_public_key ( ) . into_bytes ( ) ,
289
- & config. authority_secret_key ( ) . into_bytes ( ) ,
290
- std:: time:: Duration :: from_secs ( config. cert_validity_sec ( ) ) ,
291
- ) ;
292
- match responder {
293
- Ok ( resp) => {
294
- if let Ok ( ( receiver, sender, _, _) ) =
295
- Connection :: new ( stream, HandshakeRole :: Responder ( resp) ) . await
296
- {
297
- handle_result ! (
298
- status_tx,
299
- Self :: accept_incoming_connection_(
300
- self_. clone( ) ,
301
- receiver,
302
- sender,
303
- address
304
- )
305
- . await
306
- ) ;
276
+ let listener = match TcpListener :: bind ( & config. listen_address ( ) ) . await {
277
+ Ok ( listener) => listener,
278
+ Err ( e) => {
279
+ return Err ( PoolError :: Io ( e) ) ;
280
+ }
281
+ } ;
282
+ info ! ( "Pool is running on: {}" , config. listen_address( ) ) ;
283
+ // Run the listener in the background
284
+ task:: spawn ( async move {
285
+ loop {
286
+ tokio:: select! {
287
+ _ = recv_stop_signal. changed( ) => {
288
+ dbg!( "Pool is stopping listener after stop signal received" ) ;
289
+ break ;
290
+ } ,
291
+
292
+ result = listener. accept( ) => {
293
+ match result {
294
+ Ok ( ( stream, _) ) => {
295
+ let address = stream. peer_addr( ) . unwrap( ) ;
296
+ info!( "New connection from {:?}" , stream. peer_addr( ) . map_err( PoolError :: Io ) ) ;
297
+ let responder = Responder :: from_authority_kp(
298
+ & config. authority_public_key( ) . into_bytes( ) ,
299
+ & config. authority_secret_key( ) . into_bytes( ) ,
300
+ std:: time:: Duration :: from_secs( config. cert_validity_sec( ) ) ,
301
+ ) ;
302
+
303
+ match responder {
304
+ Ok ( resp) => {
305
+ if let Ok ( ( receiver, sender, _, _) ) = Connection :: new( stream, HandshakeRole :: Responder ( resp) ) . await {
306
+ handle_result!(
307
+ status_tx,
308
+ Self :: accept_incoming_connection_(
309
+ self_. clone( ) ,
310
+ receiver,
311
+ sender,
312
+ address
313
+ ) . await
314
+ ) ;
315
+ }
316
+ }
317
+ Err ( _) => {
318
+ return ;
319
+ }
320
+ }
321
+ }
322
+ Err ( e) => {
323
+ error!( "Error accepting connection: {:?}" , e) ;
324
+ }
325
+ }
307
326
}
308
327
}
309
- Err ( _e) => {
310
- todo ! ( )
311
- }
312
328
}
313
- }
329
+ } ) ;
314
330
Ok ( ( ) )
315
331
}
316
332
@@ -444,15 +460,16 @@ impl Pool {
444
460
Ok ( ( ) )
445
461
}
446
462
447
- pub fn start (
463
+ pub async fn start (
448
464
config : PoolConfig ,
449
465
new_template_rx : Receiver < NewTemplate < ' static > > ,
450
466
new_prev_hash_rx : Receiver < SetNewPrevHash < ' static > > ,
451
467
solution_sender : Sender < SubmitSolution < ' static > > ,
452
468
sender_message_received_signal : Sender < ( ) > ,
453
469
status_tx : status:: Sender ,
454
470
shares_per_minute : f32 ,
455
- ) -> Arc < Mutex < Self > > {
471
+ recv_stop_signal : tokio:: sync:: watch:: Receiver < ( ) > ,
472
+ ) -> Result < Arc < Mutex < Self > > , PoolError > {
456
473
let extranonce_len = 32 ;
457
474
let range_0 = std:: ops:: Range { start : 0 , end : 0 } ;
458
475
let range_1 = std:: ops:: Range { start : 0 , end : 16 } ;
@@ -488,24 +505,20 @@ impl Pool {
488
505
let cloned2 = pool. clone ( ) ;
489
506
let cloned3 = pool. clone ( ) ;
490
507
491
- info ! ( "Starting up pool listener " ) ;
508
+ info ! ( "Starting up Pool server " ) ;
492
509
let status_tx_clone = status_tx. clone ( ) ;
493
- task:: spawn ( async move {
494
- if let Err ( e) = Self :: accept_incoming_connection ( cloned, config) . await {
495
- error ! ( "{}" , e) ;
496
- }
497
- if status_tx_clone
510
+ if let Err ( e) = Self :: accept_incoming_connection ( cloned, config, recv_stop_signal) . await {
511
+ error ! ( "Pool stopped accepting connections due to: {}" , & e) ;
512
+ let _ = status_tx_clone
498
513
. send ( status:: Status {
499
514
state : status:: State :: DownstreamShutdown ( PoolError :: ComponentShutdown (
500
- "Downstream no longer accepting incoming connections" . to_string ( ) ,
515
+ "Pool stopped accepting connections" . to_string ( ) ,
501
516
) ) ,
502
517
} )
503
- . await
504
- . is_err ( )
505
- {
506
- error ! ( "Downstream shutdown and Status Channel dropped" ) ;
507
- }
508
- } ) ;
518
+ . await ;
519
+
520
+ return Err ( e) ;
521
+ }
509
522
510
523
let cloned = sender_message_received_signal. clone ( ) ;
511
524
let status_tx_clone = status_tx. clone ( ) ;
@@ -547,7 +560,7 @@ impl Pool {
547
560
error ! ( "Downstream shutdown and Status Channel dropped" ) ;
548
561
}
549
562
} ) ;
550
- cloned3
563
+ Ok ( cloned3)
551
564
}
552
565
553
566
/// This removes the downstream from the list of downstreams
0 commit comments