Skip to content

Commit 7cfa6bf

Browse files
authored
Connector & Engine adjustments for asset scale precision loss leftovers (#199)
* feature(engines): Add trait for saving leftover values Implement the trait for the eth engine * feat(engine): Add leftovers value when settling to connector * fix(engines): Make leftovers be an associated type This allows the trait implementer to choose the data type of the leftovers instead of forcing them to use BigUint * fix(eth-se): Box most futures to work around type_length_error * use an array of strings to store multiple BigUint uncredited values * test(eth-engine): ensure that the test works with numbers larger than u64 * chore: prefix uncredited amount key
1 parent 6c79a13 commit 7cfa6bf

File tree

6 files changed

+348
-50
lines changed

6 files changed

+348
-50
lines changed

crates/interledger-settlement-engines/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ clap = "2.32.0"
3535
clarity = { git = "https://github.com/gakonst/clarity" }
3636
sha3 = "0.8.2"
3737
num-bigint = "0.2.2"
38+
num-traits = "0.2.8"
3839

3940
[dev-dependencies]
4041
lazy_static = "1.3"

crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs

Lines changed: 171 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use tokio_retry::{strategy::ExponentialBackoff, Retry};
3434
use url::Url;
3535
use uuid::Uuid;
3636

37-
use crate::stores::redis_ethereum_ledger::*;
37+
use crate::stores::{redis_ethereum_ledger::*, LeftoversStore};
3838
use crate::{ApiResponse, CreateAccount, SettlementEngine, SettlementEngineApi};
3939
use interledger_settlement::{Convert, ConvertDetails, Quantity};
4040

@@ -100,7 +100,12 @@ pub struct EthereumLedgerSettlementEngineBuilder<'a, S, Si, A> {
100100

101101
impl<'a, S, Si, A> EthereumLedgerSettlementEngineBuilder<'a, S, Si, A>
102102
where
103-
S: EthereumStore<Account = A> + Clone + Send + Sync + 'static,
103+
S: EthereumStore<Account = A>
104+
+ LeftoversStore<AssetType = BigUint>
105+
+ Clone
106+
+ Send
107+
+ Sync
108+
+ 'static,
104109
Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static,
105110
A: EthereumAccount + Send + Sync + 'static,
106111
{
@@ -222,7 +227,12 @@ where
222227

223228
impl<S, Si, A> EthereumLedgerSettlementEngine<S, Si, A>
224229
where
225-
S: EthereumStore<Account = A> + Clone + Send + Sync + 'static,
230+
S: EthereumStore<Account = A>
231+
+ LeftoversStore<AssetType = BigUint>
232+
+ Clone
233+
+ Send
234+
+ Sync
235+
+ 'static,
226236
Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static,
227237
A: EthereumAccount + Send + Sync + 'static,
228238
{
@@ -283,6 +293,9 @@ where
283293
let our_address = self.address.own_address;
284294
let token_address = self.address.token_address;
285295

296+
// We `Box` futures in these functions due to
297+
// https://github.com/rust-lang/rust/issues/54540#issuecomment-494749912.
298+
// Otherwise, we get `type_length_limit` errors.
286299
// get the current block number
287300
web3.eth()
288301
.block_number()
@@ -358,7 +371,7 @@ where
358371
&self,
359372
transfer: ERC20Transfer,
360373
token_address: Address,
361-
) -> impl Future<Item = (), Error = ()> {
374+
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
362375
let store = self.store.clone();
363376
let tx_hash = transfer.tx_hash;
364377
let self_clone = self.clone();
@@ -367,7 +380,7 @@ where
367380
token_address: Some(token_address),
368381
};
369382
let amount = transfer.amount;
370-
store
383+
Box::new(store
371384
.check_if_tx_processed(tx_hash)
372385
.map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash))
373386
.and_then(move |processed| {
@@ -377,7 +390,7 @@ where
377390
.load_account_id_from_address(addr)
378391
.and_then(move |id| {
379392
debug!("Notifying connector about incoming ERC20 transaction for account {} for amount: {} (tx hash: {})", id, amount, tx_hash);
380-
self_clone.notify_connector(id.to_string(), amount, tx_hash)
393+
self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash)
381394
})
382395
.and_then(move |_| {
383396
// only save the transaction hash if the connector
@@ -388,7 +401,7 @@ where
388401
} else {
389402
Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction
390403
}
391-
})
404+
}))
392405
}
393406

394407
fn notify_eth_txs_in_block(&self, block_number: u64) -> impl Future<Item = (), Error = ()> {
@@ -422,18 +435,17 @@ where
422435
.and_then(|_| Ok(()))
423436
}
424437

425-
fn notify_eth_transfer(&self, tx_hash: H256) -> impl Future<Item = (), Error = ()> {
438+
fn notify_eth_transfer(&self, tx_hash: H256) -> Box<dyn Future<Item = (), Error = ()> + Send> {
426439
let our_address = self.address.own_address;
427440
let web3 = self.web3.clone();
428441
let store = self.store.clone();
429442
let self_clone = self.clone();
430443
// Skip transactions which have already been processed by the connector
431-
store.check_if_tx_processed(tx_hash)
444+
Box::new(store.check_if_tx_processed(tx_hash)
432445
.map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash))
433446
.and_then(move |processed| {
434447
if !processed {
435-
Either::A(
436-
web3.eth().transaction(TransactionId::Hash(tx_hash))
448+
Either::A(web3.eth().transaction(TransactionId::Hash(tx_hash))
437449
.map_err(move |err| error!("Could not fetch transaction data from transaction hash: {:?}. Got error: {:?}", tx_hash, err))
438450
.and_then(move |maybe_tx| {
439451
// Unlikely to error out since we only call this on
@@ -451,10 +463,10 @@ where
451463
own_address: from,
452464
token_address: None,
453465
};
454-
return Either::A(store.load_account_id_from_address(addr)
466+
467+
return Either::A(store.load_account_id_from_address(addr)
455468
.and_then(move |id| {
456-
debug!("Notifying connector about incoming ETH transaction for account {} for amount: {} (tx hash: {})", id, amount, tx_hash);
457-
self_clone.notify_connector(id.to_string(), amount, tx_hash)
469+
self_clone.notify_connector(id.to_string(), amount.to_string(), tx_hash)
458470
})
459471
.and_then(move |_| {
460472
// only save the transaction hash if the connector
@@ -470,54 +482,161 @@ where
470482
} else {
471483
Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction
472484
}
473-
})
485+
}))
474486
}
475487

476488
fn notify_connector(
477489
&self,
478490
account_id: String,
479-
amount: U256,
491+
amount: String,
480492
tx_hash: H256,
481493
) -> impl Future<Item = (), Error = ()> {
482-
let mut url = self.connector_url.clone();
483-
let account_id_clone = account_id.clone();
484494
let engine_scale = self.asset_scale;
495+
let mut url = self.connector_url.clone();
485496
url.path_segments_mut()
486497
.expect("Invalid connector URL")
487498
.push("accounts")
488499
.push(&account_id.clone())
489500
.push("settlements");
490-
let client = Client::new();
491501
debug!("Making POST to {:?} {:?} about {:?}", url, amount, tx_hash);
492-
let action = move || {
493-
let account_id = account_id.clone();
494-
client
495-
.post(url.clone())
496-
.header("Idempotency-Key", tx_hash.to_string())
497-
.json(&json!({ "amount": amount.to_string(), "scale" : engine_scale }))
498-
.send()
499-
.map_err(move |err| {
500-
error!(
501-
"Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}",
502-
account_id, amount, err
503-
)
502+
503+
// settle for amount + uncredited_settlement_amount
504+
let account_id_clone = account_id.clone();
505+
let full_amount_fut = self
506+
.store
507+
.load_uncredited_settlement_amount(account_id.clone())
508+
.and_then(move |uncredited_settlement_amount| {
509+
let full_amount_fut2 = result(BigUint::from_str(&amount).map_err(move |err| {
510+
let error_msg = format!("Error converting to BigUint {:?}", err);
511+
error!("{:?}", error_msg);
512+
}))
513+
.and_then(move |amount| {
514+
debug!("Got uncredited amount {}", amount);
515+
let full_amount = amount + uncredited_settlement_amount;
516+
debug!(
517+
"Notifying accounting system about full amount: {}",
518+
full_amount
519+
);
520+
ok(full_amount)
521+
});
522+
ok(full_amount_fut2)
523+
})
524+
.flatten();
525+
526+
let self_clone = self.clone();
527+
let ping_connector_fut = full_amount_fut.and_then(move |full_amount| {
528+
let url = url.clone();
529+
let account_id = account_id_clone.clone();
530+
let account_id2 = account_id_clone.clone();
531+
let full_amount2 = full_amount.clone();
532+
533+
let action = move || {
534+
let client = Client::new();
535+
let account_id = account_id.clone();
536+
let full_amount = full_amount.clone();
537+
let full_amount_clone = full_amount.clone();
538+
client
539+
.post(url.clone())
540+
.header("Idempotency-Key", tx_hash.to_string())
541+
.json(&json!(Quantity::new(full_amount.clone(), engine_scale)))
542+
.send()
543+
.map_err(move |err| {
544+
error!(
545+
"Error notifying Accounting System's account: {:?}, amount: {:?}: {:?}",
546+
account_id, full_amount_clone, err
547+
);
548+
})
549+
.and_then(move |ret| {
550+
ok((ret, full_amount))
551+
})
552+
};
553+
Retry::spawn(
554+
ExponentialBackoff::from_millis(10).take(MAX_RETRIES),
555+
action,
556+
)
557+
.map_err(move |_| {
558+
error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id2, full_amount2, tx_hash)
559+
})
560+
});
561+
562+
ping_connector_fut.and_then(move |ret| {
563+
trace!("Accounting system responded with {:?}", ret.0);
564+
self_clone.process_connector_response(account_id, ret.0, ret.1)
565+
})
566+
}
567+
568+
/// Parses a response from a connector into a Quantity type and calls a
569+
/// function to further process the parsed data to check if the store's
570+
/// uncredited settlement amount should be updated.
571+
fn process_connector_response(
572+
&self,
573+
account_id: String,
574+
response: HttpResponse,
575+
engine_amount: BigUint,
576+
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
577+
let self_clone = self.clone();
578+
if !response.status().is_success() {
579+
return Box::new(err(()));
580+
}
581+
Box::new(
582+
response
583+
.into_body()
584+
.concat2()
585+
.map_err(|err| {
586+
let err = format!("Couldn't retrieve body {:?}", err);
587+
error!("{}", err);
504588
})
505-
.and_then(move |response| {
506-
trace!("Accounting system responded with {:?}", response);
507-
if response.status().is_success() {
508-
Ok(())
509-
} else {
510-
Err(())
511-
}
589+
.and_then(move |body| {
590+
// Get the amount stored by the connector and
591+
// check for any precision loss / overflow
592+
serde_json::from_slice::<Quantity>(&body).map_err(|err| {
593+
let err = format!("Couldn't parse body {:?} into Quantity {:?}", body, err);
594+
error!("{}", err);
595+
})
512596
})
513-
};
514-
Retry::spawn(
515-
ExponentialBackoff::from_millis(10).take(MAX_RETRIES),
516-
action,
597+
.and_then(move |quantity: Quantity| {
598+
self_clone.process_received_quantity(account_id, quantity, engine_amount)
599+
}),
600+
)
601+
}
602+
603+
// Normalizes a received Quantity object against the local engine scale, and
604+
// if the normalized value is less than what the engine originally sent, it
605+
// stores it as uncredited settlement amount in the store.
606+
fn process_received_quantity(
607+
&self,
608+
account_id: String,
609+
quantity: Quantity,
610+
engine_amount: BigUint,
611+
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
612+
let store = self.store.clone();
613+
let engine_scale = self.asset_scale;
614+
Box::new(
615+
result(BigUint::from_str(&quantity.amount))
616+
.map_err(|err| {
617+
let error_msg = format!("Error converting to BigUint {:?}", err);
618+
error!("{:?}", error_msg);
619+
})
620+
.and_then(move |connector_amount: BigUint| {
621+
// Scale the amount settled by the
622+
// connector back up to our scale
623+
result(connector_amount.normalize_scale(ConvertDetails {
624+
from: quantity.scale,
625+
to: engine_scale,
626+
}))
627+
.and_then(move |scaled_connector_amount| {
628+
if engine_amount > scaled_connector_amount {
629+
let diff = engine_amount - scaled_connector_amount;
630+
// connector settled less than we
631+
// instructed it to, so we must save
632+
// the difference
633+
store.save_uncredited_settlement_amount(account_id, diff)
634+
} else {
635+
Box::new(ok(()))
636+
}
637+
})
638+
}),
517639
)
518-
.map_err(move |_| {
519-
error!("Exceeded max retries when notifying connector about account {:?} for amount {:?} and transaction hash {:?}. Please check your API.", account_id_clone, amount, tx_hash)
520-
})
521640
}
522641

523642
/// Helper function which submits an Ethereum ledger transaction to `to` for `amount`.
@@ -624,7 +743,12 @@ where
624743

625744
impl<S, Si, A> SettlementEngine for EthereumLedgerSettlementEngine<S, Si, A>
626745
where
627-
S: EthereumStore<Account = A> + Clone + Send + Sync + 'static,
746+
S: EthereumStore<Account = A>
747+
+ LeftoversStore<AssetType = BigUint>
748+
+ Clone
749+
+ Send
750+
+ Sync
751+
+ 'static,
628752
Si: EthereumLedgerTxSigner + Clone + Send + Sync + 'static,
629753
A: EthereumAccount + Send + Sync + 'static,
630754
{
@@ -980,7 +1104,7 @@ mod tests {
9801104
"{\"amount\": \"100000000000\", \"scale\": 18 }".to_string(),
9811105
))
9821106
.with_status(200)
983-
.with_body("OK".to_string())
1107+
.with_body("{\"amount\": \"100000000000\", \"scale\": 9 }".to_string())
9841108
.create();
9851109

9861110
let bob_connector_url = mockito::server_url();

0 commit comments

Comments
 (0)