Skip to content

Commit

Permalink
move to atomic bool
Browse files Browse the repository at this point in the history
  • Loading branch information
Zacholme7 committed Feb 13, 2025
1 parent ef123d3 commit b5ffff4
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 13 deletions.
3 changes: 3 additions & 0 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::fs::File;
use std::io::{ErrorKind, Read, Write};
use std::net::SocketAddr;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use subnet_tracker::start_subnet_tracker;
Expand Down Expand Up @@ -305,6 +306,7 @@ impl Client {
wait_for_genesis(&beacon_nodes, genesis_time).await?;

// Start syncer
let operational_status = Arc::new(AtomicBool::new(false));
let (historic_finished_tx, historic_finished_rx) = oneshot::channel();
let mut syncer = eth::SsvEventSyncer::new(
database.clone(),
Expand All @@ -326,6 +328,7 @@ impl Client {
network: config.ssv_network,
historic_finished_notify: Some(historic_finished_tx),
},
operational_status.clone(),
)
.await
.map_err(|e| format!("Unable to create syncer: {e}"))?;
Expand Down
8 changes: 5 additions & 3 deletions anchor/eth/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use eth::{Config, SsvEventSyncer};
use openssl::rsa::Rsa;
use ssv_network_config::SsvNetworkConfig;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

Expand Down Expand Up @@ -51,9 +52,10 @@ async fn main() {
// exist. It will communicate with the rest of the system via processor channels and constantly
// keep the database up to date with new data for the rest of the system
let db = Arc::new(NetworkDatabase::new(path, &rsa_pubkey).unwrap());
let mut event_syncer = SsvEventSyncer::new(db.clone(), config)
.await
.expect("Failed to construct event syncer");
let mut event_syncer =
SsvEventSyncer::new(db.clone(), config, Arc::new(AtomicBool::new(false)))
.await
.expect("Failed to construct event syncer");
tokio::spawn(async move {
// this should never return, if it does we should gracefully handle it and shutdown the
// client.
Expand Down
2 changes: 1 addition & 1 deletion anchor/eth/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use sync::{Config, SsvEventSyncer, OPERATIONAL_STATUS};
pub use sync::{Config, SsvEventSyncer};
mod error;
mod event_parser;
mod event_processor;
Expand Down
22 changes: 13 additions & 9 deletions anchor/eth/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ static SSV_EVENTS: LazyLock<Vec<&str>> = LazyLock::new(|| {
]
});

/// Current operational status of sync. If there is an issue with the rpc endpoint or the ws
/// endpoing, the status is considered down. Otherwise, it is up
pub static OPERATIONAL_STATUS: AtomicBool = AtomicBool::new(true);

/// Batch size for log fetching
const BATCH_SIZE: u64 = 10000;

Expand Down Expand Up @@ -86,12 +82,19 @@ pub struct SsvEventSyncer {
network: SsvNetworkConfig,
/// Notify a channel as soon as the historical sync is done
historic_finished_notify: Option<Sender<()>>,
/// Current operational status of sync. If there is an issue with the rpc endpoint or the ws
/// endpoing, the status is considered down. Otherwise, it is up
operational_status: Arc<AtomicBool>,
}

impl SsvEventSyncer {
#[instrument(skip(db))]
/// Create a new SsvEventSyncer to sync all of the events from the chain
pub async fn new(db: Arc<NetworkDatabase>, config: Config) -> Result<Self, ExecutionError> {
pub async fn new(
db: Arc<NetworkDatabase>,
config: Config,
operational_status: Arc<AtomicBool>,
) -> Result<Self, ExecutionError> {
info!(?config, "Creating new SSV Event Syncer");

// Construct HTTP Provider
Expand Down Expand Up @@ -120,6 +123,7 @@ impl SsvEventSyncer {
event_processor,
network: config.network,
historic_finished_notify: config.historic_finished_notify,
operational_status,
})
}

Expand Down Expand Up @@ -312,7 +316,7 @@ impl SsvEventSyncer {

// When we encounter a rpc error, keep polling until success
async fn troubleshoot_rpc(&self) {
OPERATIONAL_STATUS.store(false, Ordering::Relaxed);
self.operational_status.store(false, Ordering::Relaxed);

let mut retry_count = 0;
let mut current_backoff_ms = INITIAL_BACKOFF_MS;
Expand All @@ -336,7 +340,7 @@ impl SsvEventSyncer {
}

// Success! We can exit the retry loop
OPERATIONAL_STATUS.store(true, Ordering::Relaxed);
self.operational_status.store(true, Ordering::Relaxed);
}

// Once caught up with the chain, start live sync which will stream in live blocks from the
Expand Down Expand Up @@ -369,9 +373,9 @@ impl SsvEventSyncer {
// Historical sync any missed blocks while down, can pass 0 as deployment
// block since it will use last_processed_block from DB anyways
self.historical_sync(contract_address, 0).await?;
OPERATIONAL_STATUS.store(true, Ordering::Relaxed);
self.operational_status.store(true, Ordering::Relaxed);
} else {
OPERATIONAL_STATUS.store(false, Ordering::Relaxed);
self.operational_status.store(false, Ordering::Relaxed);
tokio::time::sleep(Duration::from_secs(1)).await;
}
None
Expand Down

0 comments on commit b5ffff4

Please sign in to comment.