Skip to content

Commit e3e7e49

Browse files
authored
Merge pull request #17 from graphprotocol/mde/use-ebo-subgraph-for-supported-networks
feat: use epoch block oracle subgraph for supported networks
2 parents c9108c1 + 8389340 commit e3e7e49

File tree

4 files changed

+158
-17
lines changed

4 files changed

+158
-17
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ FLAGS:
1111
-V, --version Prints version information
1212
1313
OPTIONS:
14+
--epoch-block-oracle-subgraph <subgraph>
15+
Graphql endpoint to the epoch block oracle subgraph used for fetching supported networks [env: EPOCH_BLOCK_ORACLE_SUBGRAPH=]
16+
1417
--grace-period <grace-period>
1518
Grace period, in seconds from subgraph creation, for which subgraphs will not be checked [env: ORACLE_GRACE_PERIOD=] [default: 0]
1619
@@ -22,7 +25,7 @@ OPTIONS:
2225
2326
--ipfs-timeout <ipfs-timeout>
2427
IPFS timeout after which a file will be considered unavailable [env: ORACLE_IPFS_TIMEOUT_SECS=] [default: 30]
25-
28+
2629
--metrics-port <metrics-port>
2730
[env: ORACLE_METRICS_PORT=] [default: 8090]
2831
@@ -49,9 +52,6 @@ OPTIONS:
4952
5053
--supported-data-source-kinds <supported-data-source-kinds>...
5154
a comma separated list of the supported data source kinds [env: SUPPORTED_DATA_SOURCE_KINDS=] [default: ethereum,ethereum/contract,file/ipfs,substreams,file/arweave]
52-
53-
-s, --supported-networks <supported-networks>...
54-
a comma separated list of the supported network ids [env: SUPPORTED_NETWORKS=] [default: mainnet]
5555
5656
--url <url>
5757
RPC url for the network [env: RPC_URL=]
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
use common::prelude::*;
2+
use futures::stream;
3+
use futures::Stream;
4+
use reqwest::Client;
5+
use serde_derive::{Deserialize, Serialize};
6+
use std::collections::BTreeMap;
7+
use std::pin::Pin;
8+
use std::sync::Arc;
9+
10+
pub trait EpochBlockOracleSubgraph {
11+
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>>;
12+
}
13+
14+
pub struct EpochBlockOracleSubgraphImpl {
15+
logger: Logger,
16+
endpoint: String,
17+
client: Client,
18+
}
19+
20+
impl EpochBlockOracleSubgraphImpl {
21+
pub fn new(logger: Logger, endpoint: String) -> Arc<Self> {
22+
Arc::new(EpochBlockOracleSubgraphImpl {
23+
logger,
24+
endpoint,
25+
client: Client::new(),
26+
})
27+
}
28+
}
29+
30+
#[derive(Serialize)]
31+
struct GraphqlRequest {
32+
query: String,
33+
variables: BTreeMap<String, serde_json::Value>,
34+
}
35+
36+
#[derive(Deserialize)]
37+
struct GraphqlResponse {
38+
data: Option<BTreeMap<String, serde_json::Value>>,
39+
errors: Option<Vec<serde_json::Value>>,
40+
}
41+
42+
const SUPPORTED_NETWORKS_QUERY: &str = r#"
43+
query Networks($skip: Int!) {
44+
networks(first: 1000, skip: $skip) {
45+
id
46+
alias
47+
}
48+
}
49+
"#;
50+
51+
impl EpochBlockOracleSubgraph for EpochBlockOracleSubgraphImpl {
52+
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>> {
53+
stream::iter((0..).step_by(1000))
54+
.then(move |skip| {
55+
let this = self.clone();
56+
async move {
57+
let req = GraphqlRequest {
58+
query: SUPPORTED_NETWORKS_QUERY.to_string(),
59+
variables: vec![("skip".to_string(), skip.into())]
60+
.into_iter()
61+
.collect(),
62+
};
63+
64+
let res: GraphqlResponse = this
65+
.client
66+
.post(&this.endpoint)
67+
.json(&req)
68+
.send()
69+
.await?
70+
.error_for_status()?
71+
.json()
72+
.await?;
73+
74+
if let Some(errs) = res.errors.filter(|errs| !errs.is_empty()) {
75+
return Err(anyhow!(
76+
"error querying supported networks from subgraph {}",
77+
serde_json::to_string(&errs)?
78+
));
79+
}
80+
81+
let data = res
82+
.data
83+
.ok_or_else(|| anyhow!("Data field is missing in the response"))?
84+
.remove("networks")
85+
.ok_or_else(|| anyhow!("'networks' field is missing in the data"))?;
86+
87+
#[derive(Deserialize)]
88+
#[allow(non_snake_case)]
89+
struct RawNetwork {
90+
id: String,
91+
alias: String,
92+
}
93+
94+
let page: Vec<RawNetwork> = serde_json::from_value(data)?;
95+
let page: Vec<String> = page
96+
.into_iter()
97+
.flat_map(|raw_network| vec![raw_network.id, raw_network.alias])
98+
.collect();
99+
100+
trace!(this.logger, "networks page"; "page_size" => page.len());
101+
102+
Ok(page)
103+
}
104+
})
105+
.take_while(|networks| {
106+
let keep_paginating = match networks {
107+
Ok(networks) => !networks.is_empty(),
108+
Err(_) => true,
109+
};
110+
async move { keep_paginating }
111+
})
112+
.map_ok(|networks| stream::iter(networks.into_iter().map(Ok)))
113+
.try_flatten()
114+
.boxed()
115+
}
116+
}

availability-oracle/src/main.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod contract;
2+
mod epoch_block_oracle_subgraph;
23
mod ipfs;
34
mod manifest;
45
mod network_subgraph;
@@ -8,6 +9,7 @@ mod util;
89
use common::prelude::*;
910
use common::prometheus;
1011
use contract::*;
12+
use epoch_block_oracle_subgraph::{EpochBlockOracleSubgraph, EpochBlockOracleSubgraphImpl};
1113
use ethers::abi::Address;
1214
use ethers::signers::LocalWallet;
1315
use ethers::signers::Signer;
@@ -105,14 +107,11 @@ struct Config {
105107
metrics_port: u16,
106108

107109
#[structopt(
108-
short,
109110
long,
110-
default_value = "mainnet",
111-
value_delimiter = ",",
112-
env = "SUPPORTED_NETWORKS",
113-
help = "a comma separated list of the supported network ids"
111+
env = "EPOCH_BLOCK_ORACLE_SUBGRAPH",
112+
help = "Graphql endpoint to the epoch block oracle subgraph"
114113
)]
115-
supported_networks: Vec<String>,
114+
epoch_block_oracle_subgraph: String,
116115

117116
// Note: `ethereum/contract` is a valid alias for `ethereum`
118117
#[structopt(
@@ -157,6 +156,8 @@ async fn main() -> Result<()> {
157156
async fn run(logger: Logger, config: Config) -> Result<()> {
158157
let ipfs = IpfsImpl::new(config.ipfs, config.ipfs_concurrency, config.ipfs_timeout);
159158
let subgraph = NetworkSubgraphImpl::new(logger.clone(), config.subgraph);
159+
let epoch_subgraph =
160+
EpochBlockOracleSubgraphImpl::new(logger.clone(), config.epoch_block_oracle_subgraph);
160161
let contract: Box<dyn StateManager> = if config.dry_run {
161162
Box::new(StateManagerDryRun::new(logger.clone()))
162163
} else {
@@ -194,7 +195,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
194195
subgraph.clone(),
195196
config.min_signal,
196197
grace_period,
197-
&config.supported_networks,
198+
epoch_subgraph.clone(),
198199
&config.supported_data_source_kinds,
199200
)
200201
.await
@@ -227,7 +228,7 @@ async fn run(logger: Logger, config: Config) -> Result<()> {
227228
subgraph,
228229
config.min_signal,
229230
grace_period,
230-
&config.supported_networks,
231+
epoch_subgraph.clone(),
231232
&config.supported_data_source_kinds,
232233
)
233234
.await
@@ -278,18 +279,33 @@ pub async fn reconcile_deny_list(
278279
subgraph: Arc<impl NetworkSubgraph>,
279280
min_signal: u64,
280281
grace_period: Duration,
281-
supported_network_ids: &[String],
282+
epoch_subgraph: Arc<impl EpochBlockOracleSubgraph>,
282283
supported_ds_kinds: &[String],
283284
) -> Result<(), Error> {
284285
let logger = logger.clone();
285286

287+
// Fetch supported networks
288+
let mut supported_networks = Vec::new();
289+
let networks_stream = epoch_subgraph.supported_networks();
290+
futures::pin_mut!(networks_stream);
291+
while let Some(network) = networks_stream.next().await {
292+
match network {
293+
Ok(network_id) => supported_networks.push(network_id),
294+
Err(e) => Err(e)?,
295+
}
296+
}
297+
298+
info!(logger, "Supported networks";
299+
"alias" => supported_networks.join(", ")
300+
);
301+
286302
// Check the availability status of all subgraphs, and gather which should flip the deny flag.
287303
let status_changes: Vec<([u8; 32], bool)> = subgraph
288304
.deployments_over_threshold(min_signal, grace_period)
289305
.map(|deployment| async {
290306
let deployment = deployment?;
291307
let id = bytes32_to_cid_v0(deployment.id);
292-
let validity = match check(ipfs, id, supported_network_ids, supported_ds_kinds).await {
308+
let validity = match check(ipfs, id, &supported_networks, supported_ds_kinds).await {
293309
Ok(()) => Valid::Yes,
294310
Err(CheckError::Invalid(e)) => Valid::No(e),
295311
Err(CheckError::Other(e)) => return Err(e),
@@ -419,7 +435,7 @@ impl From<Invalid> for CheckError {
419435
async fn check(
420436
ipfs: &impl Ipfs,
421437
deployment_id: Cid,
422-
supported_network_ids: &[String],
438+
supported_networks: &[String],
423439
supported_ds_kinds: &[String],
424440
) -> Result<(), CheckError> {
425441
fn check_link(file: &manifest::Link) -> Result<Cid, Invalid> {
@@ -483,7 +499,7 @@ async fn check(
483499
// - That network is listed in the `supported_networks` list
484500
match (network, ds_network) {
485501
(None, Some(ds_network)) => {
486-
if !supported_network_ids.contains(ds_network) {
502+
if !supported_networks.contains(ds_network) {
487503
return Err(Invalid::UnsupportedNetwork(ds_network.clone()).into());
488504
}
489505
network = Some(ds_network)

availability-oracle/src/test.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::contract;
2+
use crate::epoch_block_oracle_subgraph::*;
23
use crate::ipfs::*;
34
use crate::network_subgraph::*;
45
use crate::util::bytes32_to_cid_v0;
@@ -51,7 +52,7 @@ async fn test_reconcile() {
5152
Arc::new(MockSubgraph),
5253
0,
5354
Duration::default(),
54-
&vec!["mainnet".into()],
55+
Arc::new(MockEBOSubgraph),
5556
&vec![
5657
"ethereum".into(),
5758
"ethereum/contract".into(),
@@ -95,6 +96,14 @@ impl NetworkSubgraph for MockSubgraph {
9596
}
9697
}
9798

99+
struct MockEBOSubgraph;
100+
101+
impl EpochBlockOracleSubgraph for MockEBOSubgraph {
102+
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>> {
103+
futures::stream::iter(vec![Ok("mainnet".to_string())]).boxed()
104+
}
105+
}
106+
98107
struct MockIpfs;
99108

100109
#[async_trait]

0 commit comments

Comments
 (0)