diff --git a/telegraf/telegraf.conf b/telegraf/telegraf.conf index 3e6f8ce7..36577de6 100644 --- a/telegraf/telegraf.conf +++ b/telegraf/telegraf.conf @@ -218,3 +218,97 @@ # ## Fixed time-window for the available payload size e.g. "5m" # # rate_limit_period = "0s" + +############################################################################### +# INPUT PLUGINS # +############################################################################### + + +# Read metrics about cpu usage +[[inputs.cpu]] + ## Whether to report per-cpu stats or not + percpu = true + ## Whether to report total system cpu stats or not + totalcpu = true + ## If true, collect raw CPU time metrics + collect_cpu_time = false + ## If true, compute and report the sum of all non-idle CPU states + ## NOTE: The resulting 'time_active' field INCLUDES 'iowait'! + report_active = false + ## If true and the info is available then add core_id and physical_id tags + core_tags = false + + +# Read metrics about disk usage by mount point +[[inputs.disk]] + ## By default stats will be gathered for all mount points. + ## Set mount_points will restrict the stats to only the specified mount points. + # mount_points = ["/"] + + ## Ignore mount points by filesystem type. + ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"] + + ## Ignore mount points by mount options. + ## The 'mount' command reports options of all mounts in parathesis. + ## Bind mounts can be ignored with the special 'bind' option. + # ignore_mount_opts = [] + + +# Read metrics about disk IO by device +[[inputs.diskio]] + ## Devices to collect stats for + ## Wildcards are supported except for disk synonyms like '/dev/disk/by-id'. + ## ex. devices = ["sda", "sdb", "vd*", "/dev/disk/by-id/nvme-eui.00123deadc0de123"] + # devices = ["*"] + + ## Skip gathering of the disk's serial numbers. + # skip_serial_number = true + + ## Device metadata tags to add on systems supporting it (Linux only) + ## Use 'udevadm info -q property -n ' to get a list of properties. + ## Note: Most, but not all, udev properties can be accessed this way. Properties + ## that are currently inaccessible include DEVTYPE, DEVNAME, and DEVPATH. + # device_tags = ["ID_FS_TYPE", "ID_FS_USAGE"] + + ## Using the same metadata source as device_tags, you can also customize the + ## name of the device via templates. + ## The 'name_templates' parameter is a list of templates to try and apply to + ## the device. The template may contain variables in the form of '$PROPERTY' or + ## '${PROPERTY}'. The first template which does not contain any variables not + ## present for the device is used as the device name tag. + ## The typical use case is for LVM volumes, to get the VG/LV name instead of + ## the near-meaningless DM-0 name. + # name_templates = ["$ID_FS_LABEL","$DM_VG_NAME/$DM_LV_NAME"] + + +# Plugin to collect various Linux kernel statistics. +# This plugin ONLY supports Linux +[[inputs.kernel]] + ## Additional gather options + ## Possible options include: + ## * ksm - kernel same-page merging + ## * psi - pressure stall information + # collect = [] + + +# Read metrics about memory usage +[[inputs.mem]] + # no configuration + + +# Get the number of processes and group them by status +# This plugin ONLY supports non-Windows +[[inputs.processes]] + ## Use sudo to run ps command on *BSD systems. Linux systems will read + ## /proc, so this does not apply there. + # use_sudo = false + + +# Read metrics about swap memory usage +[[inputs.swap]] + # no configuration + + +# Read metrics about system load & uptime +[[inputs.system]] + # no configuration diff --git a/tip-router-operator-cli/src/backup_snapshots.rs b/tip-router-operator-cli/src/backup_snapshots.rs index c5a4b9a6..ea0827a0 100644 --- a/tip-router-operator-cli/src/backup_snapshots.rs +++ b/tip-router-operator-cli/src/backup_snapshots.rs @@ -7,17 +7,19 @@ use tokio::time; use crate::process_epoch::get_previous_epoch_last_slot; +const MAXIMUM_BACKUP_INCREMENTAL_SNAPSHOTS_PER_EPOCH: usize = 3; + /// Represents a parsed incremental snapshot filename #[derive(Debug)] -struct SnapshotInfo { +pub struct SnapshotInfo { path: PathBuf, _start_slot: u64, - end_slot: u64, + pub end_slot: u64, } impl SnapshotInfo { /// Try to parse a snapshot filename into slot information - fn from_path(path: PathBuf) -> Option { + pub fn from_path(path: PathBuf) -> Option { let file_name = path.file_name()?.to_str()?; // Only try to parse if it's an incremental snapshot @@ -67,25 +69,32 @@ impl BackupSnapshotMonitor { } /// Gets target slot for current epoch - fn get_target_slot(&self) -> Result { + fn get_target_slots(&self) -> Result<(u64, u64)> { + // Get the last slot of the current epoch + let (_, last_epoch_target_slot) = get_previous_epoch_last_slot(&self.rpc_client)?; + let next_epoch_target_slot = last_epoch_target_slot + DEFAULT_SLOTS_PER_EPOCH; + if let Some(target_slot) = self.override_target_slot { - return Ok(target_slot); + return Ok((last_epoch_target_slot, target_slot)); } - // Get the last slot of the current epoch - let (_, last_slot) = get_previous_epoch_last_slot(&self.rpc_client)?; - Ok(last_slot + DEFAULT_SLOTS_PER_EPOCH) + Ok((last_epoch_target_slot, next_epoch_target_slot)) } /// Finds the most recent incremental snapshot that's before our target slot fn find_closest_incremental(&self, target_slot: u64) -> Option { let dir_entries = std::fs::read_dir(&self.snapshots_dir).ok()?; - // Find the snapshot that ends closest to but not after target_slot + // Find the snapshot that ends closest to but not after target_slot, in the same epoch dir_entries .filter_map(Result::ok) .filter_map(|entry| SnapshotInfo::from_path(entry.path())) - .filter(|snap| snap.end_slot <= target_slot) + .filter(|snap| { + let before_target_slot = snap.end_slot <= target_slot; + let in_same_epoch = (snap.end_slot / DEFAULT_SLOTS_PER_EPOCH) + == (target_slot / DEFAULT_SLOTS_PER_EPOCH); + before_target_slot && in_same_epoch + }) .max_by_key(|snap| snap.end_slot) .map(|snap| snap.path) } @@ -135,7 +144,7 @@ impl BackupSnapshotMonitor { ); } - log::info!( + log::debug!( "Successfully backed up incremental snapshot ({} bytes)", source_size ); @@ -143,13 +152,34 @@ impl BackupSnapshotMonitor { Ok(()) } + fn evict_all_epoch_snapshots(&self, epoch: u64) -> Result<()> { + let dir_entries = std::fs::read_dir(&self.backup_dir)?; + + // Find all snapshots for the given epoch and remove them + dir_entries + .filter_map(Result::ok) + .filter_map(|entry| SnapshotInfo::from_path(entry.path())) + .filter(|snap| snap.end_slot / DEFAULT_SLOTS_PER_EPOCH == epoch) + .try_for_each(|snapshot| { + log::debug!( + "Removing old snapshot from epoch {} with slot {}: {:?}", + epoch, + snapshot.end_slot, + snapshot.path + ); + std::fs::remove_file(snapshot.path.as_path()) + })?; + + Ok(()) + } + fn evict_same_epoch_incremental(&self, target_slot: u64) -> Result<()> { let slots_per_epoch = DEFAULT_SLOTS_PER_EPOCH; let target_epoch = target_slot / slots_per_epoch; let dir_entries = std::fs::read_dir(&self.backup_dir)?; - // Find the snapshot that ends closest to but not after target_slot + // Find all snapshots for the given epoch let mut same_epoch_snapshots: Vec = dir_entries .filter_map(Result::ok) .filter_map(|entry| SnapshotInfo::from_path(entry.path())) @@ -159,58 +189,87 @@ impl BackupSnapshotMonitor { // Sort by end_slot ascending so we can remove oldest same_epoch_snapshots.sort_by_key(|snap| snap.end_slot); - // Remove oldest snapshot - if let Some(oldest_snapshot) = same_epoch_snapshots.first() { - log::debug!( - "Removing old snapshot from epoch {} with slot {}: {:?}", - target_epoch, - oldest_snapshot.end_slot, - oldest_snapshot.path - ); - std::fs::remove_file(oldest_snapshot.path.as_path())?; + // Remove oldest snapshots if we have more than MAXIMUM_BACKUP_INCREMENTAL_SNAPSHOTS_PER_EPOCH + while same_epoch_snapshots.len() > MAXIMUM_BACKUP_INCREMENTAL_SNAPSHOTS_PER_EPOCH { + if let Some(oldest_snapshot) = same_epoch_snapshots.first() { + log::debug!( + "Removing old snapshot from epoch {} with slot {}: {:?}", + target_epoch, + oldest_snapshot.end_slot, + oldest_snapshot.path + ); + std::fs::remove_file(oldest_snapshot.path.as_path())?; + same_epoch_snapshots.remove(0); + } } Ok(()) } + async fn backup_latest_for_target_slot( + &self, + mut current_backup_path: Option, + target_slot: u64, + ) -> Option { + if let Some(snapshot) = self.find_closest_incremental(target_slot) { + if current_backup_path.as_ref() != Some(&snapshot) { + log::debug!( + "Found new best snapshot for slot {}: {:?}", + target_slot, + snapshot + ); + + if let Err(e) = self.backup_incremental_snapshot(&snapshot).await { + log::error!("Failed to backup snapshot: {}", e); + return current_backup_path; + } + + current_backup_path = Some(snapshot); + + // After saving best snapshot, evict oldest one from same epoch + if let Err(e) = self.evict_same_epoch_incremental(target_slot) { + log::error!("Failed to evict old snapshots: {}", e); + } + } + } + + current_backup_path + } + + /// Runs the snapshot backup process to continually back up the latest incremental snapshot for the previous epoch and the current epoch + /// Keeps at most MAXIMUM_BACKUP_INCREMENTAL_SNAPSHOTS_PER_EPOCH snapshots per epoch in the backup + /// Purges old incremental snapshots in the backup after 2 epochs pub async fn run(&self) -> Result<()> { let mut interval = time::interval(Duration::from_secs(10)); - let mut last_target_slot = None; - let mut last_backup_path = None; + let mut current_target_slot = None; + let mut last_epoch_backup_path = None; + let mut this_epoch_backup_path = None; loop { interval.tick().await; - let target_slot = self.get_target_slot()?; + let (last_epoch_target_slot, this_epoch_target_slot) = self.get_target_slots()?; - // Only search for new snapshot if target slot has changed - if last_target_slot != Some(target_slot) { - log::info!("New target slot: {}", target_slot); - } - - if let Some(snapshot) = self.find_closest_incremental(target_slot) { - if last_backup_path.as_ref() != Some(&snapshot) { - log::debug!( - "Found new best snapshot for slot {}: {:?}", - target_slot, - snapshot - ); - - if let Err(e) = self.backup_incremental_snapshot(&snapshot).await { - log::error!("Failed to backup snapshot: {}", e); - continue; - } - - last_backup_path = Some(snapshot); - - // After saving best snapshot, evict oldest one from same epoch - if let Err(e) = self.evict_same_epoch_incremental(target_slot) { - log::error!("Failed to evict old snapshots: {}", e); - } + // Detect new epoch + if current_target_slot != Some(this_epoch_target_slot) { + log::info!("New target slot: {}", this_epoch_target_slot); + last_epoch_backup_path = this_epoch_backup_path; + this_epoch_backup_path = None; + let current_epoch = this_epoch_target_slot / DEFAULT_SLOTS_PER_EPOCH; + if let Err(e) = self.evict_all_epoch_snapshots(current_epoch - 2) { + log::error!("Failed to evict old snapshots: {}", e); } } - last_target_slot = Some(target_slot); + // Backup latest snapshot for last epoch and this epoch + last_epoch_backup_path = self + .backup_latest_for_target_slot(last_epoch_backup_path, last_epoch_target_slot) + .await; + this_epoch_backup_path = self + .backup_latest_for_target_slot(this_epoch_backup_path, this_epoch_target_slot) + .await; + + current_target_slot = Some(this_epoch_target_slot); } } } diff --git a/tip-router-operator-cli/src/ledger_utils.rs b/tip-router-operator-cli/src/ledger_utils.rs index 8341dafe..9c8586c7 100644 --- a/tip-router-operator-cli/src/ledger_utils.rs +++ b/tip-router-operator-cli/src/ledger_utils.rs @@ -60,7 +60,7 @@ pub fn get_bank_from_ledger( ("status", "error", String), ("state", "load_genesis", String), ("step", 1, i64), - ("error", e.to_string(), String), + ("error", format!("{:?}", e), String), ); panic!("Failed to load genesis config: {}", e); // TODO should panic here? } @@ -195,7 +195,7 @@ pub fn get_bank_from_ledger( ("state", "load_bank_forks", String), ("status", "error", String), ("step", 4, i64), - ("error", e.to_string(), String), + ("error", format!("{:?}", e), String), ("duration_ms", start_time.elapsed().as_millis() as i64, i64), ); panic!("Failed to load bank forks: {}", e); @@ -230,7 +230,7 @@ pub fn get_bank_from_ledger( ("status", "error", String), ("state", "process_blockstore_from_root", String), ("step", 5, i64), - ("error", e.to_string(), String), + ("error", format!("{:?}", e), String), ("duration_ms", start_time.elapsed().as_millis() as i64, i64), ); panic!("Failed to process blockstore from root: {}", e); @@ -268,7 +268,7 @@ pub fn get_bank_from_ledger( ("status", "error", String), ("state", "bank_to_full_snapshot_archive", String), ("step", 6, i64), - ("error", e.to_string(), String), + ("error", format!("{:?}", e), String), ("duration_ms", start_time.elapsed().as_millis() as i64, i64), ); panic!("Failed to create snapshot: {}", e); diff --git a/tip-router-operator-cli/src/lib.rs b/tip-router-operator-cli/src/lib.rs index 27497e09..c03f2236 100644 --- a/tip-router-operator-cli/src/lib.rs +++ b/tip-router-operator-cli/src/lib.rs @@ -29,9 +29,9 @@ use solana_sdk::{account::AccountSharedData, pubkey::Pubkey, slot_history::Slot} #[derive(Debug)] pub enum MerkleRootError { - StakeMetaGeneratorError(&'static str), - MerkleRootGeneratorError(&'static str), - MerkleTreeError(&'static str), + StakeMetaGeneratorError(String), + MerkleRootGeneratorError(String), + MerkleTreeError(String), } // TODO where did these come from? @@ -114,7 +114,9 @@ pub fn get_meta_merkle_root( tip_payment_program_id, snapshots_enabled, ) - .map_err(|_| MerkleRootError::StakeMetaGeneratorError("Failed to generate stake meta"))?; + .map_err(|e| { + MerkleRootError::StakeMetaGeneratorError(format!("Failed to generate stake meta: {:?}", e)) + })?; info!( "Created StakeMetaCollection:\n - epoch: {:?}\n - slot: {:?}\n - num stake metas: {:?}\n - bank_hash: {:?}", @@ -141,7 +143,9 @@ pub fn get_meta_merkle_root( protocol_fee_bps, ) .map_err(|_| { - MerkleRootError::MerkleRootGeneratorError("Failed to generate merkle tree collection") + MerkleRootError::MerkleRootGeneratorError( + "Failed to generate merkle tree collection".to_string(), + ) })?; info!( @@ -206,8 +210,7 @@ pub fn get_meta_merkle_root( merkle_tree_coll, ) .map_err(|e| { - info!("Meta merkle tree creation error: {:?}", e); - MerkleRootError::MerkleTreeError("Failed to create meta merkle tree") + MerkleRootError::MerkleTreeError(format!("Failed to create meta merkle tree: {:?}", e)) })?; info!( diff --git a/tip-router-operator-cli/src/main.rs b/tip-router-operator-cli/src/main.rs index e2d09b42..12ec6437 100644 --- a/tip-router-operator-cli/src/main.rs +++ b/tip-router-operator-cli/src/main.rs @@ -108,7 +108,7 @@ async fn main() -> Result<()> { { error!("Error submitting to NCN: {}", e); } - sleep(Duration::from_secs(60)).await; + sleep(Duration::from_secs(600)).await; } }); @@ -133,6 +133,9 @@ async fn main() -> Result<()> { wait_for_next_epoch(&rpc_client).await?; } + // Track runs that are starting right at the beginning of a new epoch + let mut new_epoch_rollover = start_next_epoch; + loop { // Get the last slot of the previous epoch let (previous_epoch, previous_epoch_slot) = @@ -155,6 +158,7 @@ async fn main() -> Result<()> { &tip_router_program_id, &ncn_address, enable_snapshots, + new_epoch_rollover, &cli, ) .await @@ -170,6 +174,8 @@ async fn main() -> Result<()> { error!("Error waiting for next epoch: {}", e); sleep(Duration::from_secs(60)).await; } + + new_epoch_rollover = true; } } Commands::SnapshotSlot { @@ -192,6 +198,7 @@ async fn main() -> Result<()> { &tip_router_program_id, &ncn_address, enable_snapshots, + false, &cli, ) .await diff --git a/tip-router-operator-cli/src/process_epoch.rs b/tip-router-operator-cli/src/process_epoch.rs index 07f1da2e..de3e531e 100644 --- a/tip-router-operator-cli/src/process_epoch.rs +++ b/tip-router-operator-cli/src/process_epoch.rs @@ -1,4 +1,5 @@ use std::{ + path::PathBuf, str::FromStr, time::{Duration, Instant}, }; @@ -9,8 +10,14 @@ use log::info; use solana_metrics::{datapoint_error, datapoint_info}; use solana_rpc_client::rpc_client::RpcClient; use solana_sdk::pubkey::Pubkey; +use tokio::time; -use crate::{get_meta_merkle_root, tip_router::get_ncn_config, Cli}; +use crate::{ + backup_snapshots::SnapshotInfo, get_meta_merkle_root, tip_router::get_ncn_config, Cli, +}; + +const MAX_WAIT_FOR_INCREMENTAL_SNAPSHOT_TICKS: u64 = 1200; // Experimentally determined +const OPTIMAL_INCREMENTAL_SNAPSHOT_SLOT_RANGE: u64 = 800; // Experimentally determined pub async fn wait_for_next_epoch(rpc_client: &RpcClient) -> Result<()> { let current_epoch = rpc_client.get_epoch_info()?.epoch; @@ -45,6 +52,35 @@ pub fn get_previous_epoch_last_slot(rpc_client: &RpcClient) -> Result<(u64, u64) Ok((previous_epoch, previous_epoch_final_slot)) } +/// Wait for the optimal incremental snapshot to be available to speed up full snapshot generation +/// Automatically returns after MAX_WAIT_FOR_INCREMENTAL_SNAPSHOT_TICKS seconds +pub async fn wait_for_optimal_incremental_snapshot( + incremental_snapshots_dir: PathBuf, + target_slot: u64, +) -> Result<()> { + let mut interval = time::interval(Duration::from_secs(1)); + let mut ticks = 0; + + while ticks < MAX_WAIT_FOR_INCREMENTAL_SNAPSHOT_TICKS { + let dir_entries = std::fs::read_dir(&incremental_snapshots_dir)?; + + for entry in dir_entries { + if let Some(snapshot_info) = SnapshotInfo::from_path(entry?.path()) { + if target_slot - OPTIMAL_INCREMENTAL_SNAPSHOT_SLOT_RANGE < snapshot_info.end_slot + && snapshot_info.end_slot <= target_slot + { + return Ok(()); + } + } + } + + interval.tick().await; + ticks += 1; + } + + Ok(()) +} + #[allow(clippy::too_many_arguments)] pub async fn process_epoch( client: &EllipsisClient, @@ -55,6 +91,7 @@ pub async fn process_epoch( tip_router_program_id: &Pubkey, ncn_address: &Pubkey, snapshots_enabled: bool, + new_epoch_rollover: bool, cli_args: &Cli, ) -> Result<()> { info!("Processing epoch {:?}", target_epoch); @@ -77,6 +114,12 @@ pub async fn process_epoch( let account_paths = account_paths.map_or_else(|| vec![ledger_path.clone()], |paths| paths); let full_snapshots_path = full_snapshots_path.map_or(ledger_path, |path| path); + // Wait for optimal incremental snapshot to be available since they can be delayed in a new epoch + if new_epoch_rollover { + wait_for_optimal_incremental_snapshot(incremental_snapshots_path.clone(), target_slot) + .await?; + } + // Generate merkle root from ledger let meta_merkle_tree = match get_meta_merkle_root( cli_args.ledger_path.as_path(), diff --git a/tip-router-operator-cli/src/stake_meta_generator.rs b/tip-router-operator-cli/src/stake_meta_generator.rs index d977e0c7..8391513f 100644 --- a/tip-router-operator-cli/src/stake_meta_generator.rs +++ b/tip-router-operator-cli/src/stake_meta_generator.rs @@ -63,7 +63,7 @@ pub enum StakeMetaGeneratorError { GenesisConfigError(#[from] OpenGenesisConfigError), - PanicError, + PanicError(String), } impl Display for StakeMetaGeneratorError { @@ -102,8 +102,18 @@ pub fn generate_stake_meta( let bank = match res { Ok(bank) => bank, Err(e) => { - error!("Panicked while creating bank from ledger: {:?}", e); - let error_str = format!("{:?}", e); + let error_str = if let Some(s) = e.downcast_ref::() { + s.to_string() + } else if let Some(s) = e.downcast_ref::<&'static str>() { + s.to_string() + } else { + // If we can't get a string, try to get any Debug implementation + match e.downcast_ref::>() { + Some(debug_val) => format!("{:?}", debug_val), + None => "Unknown panic payload".to_string(), + } + }; + error!("Panicked while creating bank from ledger: {}", error_str); datapoint_error!( "tip_router_cli.get_bank", ("operator", operator_address.to_string(), String), @@ -111,7 +121,7 @@ pub fn generate_stake_meta( ("state", "get_bank_from_ledger", String), ("error", error_str, String), ); - return Err(StakeMetaGeneratorError::PanicError); + return Err(StakeMetaGeneratorError::PanicError(error_str)); } };