Skip to content

Commit 126b3e9

Browse files
committed
worker - SentryApi - move collect_channels
1 parent a481760 commit 126b3e9

File tree

5 files changed

+66
-90
lines changed

5 files changed

+66
-90
lines changed

primitives/src/address.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,11 @@ impl TryFrom<&str> for Address {
111111
/// # Example
112112
/// ```
113113
/// use once_cell::sync::Lazy;
114-
/// use crate::Address;
114+
/// use primitives::Address;
115115
///
116116
/// static ADDRESS_0: Lazy<Address> = Lazy::new(|| b"0x80690751969B234697e9059e04ed72195c3507fa".try_into().unwrap());
117117
///
118-
/// println!("Address: {}", ADDRESS_0);
118+
/// println!("Address: {}", *ADDRESS_0);
119119
/// ```
120120
impl TryFrom<&'static [u8; 42]> for Address {
121121
type Error = Error;

validator_worker/src/channel.rs

Lines changed: 4 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
11
use crate::{
22
error::{Error, TickError},
3-
follower, leader,
4-
sentry_interface::{campaigns::all_campaigns, Validator, Validators},
5-
SentryApi,
6-
};
7-
use primitives::{adapter::Adapter, config::Config, util::ApiUrl, Channel, ChannelId};
8-
use slog::{info, Logger};
9-
use std::{
10-
collections::{hash_map::Entry, HashSet},
11-
time::Duration,
3+
follower, leader, SentryApi,
124
};
5+
use primitives::{adapter::Adapter, config::Config, Channel, ChannelId};
6+
use slog::info;
7+
use std::time::Duration;
138
use tokio::time::timeout;
149

1510
pub async fn channel_tick<A: Adapter + 'static>(
@@ -93,67 +88,3 @@ pub async fn channel_tick<A: Adapter + 'static>(
9388
}
9489
}
9590
}
96-
97-
/// Fetches all `Campaign`s from Sentry and builds the `Channel`s to be processed
98-
/// along side all the `Validator`s' url & auth token
99-
// TODO: Move to [`SentryApi`]
100-
pub async fn collect_channels<A: Adapter + 'static>(
101-
adapter: &A,
102-
sentry_url: &ApiUrl,
103-
config: &Config,
104-
_logger: &Logger,
105-
) -> Result<(HashSet<Channel>, Validators), reqwest::Error> {
106-
let for_whoami = adapter.whoami();
107-
108-
let all_campaigns_timeout = Duration::from_millis(config.all_campaigns_timeout as u64);
109-
let client = reqwest::Client::builder()
110-
.timeout(all_campaigns_timeout)
111-
.build()?;
112-
113-
let whoami_validator = Validator {
114-
url: sentry_url.clone(),
115-
token: adapter
116-
.get_auth(&for_whoami)
117-
.expect("Should get WhoAmI auth"),
118-
};
119-
let campaigns = all_campaigns(client, &whoami_validator, Some(for_whoami)).await?;
120-
let channels = campaigns
121-
.iter()
122-
.map(|campaign| campaign.channel)
123-
.collect::<HashSet<_>>();
124-
125-
let validators = campaigns
126-
.into_iter()
127-
.fold(Validators::new(), |mut acc, campaign| {
128-
for validator_desc in campaign.validators.iter() {
129-
// if Validator is already there, we can just skip it
130-
// remember, the campaigns are ordered by `created DESC`
131-
// so we will always get the latest Validator url first
132-
match acc.entry(validator_desc.id) {
133-
Entry::Occupied(_) => continue,
134-
Entry::Vacant(entry) => {
135-
// try to parse the url of the Validator Desc
136-
let validator_url = validator_desc.url.parse::<ApiUrl>();
137-
// and also try to find the Auth token in the config
138-
139-
// if there was an error with any of the operations, skip this `ValidatorDesc`
140-
let auth_token = adapter.get_auth(&validator_desc.id);
141-
142-
// only if `ApiUrl` parsing is `Ok` & Auth Token is found in the `Adapter`
143-
if let (Ok(url), Ok(auth_token)) = (validator_url, auth_token) {
144-
// add an entry for propagation
145-
entry.insert(Validator {
146-
url,
147-
token: auth_token,
148-
});
149-
}
150-
// otherwise it will try to do the same things on the next encounter of this `ValidatorId`
151-
}
152-
}
153-
}
154-
155-
acc
156-
});
157-
158-
Ok((channels, validators))
159-
}

validator_worker/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ fn main() -> Result<(), Box<dyn Error>> {
107107
};
108108
let mut adapter = DummyAdapter::init(options, &config);
109109
// unlock adapter
110-
adapter.unlock().expect("failed to Unlock Ethereum adapter");
110+
adapter.unlock().expect("failed to Unlock Dummy adapter");
111111

112112
AdapterTypes::DummyAdapter(Box::new(adapter))
113113
}

validator_worker/src/sentry_interface.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::{collections::HashMap, time::Duration};
1+
use std::{
2+
collections::{hash_map::Entry, HashMap, HashSet},
3+
time::Duration,
4+
};
25

36
use futures::future::{join_all, try_join_all, TryFutureExt};
47
use reqwest::{Client, Method};
@@ -14,7 +17,7 @@ use primitives::{
1417
spender::Spender,
1518
util::ApiUrl,
1619
validator::MessageTypes,
17-
Address, ChannelId, Config, ValidatorId,
20+
Address, Channel, ChannelId, Config, ValidatorId,
1821
};
1922
use thiserror::Error;
2023

@@ -248,6 +251,57 @@ impl<A: Adapter + 'static, P> SentryApi<A, P> {
248251
.map_err(Error::Request)
249252
.await?)
250253
}
254+
255+
/// Fetches all `Campaign`s from the _Who am I_ Sentry.
256+
/// It builds the `Channel`s to be processed alongside all the `Validator`s' url & auth token.
257+
pub async fn collect_channels(&self) -> Result<(HashSet<Channel>, Validators), Error> {
258+
let all_campaigns_timeout = Duration::from_millis(self.config.all_campaigns_timeout as u64);
259+
let client = reqwest::Client::builder()
260+
.timeout(all_campaigns_timeout)
261+
.build()?;
262+
263+
let campaigns =
264+
campaigns::all_campaigns(client, &self.whoami, Some(self.adapter.whoami())).await?;
265+
let channels = campaigns
266+
.iter()
267+
.map(|campaign| campaign.channel)
268+
.collect::<HashSet<_>>();
269+
270+
let validators = campaigns
271+
.into_iter()
272+
.fold(Validators::new(), |mut acc, campaign| {
273+
for validator_desc in campaign.validators.iter() {
274+
// if Validator is already there, we can just skip it
275+
// remember, the campaigns are ordered by `created DESC`
276+
// so we will always get the latest Validator url first
277+
match acc.entry(validator_desc.id) {
278+
Entry::Occupied(_) => continue,
279+
Entry::Vacant(entry) => {
280+
// try to parse the url of the Validator Desc
281+
let validator_url = validator_desc.url.parse::<ApiUrl>();
282+
// and also try to find the Auth token in the config
283+
284+
// if there was an error with any of the operations, skip this `ValidatorDesc`
285+
let auth_token = self.adapter.get_auth(&validator_desc.id);
286+
287+
// only if `ApiUrl` parsing is `Ok` & Auth Token is found in the `Adapter`
288+
if let (Ok(url), Ok(auth_token)) = (validator_url, auth_token) {
289+
// add an entry for propagation
290+
entry.insert(Validator {
291+
url,
292+
token: auth_token,
293+
});
294+
}
295+
// otherwise it will try to do the same things on the next encounter of this `ValidatorId`
296+
}
297+
}
298+
}
299+
300+
acc
301+
});
302+
303+
Ok((channels, validators))
304+
}
251305
}
252306

253307
impl<A: Adapter + 'static> SentryApi<A> {

validator_worker/src/worker.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use crate::{
2-
channel::{channel_tick, collect_channels},
3-
SentryApi,
4-
};
1+
use crate::{channel::channel_tick, SentryApi};
52
use primitives::{adapter::Adapter, Config};
63
use slog::{error, info, Logger};
74
use std::{error::Error, time::Duration};
@@ -60,14 +57,8 @@ impl<A: Adapter + 'static> Worker<A> {
6057

6158
pub async fn all_channels_tick(&self) {
6259
let logger = &self.logger;
63-
let (channels, validators) = match collect_channels(
64-
&self.adapter,
65-
&self.sentry.whoami.url,
66-
&self.config,
67-
logger,
68-
)
69-
.await
70-
{
60+
61+
let (channels, validators) = match self.sentry.collect_channels().await {
7162
Ok(res) => res,
7263
Err(err) => {
7364
error!(logger, "Error collecting all channels for tick"; "collect_channels" => ?err, "main" => "all_channels_tick");

0 commit comments

Comments
 (0)