From b5ffff4901ee45d89cc1523f13221591c30f5788 Mon Sep 17 00:00:00 2001 From: Zacholme7 Date: Thu, 13 Feb 2025 15:06:58 +0000 Subject: [PATCH] move to atomic bool --- anchor/client/src/lib.rs | 3 +++ anchor/eth/execution.rs | 8 +++++--- anchor/eth/src/lib.rs | 2 +- anchor/eth/src/sync.rs | 22 +++++++++++++--------- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 009af7af..6ad9c663 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -28,6 +28,7 @@ use std::fs::File; use std::io::{ErrorKind, Read, Write}; use std::net::SocketAddr; use std::path::Path; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use subnet_tracker::start_subnet_tracker; @@ -305,6 +306,7 @@ impl Client { wait_for_genesis(&beacon_nodes, genesis_time).await?; // Start syncer + let operational_status = Arc::new(AtomicBool::new(false)); let (historic_finished_tx, historic_finished_rx) = oneshot::channel(); let mut syncer = eth::SsvEventSyncer::new( database.clone(), @@ -326,6 +328,7 @@ impl Client { network: config.ssv_network, historic_finished_notify: Some(historic_finished_tx), }, + operational_status.clone(), ) .await .map_err(|e| format!("Unable to create syncer: {e}"))?; diff --git a/anchor/eth/execution.rs b/anchor/eth/execution.rs index ccc8ed88..5b129778 100644 --- a/anchor/eth/execution.rs +++ b/anchor/eth/execution.rs @@ -4,6 +4,7 @@ use eth::{Config, SsvEventSyncer}; use openssl::rsa::Rsa; use ssv_network_config::SsvNetworkConfig; use std::path::Path; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -51,9 +52,10 @@ async fn main() { // exist. It will communicate with the rest of the system via processor channels and constantly // keep the database up to date with new data for the rest of the system let db = Arc::new(NetworkDatabase::new(path, &rsa_pubkey).unwrap()); - let mut event_syncer = SsvEventSyncer::new(db.clone(), config) - .await - .expect("Failed to construct event syncer"); + let mut event_syncer = + SsvEventSyncer::new(db.clone(), config, Arc::new(AtomicBool::new(false))) + .await + .expect("Failed to construct event syncer"); tokio::spawn(async move { // this should never return, if it does we should gracefully handle it and shutdown the // client. diff --git a/anchor/eth/src/lib.rs b/anchor/eth/src/lib.rs index 80db6414..d698f54f 100644 --- a/anchor/eth/src/lib.rs +++ b/anchor/eth/src/lib.rs @@ -1,4 +1,4 @@ -pub use sync::{Config, SsvEventSyncer, OPERATIONAL_STATUS}; +pub use sync::{Config, SsvEventSyncer}; mod error; mod event_parser; mod event_processor; diff --git a/anchor/eth/src/sync.rs b/anchor/eth/src/sync.rs index 6350ea1d..f714310d 100644 --- a/anchor/eth/src/sync.rs +++ b/anchor/eth/src/sync.rs @@ -38,10 +38,6 @@ static SSV_EVENTS: LazyLock> = LazyLock::new(|| { ] }); -/// Current operational status of sync. If there is an issue with the rpc endpoint or the ws -/// endpoing, the status is considered down. Otherwise, it is up -pub static OPERATIONAL_STATUS: AtomicBool = AtomicBool::new(true); - /// Batch size for log fetching const BATCH_SIZE: u64 = 10000; @@ -86,12 +82,19 @@ pub struct SsvEventSyncer { network: SsvNetworkConfig, /// Notify a channel as soon as the historical sync is done historic_finished_notify: Option>, + /// Current operational status of sync. If there is an issue with the rpc endpoint or the ws + /// endpoing, the status is considered down. Otherwise, it is up + operational_status: Arc, } impl SsvEventSyncer { #[instrument(skip(db))] /// Create a new SsvEventSyncer to sync all of the events from the chain - pub async fn new(db: Arc, config: Config) -> Result { + pub async fn new( + db: Arc, + config: Config, + operational_status: Arc, + ) -> Result { info!(?config, "Creating new SSV Event Syncer"); // Construct HTTP Provider @@ -120,6 +123,7 @@ impl SsvEventSyncer { event_processor, network: config.network, historic_finished_notify: config.historic_finished_notify, + operational_status, }) } @@ -312,7 +316,7 @@ impl SsvEventSyncer { // When we encounter a rpc error, keep polling until success async fn troubleshoot_rpc(&self) { - OPERATIONAL_STATUS.store(false, Ordering::Relaxed); + self.operational_status.store(false, Ordering::Relaxed); let mut retry_count = 0; let mut current_backoff_ms = INITIAL_BACKOFF_MS; @@ -336,7 +340,7 @@ impl SsvEventSyncer { } // Success! We can exit the retry loop - OPERATIONAL_STATUS.store(true, Ordering::Relaxed); + self.operational_status.store(true, Ordering::Relaxed); } // Once caught up with the chain, start live sync which will stream in live blocks from the @@ -369,9 +373,9 @@ impl SsvEventSyncer { // Historical sync any missed blocks while down, can pass 0 as deployment // block since it will use last_processed_block from DB anyways self.historical_sync(contract_address, 0).await?; - OPERATIONAL_STATUS.store(true, Ordering::Relaxed); + self.operational_status.store(true, Ordering::Relaxed); } else { - OPERATIONAL_STATUS.store(false, Ordering::Relaxed); + self.operational_status.store(false, Ordering::Relaxed); tokio::time::sleep(Duration::from_secs(1)).await; } None