@@ -10,7 +10,8 @@ use std::{
10
10
} ;
11
11
12
12
use tokio:: {
13
- sync:: broadcast,
13
+ select,
14
+ sync:: { broadcast, Notify } ,
14
15
task:: { self , AbortHandle } ,
15
16
} ;
16
17
use tracing:: { debug, error, info, warn} ;
@@ -32,6 +33,7 @@ pub mod utils;
32
33
pub struct TranslatorSv2 {
33
34
config : ProxyConfig ,
34
35
reconnect_wait_time : u64 ,
36
+ shutdown : Arc < Notify > ,
35
37
}
36
38
37
39
impl TranslatorSv2 {
@@ -41,6 +43,7 @@ impl TranslatorSv2 {
41
43
Self {
42
44
config,
43
45
reconnect_wait_time : wait_time,
46
+ shutdown : Arc :: new ( Notify :: new ( ) ) ,
44
47
}
45
48
}
46
49
@@ -58,7 +61,8 @@ impl TranslatorSv2 {
58
61
let task_collector: Arc < Mutex < Vec < ( AbortHandle , String ) > > > =
59
62
Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
60
63
61
- self . internal_start (
64
+ Self :: internal_start (
65
+ self . config . clone ( ) ,
62
66
tx_sv1_notify. clone ( ) ,
63
67
target. clone ( ) ,
64
68
tx_status. clone ( ) ,
@@ -72,74 +76,79 @@ impl TranslatorSv2 {
72
76
debug ! ( "Starting up status listener" ) ;
73
77
let wait_time = self . reconnect_wait_time ;
74
78
// Check all tasks if is_finished() is true, if so exit
75
- loop {
76
- let task_status = tokio:: select! {
77
- task_status = rx_status. recv( ) . fuse( ) => task_status,
78
- interrupt_signal = tokio:: signal:: ctrl_c( ) . fuse( ) => {
79
- match interrupt_signal {
80
- Ok ( ( ) ) => {
81
- info!( "Interrupt received" ) ;
82
- } ,
83
- Err ( err) => {
84
- error!( "Unable to listen for interrupt signal: {}" , err) ;
85
- // we also shut down in case of error
86
- } ,
87
- }
88
- break ;
89
- }
90
- } ;
91
- let task_status: Status = task_status. unwrap ( ) ;
92
79
93
- match task_status. state {
94
- // Should only be sent by the downstream listener
95
- State :: DownstreamShutdown ( err) => {
96
- error ! ( "SHUTDOWN from: {}" , err) ;
97
- break ;
80
+ tokio:: spawn ( {
81
+ let shutdown_signal = self . shutdown ( ) ;
82
+ async move {
83
+ if tokio:: signal:: ctrl_c ( ) . await . is_ok ( ) {
84
+ info ! ( "Interrupt received" ) ;
85
+ shutdown_signal. notify_one ( ) ;
98
86
}
99
- State :: BridgeShutdown ( err) => {
100
- error ! ( "SHUTDOWN from: {}" , err) ;
101
- break ;
87
+ }
88
+ } ) ;
89
+
90
+ loop {
91
+ select ! {
92
+ task_status = rx_status. recv( ) . fuse( ) => {
93
+ if let Ok ( task_status_) = task_status {
94
+ match task_status_. state {
95
+ State :: DownstreamShutdown ( err) | State :: BridgeShutdown ( err) | State :: UpstreamShutdown ( err) => {
96
+ error!( "SHUTDOWN from: {}" , err) ;
97
+ self . shutdown( ) . notify_one( ) ;
98
+ }
99
+ State :: UpstreamTryReconnect ( err) => {
100
+ error!( "Trying to reconnect the Upstream because of: {}" , err) ;
101
+ let task_collector1 = task_collector_. clone( ) ;
102
+ let tx_sv1_notify1 = tx_sv1_notify. clone( ) ;
103
+ let target = target. clone( ) ;
104
+ let tx_status = tx_status. clone( ) ;
105
+ let proxy_config = self . config. clone( ) ;
106
+ tokio:: spawn ( async move {
107
+ // wait a random amount of time between 0 and 3000ms
108
+ // if all the downstreams try to reconnect at the same time, the upstream may
109
+ // fail
110
+ tokio:: time:: sleep( std:: time:: Duration :: from_millis( wait_time) ) . await ;
111
+
112
+ // kill al the tasks
113
+ let task_collector_aborting = task_collector1. clone( ) ;
114
+ kill_tasks( task_collector_aborting. clone( ) ) ;
115
+
116
+ warn!( "Trying reconnecting to upstream" ) ;
117
+ Self :: internal_start(
118
+ proxy_config,
119
+ tx_sv1_notify1,
120
+ target. clone( ) ,
121
+ tx_status. clone( ) ,
122
+ task_collector1,
123
+ )
124
+ . await ;
125
+ } ) ;
126
+ }
127
+ State :: Healthy ( msg) => {
128
+ info!( "HEALTHY message: {}" , msg) ;
129
+ self . shutdown( ) . notify_one( ) ;
130
+ }
131
+ }
132
+ } else {
133
+ info!( "Channel closed" ) ;
134
+ break ; // Channel closed
135
+ }
102
136
}
103
- State :: UpstreamShutdown ( err ) => {
104
- error ! ( "SHUTDOWN from: {}" , err ) ;
137
+ _ = self . shutdown . notified ( ) => {
138
+ info !( "Shutting down gracefully..." ) ;
105
139
break ;
106
140
}
107
- State :: UpstreamTryReconnect ( err) => {
108
- error ! ( "Trying to reconnect the Upstream because of: {}" , err) ;
109
-
110
- // wait a random amount of time between 0 and 3000ms
111
- // if all the downstreams try to reconnect at the same time, the upstream may
112
- // fail
113
- tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( wait_time) ) . await ;
114
-
115
- // kill al the tasks
116
- let task_collector_aborting = task_collector_. clone ( ) ;
117
- kill_tasks ( task_collector_aborting. clone ( ) ) ;
118
-
119
- warn ! ( "Trying reconnecting to upstream" ) ;
120
- self . internal_start (
121
- tx_sv1_notify. clone ( ) ,
122
- target. clone ( ) ,
123
- tx_status. clone ( ) ,
124
- task_collector_. clone ( ) ,
125
- )
126
- . await ;
127
- }
128
- State :: Healthy ( msg) => {
129
- info ! ( "HEALTHY message: {}" , msg) ;
130
- }
131
141
}
132
142
}
133
143
}
134
144
135
145
async fn internal_start (
136
- & self ,
146
+ proxy_config : ProxyConfig ,
137
147
tx_sv1_notify : broadcast:: Sender < server_to_client:: Notify < ' static > > ,
138
148
target : Arc < Mutex < Vec < u8 > > > ,
139
149
tx_status : async_channel:: Sender < Status < ' static > > ,
140
150
task_collector : Arc < Mutex < Vec < ( AbortHandle , String ) > > > ,
141
151
) {
142
- let proxy_config = self . config . clone ( ) ;
143
152
// Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream`
144
153
// (Sender<SubmitSharesExtended<'static>>, Receiver<SubmitSharesExtended<'static>>)
145
154
let ( tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded ( 10 ) ;
@@ -278,6 +287,10 @@ impl TranslatorSv2 {
278
287
let _ =
279
288
task_collector. safe_lock ( |t| t. push ( ( task. abort_handle ( ) , "init task" . to_string ( ) ) ) ) ;
280
289
}
290
+
291
+ pub fn shutdown ( & self ) -> Arc < Notify > {
292
+ self . shutdown . clone ( )
293
+ }
281
294
}
282
295
283
296
fn kill_tasks ( task_collector : Arc < Mutex < Vec < ( AbortHandle , String ) > > > ) {
0 commit comments