Skip to content

feat(fortuna): support multiple RPC endpoints with failover capability #2550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "7.4.10"
version = "7.4.11"
edition = "2021"

[lib]
Expand Down
39 changes: 30 additions & 9 deletions apps/fortuna/src/chain/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use {
config::EthereumConfig,
eth_utils::{
eth_gas_oracle::EthProviderOracle,
failover_middleware::FailoverMiddleware,
legacy_tx_middleware::LegacyTxMiddleware,
nonce_manager::NonceManagerMiddleware,
traced_client::{RpcMetrics, TracedClient},
utils::create_failover_provider,
},
},
anyhow::{anyhow, Error, Result},
Expand Down Expand Up @@ -155,27 +157,41 @@ impl<T: JsonRpcClient + 'static + Clone> SignablePythContractInner<T> {
}

impl SignablePythContract {
pub async fn from_config(chain_config: &EthereumConfig, private_key: &str) -> Result<Self> {
let provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;
Self::from_config_and_provider(chain_config, private_key, provider).await
pub async fn from_config_with_key(chain_config: &EthereumConfig, private_key: &str) -> Result<Self> {
if !chain_config.geth_rpc_addrs.is_empty() {
let provider = create_failover_provider(&chain_config.geth_rpc_addrs)?;
Self::from_config_and_provider(chain_config, private_key, provider).await
} else {
let provider = Provider::<Http>::try_from(chain_config.geth_rpc_addr.as_str())?;
Self::from_config_and_provider(chain_config, private_key, provider).await
}
}
}

impl InstrumentedSignablePythContract {
pub async fn from_config(
pub async fn from_config_with_metrics(
chain_config: &EthereumConfig,
private_key: &str,
chain_id: ChainId,
metrics: Arc<RpcMetrics>,
) -> Result<Self> {
let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?;
let rpc_addr = if !chain_config.geth_rpc_addrs.is_empty() {
&chain_config.geth_rpc_addrs[0]
} else {
&chain_config.geth_rpc_addr
};
let provider = TracedClient::new(chain_id, rpc_addr, metrics)?;
Self::from_config_and_provider(chain_config, private_key, provider).await
}
}

impl PythContract {
pub fn from_config(chain_config: &EthereumConfig) -> Result<Self> {
let provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;
pub fn from_config_basic(chain_config: &EthereumConfig) -> Result<Self> {
let provider = if !chain_config.geth_rpc_addrs.is_empty() {
create_failover_provider(&chain_config.geth_rpc_addrs)?
} else {
Provider::<Http>::try_from(chain_config.geth_rpc_addr.as_str())?
};

Ok(PythRandom::new(
chain_config.contract_addr,
Expand All @@ -185,12 +201,17 @@ impl PythContract {
}

impl InstrumentedPythContract {
pub fn from_config(
pub fn from_config_with_chain_metrics(
chain_config: &EthereumConfig,
chain_id: ChainId,
metrics: Arc<RpcMetrics>,
) -> Result<Self> {
let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?;
let rpc_addr = if !chain_config.geth_rpc_addrs.is_empty() {
&chain_config.geth_rpc_addrs[0]
} else {
&chain_config.geth_rpc_addr
};
let provider = TracedClient::new(chain_id, rpc_addr, metrics)?;

Ok(PythRandom::new(
chain_config.contract_addr,
Expand Down
4 changes: 2 additions & 2 deletions apps/fortuna/src/command/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use {

/// Run the entire random number generation protocol to produce a random number.
pub async fn generate(opts: &GenerateOptions) -> Result<()> {
let contract = Arc::new(
SignablePythContract::from_config(
let contract: Arc<SignablePythContract> = Arc::new(
SignablePythContract::from_config_with_key(
&Config::load(&opts.config.config)?.get_chain_config(&opts.chain_id)?,
&opts.private_key,
)
Expand Down
2 changes: 1 addition & 1 deletion apps/fortuna/src/command/get_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {
/// Get the on-chain request metadata for a provider and sequence number.
pub async fn get_request(opts: &GetRequestOptions) -> Result<()> {
// Initialize a Provider to interface with the EVM contract.
let contract = Arc::new(PythContract::from_config(
let contract: Arc<PythContract> = Arc::new(PythContract::from_config_basic(
&Config::load(&opts.config.config)?.get_chain_config(&opts.chain_id)?,
)?);

Expand Down
8 changes: 6 additions & 2 deletions apps/fortuna/src/command/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@ async fn inspect_chain(
num_requests: u64,
multicall_batch_size: u64,
) -> Result<()> {
let rpc_provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;
let rpc_provider = if !chain_config.geth_rpc_addrs.is_empty() {
crate::eth_utils::utils::create_failover_provider(&chain_config.geth_rpc_addrs)?
} else {
Provider::<Http>::try_from(chain_config.geth_rpc_addr.as_str())?
};
let multicall_exists = rpc_provider
.get_code(ethers::contract::MULTICALL_ADDRESS, None)
.await
.expect("Failed to get code")
.len()
> 0;

let contract = PythContract::from_config(chain_config)?;
let contract = PythContract::from_config_basic(chain_config)?;
let entropy_provider = contract.get_default_provider().call().await?;
let provider_info = contract.get_provider_info(entropy_provider).call().await?;
let mut current_request_number = provider_info.sequence_number;
Expand Down
4 changes: 2 additions & 2 deletions apps/fortuna/src/command/register_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ pub async fn register_provider_from_config(
))?;

// Initialize a Provider to interface with the EVM contract.
let contract =
Arc::new(SignablePythContract::from_config(chain_config, &private_key_string).await?);
let contract: Arc<SignablePythContract> =
Arc::new(SignablePythContract::from_config_with_key(chain_config, &private_key_string).await?);
// Create a new random hash chain.
let random = rand::random::<[u8; 32]>();
let secret = provider_config
Expand Down
4 changes: 2 additions & 2 deletions apps/fortuna/src/command/request_randomness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use {
};

pub async fn request_randomness(opts: &RequestRandomnessOptions) -> Result<()> {
let contract = Arc::new(
SignablePythContract::from_config(
let contract: Arc<SignablePythContract> = Arc::new(
SignablePythContract::from_config_with_key(
&Config::load(&opts.config.config)?.get_chain_config(&opts.chain_id)?,
&opts.private_key,
)
Expand Down
24 changes: 14 additions & 10 deletions apps/fortuna/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
))?;
let (tx_exit, rx_exit) = watch::channel(false);
let metrics_registry = Arc::new(RwLock::new(Registry::default()));
let rpc_metrics = Arc::new(RpcMetrics::new(metrics_registry.clone()).await);
let rpc_metrics: Arc<RpcMetrics> = Arc::new(RpcMetrics::new(metrics_registry.clone()).await);

let mut tasks = Vec::new();
for (chain_id, chain_config) in config.chains.clone() {
Expand Down Expand Up @@ -217,7 +217,7 @@ async fn setup_chain_state(
chain_config: &EthereumConfig,
rpc_metrics: Arc<RpcMetrics>,
) -> Result<BlockchainState> {
let contract = Arc::new(InstrumentedPythContract::from_config(
let contract = Arc::new(InstrumentedPythContract::from_config_with_chain_metrics(
chain_config,
chain_id.clone(),
rpc_metrics,
Expand Down Expand Up @@ -316,14 +316,18 @@ pub async fn check_block_timestamp_lag(
metrics: Family<ChainLabel, Gauge>,
rpc_metrics: Arc<RpcMetrics>,
) {
let provider =
match TracedClient::new(chain_id.clone(), &chain_config.geth_rpc_addr, rpc_metrics) {
Ok(r) => r,
Err(e) => {
tracing::error!("Failed to create provider for chain id - {:?}", e);
return;
}
};
let rpc_addr = if !chain_config.geth_rpc_addrs.is_empty() {
&chain_config.geth_rpc_addrs[0]
} else {
&chain_config.geth_rpc_addr
};
let provider = match TracedClient::new(chain_id.clone(), rpc_addr, rpc_metrics) {
Ok(r) => r,
Err(e) => {
tracing::error!("Failed to create provider for chain id - {:?}", e);
return;
}
};

const INF_LAG: i64 = 1000000; // value that definitely triggers an alert
let lag = match provider.get_block(BlockNumber::Latest).await {
Expand Down
2 changes: 1 addition & 1 deletion apps/fortuna/src/command/setup_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn setup_chain_provider(
))?;
let provider_address = private_key.clone().parse::<LocalWallet>()?.address();
// Initialize a Provider to interface with the EVM contract.
let contract = Arc::new(SignablePythContract::from_config(chain_config, &private_key).await?);
let contract: Arc<SignablePythContract> = Arc::new(SignablePythContract::from_config_with_key(chain_config, &private_key).await?);

tracing::info!("Fetching provider info");
let provider_info = contract.get_provider_info(provider_address).call().await?;
Expand Down
6 changes: 3 additions & 3 deletions apps/fortuna/src/command/withdraw_fees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub async fn withdraw_fees(opts: &WithdrawFeesOptions) -> Result<()> {
Some(chain_id) => {
let chain_config = &config.get_chain_config(&chain_id)?;
let contract =
SignablePythContract::from_config(chain_config, &private_key_string).await?;
SignablePythContract::from_config_with_key(chain_config, &private_key_string).await?;

withdraw_fees_for_chain(
contract,
Expand All @@ -36,7 +36,7 @@ pub async fn withdraw_fees(opts: &WithdrawFeesOptions) -> Result<()> {
for (chain_id, chain_config) in config.chains.iter() {
tracing::info!("Withdrawing fees for chain: {}", chain_id);
let contract =
SignablePythContract::from_config(chain_config, &private_key_string).await?;
SignablePythContract::from_config_with_key(chain_config, &private_key_string).await?;

withdraw_fees_for_chain(
contract,
Expand Down Expand Up @@ -78,7 +78,7 @@ pub async fn withdraw_fees_for_chain(

match &tx_result {
Some(receipt) => {
tracing::info!("Withdrawal transaction hash {:?}", receipt.transaction_hash);
tracing::info!("Withdrawal transaction hash {:?}", receipt.transaction_hash());
}
None => {
tracing::warn!("No transaction receipt. Unclear what happened to the transaction");
Expand Down
11 changes: 9 additions & 2 deletions apps/fortuna/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,11 @@ impl Config {

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct EthereumConfig {
/// URL of a Geth RPC endpoint to use for interacting with the blockchain.
/// TODO: Change type from String to Url
#[serde(default = "default_geth_rpc_addrs")]
pub geth_rpc_addrs: Vec<String>,

#[serde(skip_serializing)]
#[deprecated(note = "Use geth_rpc_addrs instead")]
pub geth_rpc_addr: String,

/// URL of a Geth RPC wss endpoint to use for subscribing to blockchain events.
Expand Down Expand Up @@ -202,6 +205,10 @@ fn default_backlog_range() -> u64 {
1000
}

fn default_geth_rpc_addrs() -> Vec<String> {
Vec::new()
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct EscalationPolicyConfig {
// The keeper will perform the callback as long as the tx is within this percentage of the configured gas limit.
Expand Down
132 changes: 132 additions & 0 deletions apps/fortuna/src/eth_utils/failover_middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use {
anyhow::Result,
axum::async_trait,
ethers::{
middleware::{Middleware, MiddlewareError},
prelude::{BlockId, PendingTransaction},
providers::JsonRpcClient,
types::{transaction::eip2718::TypedTransaction, BlockNumber, Filter, Log, U256},
},
thiserror::Error,
tracing,
};

#[derive(Clone, Debug)]
pub struct FailoverMiddleware<M> {
middlewares: Vec<M>,
current_idx: usize,
}

impl<M> FailoverMiddleware<M> {
pub fn new(middlewares: Vec<M>) -> Self {
if middlewares.is_empty() {
panic!("FailoverMiddleware requires at least one middleware");
}

Self {
middlewares,
current_idx: 0,
}
}

fn current(&self) -> &M {
&self.middlewares[self.current_idx]
}

async fn with_failover<F, Fut, R>(&self, operation: F) -> Result<R, FailoverMiddlewareError<M>>
where
F: Fn(&M) -> Fut + Clone,
Fut: std::future::Future<Output = Result<R, <M as Middleware>::Error>>,
M: Middleware,
{
let mut last_error = None;

for (idx, middleware) in self.middlewares.iter().enumerate() {
match operation(middleware).await {
Ok(result) => {
if idx > self.current_idx {
tracing::info!(
"Successfully used fallback RPC endpoint {} after primary endpoint failure",
idx
);
}
return Ok(result);
}
Err(err) => {
tracing::warn!(
"RPC endpoint {} failed with error: {:?}. Trying next endpoint if available.",
idx,
err
);
last_error = Some(FailoverMiddlewareError::MiddlewareError(err));
}
}
}

Err(last_error.unwrap_or_else(|| {
FailoverMiddlewareError::NoMiddlewares
}))
}
}

#[derive(Error, Debug)]
pub enum FailoverMiddlewareError<M: Middleware> {
#[error("{0}")]
MiddlewareError(M::Error),

#[error("No middlewares available")]
NoMiddlewares,
}

impl<M: Middleware> MiddlewareError for FailoverMiddlewareError<M> {
type Inner = M::Error;

fn from_err(src: M::Error) -> Self {
FailoverMiddlewareError::MiddlewareError(src)
}

fn as_inner(&self) -> Option<&Self::Inner> {
match self {
FailoverMiddlewareError::MiddlewareError(e) => Some(e),
_ => None,
}
}
}

#[async_trait]
impl<M: Middleware> Middleware for FailoverMiddleware<M> {
type Error = FailoverMiddlewareError<M>;
type Provider = M::Provider;
type Inner = M;

fn inner(&self) -> &M {
self.current()
}


async fn send_transaction<T: Into<TypedTransaction> + Send + Sync>(
&self,
tx: T,
block: Option<BlockId>,
) -> Result<PendingTransaction<'_, Self::Provider>, Self::Error> {
let tx = tx.into();
self.with_failover(|middleware| middleware.send_transaction(tx.clone(), block))
.await
}

async fn get_block_number(&self) -> Result<U256, Self::Error> {
self.with_failover(|middleware| middleware.get_block_number()).await
}

async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, Self::Error> {
self.with_failover(|middleware| middleware.get_logs(filter)).await
}

async fn fill_transaction(
&self,
tx: &mut TypedTransaction,
block: Option<BlockId>,
) -> Result<(), Self::Error> {
self.with_failover(|middleware| middleware.fill_transaction(tx, block)).await
}
}
6 changes: 6 additions & 0 deletions apps/fortuna/src/eth_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub mod eth_gas_oracle;
pub mod failover_middleware;
pub mod legacy_tx_middleware;
pub mod nonce_manager;
pub mod traced_client;
pub mod utils;
Loading
Loading