@@ -10,7 +10,7 @@ use std::{
10
10
} ;
11
11
12
12
use tokio:: {
13
- sync:: broadcast,
13
+ sync:: { broadcast, Notify as notification } ,
14
14
task:: { self , AbortHandle } ,
15
15
} ;
16
16
use tracing:: { debug, error, info, warn} ;
@@ -32,6 +32,7 @@ pub mod utils;
32
32
pub struct TranslatorSv2 {
33
33
config : ProxyConfig ,
34
34
reconnect_wait_time : u64 ,
35
+ shutdown : Arc < notification > ,
35
36
}
36
37
37
38
impl TranslatorSv2 {
@@ -41,6 +42,7 @@ impl TranslatorSv2 {
41
42
Self {
42
43
config,
43
44
reconnect_wait_time : wait_time,
45
+ shutdown : Arc :: new ( notification:: new ( ) ) ,
44
46
}
45
47
}
46
48
@@ -58,7 +60,8 @@ impl TranslatorSv2 {
58
60
let task_collector: Arc < Mutex < Vec < ( AbortHandle , String ) > > > =
59
61
Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
60
62
61
- self . internal_start (
63
+ Self :: internal_start (
64
+ self . config . clone ( ) ,
62
65
tx_sv1_notify. clone ( ) ,
63
66
target. clone ( ) ,
64
67
tx_status. clone ( ) ,
@@ -72,74 +75,78 @@ impl TranslatorSv2 {
72
75
debug ! ( "Starting up status listener" ) ;
73
76
let wait_time = self . reconnect_wait_time ;
74
77
// Check all tasks if is_finished() is true, if so exit
75
- loop {
76
- let task_status = tokio:: select! {
77
- task_status = rx_status. recv( ) . fuse( ) => task_status,
78
- interrupt_signal = tokio:: signal:: ctrl_c( ) . fuse( ) => {
79
- match interrupt_signal {
80
- Ok ( ( ) ) => {
81
- info!( "Interrupt received" ) ;
82
- } ,
83
- Err ( err) => {
84
- error!( "Unable to listen for interrupt signal: {}" , err) ;
85
- // we also shut down in case of error
86
- } ,
87
- }
88
- break ;
89
- }
90
- } ;
91
- let task_status: Status = task_status. unwrap ( ) ;
92
78
93
- match task_status. state {
94
- // Should only be sent by the downstream listener
95
- State :: DownstreamShutdown ( err) => {
96
- error ! ( "SHUTDOWN from: {}" , err) ;
97
- break ;
98
- }
99
- State :: BridgeShutdown ( err) => {
100
- error ! ( "SHUTDOWN from: {}" , err) ;
101
- break ;
79
+ tokio:: spawn ( {
80
+ let shutdown_signal = self . shutdown . clone ( ) ;
81
+ async move {
82
+ if tokio:: signal:: ctrl_c ( ) . await . is_ok ( ) {
83
+ info ! ( "Interrupt received" ) ;
84
+ shutdown_signal. notify_one ( ) ;
102
85
}
103
- State :: UpstreamShutdown ( err) => {
104
- error ! ( "SHUTDOWN from: {}" , err) ;
105
- break ;
106
- }
107
- State :: UpstreamTryReconnect ( err) => {
108
- error ! ( "SHUTDOWN from: {}" , err) ;
86
+ }
87
+ } ) ;
109
88
110
- // wait a random amount of time between 0 and 3000ms
111
- // if all the downstreams try to reconnect at the same time, the upstream may
112
- // fail
113
- tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( wait_time) ) . await ;
89
+ loop {
90
+ tokio:: select! {
91
+ task_status = rx_status. recv( ) . fuse( ) => {
92
+ if let Ok ( task_status_) = task_status {
93
+ match task_status_. state {
94
+ State :: DownstreamShutdown ( err) | State :: BridgeShutdown ( err) | State :: UpstreamShutdown ( err) => {
95
+ error!( "SHUTDOWN from: {}" , err) ;
96
+ self . shutdown. notify_one( ) ;
97
+ }
98
+ State :: UpstreamTryReconnect ( err) => {
99
+ error!( "SHUTDOWN from: {}" , err) ;
100
+ let task_collector1 = task_collector_. clone( ) ;
101
+ let tx_sv1_notify1 = tx_sv1_notify. clone( ) ;
102
+ let target = target. clone( ) ;
103
+ let tx_status = tx_status. clone( ) ;
104
+ let proxy_config = self . config. clone( ) ;
105
+ tokio:: spawn ( async move {
106
+ // wait a random amount of time between 0 and 3000ms
107
+ // if all the downstreams try to reconnect at the same time, the upstream may
108
+ // fail
109
+ tokio:: time:: sleep( std:: time:: Duration :: from_millis( wait_time) ) . await ;
114
110
115
- // kill al the tasks
116
- let task_collector_aborting = task_collector_ . clone ( ) ;
117
- kill_tasks ( task_collector_aborting. clone ( ) ) ;
111
+ // kill al the tasks
112
+ let task_collector_aborting = task_collector1 . clone( ) ;
113
+ kill_tasks( task_collector_aborting. clone( ) ) ;
118
114
119
- warn ! ( "Trying reconnecting to upstream" ) ;
120
- self . internal_start (
121
- tx_sv1_notify. clone ( ) ,
122
- target. clone ( ) ,
123
- tx_status. clone ( ) ,
124
- task_collector_. clone ( ) ,
125
- )
126
- . await ;
115
+ warn!( "Trying reconnecting to upstream" ) ;
116
+ Self :: internal_start(
117
+ proxy_config,
118
+ tx_sv1_notify1,
119
+ target. clone( ) ,
120
+ tx_status. clone( ) ,
121
+ task_collector1,
122
+ )
123
+ . await ;
124
+ } ) ;
125
+ }
126
+ State :: Healthy ( msg) => {
127
+ info!( "HEALTHY message: {}" , msg) ;
128
+ self . shutdown. notify_one( ) ;
129
+ }
130
+ }
131
+ } else {
132
+ break ; // Channel closed
133
+ }
127
134
}
128
- State :: Healthy ( msg) => {
129
- info ! ( "HEALTHY message: {}" , msg) ;
135
+ _ = self . shutdown. notified( ) => {
136
+ info!( "Shutting down gracefully..." ) ;
137
+ break ;
130
138
}
131
139
}
132
140
}
133
141
}
134
142
135
143
async fn internal_start (
136
- & self ,
144
+ proxy_config : ProxyConfig ,
137
145
tx_sv1_notify : broadcast:: Sender < server_to_client:: Notify < ' static > > ,
138
146
target : Arc < Mutex < Vec < u8 > > > ,
139
147
tx_status : async_channel:: Sender < Status < ' static > > ,
140
148
task_collector : Arc < Mutex < Vec < ( AbortHandle , String ) > > > ,
141
149
) {
142
- let proxy_config = self . config . clone ( ) ;
143
150
// Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream`
144
151
// (Sender<SubmitSharesExtended<'static>>, Receiver<SubmitSharesExtended<'static>>)
145
152
let ( tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded ( 10 ) ;
0 commit comments