Skip to content

Connector & Engine adjustments for asset scale precision loss leftovers #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Aug 15, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/interledger-settlement-engines/Cargo.toml
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ clap = "2.32.0"
clarity = { git = "https://github.com/gakonst/clarity" }
sha3 = "0.8.2"
num-bigint = "0.2.2"
num-traits = "0.2.8"

[dev-dependencies]
lazy_static = "1.3"
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ use tokio_retry::{strategy::ExponentialBackoff, Retry};
use url::Url;
use uuid::Uuid;

use crate::stores::redis_ethereum_ledger::*;
use crate::stores::{redis_ethereum_ledger::*, LeftoversStore};
use crate::{ApiResponse, CreateAccount, SettlementEngine, SettlementEngineApi};
use interledger_settlement::{Convert, ConvertDetails, Quantity};

@@ -100,7 +100,12 @@ pub struct EthereumLedgerSettlementEngineBuilder<'a, S, Si, A> {

impl<'a, S, Si, A> EthereumLedgerSettlementEngineBuilder<'a, S, Si, A>
where
S: EthereumStore<Account = A> + Clone + Send + Sync + 'static,
S: EthereumStore<Account = A>
+ LeftoversStore<AssetType = BigUint>
+ Clone
+ Send
+ Sync
+ 'static,
Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static,
A: EthereumAccount + Send + Sync + 'static,
{
@@ -222,7 +227,12 @@ where

impl<S, Si, A> EthereumLedgerSettlementEngine<S, Si, A>
where
S: EthereumStore<Account = A> + Clone + Send + Sync + 'static,
S: EthereumStore<Account = A>
+ LeftoversStore<AssetType = BigUint>
+ Clone
+ Send
+ Sync
+ 'static,
Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static,
A: EthereumAccount + Send + Sync + 'static,
{
@@ -283,6 +293,9 @@ where
let our_address = self.address.own_address;
let token_address = self.address.token_address;

// We `Box` futures in these functions due to
// https://github.com/rust-lang/rust/issues/54540#issuecomment-494749912.
// Otherwise, we get `type_length_limit` errors.
// get the current block number
web3.eth()
.block_number()
@@ -358,7 +371,7 @@ where
&self,
transfer: ERC20Transfer,
token_address: Address,
) -> impl Future<Item = (), Error = ()> {
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let store = self.store.clone();
let tx_hash = transfer.tx_hash;
let self_clone = self.clone();
@@ -367,7 +380,7 @@ where
token_address: Some(token_address),
};
let amount = transfer.amount;
store
Box::new(store
.check_if_tx_processed(tx_hash)
.map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash))
.and_then(move |processed| {
@@ -377,7 +390,7 @@ where
.load_account_id_from_address(addr)
.and_then(move |id| {
debug!("Notifying connector about incoming ERC20 transaction for account {} for amount: {} (tx hash: {})", id, amount, tx_hash);
self_clone.notify_connector(id.to_string(), amount, tx_hash)
self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash)
})
.and_then(move |_| {
// only save the transaction hash if the connector
@@ -388,7 +401,7 @@ where
} else {
Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction
}
})
}))
}

fn notify_eth_txs_in_block(&self, block_number: u64) -> impl Future<Item = (), Error = ()> {
@@ -422,18 +435,17 @@ where
.and_then(|_| Ok(()))
}

fn notify_eth_transfer(&self, tx_hash: H256) -> impl Future<Item = (), Error = ()> {
fn notify_eth_transfer(&self, tx_hash: H256) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let our_address = self.address.own_address;
let web3 = self.web3.clone();
let store = self.store.clone();
let self_clone = self.clone();
// Skip transactions which have already been processed by the connector
store.check_if_tx_processed(tx_hash)
Box::new(store.check_if_tx_processed(tx_hash)
.map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash))
.and_then(move |processed| {
if !processed {
Either::A(
web3.eth().transaction(TransactionId::Hash(tx_hash))
Either::A(web3.eth().transaction(TransactionId::Hash(tx_hash))
.map_err(move |err| error!("Could not fetch transaction data from transaction hash: {:?}. Got error: {:?}", tx_hash, err))
.and_then(move |maybe_tx| {
// Unlikely to error out since we only call this on
@@ -451,10 +463,10 @@ where
own_address: from,
token_address: None,
};
return Either::A(store.load_account_id_from_address(addr)

return Either::A(store.load_account_id_from_address(addr)
.and_then(move |id| {
debug!("Notifying connector about incoming ETH transaction for account {} for amount: {} (tx hash: {})", id, amount, tx_hash);
self_clone.notify_connector(id.to_string(), amount, tx_hash)
self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash)
})
.and_then(move |_| {
// only save the transaction hash if the connector
@@ -470,54 +482,161 @@ where
} else {
Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction
}
})
}))
}

fn notify_connector(
&self,
account_id: String,
amount: U256,
amount: String,
tx_hash: H256,
) -> impl Future<Item = (), Error = ()> {
let mut url = self.connector_url.clone();
let account_id_clone = account_id.clone();
let engine_scale = self.asset_scale;
let mut url = self.connector_url.clone();
url.path_segments_mut()
.expect("Invalid connector URL")
.push("accounts")
.push(&account_id.clone())
.push("settlements");
let client = Client::new();
debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash);
let action = move || {
let account_id = account_id.clone();
client
.post(url.clone())
.header("Idempotency-Key", tx_hash.to_string())
.json(&json!({ "amount": amount.to_string(), "scale" : engine_scale }))
.send()
.map_err(move |err| {
error!(
"Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}",
account_id, amount, err
)

// settle for amount + uncredited_settlement_amount
let account_id_clone = account_id.clone();
let full_amount_fut = self
.store
.load_uncredited_settlement_amount(account_id.clone())
.and_then(move |uncredited_settlement_amount| {
let full_amount_fut2 = result(BigUint::from_str(&amount).map_err(move |err| {
let error_msg = format!("Error converting to BigUint {:?}", err);
error!("{:?}", error_msg);
}))
.and_then(move |amount| {
debug!("Got uncredited amount {}", amount);
let full_amount = amount + uncredited_settlement_amount;
debug!(
"Notifying accounting system about full amount: {}",
full_amount
);
ok(full_amount)
});
ok(full_amount_fut2)
})
.flatten();

let self_clone = self.clone();
let ping_connector_fut = full_amount_fut.and_then(move |full_amount| {
let url = url.clone();
let account_id = account_id_clone.clone();
let account_id2 = account_id_clone.clone();
let full_amount2 = full_amount.clone();

let action = move || {
let client = Client::new();
let account_id = account_id.clone();
let full_amount = full_amount.clone();
let full_amount_clone = full_amount.clone();
client
.post(url.clone())
.header("Idempotency-Key", tx_hash.to_string())
.json(&json!(Quantity::new(full_amount.clone(), engine_scale)))
.send()
.map_err(move |err| {
error!(
"Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}",
account_id, full_amount_clone, err
);
})
.and_then(move |ret| {
ok((ret, full_amount))
})
};
Retry::spawn(
ExponentialBackoff::from_millis(10).take(MAX_RETRIES),
action,
)
.map_err(move |_| {
error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id2, full_amount2, tx_hash)
})
});

ping_connector_fut.and_then(move |ret| {
trace!("Accounting system responded with {:?}", ret.0);
self_clone.process_connector_response(account_id, ret.0, ret.1)
})
}

/// Parses a response from a connector into a Quantity type and calls a
/// function to further process the parsed data to check if the store's
/// uncredited settlement amount should be updated.
fn process_connector_response(
&self,
account_id: String,
response: HttpResponse,
engine_amount: BigUint,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let self_clone = self.clone();
if !response.status().is_success() {
return Box::new(err(()));
}
Box::new(
response
.into_body()
.concat2()
.map_err(|err| {
let err = format!("Couldn't retrieve body {:?}", err);
error!("{}", err);
})
.and_then(move |response| {
trace!("Accounting system responded with {:?}", response);
if response.status().is_success() {
Ok(())
} else {
Err(())
}
.and_then(move |body| {
// Get the amount stored by the connector and
// check for any precision loss / overflow
serde_json::from_slice::<Quantity>(&body).map_err(|err| {
let err = format!("Couldn't parse body {:?} into Quantity {:?}", body, err);
error!("{}", err);
})
})
};
Retry::spawn(
ExponentialBackoff::from_millis(10).take(MAX_RETRIES),
action,
.and_then(move |quantity: Quantity| {
self_clone.process_received_quantity(account_id, quantity, engine_amount)
}),
)
}

// Normalizes a received Quantity object against the local engine scale, and
// if the normalized value is less than what the engine originally sent, it
// stores it as uncredited settlement amount in the store.
fn process_received_quantity(
&self,
account_id: String,
quantity: Quantity,
engine_amount: BigUint,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let store = self.store.clone();
let engine_scale = self.asset_scale;
Box::new(
result(BigUint::from_str(&quantity.amount))
.map_err(|err| {
let error_msg = format!("Error converting to BigUint {:?}", err);
error!("{:?}", error_msg);
})
.and_then(move |connector_amount: BigUint| {
// Scale the amount settled by the
// connector back up to our scale
result(connector_amount.normalize_scale(ConvertDetails {
from: quantity.scale,
to: engine_scale,
}))
.and_then(move |scaled_connector_amount| {
if engine_amount > scaled_connector_amount {
let diff = engine_amount - scaled_connector_amount;
// connector settled less than we
// instructed it to, so we must save
// the difference
store.save_uncredited_settlement_amount(account_id, diff)
} else {
Box::new(ok(()))
}
})
}),
)
.map_err(move |_| {
error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount, tx_hash)
})
}

/// Helper function which submits an Ethereum ledger transaction to `to` for `amount`.
@@ -624,7 +743,12 @@ where

impl<S, Si, A> SettlementEngine for EthereumLedgerSettlementEngine<S, Si, A>
where
S: EthereumStore<Account = A> + Clone + Send + Sync + 'static,
S: EthereumStore<Account = A>
+ LeftoversStore<AssetType = BigUint>
+ Clone
+ Send
+ Sync
+ 'static,
Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static,
A: EthereumAccount + Send + Sync + 'static,
{
@@ -980,7 +1104,7 @@ mod tests {
"{\"amount\": \"100000000000\", \"scale\": 18 }".to_string(),
))
.with_status(200)
.with_body("OK".to_string())
.with_body("{\"amount\": \"100000000000\", \"scale\": 9 }".to_string())
.create();

let bob_connector_url = mockito::server_url();
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use hyper::StatusCode;
use num_traits::Zero;
use std::process::Command;
use std::str::FromStr;
use std::thread::sleep;
@@ -61,6 +62,38 @@ pub struct TestStore {
pub last_observed_block: Arc<RwLock<U256>>,
pub saved_hashes: Arc<RwLock<HashMap<H256, bool>>>,
pub cache_hits: Arc<RwLock<u64>>,
pub uncredited_settlement_amount: Arc<RwLock<HashMap<String, BigUint>>>,
}

use crate::stores::LeftoversStore;
use num_bigint::BigUint;

impl LeftoversStore for TestStore {
type AssetType = BigUint;

fn save_uncredited_settlement_amount(
&self,
account_id: String,
uncredited_settlement_amount: Self::AssetType,
) -> Box<Future<Item = (), Error = ()> + Send> {
let mut guard = self.uncredited_settlement_amount.write();
(*guard).insert(account_id, uncredited_settlement_amount);
Box::new(ok(()))
}

fn load_uncredited_settlement_amount(
&self,
account_id: String,
) -> Box<Future<Item = Self::AssetType, Error = ()> + Send> {
let mut guard = self.uncredited_settlement_amount.write();
if let Some(l) = guard.get(&account_id) {
let l = l.clone();
(*guard).insert(account_id, Zero::zero());
Box::new(ok(l.clone()))
} else {
Box::new(ok(Zero::zero()))
}
}
}

impl EthereumStore for TestStore {
@@ -238,6 +271,7 @@ impl TestStore {
cache_hits: Arc::new(RwLock::new(0)),
last_observed_block: Arc::new(RwLock::new(U256::from(0))),
saved_hashes: Arc::new(RwLock::new(HashMap::new())),
uncredited_settlement_amount: Arc::new(RwLock::new(HashMap::new())),
}
}
}
@@ -265,7 +299,13 @@ pub fn test_engine<Si, S, A>(
) -> EthereumLedgerSettlementEngine<S, Si, A>
where
Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static,
S: EthereumStore<Account = A> + IdempotentStore + Clone + Send + Sync + 'static,
S: EthereumStore<Account = A>
+ LeftoversStore<AssetType = BigUint>
+ IdempotentStore
+ Clone
+ Send
+ Sync
+ 'static,
A: EthereumAccount + Send + Sync + 'static,
{
EthereumLedgerSettlementEngineBuilder::new(store, key)
17 changes: 17 additions & 0 deletions crates/interledger-settlement-engines/src/stores/mod.rs
Original file line number Diff line number Diff line change
@@ -28,3 +28,20 @@ pub trait IdempotentEngineStore {
data: Bytes,
) -> Box<dyn Future<Item = (), Error = ()> + Send>;
}

pub trait LeftoversStore {
type AssetType;

/// Saves the leftover data
fn save_uncredited_settlement_amount(
&self,
account_id: String,
uncredited_settlement_amount: Self::AssetType,
) -> Box<dyn Future<Item = (), Error = ()> + Send>;

/// Clears the leftover data in the database and returns the cleared value
fn load_uncredited_settlement_amount(
&self,
account_id: String,
) -> Box<dyn Future<Item = Self::AssetType, Error = ()> + Send>;
}
Original file line number Diff line number Diff line change
@@ -6,13 +6,17 @@ use futures::{
use ethereum_tx_sign::web3::types::{Address as EthAddress, H256, U256};
use interledger_service::Account as AccountTrait;
use std::collections::HashMap;
use std::str::FromStr;

use crate::engines::ethereum_ledger::{EthereumAccount, EthereumAddresses, EthereumStore};
use num_traits::Zero;
use redis::{self, cmd, r#async::SharedConnection, ConnectionInfo, PipelineCommands, Value};

use log::{error, trace};

use crate::stores::redis_store_common::{EngineRedisStore, EngineRedisStoreBuilder};
use crate::stores::LeftoversStore;
use num_bigint::BigUint;

// Key for the latest observed block and balance. The data is stored in order to
// avoid double crediting transactions which have already been processed, and in
@@ -22,6 +26,7 @@ static SAVED_TRANSACTIONS_KEY: &str = "transactions";
static SETTLEMENT_ENGINES_KEY: &str = "settlement";
static LEDGER_KEY: &str = "ledger";
static ETHEREUM_KEY: &str = "eth";
static UNCREDITED_AMOUNT_KEY: &str = "uncredited_settlement_amount";

#[derive(Clone, Debug, Serialize)]
pub struct Account {
@@ -52,6 +57,13 @@ fn ethereum_ledger_key(account_id: u64) -> String {
)
}

fn ethereum_uncredited_amount_key(account_id: String) -> String {
format!(
"{}:{}:{}:{}",
ETHEREUM_KEY, LEDGER_KEY, UNCREDITED_AMOUNT_KEY, account_id,
)
}

impl EthereumAccount for Account {
fn token_address(&self) -> Option<EthAddress> {
self.token_address
@@ -105,6 +117,83 @@ impl EthereumLedgerRedisStore {
}
}

impl LeftoversStore for EthereumLedgerRedisStore {
type AssetType = BigUint;

fn save_uncredited_settlement_amount(
&self,
account_id: String,
uncredited_settlement_amount: Self::AssetType,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
trace!(
"Saving uncredited_settlement_amount {:?} {:?}",
account_id,
uncredited_settlement_amount
);
let mut pipe = redis::pipe();
// We store these amounts as lists of strings
// because we cannot do BigNumber arithmetic in the store
// When loading the amounts, we convert them to the appropriate data
// type and sum them up.
pipe.lpush(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth including a comment recording why these amounts are stored as a list

ethereum_uncredited_amount_key(account_id.clone()),
uncredited_settlement_amount.to_string(),
)
.ignore();
Box::new(
pipe.query_async(self.connection.clone())
.map_err(move |err| {
error!(
"Error saving uncredited_settlement_amount {:?}: {:?}",
uncredited_settlement_amount, err
)
})
.and_then(move |(_conn, _ret): (_, Value)| Ok(())),
)
}

fn load_uncredited_settlement_amount(
&self,
account_id: String,
) -> Box<dyn Future<Item = Self::AssetType, Error = ()> + Send> {
trace!("Loading uncredited_settlement_amount {:?}", account_id);
let mut pipe = redis::pipe();
// Loads the value and resets it to 0
pipe.lrange(ethereum_uncredited_amount_key(account_id.clone()), 0, -1);
pipe.del(format!("uncredited_settlement_amount:{}", account_id))
.ignore();
Box::new(
pipe.query_async(self.connection.clone())
.map_err(move |err| {
error!("Error loading uncredited_settlement_amount {:?}: ", err)
})
.and_then(
move |(_conn, uncredited_settlement_amounts): (_, Vec<Vec<String>>)| {
if uncredited_settlement_amounts.len() == 1 {
let uncredited_settlement_amounts =
uncredited_settlement_amounts[0].clone();
let mut total_amount = BigUint::zero();
for uncredited_settlement_amount in uncredited_settlement_amounts {
let amount = if let Ok(amount) =
BigUint::from_str(&uncredited_settlement_amount)
{
amount
} else {
// could not convert to bigint
return Box::new(err(()));
};
total_amount += amount;
}
Box::new(ok(total_amount))
} else {
Box::new(ok(Zero::zero()))
}
},
),
)
}
}

impl EthereumStore for EthereumLedgerRedisStore {
type Account = Account;

@@ -284,9 +373,36 @@ mod tests {
block_on, test_eth_store as test_store,
};
use super::*;
use futures::future::join_all;
use std::iter::FromIterator;
use std::str::FromStr;

#[test]
fn saves_and_pops_uncredited_settlement_amount_properly() {
block_on(test_store().and_then(|(store, context)| {
let amount = BigUint::from_str("10000000000000000000").unwrap();
let ret_amount = BigUint::from_str("30000000000000000000").unwrap();
let acc = "0".to_string();
join_all(vec![
store.save_uncredited_settlement_amount(acc.clone(), amount.clone()),
store.save_uncredited_settlement_amount(acc.clone(), amount.clone()),
store.save_uncredited_settlement_amount(acc.clone(), amount.clone()),
])
.map_err(|err| eprintln!("Redis error: {:?}", err))
.and_then(move |_| {
store
.load_uncredited_settlement_amount(acc)
.map_err(|err| eprintln!("Redis error: {:?}", err))
.and_then(move |ret| {
assert_eq!(ret, ret_amount);
let _ = context;
Ok(())
})
})
}))
.unwrap()
}

#[test]
fn saves_and_loads_ethereum_addreses_properly() {
block_on(test_store().and_then(|(store, context)| {
4 changes: 2 additions & 2 deletions crates/interledger-settlement/src/lib.rs
Original file line number Diff line number Diff line change
@@ -186,7 +186,7 @@ mod tests {
.unwrap(),
1
);
// there's leftovers for all number slots which do not increase in
// there's uncredited_settlement_amount for all number slots which do not increase in
// increments of 10^abs(to_scale-from_scale)
assert_eq!(
1u64.normalize_scale(ConvertDetails { from: 2, to: 1 })
@@ -219,7 +219,7 @@ mod tests {
.unwrap(),
100
);
// 299 units with base 3 is 29 units with base 2 (0.9 leftovers)
// 299 units with base 3 is 29 units with base 2 (0.9 uncredited_settlement_amount)
assert_eq!(
299u64
.normalize_scale(ConvertDetails { from: 3, to: 2 })