Skip to content

Commit 963edff

Browse files
committed
feat: use epoch block oracle subgraph for supported networks
1 parent 25c9ae7 commit 963edff

File tree

2 files changed

+144
-12
lines changed

2 files changed

+144
-12
lines changed
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
use common::prelude::*;
2+
use futures::stream;
3+
use futures::Stream;
4+
use reqwest::Client;
5+
use serde_derive::{Deserialize, Serialize};
6+
use std::collections::BTreeMap;
7+
use std::pin::Pin;
8+
use std::sync::Arc;
9+
10+
pub trait EpochBlockOracleSubgraph {
11+
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>>;
12+
}
13+
14+
pub struct EpochBlockOracleSubgraphImpl {
15+
logger: Logger,
16+
endpoint: String,
17+
client: Client,
18+
}
19+
20+
impl EpochBlockOracleSubgraphImpl {
21+
pub fn new(logger: Logger, endpoint: String) -> Arc<Self> {
22+
Arc::new(EpochBlockOracleSubgraphImpl {
23+
logger,
24+
endpoint,
25+
client: Client::new(),
26+
})
27+
}
28+
}
29+
30+
#[derive(Serialize)]
31+
struct GraphqlRequest {
32+
query: String,
33+
variables: BTreeMap<String, serde_json::Value>,
34+
}
35+
36+
#[derive(Deserialize)]
37+
struct GraphqlResponse {
38+
data: Option<BTreeMap<String, serde_json::Value>>,
39+
errors: Option<Vec<serde_json::Value>>,
40+
}
41+
42+
const SUPPORTED_NETWORKS_QUERY: &str = r#"
43+
query Networks($skip: Int!) {
44+
networks(first: 1000, skip: $skip) {
45+
id
46+
alias
47+
}
48+
}
49+
"#;
50+
51+
impl EpochBlockOracleSubgraph for EpochBlockOracleSubgraphImpl {
52+
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>> {
53+
stream::iter((0..).step_by(1000))
54+
.then(move |skip| {
55+
let this = self.clone();
56+
async move {
57+
let req = GraphqlRequest {
58+
query: SUPPORTED_NETWORKS_QUERY.to_string(),
59+
variables: vec![("skip".to_string(), skip.into())]
60+
.into_iter()
61+
.collect(),
62+
};
63+
64+
let res: GraphqlResponse = this
65+
.client
66+
.post(&this.endpoint)
67+
.json(&req)
68+
.send()
69+
.await?
70+
.error_for_status()?
71+
.json()
72+
.await?;
73+
74+
if let Some(errs) = res.errors.filter(|errs| !errs.is_empty()) {
75+
return Err(anyhow!(
76+
"error querying supported networks from subgraph {}",
77+
serde_json::to_string(&errs)?
78+
));
79+
}
80+
81+
let data = res
82+
.data
83+
.ok_or_else(|| anyhow!("Data field is missing in the response"))?
84+
.remove("networks")
85+
.ok_or_else(|| anyhow!("'networks' field is missing in the data"))?;
86+
87+
#[derive(Deserialize)]
88+
#[allow(non_snake_case)]
89+
struct RawNetwork {
90+
id: String,
91+
alias: String,
92+
}
93+
94+
let page: Vec<RawNetwork> = serde_json::from_value(data)?;
95+
let page: Vec<String> = page
96+
.into_iter()
97+
.flat_map(|raw_network| vec![raw_network.id, raw_network.alias])
98+
.collect();
99+
100+
trace!(this.logger, "networks page"; "page_size" => page.len());
101+
102+
Ok(page)
103+
}
104+
})
105+
.take_while(|networks| {
106+
let keep_paginating = match networks {
107+
Ok(networks) => !networks.is_empty(),
108+
Err(_) => true,
109+
};
110+
async move { keep_paginating }
111+
})
112+
.map_ok(|networks| stream::iter(networks.into_iter().map(Ok)))
113+
.try_flatten()
114+
.boxed()
115+
}
116+
}

availability-oracle/src/main.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod contract;
2+
mod epoch_block_oracle_subgraph;
23
mod ipfs;
34
mod manifest;
45
mod network_subgraph;
@@ -8,6 +9,7 @@ mod util;
89
use common::prelude::*;
910
use common::prometheus;
1011
use contract::*;
12+
use epoch_block_oracle_subgraph::{EpochBlockOracleSubgraph, EpochBlockOracleSubgraphImpl};
1113
use ethers::abi::Address;
1214
use ethers::signers::LocalWallet;
1315
use ethers::signers::Signer;
@@ -105,14 +107,11 @@ struct Config {
105107
metrics_port: u16,
106108

107109
#[structopt(
108-
short,
109110
long,
110-
default_value = "mainnet",
111-
value_delimiter = ",",
112-
env = "SUPPORTED_NETWORKS",
113-
help = "a comma separated list of the supported network ids"
111+
env = "EPOCH_BLOCK_ORACLE_SUBGRAPH",
112+
help = "Graphql endpoint to the epoch block oracle subgraph"
114113
)]
115-
supported_networks: Vec<String>,
114+
epoch_block_oracle_subgraph: String,
116115

117116
// Note: `ethereum/contract` is a valid alias for `ethereum`
118117
#[structopt(
@@ -157,6 +156,8 @@ async fn main() -> Result<()> {
157156
async fn run(logger: Logger, config: Config) -> Result<()> {
158157
let ipfs = IpfsImpl::new(config.ipfs, config.ipfs_concurrency, config.ipfs_timeout);
159158
let subgraph = NetworkSubgraphImpl::new(logger.clone(), config.subgraph);
159+
let epoch_subgraph =
160+
EpochBlockOracleSubgraphImpl::new(logger.clone(), config.epoch_block_oracle_subgraph);
160161
let contract: Box<dyn StateManager> = if config.dry_run {
161162
Box::new(StateManagerDryRun::new(logger.clone()))
162163
} else {
@@ -194,7 +195,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
194195
subgraph.clone(),
195196
config.min_signal,
196197
grace_period,
197-
&config.supported_networks,
198+
epoch_subgraph.clone(),
198199
&config.supported_data_source_kinds,
199200
)
200201
.await
@@ -227,7 +228,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
227228
subgraph,
228229
config.min_signal,
229230
grace_period,
230-
&config.supported_networks,
231+
epoch_subgraph.clone(),
231232
&config.supported_data_source_kinds,
232233
)
233234
.await
@@ -278,18 +279,33 @@ pub async fn reconcile_deny_list(
278279
subgraph: Arc<impl NetworkSubgraph>,
279280
min_signal: u64,
280281
grace_period: Duration,
281-
supported_network_ids: &[String],
282+
epoch_subgraph: Arc<impl EpochBlockOracleSubgraph>,
282283
supported_ds_kinds: &[String],
283284
) -> Result<(), Error> {
284285
let logger = logger.clone();
285286

287+
// Fetch supported networks
288+
let mut supported_networks = Vec::new();
289+
let networks_stream = epoch_subgraph.supported_networks();
290+
futures::pin_mut!(networks_stream);
291+
while let Some(network) = networks_stream.next().await {
292+
match network {
293+
Ok(network_id) => supported_networks.push(network_id),
294+
Err(e) => Err(e)?,
295+
}
296+
}
297+
298+
info!(logger, "Supported networks";
299+
"alias" => supported_networks.join(", ")
300+
);
301+
286302
// Check the availability status of all subgraphs, and gather which should flip the deny flag.
287303
let status_changes: Vec<([u8; 32], bool)> = subgraph
288304
.deployments_over_threshold(min_signal, grace_period)
289305
.map(|deployment| async {
290306
let deployment = deployment?;
291307
let id = bytes32_to_cid_v0(deployment.id);
292-
let validity = match check(ipfs, id, supported_network_ids, supported_ds_kinds).await {
308+
let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await {
293309
Ok(()) => Valid::Yes,
294310
Err(CheckError::Invalid(e)) => Valid::No(e),
295311
Err(CheckError::Other(e)) => return Err(e),
@@ -419,7 +435,7 @@ impl From<Invalid> for CheckError {
419435
async fn check(
420436
ipfs: &impl Ipfs,
421437
deployment_id: Cid,
422-
supported_network_ids: &[String],
438+
supported_networks: &[String],
423439
supported_ds_kinds: &[String],
424440
) -> Result<(), CheckError> {
425441
fn check_link(file: &manifest::Link) -> Result<Cid, Invalid> {
@@ -483,7 +499,7 @@ async fn check(
483499
// - That network is listed in the `supported_networks` list
484500
match (network, ds_network) {
485501
(None, Some(ds_network)) => {
486-
if !supported_network_ids.contains(ds_network) {
502+
if !supported_networks.contains(ds_network) {
487503
return Err(Invalid::UnsupportedNetwork(ds_network.clone()).into());
488504
}
489505
network = Some(ds_network)

0 commit comments

Comments
 (0)