Skip to content

V5 validator Leader/Follower ticks with timeouts & logging #433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 4, 2021
3 changes: 2 additions & 1 deletion docs/config/dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ health_unsignable_promilles = 750
propagation_timeout = 1000

fetch_timeout = 5000
validator_tick_timeout = 5000
all_campaigns_timeout = 5000
channel_tick_timeout = 5000

ip_rate_limit = { type = 'ip', timeframe = 20000 }
sid_rate_limit = { type = 'sid', timeframe = 20000 }
Expand Down
3 changes: 2 additions & 1 deletion docs/config/prod.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ health_unsignable_promilles = 770
propagation_timeout = 3000

fetch_timeout = 10000
validator_tick_timeout = 10000
all_campaigns_timeout = 10000
channel_tick_timeout = 10000

ip_rate_limit = { type = 'ip', timeframe = 1200000 }
sid_rate_limit = { type = 'sid', timeframe = 0 }
Expand Down
18 changes: 12 additions & 6 deletions primitives/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,24 @@ pub struct Config {
pub events_find_limit: u32,
pub msgs_find_limit: u32,
pub analytics_find_limit_v5: u32,
// in milliseconds
/// In milliseconds
pub analytics_maxtime_v5: u32,
// in milliseconds
/// In milliseconds
pub heartbeat_time: u32,
pub health_threshold_promilles: u32,
pub health_unsignable_promilles: u32,
/// Sets the timeout for propagating a Validator message to a validator
/// In Milliseconds
pub propagation_timeout: u32,
/// in milliseconds
/// set's the Client timeout for [`SentryApi`]
/// This includes requests made for propagating new messages
/// Set's the Client timeout for [`SentryApi`]
/// This includes all requests made to sentry except propagating messages.
/// When propagating messages we make requests to foreign Sentry instances as well.
pub fetch_timeout: u32,
/// in milliseconds
pub validator_tick_timeout: u32,
/// In Milliseconds
pub all_campaigns_timeout: u32,
/// In Milliseconds
pub channel_tick_timeout: u32,
pub ip_rate_limit: RateLimit, // HashMap??
pub sid_rate_limit: RateLimit, // HashMap ??
#[serde(with = "SerHex::<StrictPfx>")]
Expand Down
7 changes: 6 additions & 1 deletion sentry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,12 @@ async fn campaigns_router<A: Adapter + 'static>(
) -> Result<Response<Body>, ResponseError> {
let (path, method) = (req.uri().path(), req.method());

if let (Some(_caps), &Method::POST) = (CAMPAIGN_UPDATE_BY_ID.captures(path), method) {
// For creating campaigns
if (path, method) == ("/v5/campaign", &Method::POST) {
let req = AuthRequired.call(req, app).await?;

create_campaign(req, app).await
} else if let (Some(_caps), &Method::POST) = (CAMPAIGN_UPDATE_BY_ID.captures(path), method) {
let req = CampaignLoad.call(req, app).await?;

update_campaign::handle_route(req, app).await
Expand Down
74 changes: 52 additions & 22 deletions validator_worker/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ use crate::{
SentryApi,
};
use primitives::{adapter::Adapter, config::Config, util::ApiUrl, Channel, ChannelId};
use slog::Logger;
use std::collections::{hash_map::Entry, HashSet};
use slog::{info, Logger};
use std::{
collections::{hash_map::Entry, HashSet},
time::Duration,
};
use tokio::time::timeout;

pub async fn channel_tick<A: Adapter + 'static>(
sentry: &SentryApi<A>,
config: &Config,
channel: Channel,
// validators: &Validators,
) -> Result<ChannelId, Error> {
) -> Result<(ChannelId, Box<dyn std::fmt::Debug>), Error> {
let logger = sentry.logger.clone();

let adapter = &sentry.adapter;
let tick = channel
.find_validator(adapter.whoami())
Expand Down Expand Up @@ -46,38 +51,63 @@ pub async fn channel_tick<A: Adapter + 'static>(
.get(&channel.token)
.ok_or(Error::ChannelTokenNotWhitelisted)?;

// TODO: Add timeout
let duration = Duration::from_millis(config.channel_tick_timeout as u64);

match tick {
primitives::Validator::Leader(_v) => {
let _leader_tick_status = leader::tick(sentry, channel, accounting.balances, token)
.await
.map_err(|err| Error::LeaderTick(channel.id(), TickError::Tick(Box::new(err))))?;
}
primitives::Validator::Leader(_v) => match timeout(
duration,
leader::tick(sentry, channel, accounting.balances, token),
)
.await
{
Err(timeout_e) => Err(Error::LeaderTick(
channel.id(),
TickError::TimedOut(timeout_e),
)),
Ok(Err(tick_e)) => Err(Error::LeaderTick(
channel.id(),
TickError::Tick(Box::new(tick_e)),
)),
Ok(Ok(tick_status)) => {
info!(&logger, "Leader tick"; "status" => ?tick_status);
Ok((channel.id(), Box::new(tick_status)))
}
},
primitives::Validator::Follower(_v) => {
let _follower_tick_status =
follower::tick(sentry, channel, all_spenders, accounting.balances, token)
.await
.map_err(|err| {
Error::FollowerTick(channel.id(), TickError::Tick(Box::new(err)))
})?;
let follower_fut =
follower::tick(sentry, channel, all_spenders, accounting.balances, token);
match timeout(duration, follower_fut).await {
Err(timeout_e) => Err(Error::FollowerTick(
channel.id(),
TickError::TimedOut(timeout_e),
)),
Ok(Err(tick_e)) => Err(Error::FollowerTick(
channel.id(),
TickError::Tick(Box::new(tick_e)),
)),
Ok(Ok(tick_status)) => {
info!(&logger, "Follower tick"; "status" => ?tick_status);
Ok((channel.id(), Box::new(tick_status)))
}
}
}
};

Ok(channel.id())
}
}

/// Fetches all `Campaign`s from Sentry and builds the `Channel`s to be processed
/// along side all the `Validator`s' url & auth token
pub async fn collect_channels<A: Adapter + 'static>(
adapter: &A,
sentry_url: &ApiUrl,
_config: &Config,
config: &Config,
_logger: &Logger,
) -> Result<(HashSet<Channel>, Validators), reqwest::Error> {
let whoami = adapter.whoami();

// TODO: Move client creation
let client = reqwest::Client::new();
let all_campaigns_timeout = Duration::from_millis(config.all_campaigns_timeout as u64);
let client = reqwest::Client::builder()
.timeout(all_campaigns_timeout)
.build()?;
let campaigns = all_campaigns(client, sentry_url, whoami).await?;
let channels = campaigns
.iter()
Expand Down
46 changes: 23 additions & 23 deletions validator_worker/src/core/follower_rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ mod test {
#[test]
fn get_health_the_approved_balance_tree_gte_our_accounting_is_healthy() {
let all_spenders_sum = UnifiedNum::from(50);
let our = vec![(ADDRESSES["publisher"].clone(), 50.into())]
let our = vec![(ADDRESSES["publisher"], 50.into())]
.into_iter()
.collect();
assert!(
Expand All @@ -178,7 +178,7 @@ mod test {
get_health(
all_spenders_sum,
&our,
&vec![(ADDRESSES["publisher"].clone(), 60.into())]
&vec![(ADDRESSES["publisher"], 60.into())]
.into_iter()
.collect()
)
Expand All @@ -189,7 +189,7 @@ mod test {

#[test]
fn get_health_the_approved_balance_tree_is_positive_our_accounting_is_0_and_it_is_healthy() {
let approved = vec![(ADDRESSES["publisher"].clone(), 50.into())]
let approved = vec![(ADDRESSES["publisher"], 50.into())]
.into_iter()
.collect();

Expand All @@ -207,10 +207,10 @@ mod test {
assert!(
get_health(
all_spenders_sum,
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
&vec![(ADDRESSES["publisher"], 80.into())]
.into_iter()
.collect(),
&vec![(ADDRESSES["publisher"].clone(), 79.into())]
&vec![(ADDRESSES["publisher"], 79.into())]
.into_iter()
.collect()
)
Expand All @@ -221,10 +221,10 @@ mod test {
assert!(
get_health(
all_spenders_sum,
&vec![(ADDRESSES["publisher"].clone(), 2.into())]
&vec![(ADDRESSES["publisher"], 2.into())]
.into_iter()
.collect(),
&vec![(ADDRESSES["publisher"].clone(), 1.into())]
&vec![(ADDRESSES["publisher"], 1.into())]
.into_iter()
.collect()
)
Expand All @@ -238,10 +238,10 @@ mod test {
assert!(
get_health(
UnifiedNum::from(80),
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
&vec![(ADDRESSES["publisher"], 80.into())]
.into_iter()
.collect(),
&vec![(ADDRESSES["publisher"].clone(), 70.into())]
&vec![(ADDRESSES["publisher"], 70.into())]
.into_iter()
.collect()
)
Expand All @@ -257,10 +257,10 @@ mod test {
assert!(
get_health(
all_spenders_sum,
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
&vec![(ADDRESSES["publisher"], 80.into())]
.into_iter()
.collect(),
&vec![(ADDRESSES["publisher2"].clone(), 80.into())]
&vec![(ADDRESSES["publisher2"], 80.into())]
.into_iter()
.collect()
)
Expand All @@ -271,12 +271,12 @@ mod test {
assert!(
get_health(
all_spenders_sum,
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
&vec![(ADDRESSES["publisher"], 80.into())]
.into_iter()
.collect(),
&vec![
(ADDRESSES["publisher2"].clone(), 40.into()),
(ADDRESSES["publisher"].clone(), 40.into())
(ADDRESSES["publisher2"], 40.into()),
(ADDRESSES["publisher"], 40.into())
]
.into_iter()
.collect()
Expand All @@ -288,12 +288,12 @@ mod test {
assert!(
get_health(
all_spenders_sum,
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
&vec![(ADDRESSES["publisher"], 80.into())]
.into_iter()
.collect(),
&vec![
(ADDRESSES["publisher2"].clone(), 20.into()),
(ADDRESSES["publisher"].clone(), 60.into())
(ADDRESSES["publisher2"], 20.into()),
(ADDRESSES["publisher"], 60.into())
]
.into_iter()
.collect()
Expand All @@ -305,12 +305,12 @@ mod test {
assert!(
get_health(
all_spenders_sum,
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
&vec![(ADDRESSES["publisher"], 80.into())]
.into_iter()
.collect(),
&vec![
(ADDRESSES["publisher2"].clone(), 2.into()),
(ADDRESSES["publisher"].clone(), 78.into())
(ADDRESSES["publisher2"], 2.into()),
(ADDRESSES["publisher"], 78.into())
]
.into_iter()
.collect()
Expand All @@ -323,12 +323,12 @@ mod test {
get_health(
all_spenders_sum,
&vec![
(ADDRESSES["publisher"].clone(), 100.into()),
(ADDRESSES["publisher2"].clone(), 1.into())
(ADDRESSES["publisher"], 100.into()),
(ADDRESSES["publisher2"], 1.into())
]
.into_iter()
.collect(),
&vec![(ADDRESSES["publisher"].clone(), 100.into())]
&vec![(ADDRESSES["publisher"], 100.into())]
.into_iter()
.collect()
)
Expand Down
Loading