@@ -2,25 +2,35 @@ use std::sync::Arc;
2
2
3
3
use anyhow:: anyhow;
4
4
use env_logger:: Env ;
5
- use log:: { error, info } ;
5
+ use log:: error;
6
6
use spaced:: {
7
7
config:: { safe_exit, Args } ,
8
- rpc:: { AsyncChainState , RpcServerImpl , WalletManager } ,
8
+ rpc:: { AsyncChainState , LoadedWallet , RpcServerImpl , WalletManager } ,
9
9
source:: BitcoinBlockSource ,
10
10
store,
11
+ sync:: Spaced ,
11
12
wallets:: RpcWallet ,
12
13
} ;
13
14
use store:: LiveSnapshot ;
14
15
use tokio:: {
15
- select,
16
16
sync:: { broadcast, mpsc} ,
17
- task:: JoinHandle ,
18
- try_join,
17
+ task:: { JoinHandle , JoinSet } ,
19
18
} ;
20
19
21
20
#[ tokio:: main]
22
21
async fn main ( ) {
23
- match start ( ) . await {
22
+ env_logger:: Builder :: from_env ( Env :: default ( ) . default_filter_or ( "info" ) ) . init ( ) ;
23
+ let sigterm = tokio:: signal:: ctrl_c ( ) ;
24
+
25
+ let mut app = Composer :: new ( ) ;
26
+ let shutdown = app. shutdown . clone ( ) ;
27
+
28
+ tokio:: spawn ( async move {
29
+ sigterm. await . expect ( "could not listen for shutdown" ) ;
30
+ let _ = shutdown. send ( ( ) ) ;
31
+ } ) ;
32
+
33
+ match app. run ( ) . await {
24
34
Ok ( _) => { }
25
35
Err ( e) => {
26
36
error ! ( "{}" , e. to_string( ) ) ;
@@ -29,134 +39,115 @@ async fn main() {
29
39
}
30
40
}
31
41
32
- async fn start ( ) -> anyhow:: Result < ( ) > {
33
- env_logger:: Builder :: from_env ( Env :: default ( ) . default_filter_or ( "info" ) )
34
- . format_timestamp ( None )
35
- . init ( ) ;
36
-
37
- let sigint = tokio:: signal:: ctrl_c ( ) ;
38
-
39
- let mut spaced = Args :: configure ( ) ?;
40
- let network = spaced. network ;
41
- let ( shutdown_sender, _) = broadcast:: channel ( 1 ) ;
42
-
43
- let mempool = spaced. mempool . clone ( ) ;
44
- let params = spaced. params ;
45
-
46
- let wallet_chain_state = spaced. chain . state . clone ( ) ;
47
-
48
- let ( async_chain_state, async_chain_state_handle) = create_async_store (
49
- spaced. chain . state . clone ( ) ,
50
- spaced. block_index . as_ref ( ) . map ( |index| index. state . clone ( ) ) ,
51
- shutdown_sender. subscribe ( ) ,
52
- )
53
- . await ;
54
-
55
- let ( wallet_loader_tx, wallet_loader_rx) = mpsc:: channel ( 4 ) ;
56
- let wallet_manager = WalletManager {
57
- data_dir : spaced. data_dir . join ( "wallets" ) ,
58
- network : spaced. network ,
59
- rpc : spaced. rpc . clone ( ) ,
60
- params,
61
- wallet_loader : wallet_loader_tx,
62
- wallets : Arc :: new ( Default :: default ( ) ) ,
63
- } ;
64
-
65
- let rpc_server = RpcServerImpl :: new ( async_chain_state. clone ( ) , wallet_manager) ;
66
-
67
- let rpc_task_server = rpc_server. clone ( ) ;
68
- let rpc_task_shutdown = shutdown_sender. clone ( ) ;
69
- let rpc_server_bind = spaced. bind . clone ( ) ;
70
- let rpc_handle = rpc_task_server. listen ( rpc_server_bind, rpc_task_shutdown) ;
71
-
72
- let spaced_shutdown_sender = shutdown_sender. clone ( ) ;
73
-
74
- let ( spaced_sender, spaced_receiver) = tokio:: sync:: oneshot:: channel ( ) ;
75
- let rpc = spaced. rpc . clone ( ) ;
76
- let rpc2 = spaced. rpc . clone ( ) ;
77
- std:: thread:: spawn ( move || {
78
- let source = BitcoinBlockSource :: new ( rpc) ;
79
- _ = spaced_sender. send ( spaced. protocol_sync ( source, spaced_shutdown_sender) ) ;
80
- } ) ;
42
+ struct Composer {
43
+ shutdown : broadcast:: Sender < ( ) > ,
44
+ services : JoinSet < anyhow:: Result < ( ) > > ,
45
+ }
81
46
82
- let wallet_service_shutdown = shutdown_sender. clone ( ) ;
83
- let wallet_service = RpcWallet :: service (
84
- network,
85
- mempool,
86
- rpc2,
87
- wallet_chain_state,
88
- wallet_loader_rx,
89
- wallet_service_shutdown,
90
- ) ;
91
-
92
- let signal = shutdown_sender. clone ( ) ;
93
- tokio:: spawn ( async move {
94
- _ = sigint. await ;
95
- _ = signal. send ( ( ) ) ;
96
- } ) ;
47
+ impl Composer {
48
+ fn new ( ) -> Self {
49
+ let ( shutdown, _) = broadcast:: channel ( 1 ) ;
50
+ Self {
51
+ shutdown,
52
+ services : JoinSet :: new ( ) ,
53
+ }
54
+ }
55
+
56
+ async fn setup_rpc_wallet ( & mut self , spaced : & Spaced , rx : mpsc:: Receiver < LoadedWallet > ) {
57
+ let wallet_service = RpcWallet :: service (
58
+ spaced. network ,
59
+ spaced. rpc . clone ( ) ,
60
+ spaced. chain . state . clone ( ) ,
61
+ rx,
62
+ self . shutdown . clone ( ) ,
63
+ ) ;
64
+
65
+ self . services . spawn ( async move {
66
+ wallet_service
67
+ . await
68
+ . map_err ( |e| anyhow ! ( "Wallet service error: {}" , e) )
69
+ } ) ;
70
+ }
97
71
98
- let shutdown_result = try_join ! (
99
- async {
100
- let res = spaced_receiver. await ;
101
- _ = shutdown_sender. send( ( ) ) ;
102
- if let Ok ( res) = res {
103
- if let Err ( e) = res {
104
- error!( "Protocol sync: {}" , e) ;
105
- return Err ( anyhow!( "Protocol sync error: {}" , e) ) ;
106
- }
107
- }
108
- Ok ( ( ) )
109
- } ,
110
- async {
111
- let res = rpc_handle. await ;
112
- _ = shutdown_sender. send( ( ) ) ;
113
- if let Err ( e) = res {
114
- error!( "RPC Server: {}" , e) ;
115
- return Err ( anyhow!( "RPC Server error: {}" , e) ) ;
116
- }
117
- Ok ( ( ) )
118
- } ,
119
- async {
120
- let res = wallet_service. await ;
121
- _ = shutdown_sender. send( ( ) ) ;
122
- if let Err ( e) = res {
123
- error!( "Wallet service: {}" , e) ;
124
- return Err ( anyhow!( "Wallet service error: {}" , e) ) ;
125
- }
126
- Ok ( ( ) )
127
- } ,
128
- async {
129
- let res = async_chain_state_handle
72
+ async fn setup_rpc_services ( & mut self , spaced : & Spaced ) {
73
+ let ( wallet_loader_tx, wallet_loader_rx) = mpsc:: channel ( 1 ) ;
74
+
75
+ let wallet_manager = WalletManager {
76
+ data_dir : spaced. data_dir . join ( "wallets" ) ,
77
+ network : spaced. network ,
78
+ rpc : spaced. rpc . clone ( ) ,
79
+ wallet_loader : wallet_loader_tx,
80
+ wallets : Arc :: new ( Default :: default ( ) ) ,
81
+ } ;
82
+
83
+ let ( async_chain_state, async_chain_state_handle) = create_async_store (
84
+ spaced. chain . state . clone ( ) ,
85
+ spaced. block_index . as_ref ( ) . map ( |index| index. state . clone ( ) ) ,
86
+ self . shutdown . subscribe ( ) ,
87
+ )
88
+ . await ;
89
+
90
+ self . services . spawn ( async {
91
+ async_chain_state_handle
130
92
. await
131
- . map_err( |e| anyhow!( "Async chain state error: {}" , e) ) ;
132
- _ = shutdown_sender. send( ( ) ) ;
133
- res
93
+ . map_err ( |e| anyhow ! ( "Chain state error: {}" , e) )
94
+ } ) ;
95
+ let rpc_server = RpcServerImpl :: new ( async_chain_state. clone ( ) , wallet_manager) ;
96
+
97
+ let bind = spaced. bind . clone ( ) ;
98
+ let shutdown = self . shutdown . clone ( ) ;
99
+
100
+ self . services . spawn ( async move {
101
+ rpc_server
102
+ . listen ( bind, shutdown)
103
+ . await
104
+ . map_err ( |e| anyhow ! ( "RPC Server error: {}" , e) )
105
+ } ) ;
106
+
107
+ self . setup_rpc_wallet ( spaced, wallet_loader_rx) . await ;
108
+ }
109
+
110
+ async fn setup_sync_service ( & mut self , mut spaced : Spaced ) {
111
+ let ( spaced_sender, spaced_receiver) = tokio:: sync:: oneshot:: channel ( ) ;
112
+
113
+ let shutdown = self . shutdown . clone ( ) ;
114
+ let rpc = spaced. rpc . clone ( ) ;
115
+
116
+ std:: thread:: spawn ( move || {
117
+ let source = BitcoinBlockSource :: new ( rpc) ;
118
+ _ = spaced_sender. send ( spaced. protocol_sync ( source, shutdown) ) ;
119
+ } ) ;
120
+
121
+ self . services . spawn ( async move {
122
+ spaced_receiver
123
+ . await ?
124
+ . map_err ( |e| anyhow ! ( "Protocol sync error: {}" , e) )
125
+ } ) ;
126
+ }
127
+
128
+ async fn run ( & mut self ) -> anyhow:: Result < ( ) > {
129
+ let spaced = Args :: configure ( ) ?;
130
+ self . setup_rpc_services ( & spaced) . await ;
131
+ self . setup_sync_service ( spaced) . await ;
132
+
133
+ while let Some ( res) = self . services . join_next ( ) . await {
134
+ res??
134
135
}
135
- ) ;
136
136
137
- if !shutdown_result. is_ok ( ) {
138
- safe_exit ( 1 ) ;
137
+ Ok ( ( ) )
139
138
}
140
- Ok ( ( ) )
141
139
}
142
140
143
141
async fn create_async_store (
144
142
chain_state : LiveSnapshot ,
145
143
block_index : Option < LiveSnapshot > ,
146
- mut shutdown : broadcast:: Receiver < ( ) > ,
144
+ shutdown : broadcast:: Receiver < ( ) > ,
147
145
) -> ( AsyncChainState , JoinHandle < ( ) > ) {
148
146
let ( tx, rx) = mpsc:: channel ( 32 ) ;
149
147
let async_store = AsyncChainState :: new ( tx) ;
150
148
151
149
let handle = tokio:: spawn ( async move {
152
- select ! {
153
- _ = AsyncChainState :: handler( chain_state, block_index, rx) => {
154
- // Handler completed normally
155
- }
156
- Ok ( _) = shutdown. recv( ) => {
157
- info!( "Shutting down database..." ) ;
158
- }
159
- }
150
+ AsyncChainState :: handler ( chain_state, block_index, rx, shutdown) . await
160
151
} ) ;
161
152
( async_store, handle)
162
153
}
0 commit comments