@@ -163,13 +163,13 @@ where
163
163
164
164
// tx -> rx -> write -> our peer
165
165
// Responsible mainly for responding to Pings
166
- // TODO: We must somehow figure out how to merge this stream with the incoming one
167
166
let write_to_ws = client_rx. map ( Ok ) . forward ( write) . then ( move |_| {
168
167
async move {
169
168
debug ! (
170
169
"Finished forwarding to WebSocket stream for account: {}" ,
171
170
account_id
172
171
) ;
172
+ // When this is dropped, the read valve will close
173
173
drop ( close_connection) ;
174
174
Ok :: < ( ) , ( ) > ( ( ) )
175
175
}
@@ -190,8 +190,8 @@ where
190
190
)
191
191
} ;
192
192
193
- // Close connections triggers
194
- let read = valve. wrap ( read) ;
193
+ // Close connections trigger
194
+ let read = valve. wrap ( read) ; // close when `write_to_ws` calls `drop(connection)`
195
195
let read = self . stream_valve . wrap ( read) ;
196
196
let read_from_ws = read. for_each ( handle_message_fn) . then ( move |_| {
197
197
async move {
@@ -202,22 +202,6 @@ where
202
202
Ok :: < ( ) , ( ) > ( ( ) )
203
203
}
204
204
} ) ;
205
-
206
- // TODO: How can we drop the trigger when both the read and write spawn'ed futures
207
- // have completed?
208
- // let connections = self.connections.clone();
209
- // let keep_connections_open = self.close_all_connections.clone();
210
- // .then(move |_| {
211
- // let _ = keep_connections_open;
212
- // let mut connections = connections.write();
213
- // connections.remove(&account_id);
214
- // debug!(
215
- // "WebSocket connection closed for account {} ({} connections still open)",
216
- // account_id,
217
- // connections.len()
218
- // );
219
- // future::ready(())
220
- // });
221
205
tokio:: spawn ( read_from_ws) ;
222
206
223
207
// Send pings every PING_INTERVAL until the connection closes (when `drop(close_connection)` is called)
@@ -277,7 +261,6 @@ where
277
261
Err ( reject) => Packet :: Reject ( reject) ,
278
262
} ;
279
263
280
- // TODO: Is it OK to remove the results from here?
281
264
if let Some ( connection) = connections_clone. clone ( ) . read ( ) . get ( & account_id) {
282
265
let message = ilp_packet_to_ws_message ( request_id, packet) ;
283
266
let _ = connection. unbounded_send ( message) . map_err ( move |err| {
0 commit comments