-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathepoch_block_oracle_subgraph.rs
121 lines (107 loc) · 3.85 KB
/
epoch_block_oracle_subgraph.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use common::prelude::*;
use futures::stream;
use futures::Stream;
use reqwest::Client;
use serde_derive::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;
pub trait EpochBlockOracleSubgraph {
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>>;
}
pub struct EpochBlockOracleSubgraphImpl {
logger: Logger,
endpoint: String,
client: Client,
}
impl EpochBlockOracleSubgraphImpl {
pub fn new(logger: Logger, endpoint: String) -> Arc<Self> {
Arc::new(EpochBlockOracleSubgraphImpl {
logger,
endpoint,
client: Client::new(),
})
}
}
#[derive(Serialize)]
struct GraphqlRequest {
query: String,
variables: BTreeMap<String, serde_json::Value>,
}
#[derive(Deserialize)]
struct GraphqlResponse {
data: Option<BTreeMap<String, serde_json::Value>>,
errors: Option<Vec<serde_json::Value>>,
}
const SUPPORTED_NETWORKS_QUERY: &str = r#"
query Networks($skip: Int!) {
globalState(id: "0") {
networks(first: 1000, skip: $skip) {
id
alias
}
}
}
"#;
impl EpochBlockOracleSubgraph for EpochBlockOracleSubgraphImpl {
fn supported_networks(self: Arc<Self>) -> Pin<Box<dyn Stream<Item = Result<String, Error>>>> {
stream::iter((0..).step_by(1000))
.then(move |skip| {
let this = self.clone();
async move {
let req = GraphqlRequest {
query: SUPPORTED_NETWORKS_QUERY.to_string(),
variables: vec![("skip".to_string(), skip.into())]
.into_iter()
.collect(),
};
let res: GraphqlResponse = this
.client
.post(&this.endpoint)
.json(&req)
.send()
.await?
.error_for_status()?
.json()
.await?;
if let Some(errs) = res.errors.filter(|errs| !errs.is_empty()) {
return Err(anyhow!(
"error querying supported networks from subgraph {}",
serde_json::to_string(&errs)?
));
}
let data = res
.data
.ok_or_else(|| anyhow!("Data field is missing in the response"))?
.remove("globalState")
.and_then(|global_state| global_state.get("networks").cloned())
.ok_or_else(|| {
anyhow!("'networks' field is missing in the globalState data")
})?;
#[derive(Deserialize)]
#[allow(non_snake_case)]
struct RawNetwork {
id: String,
alias: String,
}
let page: Vec<RawNetwork> = serde_json::from_value(data)?;
let page: Vec<String> = page
.into_iter()
.flat_map(|raw_network| vec![raw_network.id, raw_network.alias])
.collect();
trace!(this.logger, "networks page"; "page_size" => page.len());
Ok(page)
}
})
.take_while(|networks| {
let keep_paginating = match networks {
Ok(networks) => !networks.is_empty(),
Err(_) => true,
};
async move { keep_paginating }
})
.map_ok(|networks| stream::iter(networks.into_iter().map(Ok)))
.try_flatten()
.boxed()
}
}