Skip to content

Changed NewState/Accounting to use Balances<CheckedState> #422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions primitives/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub mod messages {
use std::{any::type_name, convert::TryFrom, fmt, marker::PhantomData};
use thiserror::Error;

use crate::BalancesMap;
use crate::sentry::accounting::{Balances, CheckedState};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -282,7 +282,7 @@ pub mod messages {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Accounting {
pub balances: BalancesMap,
pub balances: Balances<CheckedState>,
pub last_aggregate: DateTime<Utc>,
}

Expand All @@ -304,7 +304,7 @@ pub mod messages {
pub struct NewState {
pub state_root: String,
pub signature: String,
pub balances: BalancesMap,
pub balances: Balances<CheckedState>,
//
// TODO: AIP#61 Remove exhausted property
//
Expand All @@ -318,7 +318,7 @@ pub mod messages {
pub reason: String,
pub state_root: String,
pub signature: String,
pub balances: Option<BalancesMap>,
pub balances: Option<Balances<CheckedState>>,
pub timestamp: Option<DateTime<Utc>>,
}

Expand Down
4 changes: 2 additions & 2 deletions validator_worker/src/core/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use num_traits::CheckedSub;

use primitives::sentry::{AggregateEvents, EventAggregate};
use primitives::sentry::{accounting::{Balances, CheckedState}, AggregateEvents, EventAggregate};
use primitives::validator::Accounting;
use primitives::{BalancesMap, BigNum, Channel, DomainError};

Expand All @@ -27,7 +27,7 @@ pub(crate) fn merge_aggrs(
//
// TODO: AIP#61 Sum all Spender Aggregates and use that for the new Accounting
//
let balances = BalancesMap::default();
let balances = Balances::<CheckedState>::default();

let new_accounting = Accounting {
balances,
Expand Down
25 changes: 17 additions & 8 deletions validator_worker/src/follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::fmt;

use primitives::adapter::{Adapter, AdapterErrorKind};
use primitives::validator::{ApproveState, MessageTypes, NewState, RejectState};
use primitives::{BalancesMap, BigNum};
use primitives::{
sentry::accounting::{Balances, CheckedState},
BalancesMap, BigNum,
};

use crate::core::follower_rules::{get_health, is_valid_transition};
use crate::heartbeat::{heartbeat, HeartbeatStatus};
Expand Down Expand Up @@ -78,20 +81,21 @@ pub async fn tick<A: Adapter + 'static>(
};

let producer_tick = producer::tick(iface).await?;
let empty_balances = BalancesMap::default();
let balances = match &producer_tick {
let empty_balances = Balances::<CheckedState>::default();
let _balances = match &producer_tick {
producer::TickStatus::Sent { new_accounting, .. } => &new_accounting.balances,
producer::TickStatus::NoNewEventAggr(balances) => balances,
producer::TickStatus::EmptyBalances => &empty_balances,
};
let approve_state_result = if let (Some(new_state), false) = (new_msg, latest_is_responded_to) {
on_new_state(iface, balances, &new_state).await?
on_new_state(iface, &BalancesMap::default(), &new_state).await?
} else {
ApproveStateResult::Sent(None)
};

Ok(TickStatus {
heartbeat: heartbeat(iface, balances).await?,
heartbeat: Default::default(),
// heartbeat: heartbeat(iface, balances).await?,
approve_state: approve_state_result,
producer_tick,
})
Expand All @@ -102,7 +106,8 @@ async fn on_new_state<'a, A: Adapter + 'static>(
balances: &'a BalancesMap,
new_state: &'a NewState,
) -> Result<ApproveStateResult<A::AdapterError>, Box<dyn Error>> {
let proposed_balances = new_state.balances.clone();
let proposed_balances = BalancesMap::default();
// let proposed_balances = new_state.balances.clone();
let proposed_state_root = new_state.state_root.clone();
if proposed_state_root != hex::encode(get_state_root_hash(iface, &proposed_balances)?) {
return Ok(on_error(iface, new_state, InvalidNewState::RootHash).await);
Expand All @@ -117,15 +122,19 @@ async fn on_new_state<'a, A: Adapter + 'static>(
}

let last_approve_response = iface.get_last_approved().await?;
let prev_balances = match last_approve_response
let _prev_balances = match last_approve_response
.last_approved
.and_then(|last_approved| last_approved.new_state)
{
Some(new_state) => new_state.msg.into_inner().balances,
_ => Default::default(),
};

if !is_valid_transition(&iface.channel, &prev_balances, &proposed_balances) {
if !is_valid_transition(
&iface.channel,
&BalancesMap::default(),
&BalancesMap::default(),
) {
return Ok(on_error(iface, new_state, InvalidNewState::Transition).await);
}

Expand Down
11 changes: 6 additions & 5 deletions validator_worker/src/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::error::Error;

use primitives::adapter::{Adapter, AdapterErrorKind};
use primitives::{
sentry::accounting::{Balances, CheckedState},
validator::{Accounting, MessageTypes, NewState},
BalancesMap, BigNum,
};
Expand All @@ -22,8 +23,8 @@ pub async fn tick<A: Adapter + 'static>(
iface: &SentryApi<A>,
) -> Result<TickStatus<A::AdapterError>, Box<dyn Error>> {
let producer_tick = producer::tick(iface).await?;
let empty_balances = BalancesMap::default();
let (balances, new_state) = match &producer_tick {
let empty_balances = Balances::<CheckedState>::default();
let (_balances, new_state) = match &producer_tick {
producer::TickStatus::Sent { new_accounting, .. } => {
let new_state = on_new_accounting(iface, new_accounting).await?;
(&new_accounting.balances, Some(new_state))
Expand All @@ -33,7 +34,7 @@ pub async fn tick<A: Adapter + 'static>(
};

Ok(TickStatus {
heartbeat: heartbeat(iface, balances).await?,
heartbeat: heartbeat(iface, &BalancesMap::default()).await?,
new_state,
producer_tick,
})
Expand All @@ -43,13 +44,13 @@ async fn on_new_accounting<A: Adapter + 'static>(
iface: &SentryApi<A>,
new_accounting: &Accounting,
) -> Result<Vec<PropagationResult<A::AdapterError>>, Box<dyn Error>> {
let state_root_raw = get_state_root_hash(iface, &new_accounting.balances)?;
let state_root_raw = get_state_root_hash(iface, &BalancesMap::default())?;
let state_root = hex::encode(state_root_raw);

let signature = iface.adapter.sign(&state_root)?;

let exhausted =
new_accounting.balances.values().sum::<BigNum>() == iface.channel.deposit_amount;
new_accounting.balances.earners.values().sum::<BigNum>() == iface.channel.deposit_amount;

let propagation_results = iface
.propagate(&[&MessageTypes::NewState(NewState {
Expand Down
9 changes: 6 additions & 3 deletions validator_worker/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use chrono::{TimeZone, Utc};

use primitives::adapter::{Adapter, AdapterErrorKind};
use primitives::validator::{Accounting, MessageTypes};
use primitives::{BalancesMap, ChannelId};
use primitives::{
sentry::accounting::{Balances, CheckedState},
ChannelId,
};

use crate::core::events::merge_aggrs;
use crate::sentry_interface::{PropagationResult, SentryApi};
Expand All @@ -18,7 +21,7 @@ pub enum TickStatus<AE: AdapterErrorKind> {
accounting_propagation: Vec<PropagationResult<AE>>,
event_counts: usize,
},
NoNewEventAggr(BalancesMap),
NoNewEventAggr(Balances<CheckedState>),
EmptyBalances,
}

Expand Down Expand Up @@ -52,7 +55,7 @@ pub async fn tick<A: Adapter + 'static>(
//
let new_accounting = merge_aggrs(&accounting, &aggrs.events, &iface.channel)?;

if new_accounting.balances.is_empty() {
if new_accounting.balances.earners.is_empty() || new_accounting.balances.spenders.is_empty() {
info!(
iface.logger,
"channel {}: empty Accounting balances, skipping propagation", iface.channel.id
Expand Down