Skip to content

Commit b9bcbb8

Browse files
authored
Merge pull request #426 from AdExNetwork/v5-validator-workflow
V5 validator workflow
2 parents aae1400 + f7a37e8 commit b9bcbb8

File tree

17 files changed

+925
-602
lines changed

17 files changed

+925
-602
lines changed

Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

adapter/src/lib.rs

+16-8
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,22 @@ pub fn get_signable_state_root(
4242
Ok(res)
4343
}
4444

45-
pub fn get_balance_leaf(acc: &Address, amnt: &BigNum) -> Result<[u8; 32], Box<dyn Error>> {
46-
let tokens = [
47-
Token::Address(EthAddress::from_slice(acc.as_bytes())),
48-
Token::Uint(
49-
U256::from_dec_str(&amnt.to_str_radix(10))
50-
.map_err(|_| ChannelError::InvalidArgument("failed to parse amt".into()))?,
51-
),
52-
];
45+
pub fn get_balance_leaf(
46+
is_spender: bool,
47+
acc: &Address,
48+
amnt: &BigNum,
49+
) -> Result<[u8; 32], Box<dyn Error>> {
50+
let address = Token::Address(EthAddress::from_slice(acc.as_bytes()));
51+
let amount = Token::Uint(
52+
U256::from_dec_str(&amnt.to_str_radix(10))
53+
.map_err(|_| ChannelError::InvalidArgument("failed to parse amt".into()))?,
54+
);
55+
56+
let tokens = if is_spender {
57+
vec![Token::String("spender".into()), address, amount]
58+
} else {
59+
vec![address, amount]
60+
};
5361
let encoded = encode(&tokens).to_vec();
5462

5563
let mut result = Keccak::new_keccak256();

primitives/src/balances.rs

+21
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ impl Balances<UncheckedState> {
3838
}
3939

4040
impl<S: BalancesState> Balances<S> {
41+
pub fn new() -> Balances<CheckedState> {
42+
Balances {
43+
earners: Default::default(),
44+
spenders: Default::default(),
45+
state: Default::default(),
46+
}
47+
}
48+
4149
pub fn spend(
4250
&mut self,
4351
spender: Address,
@@ -78,6 +86,19 @@ impl<S: BalancesState> Balances<S> {
7886
state: PhantomData::default(),
7987
}
8088
}
89+
90+
/// Returns a tuple of the sum of `(earners, spenders)`
91+
pub fn sum(&self) -> Option<(UnifiedNum, UnifiedNum)> {
92+
self.spenders
93+
.values()
94+
.sum::<Option<UnifiedNum>>()
95+
.map(|spenders| {
96+
let earners = self.earners.values().sum::<Option<UnifiedNum>>()?;
97+
98+
Some((earners, spenders))
99+
})
100+
.flatten()
101+
}
81102
}
82103

83104
#[derive(Debug, Error)]

primitives/src/channel_v5.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::fmt;
44

55
use crate::{Address, ChannelId, Validator, ValidatorId};
66

7-
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
7+
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Hash)]
88
#[serde(rename_all = "camelCase")]
99
pub struct Channel {
1010
pub leader: ValidatorId,
@@ -45,7 +45,7 @@ impl Channel {
4545
}
4646

4747
/// The nonce is an Unsigned 256 number
48-
#[derive(Clone, Copy, PartialEq, Eq)]
48+
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
4949
pub struct Nonce(pub U256);
5050

5151
impl Nonce {

primitives/src/sentry.rs

+42-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::{
22
balances::BalancesState,
3-
channel_v5::Channel as ChannelV5,
43
spender::Spender,
54
validator::{ApproveState, Heartbeat, MessageTypes, NewState, Type as MessageType},
65
Address, Balances, BigNum, Channel, ChannelId, ValidatorId, IPFS,
@@ -11,12 +10,11 @@ use std::{collections::HashMap, fmt, hash::Hash};
1110

1211
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1312
#[serde(rename_all = "camelCase")]
14-
pub struct Accounting<S: BalancesState> {
15-
pub channel: ChannelV5,
13+
/// Channel Accounting response
14+
/// A collection of all `Accounting`s for a specific `Channel`
15+
pub struct AccountingResponse<S: BalancesState> {
1616
#[serde(flatten, bound = "S: BalancesState")]
1717
pub balances: Balances<S>,
18-
pub updated: Option<DateTime<Utc>>,
19-
pub created: DateTime<Utc>,
2018
}
2119

2220
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
@@ -187,6 +185,15 @@ pub struct AggregateEvents {
187185
#[serde(rename_all = "camelCase")]
188186
pub struct ChannelListResponse {
189187
pub channels: Vec<Channel>,
188+
// TODO: Replace with `Pagination`
189+
pub total_pages: u64,
190+
pub total: u64,
191+
pub page: u64,
192+
}
193+
194+
#[derive(Debug, Serialize, Deserialize)]
195+
#[serde(rename_all = "camelCase")]
196+
pub struct Pagination {
190197
pub total_pages: u64,
191198
pub total: u64,
192199
pub page: u64,
@@ -317,6 +324,36 @@ pub mod channel_list {
317324
}
318325
}
319326

327+
pub mod campaign {
328+
use crate::{Address, Campaign, ValidatorId};
329+
use chrono::{serde::ts_seconds, DateTime, Utc};
330+
use serde::{Deserialize, Serialize};
331+
332+
use super::Pagination;
333+
334+
#[derive(Debug, Serialize, Deserialize)]
335+
#[serde(rename_all = "camelCase")]
336+
pub struct CampaignListResponse {
337+
pub campaigns: Vec<Campaign>,
338+
#[serde(flatten)]
339+
pub pagination: Pagination,
340+
}
341+
342+
#[derive(Debug, Serialize, Deserialize)]
343+
pub struct CampaignListQuery {
344+
#[serde(default)]
345+
// default is `u64::default()` = `0`
346+
pub page: u64,
347+
/// filters the list on `active.to >= active_to_ge`
348+
/// It should be the same timestamp format as the `Campaign.active.to`: **seconds**
349+
#[serde(with = "ts_seconds", default = "Utc::now", rename = "activeTo")]
350+
pub active_to_ge: DateTime<Utc>,
351+
pub creator: Option<Address>,
352+
/// filters the campaigns containing a specific validator if provided
353+
pub validator: Option<ValidatorId>,
354+
}
355+
}
356+
320357
pub mod campaign_create {
321358
use chrono::{serde::ts_milliseconds, DateTime, Utc};
322359
use serde::{Deserialize, Serialize};

primitives/src/util/api.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{convert::TryFrom, fmt, str::FromStr};
33
use parse_display::Display;
44
use serde::{Deserialize, Serialize};
55
use thiserror::Error;
6+
pub use url::ParseError;
67
use url::Url;
78

89
// `url::Url::scheme()` returns lower-cased ASCII string without `:`
@@ -46,12 +47,15 @@ impl ApiUrl {
4647

4748
/// The Endpoint of which we want to get an url to (strips prefixed `/` from the endpoint),
4849
/// which can can include:
50+
///
4951
/// - path
5052
/// - query
5153
/// - fragments - usually should not be used for requesting API resources from server
54+
///
5255
/// This method does **not** check if a file is present
56+
///
5357
/// This method strips the starting `/` of the endpoint, if there is one
54-
pub fn join(&self, endpoint: &str) -> Result<Url, url::ParseError> {
58+
pub fn join(&self, endpoint: &str) -> Result<Url, ParseError> {
5559
let stripped = endpoint.strip_prefix('/').unwrap_or(endpoint);
5660
// this join is safe, since we always prefix the Url with `/`
5761
self.0.join(stripped)

sentry/src/routes/channel.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ async fn create_or_update_spendable_document(
246246
};
247247

248248
let spendable = Spendable {
249-
channel: channel.clone(),
249+
channel: *channel,
250250
deposit: Deposit {
251251
total,
252252
still_on_create2,

validator_worker/Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ futures = "0.3"
2929
tokio = { version = "1", features = ["time", "rt-multi-thread"] }
3030
# API client
3131
reqwest = { version = "0.11", features = ["json"] }
32-
# Configuration
32+
# Other
3333
lazy_static = "^1.4"
34+
thiserror = "^1.0"
3435
# (De)Serialization
3536
serde = { version = "^1.0", features = ["derive"] }
3637
serde_json = "1.0"
38+
serde_urlencoded = "0.7"
3739
toml = "0.5"
3840
# CLI
3941
clap = "^2.33"

validator_worker/src/channel.rs

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use crate::{
2+
error::{Error, TickError},
3+
follower,
4+
sentry_interface::{campaigns::all_campaigns, Validator, Validators},
5+
SentryApi,
6+
};
7+
use primitives::{adapter::Adapter, channel_v5::Channel, config::Config, util::ApiUrl, ChannelId};
8+
// use slog::{error, info, Logger};
9+
use slog::Logger;
10+
use std::collections::{hash_map::Entry, HashSet};
11+
12+
pub async fn channel_tick<A: Adapter + 'static>(
13+
adapter: A,
14+
config: &Config,
15+
logger: &Logger,
16+
channel: Channel,
17+
validators: Validators,
18+
) -> Result<ChannelId, Error<A::AdapterError>> {
19+
let tick = channel
20+
.find_validator(adapter.whoami())
21+
.ok_or(Error::ChannelNotIntendedForUs)?;
22+
23+
let sentry = SentryApi::init(
24+
adapter,
25+
logger.clone(),
26+
config.clone(),
27+
(channel, validators),
28+
)?;
29+
// `GET /channel/:id/spender/all`
30+
let all_spenders = sentry.get_all_spenders().await?;
31+
32+
// `GET /channel/:id/accounting`
33+
// Validation #1:
34+
// sum(Accounting.spenders) == sum(Accounting.earners)
35+
let accounting = sentry.get_accounting(channel.id()).await?;
36+
37+
// Validation #2:
38+
// spender.spender_leaf.total_deposit >= accounting.balances.spenders[spender.address]
39+
if !all_spenders.iter().all(|(address, spender)| {
40+
spender.total_deposited
41+
>= accounting
42+
.balances
43+
.spenders
44+
.get(address)
45+
.cloned()
46+
.unwrap_or_default()
47+
}) {
48+
return Err(Error::Validation);
49+
}
50+
51+
// TODO: Add timeout
52+
let _tick_result = match tick {
53+
primitives::Validator::Leader(_v) => todo!(),
54+
primitives::Validator::Follower(_v) => {
55+
follower::tick(&sentry, channel, accounting.balances)
56+
.await
57+
.map_err(|err| Error::FollowerTick(channel.id(), TickError::Tick(err)))?
58+
}
59+
};
60+
61+
// Validation #3
62+
// Accounting.balances != NewState.balances
63+
64+
// Validation #4
65+
// OUTPACE Rules:
66+
67+
Ok(channel.id())
68+
}
69+
70+
/// Fetches all `Campaign`s from Sentry and builds the `Channel`s to be processed
71+
/// along side all the `Validator`s' url & auth token
72+
pub async fn collect_channels<A: Adapter + 'static>(
73+
adapter: A,
74+
sentry_url: &ApiUrl,
75+
_config: &Config,
76+
_logger: &Logger,
77+
) -> Result<(HashSet<Channel>, Validators), reqwest::Error> {
78+
let whoami = adapter.whoami();
79+
80+
let campaigns = all_campaigns(sentry_url, whoami).await?;
81+
let channels = campaigns
82+
.iter()
83+
.map(|campaign| campaign.channel)
84+
.collect::<HashSet<_>>();
85+
86+
let validators = campaigns
87+
.into_iter()
88+
.fold(Validators::new(), |mut acc, campaign| {
89+
for validator_desc in campaign.validators.iter() {
90+
// if Validator is already there, we can just skip it
91+
// remember, the campaigns are ordered by `created DESC`
92+
// so we will always get the latest Validator url first
93+
match acc.entry(validator_desc.id) {
94+
Entry::Occupied(_) => continue,
95+
Entry::Vacant(entry) => {
96+
// try to parse the url of the Validator Desc
97+
let validator_url = validator_desc.url.parse::<ApiUrl>();
98+
// and also try to find the Auth token in the config
99+
100+
// if there was an error with any of the operations, skip this `ValidatorDesc`
101+
let auth_token = adapter.get_auth(&validator_desc.id);
102+
103+
// only if `ApiUrl` parsing is `Ok` & Auth Token is found in the `Adapter`
104+
if let (Ok(url), Ok(auth_token)) = (validator_url, auth_token) {
105+
// add an entry for propagation
106+
entry.insert(Validator {
107+
url,
108+
token: auth_token,
109+
});
110+
}
111+
// otherwise it will try to do the same things on the next encounter of this `ValidatorId`
112+
}
113+
}
114+
}
115+
116+
acc
117+
});
118+
119+
Ok((channels, validators))
120+
}

0 commit comments

Comments
 (0)