Skip to content

Commit 22279b9

Browse files
authored
Close BTP connection if a BTP token is provided (#611)
* feat(btp): add methods to close a btp connection * feat(btp): add timeout to outgoing requests * feat(btp): close connection with peer if they time out * feat(api): close BTP connections when an account is removed or modified
1 parent 023754a commit 22279b9

File tree

3 files changed

+126
-10
lines changed

3 files changed

+126
-10
lines changed

crates/interledger-api/src/routes/accounts.rs

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ where
192192
.boxed();
193193

194194
// PUT /accounts/:username
195+
let btp_clone = btp.clone();
196+
let outgoing_handler_clone = outgoing_handler.clone();
195197
let put_account = warp::put()
196198
.and(warp::path("accounts"))
197199
.and(account_username_to_id.clone())
@@ -200,8 +202,15 @@ where
200202
.and(deserialize_json()) // warp::body::json() is not able to decode this!
201203
.and(with_store.clone())
202204
.and_then(move |id: Uuid, account_details: AccountDetails, store: S| {
203-
let outgoing_handler = outgoing_handler.clone();
204-
let btp = btp.clone();
205+
let outgoing_handler = outgoing_handler_clone.clone();
206+
let btp = btp_clone.clone();
207+
if account_details.ilp_over_btp_incoming_token.is_some() {
208+
// if the BTP token was provided, assume that it's different
209+
// from the existing one and drop the connection
210+
// the saved websocket connection
211+
// a new one will be initialized in the `connect_to_external_services` call
212+
btp.close_connection(&id);
213+
}
205214
async move {
206215
let account = store.update_account(id, account_details).await?;
207216
connect_to_external_services(outgoing_handler, account.clone(), store, btp).await?;
@@ -253,29 +262,56 @@ where
253262
.boxed();
254263

255264
// DELETE /accounts/:username
265+
let btp_clone = btp.clone();
256266
let delete_account = warp::delete()
257267
.and(warp::path("accounts"))
258268
.and(account_username_to_id.clone())
259269
.and(warp::path::end())
260270
.and(admin_only)
261271
.and(with_store.clone())
262-
.and_then(|id: Uuid, store: S| async move {
263-
let account = store.delete_account(id).await?;
264-
Ok::<Json, Rejection>(warp::reply::json(&account))
272+
.and_then(move |id: Uuid, store: S| {
273+
let btp = btp_clone.clone();
274+
async move {
275+
let account = store.delete_account(id).await?;
276+
// close the btp connection (if any)
277+
btp.close_connection(&id);
278+
Ok::<Json, Rejection>(warp::reply::json(&account))
279+
}
265280
})
266281
.boxed();
267282

268283
// PUT /accounts/:username/settings
284+
let outgoing_handler_clone = outgoing_handler;
269285
let put_account_settings = warp::put()
270286
.and(warp::path("accounts"))
271287
.and(admin_or_authorized_user_only.clone())
272288
.and(warp::path("settings"))
273289
.and(warp::path::end())
274290
.and(deserialize_json())
275291
.and(with_store.clone())
276-
.and_then(|id: Uuid, settings: AccountSettings, store: S| async move {
277-
let modified_account = store.modify_account_settings(id, settings).await?;
278-
Ok::<Json, Rejection>(warp::reply::json(&modified_account))
292+
.and_then(move |id: Uuid, settings: AccountSettings, store: S| {
293+
let btp = btp.clone();
294+
let outgoing_handler = outgoing_handler_clone.clone();
295+
async move {
296+
if settings.ilp_over_btp_incoming_token.is_some() {
297+
// if the BTP token was provided, assume that it's different
298+
// from the existing one and drop the connection
299+
// the saved websocket connection
300+
btp.close_connection(&id);
301+
}
302+
let modified_account = store.modify_account_settings(id, settings).await?;
303+
304+
// Since the account was modified, we should also try to
305+
// connect to the new account:
306+
connect_to_external_services(
307+
outgoing_handler,
308+
modified_account.clone(),
309+
store,
310+
btp,
311+
)
312+
.await?;
313+
Ok::<Json, Rejection>(warp::reply::json(&modified_account))
314+
}
279315
})
280316
.boxed();
281317

crates/interledger-btp/src/lib.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,10 @@ mod client_server {
166166
async fn client_server_test() {
167167
let bind_addr = get_open_port();
168168

169+
let server_acc_id = Uuid::new_v4();
169170
let server_store = TestStore {
170171
accounts: Arc::new(vec![TestAccount {
171-
id: Uuid::new_v4(),
172+
id: server_acc_id,
172173
ilp_over_btp_incoming_token: Some("test_auth_token".to_string()),
173174
ilp_over_btp_outgoing_token: None,
174175
ilp_over_btp_url: None,
@@ -260,6 +261,48 @@ mod client_server {
260261
})
261262
.await;
262263
assert!(res.is_ok());
264+
265+
btp_service.close_connection(&server_acc_id);
266+
// after removing the connection this will fail
267+
let mut btp_client_clone = btp_client.clone();
268+
let res = btp_client_clone
269+
.send_request(OutgoingRequest {
270+
from: account.clone(),
271+
to: account.clone(),
272+
original_amount: 100,
273+
prepare: PrepareBuilder {
274+
destination: Address::from_str("example.destination").unwrap(),
275+
amount: 100,
276+
execution_condition: &[0; 32],
277+
expires_at: SystemTime::now() + Duration::from_secs(30),
278+
data: b"test data",
279+
}
280+
.build(),
281+
})
282+
.await
283+
.unwrap_err();
284+
assert_eq!(res.code(), ErrorCode::R00_TRANSFER_TIMED_OUT);
285+
286+
// now that we have timed out, if we try sending again we'll see that we
287+
// have no more connections with this user
288+
let res = btp_client_clone
289+
.send_request(OutgoingRequest {
290+
from: account.clone(),
291+
to: account.clone(),
292+
original_amount: 100,
293+
prepare: PrepareBuilder {
294+
destination: Address::from_str("example.destination").unwrap(),
295+
amount: 100,
296+
execution_condition: &[0; 32],
297+
expires_at: SystemTime::now() + Duration::from_secs(30),
298+
data: b"test data",
299+
}
300+
.build(),
301+
})
302+
.await
303+
.unwrap_err();
304+
assert_eq!(res.code(), ErrorCode::F02_UNREACHABLE);
305+
263306
btp_service.close();
264307
}
265308
}

crates/interledger-btp/src/service.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ const PING_INTERVAL: u64 = 30; // seconds
2626
static PING: Lazy<Message> = Lazy::new(|| Message::Ping(Vec::with_capacity(0)));
2727
static PONG: Lazy<Message> = Lazy::new(|| Message::Pong(Vec::with_capacity(0)));
2828

29+
// Return a Reject timeout if the outgoing message future does not complete
30+
// within this timeout. This will probably happen if the peer closed the websocket
31+
// with us
32+
const SEND_MSG_TIMEOUT: Duration = Duration::from_secs(30);
33+
2934
type IlpResultChannel = oneshot::Sender<Result<Fulfill, Reject>>;
3035
type IncomingRequestBuffer<A> = UnboundedReceiver<(A, u32, Prepare)>;
3136

@@ -136,6 +141,11 @@ where
136141
}
137142
}
138143

144+
/// Deletes the websocket associated with the provided `account_id`
145+
pub fn close_connection(&self, account_id: &Uuid) {
146+
self.connections.write().remove(account_id);
147+
}
148+
139149
/// Close all of the open WebSocket connections
140150
// TODO is there some more automatic way of knowing when we should close the connections?
141151
// The problem is that the WS client can be a server too, so it's not clear when we are done with it
@@ -323,7 +333,30 @@ where
323333
Ok(_) => {
324334
let (sender, receiver) = oneshot::channel();
325335
(*self.pending_outgoing.lock()).insert(request_id, sender);
326-
let result = receiver.await;
336+
337+
// Wrap the receiver with a timeout to ensure we do not
338+
// wait too long if the other party has disconnected
339+
let result = tokio::time::timeout(SEND_MSG_TIMEOUT, receiver).await;
340+
341+
let result = match result {
342+
Ok(packet) => packet,
343+
Err(err) => {
344+
error!("Request timed out. Did the peer disconnect? Err: {}", err);
345+
// Assume that such a long timeout means that the peer closed their
346+
// connection with us, so we'll remove the pending request and the websocket
347+
(*self.pending_outgoing.lock()).remove(&request_id);
348+
self.close_connection(&request.to.id());
349+
350+
return Err(RejectBuilder {
351+
code: ErrorCode::R00_TRANSFER_TIMED_OUT,
352+
message: &[],
353+
triggered_by: Some(&ilp_address),
354+
data: &[],
355+
}
356+
.build());
357+
}
358+
};
359+
327360
// Drop the trigger here since we've gotten the response
328361
// and don't need to keep the connections open if this was the
329362
// last thing we were waiting for
@@ -390,6 +423,10 @@ where
390423
pub fn close(&self) {
391424
self.outgoing.close();
392425
}
426+
427+
pub fn close_connection(&self, account_id: &Uuid) {
428+
self.outgoing.close_connection(account_id);
429+
}
393430
}
394431

395432
#[async_trait]

0 commit comments

Comments
 (0)