Skip to content

Commit 39638ef

Browse files
authored
Merge pull request #433 from AdExNetwork/v5-validator-workflow
V5 validator Leader/Follower ticks with timeouts & logging
2 parents 65b7524 + 948ab49 commit 39638ef

File tree

11 files changed

+173
-189
lines changed

11 files changed

+173
-189
lines changed

docs/config/dev.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ health_unsignable_promilles = 750
2121
propagation_timeout = 1000
2222

2323
fetch_timeout = 5000
24-
validator_tick_timeout = 5000
24+
all_campaigns_timeout = 5000
25+
channel_tick_timeout = 5000
2526

2627
ip_rate_limit = { type = 'ip', timeframe = 20000 }
2728
sid_rate_limit = { type = 'sid', timeframe = 20000 }

docs/config/prod.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ health_unsignable_promilles = 770
2222
propagation_timeout = 3000
2323

2424
fetch_timeout = 10000
25-
validator_tick_timeout = 10000
25+
all_campaigns_timeout = 10000
26+
channel_tick_timeout = 10000
2627

2728
ip_rate_limit = { type = 'ip', timeframe = 1200000 }
2829
sid_rate_limit = { type = 'sid', timeframe = 0 }

primitives/src/config.rs

+12-6
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,24 @@ pub struct Config {
3434
pub events_find_limit: u32,
3535
pub msgs_find_limit: u32,
3636
pub analytics_find_limit_v5: u32,
37-
// in milliseconds
37+
/// In milliseconds
3838
pub analytics_maxtime_v5: u32,
39-
// in milliseconds
39+
/// In milliseconds
4040
pub heartbeat_time: u32,
4141
pub health_threshold_promilles: u32,
4242
pub health_unsignable_promilles: u32,
43+
/// Sets the timeout for propagating a Validator message to a validator
44+
/// In Milliseconds
45+
pub propagation_timeout: u32,
4346
/// in milliseconds
44-
/// set's the Client timeout for [`SentryApi`]
45-
/// This includes requests made for propagating new messages
47+
/// Set's the Client timeout for [`SentryApi`]
48+
/// This includes all requests made to sentry except propagating messages.
49+
/// When propagating messages we make requests to foreign Sentry instances as well.
4650
pub fetch_timeout: u32,
47-
/// in milliseconds
48-
pub validator_tick_timeout: u32,
51+
/// In Milliseconds
52+
pub all_campaigns_timeout: u32,
53+
/// In Milliseconds
54+
pub channel_tick_timeout: u32,
4955
pub ip_rate_limit: RateLimit, // HashMap??
5056
pub sid_rate_limit: RateLimit, // HashMap ??
5157
#[serde(with = "SerHex::<StrictPfx>")]

sentry/src/lib.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,12 @@ async fn campaigns_router<A: Adapter + 'static>(
176176
) -> Result<Response<Body>, ResponseError> {
177177
let (path, method) = (req.uri().path(), req.method());
178178

179-
if let (Some(_caps), &Method::POST) = (CAMPAIGN_UPDATE_BY_ID.captures(path), method) {
179+
// For creating campaigns
180+
if (path, method) == ("/v5/campaign", &Method::POST) {
181+
let req = AuthRequired.call(req, app).await?;
182+
183+
create_campaign(req, app).await
184+
} else if let (Some(_caps), &Method::POST) = (CAMPAIGN_UPDATE_BY_ID.captures(path), method) {
180185
let req = CampaignLoad.call(req, app).await?;
181186

182187
update_campaign::handle_route(req, app).await

validator_worker/src/channel.rs

+52-22
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,20 @@ use crate::{
55
SentryApi,
66
};
77
use primitives::{adapter::Adapter, config::Config, util::ApiUrl, Channel, ChannelId};
8-
use slog::Logger;
9-
use std::collections::{hash_map::Entry, HashSet};
8+
use slog::{info, Logger};
9+
use std::{
10+
collections::{hash_map::Entry, HashSet},
11+
time::Duration,
12+
};
13+
use tokio::time::timeout;
1014

1115
pub async fn channel_tick<A: Adapter + 'static>(
1216
sentry: &SentryApi<A>,
1317
config: &Config,
1418
channel: Channel,
15-
// validators: &Validators,
16-
) -> Result<ChannelId, Error> {
19+
) -> Result<(ChannelId, Box<dyn std::fmt::Debug>), Error> {
20+
let logger = sentry.logger.clone();
21+
1722
let adapter = &sentry.adapter;
1823
let tick = channel
1924
.find_validator(adapter.whoami())
@@ -46,38 +51,63 @@ pub async fn channel_tick<A: Adapter + 'static>(
4651
.get(&channel.token)
4752
.ok_or(Error::ChannelTokenNotWhitelisted)?;
4853

49-
// TODO: Add timeout
54+
let duration = Duration::from_millis(config.channel_tick_timeout as u64);
55+
5056
match tick {
51-
primitives::Validator::Leader(_v) => {
52-
let _leader_tick_status = leader::tick(sentry, channel, accounting.balances, token)
53-
.await
54-
.map_err(|err| Error::LeaderTick(channel.id(), TickError::Tick(Box::new(err))))?;
55-
}
57+
primitives::Validator::Leader(_v) => match timeout(
58+
duration,
59+
leader::tick(sentry, channel, accounting.balances, token),
60+
)
61+
.await
62+
{
63+
Err(timeout_e) => Err(Error::LeaderTick(
64+
channel.id(),
65+
TickError::TimedOut(timeout_e),
66+
)),
67+
Ok(Err(tick_e)) => Err(Error::LeaderTick(
68+
channel.id(),
69+
TickError::Tick(Box::new(tick_e)),
70+
)),
71+
Ok(Ok(tick_status)) => {
72+
info!(&logger, "Leader tick"; "status" => ?tick_status);
73+
Ok((channel.id(), Box::new(tick_status)))
74+
}
75+
},
5676
primitives::Validator::Follower(_v) => {
57-
let _follower_tick_status =
58-
follower::tick(sentry, channel, all_spenders, accounting.balances, token)
59-
.await
60-
.map_err(|err| {
61-
Error::FollowerTick(channel.id(), TickError::Tick(Box::new(err)))
62-
})?;
77+
let follower_fut =
78+
follower::tick(sentry, channel, all_spenders, accounting.balances, token);
79+
match timeout(duration, follower_fut).await {
80+
Err(timeout_e) => Err(Error::FollowerTick(
81+
channel.id(),
82+
TickError::TimedOut(timeout_e),
83+
)),
84+
Ok(Err(tick_e)) => Err(Error::FollowerTick(
85+
channel.id(),
86+
TickError::Tick(Box::new(tick_e)),
87+
)),
88+
Ok(Ok(tick_status)) => {
89+
info!(&logger, "Follower tick"; "status" => ?tick_status);
90+
Ok((channel.id(), Box::new(tick_status)))
91+
}
92+
}
6393
}
64-
};
65-
66-
Ok(channel.id())
94+
}
6795
}
6896

6997
/// Fetches all `Campaign`s from Sentry and builds the `Channel`s to be processed
7098
/// along side all the `Validator`s' url & auth token
7199
pub async fn collect_channels<A: Adapter + 'static>(
72100
adapter: &A,
73101
sentry_url: &ApiUrl,
74-
_config: &Config,
102+
config: &Config,
75103
_logger: &Logger,
76104
) -> Result<(HashSet<Channel>, Validators), reqwest::Error> {
77105
let whoami = adapter.whoami();
78106

79-
// TODO: Move client creation
80-
let client = reqwest::Client::new();
107+
let all_campaigns_timeout = Duration::from_millis(config.all_campaigns_timeout as u64);
108+
let client = reqwest::Client::builder()
109+
.timeout(all_campaigns_timeout)
110+
.build()?;
81111
let campaigns = all_campaigns(client, sentry_url, whoami).await?;
82112
let channels = campaigns
83113
.iter()

validator_worker/src/core/follower_rules.rs

+23-23
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ mod test {
166166
#[test]
167167
fn get_health_the_approved_balance_tree_gte_our_accounting_is_healthy() {
168168
let all_spenders_sum = UnifiedNum::from(50);
169-
let our = vec![(ADDRESSES["publisher"].clone(), 50.into())]
169+
let our = vec![(ADDRESSES["publisher"], 50.into())]
170170
.into_iter()
171171
.collect();
172172
assert!(
@@ -178,7 +178,7 @@ mod test {
178178
get_health(
179179
all_spenders_sum,
180180
&our,
181-
&vec![(ADDRESSES["publisher"].clone(), 60.into())]
181+
&vec![(ADDRESSES["publisher"], 60.into())]
182182
.into_iter()
183183
.collect()
184184
)
@@ -189,7 +189,7 @@ mod test {
189189

190190
#[test]
191191
fn get_health_the_approved_balance_tree_is_positive_our_accounting_is_0_and_it_is_healthy() {
192-
let approved = vec![(ADDRESSES["publisher"].clone(), 50.into())]
192+
let approved = vec![(ADDRESSES["publisher"], 50.into())]
193193
.into_iter()
194194
.collect();
195195

@@ -207,10 +207,10 @@ mod test {
207207
assert!(
208208
get_health(
209209
all_spenders_sum,
210-
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
210+
&vec![(ADDRESSES["publisher"], 80.into())]
211211
.into_iter()
212212
.collect(),
213-
&vec![(ADDRESSES["publisher"].clone(), 79.into())]
213+
&vec![(ADDRESSES["publisher"], 79.into())]
214214
.into_iter()
215215
.collect()
216216
)
@@ -221,10 +221,10 @@ mod test {
221221
assert!(
222222
get_health(
223223
all_spenders_sum,
224-
&vec![(ADDRESSES["publisher"].clone(), 2.into())]
224+
&vec![(ADDRESSES["publisher"], 2.into())]
225225
.into_iter()
226226
.collect(),
227-
&vec![(ADDRESSES["publisher"].clone(), 1.into())]
227+
&vec![(ADDRESSES["publisher"], 1.into())]
228228
.into_iter()
229229
.collect()
230230
)
@@ -238,10 +238,10 @@ mod test {
238238
assert!(
239239
get_health(
240240
UnifiedNum::from(80),
241-
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
241+
&vec![(ADDRESSES["publisher"], 80.into())]
242242
.into_iter()
243243
.collect(),
244-
&vec![(ADDRESSES["publisher"].clone(), 70.into())]
244+
&vec![(ADDRESSES["publisher"], 70.into())]
245245
.into_iter()
246246
.collect()
247247
)
@@ -257,10 +257,10 @@ mod test {
257257
assert!(
258258
get_health(
259259
all_spenders_sum,
260-
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
260+
&vec![(ADDRESSES["publisher"], 80.into())]
261261
.into_iter()
262262
.collect(),
263-
&vec![(ADDRESSES["publisher2"].clone(), 80.into())]
263+
&vec![(ADDRESSES["publisher2"], 80.into())]
264264
.into_iter()
265265
.collect()
266266
)
@@ -271,12 +271,12 @@ mod test {
271271
assert!(
272272
get_health(
273273
all_spenders_sum,
274-
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
274+
&vec![(ADDRESSES["publisher"], 80.into())]
275275
.into_iter()
276276
.collect(),
277277
&vec![
278-
(ADDRESSES["publisher2"].clone(), 40.into()),
279-
(ADDRESSES["publisher"].clone(), 40.into())
278+
(ADDRESSES["publisher2"], 40.into()),
279+
(ADDRESSES["publisher"], 40.into())
280280
]
281281
.into_iter()
282282
.collect()
@@ -288,12 +288,12 @@ mod test {
288288
assert!(
289289
get_health(
290290
all_spenders_sum,
291-
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
291+
&vec![(ADDRESSES["publisher"], 80.into())]
292292
.into_iter()
293293
.collect(),
294294
&vec![
295-
(ADDRESSES["publisher2"].clone(), 20.into()),
296-
(ADDRESSES["publisher"].clone(), 60.into())
295+
(ADDRESSES["publisher2"], 20.into()),
296+
(ADDRESSES["publisher"], 60.into())
297297
]
298298
.into_iter()
299299
.collect()
@@ -305,12 +305,12 @@ mod test {
305305
assert!(
306306
get_health(
307307
all_spenders_sum,
308-
&vec![(ADDRESSES["publisher"].clone(), 80.into())]
308+
&vec![(ADDRESSES["publisher"], 80.into())]
309309
.into_iter()
310310
.collect(),
311311
&vec![
312-
(ADDRESSES["publisher2"].clone(), 2.into()),
313-
(ADDRESSES["publisher"].clone(), 78.into())
312+
(ADDRESSES["publisher2"], 2.into()),
313+
(ADDRESSES["publisher"], 78.into())
314314
]
315315
.into_iter()
316316
.collect()
@@ -323,12 +323,12 @@ mod test {
323323
get_health(
324324
all_spenders_sum,
325325
&vec![
326-
(ADDRESSES["publisher"].clone(), 100.into()),
327-
(ADDRESSES["publisher2"].clone(), 1.into())
326+
(ADDRESSES["publisher"], 100.into()),
327+
(ADDRESSES["publisher2"], 1.into())
328328
]
329329
.into_iter()
330330
.collect(),
331-
&vec![(ADDRESSES["publisher"].clone(), 100.into())]
331+
&vec![(ADDRESSES["publisher"], 100.into())]
332332
.into_iter()
333333
.collect()
334334
)

0 commit comments

Comments
 (0)