Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync error handling #133

Merged
merged 25 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ impl Client {
.await
.map_err(|e| format!("Unable to create syncer: {e}"))?;

// Access to the operational status of the sync. This can be passed around to condition
// duties based on the current status of the sync
let _operational_status = syncer.operational_status();

executor.spawn(
async move {
if let Err(e) = syncer.sync().await {
Expand Down
1 change: 1 addition & 0 deletions anchor/eth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ path = "execution.rs"
alloy = { workspace = true }
base64 = { workspace = true }
database = { workspace = true }
fastrand = "2.3.0"
futures = { workspace = true }
hex = { workspace = true }
indexmap = { workspace = true }
Expand Down
7 changes: 3 additions & 4 deletions anchor/eth/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing_subscriber::{fmt, prelude::*, EnvFilter};
async fn main() {
// Setup a log filter & tracing
let filter = EnvFilter::builder()
.parse("info,hyper=off,hyper_util=off,alloy_transport_http=off,reqwest=off,alloy_rpc_client=off")
.parse("info,hyper=off,hyper_util=off,alloy_transport_http=off,reqwest=off,alloy_rpc_client=off,alloy_transport_ws=off,alloy_pubsub=off")
.expect("filter should be valid");
tracing_subscriber::registry()
.with(fmt::layer())
Expand All @@ -21,14 +21,13 @@ async fn main() {

// Dummy configuration with endpoint and network
let rpc_endpoint = "http://127.0.0.1:8545";
let _ws_endpoint = "ws://127.0.0.1:8546";
let ws_endpoint = "wss://eth.merkle.io";
let ws_endpoint = "ws://127.0.0.1:8546";
let beacon_endpoint = "http://127.0.0.1:5052";
let config = Config {
http_url: String::from(rpc_endpoint),
ws_url: String::from(ws_endpoint),
beacon_url: String::from(beacon_endpoint),
network: SsvNetworkConfig::constant("mainnet").unwrap().unwrap(),
network: SsvNetworkConfig::constant("holesky").unwrap().unwrap(),
historic_finished_notify: None,
};

Expand Down
1 change: 1 addition & 0 deletions anchor/eth/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub enum ExecutionError {
SyncError(String),
InvalidEvent(String),
RpcError(String),
WsError(String),
DecodeError(String),
Misc(String),
Duplicate(String),
Expand Down
5 changes: 3 additions & 2 deletions anchor/eth/src/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ impl EventProcessor {
// Make sure the data is the expected length
if data.len() != 704 {
debug!(operator_id = ?operator_id, expected = 704, actual = data.len(), "Invalid public key data length");
return Err(ExecutionError::InvalidEvent(String::from(
"Invalid public key data length. Expected 704, got {data.len()}",
return Err(ExecutionError::InvalidEvent(format!(
"Invalid public key data length. Expected 704, got {}",
data.len()
)));
}

Expand Down
165 changes: 111 additions & 54 deletions anchor/eth/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use alloy::sol_types::SolEvent;
use database::NetworkDatabase;
use futures::future::{try_join_all, Future};
use futures::StreamExt;
use rand::Rng;
use ssv_network_config::SsvNetworkConfig;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, LazyLock};
use tokio::sync::oneshot::Sender;
use tokio::time::Duration;
Expand Down Expand Up @@ -44,8 +44,9 @@ const BATCH_SIZE: u64 = 10000;
/// Batch size for task groups
const GROUP_SIZE: usize = 50;

/// Retry information for log fetching
const MAX_RETRIES: i32 = 5;
/// Exponential backoff constants
const INITIAL_BACKOFF_MS: u64 = 100; // Start with 100ms delay
const MAX_BACKOFF_MS: u64 = 30_000; // Don't wait longer than 30 seconds

// Block follow distance
const FOLLOW_DISTANCE: u64 = 8;
Expand Down Expand Up @@ -81,6 +82,9 @@ pub struct SsvEventSyncer {
network: SsvNetworkConfig,
/// Notify a channel as soon as the historical sync is done
historic_finished_notify: Option<Sender<()>>,
/// Current operational status of sync. If there is an issue with the rpc endpoint or the ws
/// endpoint, the status is considered down. Otherwise, it is up
operational_status: Arc<AtomicBool>,
}

impl SsvEventSyncer {
Expand Down Expand Up @@ -115,15 +119,19 @@ impl SsvEventSyncer {
event_processor,
network: config.network,
historic_finished_notify: config.historic_finished_notify,
operational_status: Arc::new(AtomicBool::new(false)),
})
}

// Get access to the current status of the sync
pub fn operational_status(&self) -> Arc<AtomicBool> {
self.operational_status.clone()
}

#[instrument(skip(self))]
/// Initial both a historical sync and a live sync from the chain. This function will transition
/// into a never ending live sync, so it should never return
/// Try to perform both a historical and live sync from the chain
pub async fn sync(&mut self) -> Result<(), ExecutionError> {
info!("Starting SSV event sync");

// Get network specific contract information
let contract_address = self.network.ssv_contract;
let deployment_block = self.network.ssv_contract_block;
Expand All @@ -132,7 +140,87 @@ impl SsvEventSyncer {
?contract_address,
deployment_block, "Using contract configuration"
);
loop {
match self.try_sync(contract_address, deployment_block).await {
Ok(_) => unreachable!("Sync should never finish successfully"),
Err(e) => {
error!(?e, "Sync failed, attempting recovery");
self.operational_status.store(false, Ordering::Relaxed);

match e {
ExecutionError::WsError(e) => {
warn!("Websocket error: {e}");
self.troubleshoot_ws().await;
}
ExecutionError::RpcError(e) => {
warn!("Rpc error: {e}");
self.troubleshoot_rpc().await
}
_ => {} // these are logged where they occur
}

self.operational_status.store(true, Ordering::Relaxed);
}
}
}
}

// When we encounter a rpc error, keep polling until success
async fn troubleshoot_rpc(&self) {
info!("Attempting to reconnect to rpc");
let mut retry_count = 0;
let mut current_backoff_ms = INITIAL_BACKOFF_MS;

while (self.rpc_client.get_block_number().await).is_err() {
self.apply_backoff(&mut retry_count, &mut current_backoff_ms)
.await;
}
}

// When we encounter a ws error, keep trying to connect until success
pub async fn troubleshoot_ws(&mut self) {
info!("Attempting to reconnect to ws");
let mut retry_count = 0;
let mut current_backoff_ms = INITIAL_BACKOFF_MS;

loop {
let ws = WsConnect::new(&self.ws_url);
if let Ok(ws_client) = ProviderBuilder::default().on_ws(ws).await {
self.ws_client = ws_client;
break;
}
// unsuccessfull, backoff
self.apply_backoff(&mut retry_count, &mut current_backoff_ms)
.await;
}
}

// Exponential backoff with cap
pub async fn apply_backoff(&self, retry_count: &mut i32, current_backoff_ms: &mut u64) {
// Calculate next backoff with some jitter
let jitter = fastrand::u64(0..=50); // Random 0-50ms
*current_backoff_ms = (*current_backoff_ms * 2) // Exponential growth
.min(MAX_BACKOFF_MS) // Don't exceed max backoff
.saturating_add(jitter); // Add jitter safely

warn!(
retry_count,
backoff_ms = current_backoff_ms,
"Conneciton error, backing off before retry"
);
*retry_count += 1;

tokio::time::sleep(Duration::from_millis(*current_backoff_ms)).await;
}

#[instrument(skip(self))]
/// Initial both a historical sync and a live sync from the chain. This function will transition
/// into a never ending live sync, so it should never return
pub async fn try_sync(
&mut self,
contract_address: Address,
deployment_block: u64,
) -> Result<(), ExecutionError> {
info!("Starting historical sync");
self.historical_sync(contract_address, deployment_block)
.await?;
Expand All @@ -158,13 +246,13 @@ impl SsvEventSyncer {
deployment_block: u64,
) -> Result<(), ExecutionError> {
// Start from the contract deployment block or the last block that has been processed
let last_processed_block = self.event_processor.db.state().get_last_processed_block() + 1;
let mut start_block = std::cmp::max(deployment_block, last_processed_block);
let last_processed_block = self.event_processor.db.state().get_last_processed_block();
let mut start_block = std::cmp::max(deployment_block, last_processed_block + 1);

loop {
let current_block = self.rpc_client.get_block_number().await.map_err(|e| {
error!(?e, "Failed to fetch block number");
ExecutionError::RpcError(format!("Unable to fetch block number {}", e))
ExecutionError::RpcError(format!("Failed to fetch block number: {e}"))
})?;

// Basic verification
Expand All @@ -179,12 +267,12 @@ impl SsvEventSyncer {
}

// Make sure we have blocks to sync
if start_block == end_block {
if start_block == end_block && start_block - 1 != last_processed_block {
info!("Synced up to the tip of the chain, breaking");
break;
}

// Here, we have a start..endblock that we need to sync the logs from. This range gets
// Here, we have a start..end block that we need to sync the logs from. This range gets
// broken up into individual ranges of BATCH_SIZE where the logs are fetches from. The
// individual ranges are further broken up into a set of batches that are sequentually
// processes. This makes it so we dont have a ton of logs that all have to be processed
Expand Down Expand Up @@ -227,7 +315,7 @@ impl SsvEventSyncer {

// Await all of the futures.
let event_logs: Vec<Vec<Log>> = try_join_all(group).await.map_err(|e| {
ExecutionError::SyncError(format!("Failed to join log future: {e}"))
ExecutionError::RpcError(format!("Failed to join log future: {e}"))
})?;
let event_logs: Vec<Log> = event_logs.into_iter().flatten().collect();

Expand Down Expand Up @@ -274,7 +362,7 @@ impl SsvEventSyncer {
from_block: u64,
to_block: u64,
deployment_address: Address,
) -> impl Future<Output = Result<Vec<Log>, ExecutionError>> {
) -> impl Future<Output = Result<Vec<Log>, ExecutionError>> + use<'_> {
// Setup filter and rpc client
let rpc_client = self.rpc_client.clone();
let filter = Filter::new()
Expand All @@ -286,31 +374,14 @@ impl SsvEventSyncer {
// Try to fetch logs with a retry upon error. Try up to MAX_RETRIES times and error if we
// exceed this as we can assume there is some underlying connection issue
async move {
let mut retry_cnt = 0;
loop {
match rpc_client.get_logs(&filter).await {
Ok(logs) => {
debug!(log_count = logs.len(), "Successfully fetched logs");
return Ok(logs);
}
Err(e) => {
if retry_cnt > MAX_RETRIES {
error!(?e, retry_cnt, "Max retries exceeded while fetching logs");
return Err(ExecutionError::RpcError(
"Unable to fetch logs".to_string(),
));
}

warn!(?e, retry_cnt, "Error fetching logs, retrying");

// increment retry_count and jitter retry duration
let jitter = rand::thread_rng().gen_range(0..=100);
let sleep_duration = Duration::from_millis(jitter);
tokio::time::sleep(sleep_duration).await;
retry_cnt += 1;
continue;
}
match rpc_client.get_logs(&filter).await {
Ok(logs) => {
debug!(log_count = logs.len(), "Successfully fetched logs");
Ok(logs)
}
Err(e) => Err(ExecutionError::RpcError(format!(
"Error fetching logs: {e}"
))),
}
}
}
Expand All @@ -332,23 +403,9 @@ impl SsvEventSyncer {
Some(sub.into_stream())
}
Err(e) => {
error!(
?e,
"Failed to subscribe to block stream. Retrying in 1 second..."
);

// Backend has closed, need to reconnect
let ws = WsConnect::new(&self.ws_url);
if let Ok(ws_client) = ProviderBuilder::default().on_ws(ws).await {
info!("Successfully reconnected to websocket. Catching back up");
self.ws_client = ws_client;
// Historical sync any missed blocks while down, can pass 0 as deployment
// block since it will use last_processed_block from DB anyways
self.historical_sync(contract_address, 0).await?;
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
None
return Err(ExecutionError::WsError(format!(
"Failed to subscribe to block stream: {e}"
)));
}
};

Expand Down