Skip to content

Commit a52e0d4

Browse files
committed
feat(service-util): update balance service
1 parent 919ac1c commit a52e0d4

File tree

1 file changed

+104
-92
lines changed

1 file changed

+104
-92
lines changed
Lines changed: 104 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,42 @@
1-
use futures::Future;
2-
use interledger_packet::{ErrorCode, Fulfill, Reject, RejectBuilder};
1+
use async_trait::async_trait;
2+
use futures::TryFutureExt;
3+
use interledger_packet::{ErrorCode, RejectBuilder};
34
use interledger_service::*;
45
use interledger_settlement::{
56
api::SettlementClient,
67
core::types::{SettlementAccount, SettlementStore},
78
};
89
use log::{debug, error};
910
use std::marker::PhantomData;
10-
use tokio_executor::spawn;
1111

12+
// TODO: Remove AccountStore dependency, use `AccountId: ToString` as associated type
13+
/// Trait responsible for managing an account's balance in the store
14+
/// as ILP Packets get routed
15+
#[async_trait]
1216
pub trait BalanceStore: AccountStore {
1317
/// Fetch the current balance for the given account.
14-
fn get_balance(&self, account: Self::Account)
15-
-> Box<dyn Future<Item = i64, Error = ()> + Send>;
18+
async fn get_balance(&self, account: Self::Account) -> Result<i64, ()>;
1619

17-
fn update_balances_for_prepare(
20+
/// Decreases the sending account's balance before forwarding out a prepare packet
21+
async fn update_balances_for_prepare(
1822
&self,
1923
from_account: Self::Account,
2024
incoming_amount: u64,
21-
) -> Box<dyn Future<Item = (), Error = ()> + Send>;
25+
) -> Result<(), ()>;
2226

23-
/// Increases the account's balance, and returns the updated balance
27+
/// Increases the receiving account's balance, and returns the updated balance
2428
/// along with the amount which should be settled
25-
fn update_balances_for_fulfill(
29+
async fn update_balances_for_fulfill(
2630
&self,
2731
to_account: Self::Account,
2832
outgoing_amount: u64,
29-
) -> Box<dyn Future<Item = (i64, u64), Error = ()> + Send>;
33+
) -> Result<(i64, u64), ()>;
3034

31-
fn update_balances_for_reject(
35+
async fn update_balances_for_reject(
3236
&self,
3337
from_account: Self::Account,
3438
incoming_amount: u64,
35-
) -> Box<dyn Future<Item = (), Error = ()> + Send>;
39+
) -> Result<(), ()>;
3640
}
3741

3842
/// # Balance Service
@@ -64,6 +68,7 @@ where
6468
}
6569
}
6670

71+
#[async_trait]
6772
impl<S, O, A> OutgoingService<A> for BalanceService<S, O, A>
6873
where
6974
S: AddressStore
@@ -74,10 +79,8 @@ where
7479
+ Sync
7580
+ 'static,
7681
O: OutgoingService<A> + Send + Clone + 'static,
77-
A: SettlementAccount + 'static,
82+
A: SettlementAccount + Send + Sync + 'static,
7883
{
79-
type Future = BoxedIlpFuture;
80-
8184
/// On send message:
8285
/// 1. Calls `store.update_balances_for_prepare` with the prepare.
8386
/// If it fails, it replies with a reject
@@ -86,21 +89,17 @@ where
8689
/// INDEPENDENTLY of if the call suceeds or fails. This makes a `sendMoney` call if the fulfill puts the account's balance over the `settle_threshold`
8790
/// - if it returns an reject calls `store.update_balances_for_reject` and replies with the fulfill
8891
/// INDEPENDENTLY of if the call suceeds or fails
89-
fn send_request(
90-
&mut self,
91-
request: OutgoingRequest<A>,
92-
) -> Box<dyn Future<Item = Fulfill, Error = Reject> + Send> {
92+
async fn send_request(&mut self, request: OutgoingRequest<A>) -> IlpResult {
9393
// Don't bother touching the store for zero-amount packets.
9494
// Note that it is possible for the original_amount to be >0 while the
9595
// prepare.amount is 0, because the original amount could be rounded down
9696
// to 0 when exchange rate and scale change are applied.
9797
if request.prepare.amount() == 0 && request.original_amount == 0 {
98-
return Box::new(self.next.send_request(request));
98+
return self.next.send_request(request).await;
9999
}
100100

101101
let mut next = self.next.clone();
102102
let store = self.store.clone();
103-
let store_clone = store.clone();
104103
let from = request.from.clone();
105104
let from_clone = from.clone();
106105
let from_id = from.id();
@@ -123,79 +122,92 @@ where
123122
// _eventually_ be completed. Because of this settlement_engine guarantee, the Connector can
124123
// operate as-if the settlement engine has completed. Finally, if the request to the settlement-engine
125124
// fails, this amount will be re-added back to balance.
126-
Box::new(
127-
self.store
128-
.update_balances_for_prepare(
129-
from.clone(),
130-
incoming_amount,
131-
)
132-
.map_err(move |_| {
133-
debug!("Rejecting packet because it would exceed a balance limit");
134-
RejectBuilder {
135-
code: ErrorCode::T04_INSUFFICIENT_LIQUIDITY,
136-
message: &[],
137-
triggered_by: Some(&ilp_address),
138-
data: &[],
139-
}
140-
.build()
141-
})
142-
.and_then(move |_| {
143-
next.send_request(request)
144-
.and_then(move |fulfill| {
145-
// We will spawn a task to update the balances in the database
146-
// so that we DO NOT wait for the database before sending the
147-
// Fulfill packet back to our peer. Due to how the flow of ILP
148-
// packets work, once we get the Fulfill back from the next node
149-
// we need to propagate it backwards ASAP. If we do not give the
150-
// previous node the fulfillment in time, they won't pay us back
151-
// for the packet we forwarded. Note this means that we will
152-
// relay the fulfillment _even if saving to the DB fails._
153-
let fulfill_balance_update = store.update_balances_for_fulfill(
154-
to.clone(),
155-
outgoing_amount,
156-
)
157-
.map_err(move |_| error!("Error applying balance changes for fulfill from account: {} to account: {}. Incoming amount was: {}, outgoing amount was: {}", from_id, to_id, incoming_amount, outgoing_amount))
158-
.and_then(move |(balance, amount_to_settle)| {
159-
debug!("Account balance after fulfill: {}. Amount that needs to be settled: {}", balance, amount_to_settle);
160-
if amount_to_settle > 0 && to_has_engine {
161-
// Note that if this program crashes after changing the balance (in the PROCESS_FULFILL script)
162-
// and the send_settlement fails but the program isn't alive to hear that, the balance will be incorrect.
163-
// No other instance will know that it was trying to send an outgoing settlement. We could
164-
// make this more robust by saving something to the DB about the outgoing settlement when we change the balance
165-
// but then we would also need to prevent a situation where every connector instance is polling the
166-
// settlement engine for the status of each
167-
// outgoing settlement and putting unnecessary
168-
// load on the settlement engine.
169-
spawn(settlement_client
170-
.send_settlement(to, amount_to_settle)
171-
.or_else(move |_| store.refund_settlement(to_id, amount_to_settle)));
172-
}
173-
Ok(())
174-
});
125+
self.store
126+
.update_balances_for_prepare(from.clone(), incoming_amount)
127+
.map_err(move |_| {
128+
debug!("Rejecting packet because it would exceed a balance limit");
129+
RejectBuilder {
130+
code: ErrorCode::T04_INSUFFICIENT_LIQUIDITY,
131+
message: &[],
132+
triggered_by: Some(&ilp_address),
133+
data: &[],
134+
}
135+
.build()
136+
})
137+
.await?;
175138

176-
spawn(fulfill_balance_update);
139+
match next.send_request(request).await {
140+
Ok(fulfill) => {
141+
// We will spawn a task to update the balances in the database
142+
// so that we DO NOT wait for the database before sending the
143+
// Fulfill packet back to our peer. Due to how the flow of ILP
144+
// packets work, once we get the Fulfill back from the next node
145+
// we need to propagate it backwards ASAP. If we do not give the
146+
// previous node the fulfillment in time, they won't pay us back
147+
// for the packet we forwarded. Note this means that we will
148+
// relay the fulfillment _even if saving to the DB fails._
149+
tokio::spawn(async move {
150+
let (balance, amount_to_settle) = match store
151+
.update_balances_for_fulfill(to.clone(), outgoing_amount)
152+
.await
153+
{
154+
Ok(r) => r,
155+
Err(_) => {
156+
error!("Error applying balance changes for fulfill from account: {} to account: {}. Incoming amount was: {}, outgoing amount was: {}", from_id, to_id, incoming_amount, outgoing_amount);
157+
return Err(());
158+
}
159+
};
160+
debug!(
161+
"Account balance after fulfill: {}. Amount that needs to be settled: {}",
162+
balance, amount_to_settle
163+
);
164+
if amount_to_settle > 0 && to_has_engine {
165+
// Note that if this program crashes after changing the balance (in the PROCESS_FULFILL script)
166+
// and the send_settlement fails but the program isn't alive to hear that, the balance will be incorrect.
167+
// No other instance will know that it was trying to send an outgoing settlement. We could
168+
// make this more robust by saving something to the DB about the outgoing settlement when we change the balance
169+
// but then we would also need to prevent a situation where every connector instance is polling the
170+
// settlement engine for the status of each
171+
// outgoing settlement and putting unnecessary
172+
// load on the settlement engine.
173+
tokio::spawn(async move {
174+
if settlement_client
175+
.send_settlement(to, amount_to_settle)
176+
.await
177+
.is_err()
178+
{
179+
store.refund_settlement(to_id, amount_to_settle).await?;
180+
}
181+
Ok::<(), ()>(())
182+
});
183+
}
184+
Ok(())
185+
});
177186

178-
Ok(fulfill)
179-
})
180-
.or_else(move |reject| {
181-
// Similar to the logic for handling the Fulfill packet above, we
182-
// spawn a task to update the balance for the Reject in parallel
183-
// rather than waiting for the database to update before relaying
184-
// the packet back. In this case, the only substantive difference
185-
// would come from if the DB operation fails or takes too long.
186-
// The packet is already rejected so it's more useful for the sender
187-
// to get the error message from the original Reject packet rather
188-
// than a less specific one saying that this node had an "internal
189-
// error" caused by a database issue.
190-
let reject_balance_update = store_clone.update_balances_for_reject(
191-
from_clone.clone(),
192-
incoming_amount,
193-
).map_err(move |_| error!("Error rolling back balance change for accounts: {} and {}. Incoming amount was: {}, outgoing amount was: {}", from_clone.id(), to_clone.id(), incoming_amount, outgoing_amount));
194-
spawn(reject_balance_update);
187+
Ok(fulfill)
188+
}
189+
Err(reject) => {
190+
// Similar to the logic for handling the Fulfill packet above, we
191+
// spawn a task to update the balance for the Reject in parallel
192+
// rather than waiting for the database to update before relaying
193+
// the packet back. In this case, the only substantive difference
194+
// would come from if the DB operation fails or takes too long.
195+
// The packet is already rejected so it's more useful for the sender
196+
// to get the error message from the original Reject packet rather
197+
// than a less specific one saying that this node had an "internal
198+
// error" caused by a database issue.
199+
tokio::spawn({
200+
let store_clone = self.store.clone();
201+
async move {
202+
store_clone.update_balances_for_reject(
203+
from_clone.clone(),
204+
incoming_amount,
205+
).map_err(move |_| error!("Error rolling back balance change for accounts: {} and {}. Incoming amount was: {}, outgoing amount was: {}", from_clone.id(), to_clone.id(), incoming_amount, outgoing_amount)).await
206+
}
207+
});
195208

196-
Err(reject)
197-
})
198-
}),
199-
)
209+
Err(reject)
210+
}
211+
}
200212
}
201213
}

0 commit comments

Comments
 (0)