Skip to content

Commit 055d365

Browse files
committed
feat(engine): Add leftovers value when settling to connector
1 parent e892efa commit 055d365

File tree

5 files changed

+92
-32
lines changed

5 files changed

+92
-32
lines changed

crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs

+44-29
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ where
373373
store
374374
.load_account_id_from_address(addr)
375375
.and_then(move |id| {
376-
self_clone.notify_connector(id.to_string(), amount, tx_hash)
376+
self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash)
377377
})
378378
.and_then(move |_| {
379379
// only save the transaction hash if the connector
@@ -456,7 +456,7 @@ where
456456
Either::A(
457457
store.load_account_id_from_address(addr)
458458
.and_then(move |id| {
459-
self_clone.notify_connector(id.to_string(), amount, tx_hash)
459+
self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash)
460460
})
461461
.and_then(move |_| {
462462
// only save the transaction hash if the connector
@@ -477,7 +477,7 @@ where
477477
fn notify_connector(
478478
&self,
479479
account_id: String,
480-
amount: U256,
480+
amount: String,
481481
tx_hash: H256,
482482
) -> impl Future<Item = (), Error = ()> {
483483
let mut url = self.connector_url.clone();
@@ -488,44 +488,65 @@ where
488488
.push("accounts")
489489
.push(&account_id.clone())
490490
.push("settlements");
491-
let client = Client::new();
492491
debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash);
493492
let self_clone = self.clone();
493+
let store = self.store.clone();
494+
let amount_clone = amount.clone();
494495
let action = move || {
495496
// need to make 2 clones, one to own the variables in the function
496497
// and one for the retry closure..
497498
let self_clone = self_clone.clone();
499+
let store = store.clone();
498500
let account_id = account_id.clone();
499-
client
500-
.post(url.clone())
501-
.header("Idempotency-Key", tx_hash.to_string())
502-
.json(&json!({ "amount": amount.to_string(), "scale" : engine_scale }))
503-
.send()
504-
.map_err(move |err| {
505-
error!(
506-
"Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}",
507-
account_id, amount, err
508-
)
509-
})
510-
.and_then(move |response| {
511-
trace!("Accounting system responded with {:?}", response);
512-
self_clone.process_connector_response(account_id_clone2, response, amount)
501+
let account_id_clone2 = account_id.clone();
502+
let amount = amount.clone();
503+
let url = url.clone();
504+
505+
// settle for amount + leftovers
506+
store.load_leftovers(account_id.clone())
507+
.and_then(move |leftovers| {
508+
result(BigUint::from_str(&amount.clone()).map_err(move |err| {
509+
let error_msg = format!("Error converting to BigUint {:?}", err);
510+
error!("{:?}", error_msg);
511+
}))
512+
.and_then(move |amount| {
513+
Ok(amount + leftovers)
513514
})
515+
})
516+
.and_then(move |full_amount| {
517+
let client = Client::new();
518+
let full_amount_clone = full_amount.clone();
519+
client
520+
.post(url.clone())
521+
.header("Idempotency-Key", tx_hash.to_string())
522+
.json(&json!({ "amount": full_amount.clone().to_string(), "scale" : engine_scale }))
523+
.send()
524+
.map_err(move |err| {
525+
error!(
526+
"Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}",
527+
account_id, full_amount_clone, err
528+
)
529+
})
530+
.and_then(move |response| {
531+
trace!("Accounting system responded with {:?}", response);
532+
self_clone.process_connector_response(account_id_clone2, response, full_amount.clone())
533+
})
534+
})
514535
};
515536
Retry::spawn(
516537
ExponentialBackoff::from_millis(10).take(MAX_RETRIES),
517538
action,
518539
)
519540
.map_err(move |_| {
520-
error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount, tx_hash)
541+
error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount_clone, tx_hash)
521542
})
522543
}
523544

524545
fn process_connector_response(
525546
&self,
526547
account_id: String,
527548
response: HttpResponse,
528-
amount: U256,
549+
engine_amount: BigUint,
529550
) -> impl Future<Item = (), Error = ()> {
530551
let engine_scale = self.asset_scale;
531552
let store = self.store.clone();
@@ -549,18 +570,12 @@ where
549570
})
550571
})
551572
.and_then(move |quantity: Quantity| {
552-
// Convert both to BigUInts so we can do arithmetic
553-
join_all(vec![
554-
result(BigUint::from_str(&quantity.amount)),
555-
result(BigUint::from_str(&amount.to_string())),
556-
])
573+
result(BigUint::from_str(&quantity.amount))
557574
.map_err(|err| {
558-
let error_msg = format!("Error converting to BigUints {:?}", err);
575+
let error_msg = format!("Error converting to BigUint {:?}", err);
559576
error!("{:?}", error_msg);
560577
})
561-
.and_then(move |amounts: Vec<BigUint>| {
562-
let connector_amount = amounts[0].clone();
563-
let engine_amount = amounts[1].clone();
578+
.and_then(move |connector_amount: BigUint| {
564579
// Scale the amount settled by the
565580
// connector back up to our scale
566581
result(connector_amount.normalize_scale(ConvertDetails {

crates/interledger-settlement-engines/src/engines/ethereum_ledger/test_helpers.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub struct TestStore {
6161
pub last_observed_block: Arc<RwLock<U256>>,
6262
pub saved_hashes: Arc<RwLock<HashMap<H256, bool>>>,
6363
pub cache_hits: Arc<RwLock<u64>>,
64-
pub leftovers: Arc<RwLock<HashMap<String, String>>>,
64+
pub leftovers: Arc<RwLock<HashMap<String, BigUint>>>,
6565
}
6666

6767
use crate::stores::LeftoversStore;
@@ -74,9 +74,18 @@ impl LeftoversStore for TestStore {
7474
leftovers: BigUint,
7575
) -> Box<Future<Item = (), Error = ()> + Send> {
7676
let mut guard = self.leftovers.write();
77-
(*guard).insert(account_id, leftovers.to_string());
77+
(*guard).insert(account_id, leftovers);
7878
Box::new(ok(()))
7979
}
80+
81+
fn load_leftovers(&self, account_id: String) -> Box<Future<Item = BigUint, Error = ()> + Send> {
82+
let guard = self.leftovers.read();
83+
if let Some(l) = guard.get(&account_id) {
84+
Box::new(ok(l.clone()))
85+
} else {
86+
Box::new(err(()))
87+
}
88+
}
8089
}
8190

8291
impl EthereumStore for TestStore {

crates/interledger-settlement-engines/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#![type_length_limit = "1739269"]
1+
#![type_length_limit = "3396805"]
22

33
use clap::{value_t, App, Arg, SubCommand};
44
use hex;

crates/interledger-settlement-engines/src/stores/mod.rs

+15
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,18 @@ pub trait IdempotentEngineStore {
2828
data: Bytes,
2929
) -> Box<dyn Future<Item = (), Error = ()> + Send>;
3030
}
31+
32+
pub trait LeftoversStore {
33+
/// Saves the leftover data
34+
fn save_leftovers(
35+
&self,
36+
account_id: String,
37+
leftovers: BigUint,
38+
) -> Box<dyn Future<Item = (), Error = ()> + Send>;
39+
40+
/// Saves the leftover data
41+
fn load_leftovers(
42+
&self,
43+
account_id: String,
44+
) -> Box<dyn Future<Item = BigUint, Error = ()> + Send>;
45+
}

crates/interledger-settlement-engines/src/stores/redis_ethereum_ledger/store.rs

+21
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use futures::{
66
use ethereum_tx_sign::web3::types::{Address as EthAddress, H256, U256};
77
use interledger_service::Account as AccountTrait;
88
use std::collections::HashMap;
9+
use std::str::FromStr;
910

1011
use crate::engines::ethereum_ledger::{EthereumAccount, EthereumAddresses, EthereumStore};
1112
use redis::{self, cmd, r#async::SharedConnection, ConnectionInfo, PipelineCommands, Value};
@@ -120,6 +121,26 @@ impl LeftoversStore for EthereumLedgerRedisStore {
120121
.and_then(move |(_conn, _ret): (_, Value)| Ok(())),
121122
)
122123
}
124+
125+
fn load_leftovers(
126+
&self,
127+
account_id: String,
128+
) -> Box<dyn Future<Item = BigUint, Error = ()> + Send> {
129+
let mut pipe = redis::pipe();
130+
pipe.get(format!("leftovers:{}", account_id))
131+
.ignore();
132+
Box::new(
133+
pipe.query_async(self.connection.clone())
134+
.map_err(move |err| error!("Error loading leftovers {:?}: ", err))
135+
.and_then(move |(_conn, leftovers): (_, String)| {
136+
if let Ok(leftovers) = BigUint::from_str(&leftovers) {
137+
Box::new(ok(leftovers))
138+
} else {
139+
Box::new(err(()))
140+
}
141+
})
142+
)
143+
}
123144
}
124145

125146
impl EthereumStore for EthereumLedgerRedisStore {

0 commit comments

Comments
 (0)