Skip to content

Commit

Permalink
Merge branch 'lite-operator' into add-mev-claim
Browse files Browse the repository at this point in the history
  • Loading branch information
tomjohn1028 committed Jan 28, 2025
2 parents 46cff84 + 26dc401 commit c4d38bd
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 66 deletions.
94 changes: 94 additions & 0 deletions telegraf/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,97 @@
# ## Fixed time-window for the available payload size e.g. "5m"
# # rate_limit_period = "0s"


###############################################################################
# INPUT PLUGINS #
###############################################################################


# Read metrics about cpu usage
[[inputs.cpu]]
## Whether to report per-cpu stats or not
percpu = true
## Whether to report total system cpu stats or not
totalcpu = true
## If true, collect raw CPU time metrics
collect_cpu_time = false
## If true, compute and report the sum of all non-idle CPU states
## NOTE: The resulting 'time_active' field INCLUDES 'iowait'!
report_active = false
## If true and the info is available then add core_id and physical_id tags
core_tags = false


# Read metrics about disk usage by mount point
[[inputs.disk]]
## By default stats will be gathered for all mount points.
## Set mount_points will restrict the stats to only the specified mount points.
# mount_points = ["/"]

## Ignore mount points by filesystem type.
ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]

## Ignore mount points by mount options.
## The 'mount' command reports options of all mounts in parathesis.
## Bind mounts can be ignored with the special 'bind' option.
# ignore_mount_opts = []


# Read metrics about disk IO by device
[[inputs.diskio]]
## Devices to collect stats for
## Wildcards are supported except for disk synonyms like '/dev/disk/by-id'.
## ex. devices = ["sda", "sdb", "vd*", "/dev/disk/by-id/nvme-eui.00123deadc0de123"]
# devices = ["*"]

## Skip gathering of the disk's serial numbers.
# skip_serial_number = true

## Device metadata tags to add on systems supporting it (Linux only)
## Use 'udevadm info -q property -n <device>' to get a list of properties.
## Note: Most, but not all, udev properties can be accessed this way. Properties
## that are currently inaccessible include DEVTYPE, DEVNAME, and DEVPATH.
# device_tags = ["ID_FS_TYPE", "ID_FS_USAGE"]

## Using the same metadata source as device_tags, you can also customize the
## name of the device via templates.
## The 'name_templates' parameter is a list of templates to try and apply to
## the device. The template may contain variables in the form of '$PROPERTY' or
## '${PROPERTY}'. The first template which does not contain any variables not
## present for the device is used as the device name tag.
## The typical use case is for LVM volumes, to get the VG/LV name instead of
## the near-meaningless DM-0 name.
# name_templates = ["$ID_FS_LABEL","$DM_VG_NAME/$DM_LV_NAME"]


# Plugin to collect various Linux kernel statistics.
# This plugin ONLY supports Linux
[[inputs.kernel]]
## Additional gather options
## Possible options include:
## * ksm - kernel same-page merging
## * psi - pressure stall information
# collect = []


# Read metrics about memory usage
[[inputs.mem]]
# no configuration


# Get the number of processes and group them by status
# This plugin ONLY supports non-Windows
[[inputs.processes]]
## Use sudo to run ps command on *BSD systems. Linux systems will read
## /proc, so this does not apply there.
# use_sudo = false


# Read metrics about swap memory usage
[[inputs.swap]]
# no configuration


# Read metrics about system load & uptime
[[inputs.system]]
# no configuration
157 changes: 108 additions & 49 deletions tip-router-operator-cli/src/backup_snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ use tokio::time;

use crate::process_epoch::get_previous_epoch_last_slot;

const MAXIMUM_BACKUP_INCREMENTAL_SNAPSHOTS_PER_EPOCH: usize = 3;

/// Represents a parsed incremental snapshot filename
#[derive(Debug)]
struct SnapshotInfo {
pub struct SnapshotInfo {
path: PathBuf,
_start_slot: u64,
end_slot: u64,
pub end_slot: u64,
}

impl SnapshotInfo {
/// Try to parse a snapshot filename into slot information
fn from_path(path: PathBuf) -> Option<Self> {
pub fn from_path(path: PathBuf) -> Option<Self> {
let file_name = path.file_name()?.to_str()?;

// Only try to parse if it's an incremental snapshot
Expand Down Expand Up @@ -67,25 +69,32 @@ impl BackupSnapshotMonitor {
}

/// Gets target slot for current epoch
fn get_target_slot(&self) -> Result<u64> {
fn get_target_slots(&self) -> Result<(u64, u64)> {
// Get the last slot of the current epoch
let (_, last_epoch_target_slot) = get_previous_epoch_last_slot(&self.rpc_client)?;
let next_epoch_target_slot = last_epoch_target_slot + DEFAULT_SLOTS_PER_EPOCH;

if let Some(target_slot) = self.override_target_slot {
return Ok(target_slot);
return Ok((last_epoch_target_slot, target_slot));
}

// Get the last slot of the current epoch
let (_, last_slot) = get_previous_epoch_last_slot(&self.rpc_client)?;
Ok(last_slot + DEFAULT_SLOTS_PER_EPOCH)
Ok((last_epoch_target_slot, next_epoch_target_slot))
}

/// Finds the most recent incremental snapshot that's before our target slot
fn find_closest_incremental(&self, target_slot: u64) -> Option<PathBuf> {
let dir_entries = std::fs::read_dir(&self.snapshots_dir).ok()?;

// Find the snapshot that ends closest to but not after target_slot
// Find the snapshot that ends closest to but not after target_slot, in the same epoch
dir_entries
.filter_map(Result::ok)
.filter_map(|entry| SnapshotInfo::from_path(entry.path()))
.filter(|snap| snap.end_slot <= target_slot)
.filter(|snap| {
let before_target_slot = snap.end_slot <= target_slot;
let in_same_epoch = (snap.end_slot / DEFAULT_SLOTS_PER_EPOCH)
== (target_slot / DEFAULT_SLOTS_PER_EPOCH);
before_target_slot && in_same_epoch
})
.max_by_key(|snap| snap.end_slot)
.map(|snap| snap.path)
}
Expand Down Expand Up @@ -135,21 +144,42 @@ impl BackupSnapshotMonitor {
);
}

log::info!(
log::debug!(
"Successfully backed up incremental snapshot ({} bytes)",
source_size
);

Ok(())
}

fn evict_all_epoch_snapshots(&self, epoch: u64) -> Result<()> {
let dir_entries = std::fs::read_dir(&self.backup_dir)?;

// Find all snapshots for the given epoch and remove them
dir_entries
.filter_map(Result::ok)
.filter_map(|entry| SnapshotInfo::from_path(entry.path()))
.filter(|snap| snap.end_slot / DEFAULT_SLOTS_PER_EPOCH == epoch)
.try_for_each(|snapshot| {
log::debug!(
"Removing old snapshot from epoch {} with slot {}: {:?}",
epoch,
snapshot.end_slot,
snapshot.path
);
std::fs::remove_file(snapshot.path.as_path())
})?;

Ok(())
}

fn evict_same_epoch_incremental(&self, target_slot: u64) -> Result<()> {
let slots_per_epoch = DEFAULT_SLOTS_PER_EPOCH;
let target_epoch = target_slot / slots_per_epoch;

let dir_entries = std::fs::read_dir(&self.backup_dir)?;

// Find the snapshot that ends closest to but not after target_slot
// Find all snapshots for the given epoch
let mut same_epoch_snapshots: Vec<SnapshotInfo> = dir_entries
.filter_map(Result::ok)
.filter_map(|entry| SnapshotInfo::from_path(entry.path()))
Expand All @@ -159,58 +189,87 @@ impl BackupSnapshotMonitor {
// Sort by end_slot ascending so we can remove oldest
same_epoch_snapshots.sort_by_key(|snap| snap.end_slot);

// Remove oldest snapshot
if let Some(oldest_snapshot) = same_epoch_snapshots.first() {
log::debug!(
"Removing old snapshot from epoch {} with slot {}: {:?}",
target_epoch,
oldest_snapshot.end_slot,
oldest_snapshot.path
);
std::fs::remove_file(oldest_snapshot.path.as_path())?;
// Remove oldest snapshots if we have more than MAXIMUM_BACKUP_INCREMENTAL_SNAPSHOTS_PER_EPOCH
while same_epoch_snapshots.len() > MAXIMUM_BACKUP_INCREMENTAL_SNAPSHOTS_PER_EPOCH {
if let Some(oldest_snapshot) = same_epoch_snapshots.first() {
log::debug!(
"Removing old snapshot from epoch {} with slot {}: {:?}",
target_epoch,
oldest_snapshot.end_slot,
oldest_snapshot.path
);
std::fs::remove_file(oldest_snapshot.path.as_path())?;
same_epoch_snapshots.remove(0);
}
}

Ok(())
}

async fn backup_latest_for_target_slot(
&self,
mut current_backup_path: Option<PathBuf>,
target_slot: u64,
) -> Option<PathBuf> {
if let Some(snapshot) = self.find_closest_incremental(target_slot) {
if current_backup_path.as_ref() != Some(&snapshot) {
log::debug!(
"Found new best snapshot for slot {}: {:?}",
target_slot,
snapshot
);

if let Err(e) = self.backup_incremental_snapshot(&snapshot).await {
log::error!("Failed to backup snapshot: {}", e);
return current_backup_path;
}

current_backup_path = Some(snapshot);

// After saving best snapshot, evict oldest one from same epoch
if let Err(e) = self.evict_same_epoch_incremental(target_slot) {
log::error!("Failed to evict old snapshots: {}", e);
}
}
}

current_backup_path
}

/// Runs the snapshot backup process to continually back up the latest incremental snapshot for the previous epoch and the current epoch
/// Keeps at most MAXIMUM_BACKUP_INCREMENTAL_SNAPSHOTS_PER_EPOCH snapshots per epoch in the backup
/// Purges old incremental snapshots in the backup after 2 epochs
pub async fn run(&self) -> Result<()> {
let mut interval = time::interval(Duration::from_secs(10));
let mut last_target_slot = None;
let mut last_backup_path = None;
let mut current_target_slot = None;
let mut last_epoch_backup_path = None;
let mut this_epoch_backup_path = None;

loop {
interval.tick().await;

let target_slot = self.get_target_slot()?;
let (last_epoch_target_slot, this_epoch_target_slot) = self.get_target_slots()?;

// Only search for new snapshot if target slot has changed
if last_target_slot != Some(target_slot) {
log::info!("New target slot: {}", target_slot);
}

if let Some(snapshot) = self.find_closest_incremental(target_slot) {
if last_backup_path.as_ref() != Some(&snapshot) {
log::debug!(
"Found new best snapshot for slot {}: {:?}",
target_slot,
snapshot
);

if let Err(e) = self.backup_incremental_snapshot(&snapshot).await {
log::error!("Failed to backup snapshot: {}", e);
continue;
}

last_backup_path = Some(snapshot);

// After saving best snapshot, evict oldest one from same epoch
if let Err(e) = self.evict_same_epoch_incremental(target_slot) {
log::error!("Failed to evict old snapshots: {}", e);
}
// Detect new epoch
if current_target_slot != Some(this_epoch_target_slot) {
log::info!("New target slot: {}", this_epoch_target_slot);
last_epoch_backup_path = this_epoch_backup_path;
this_epoch_backup_path = None;
let current_epoch = this_epoch_target_slot / DEFAULT_SLOTS_PER_EPOCH;
if let Err(e) = self.evict_all_epoch_snapshots(current_epoch - 2) {
log::error!("Failed to evict old snapshots: {}", e);
}
}

last_target_slot = Some(target_slot);
// Backup latest snapshot for last epoch and this epoch
last_epoch_backup_path = self
.backup_latest_for_target_slot(last_epoch_backup_path, last_epoch_target_slot)
.await;
this_epoch_backup_path = self
.backup_latest_for_target_slot(this_epoch_backup_path, this_epoch_target_slot)
.await;

current_target_slot = Some(this_epoch_target_slot);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions tip-router-operator-cli/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn get_bank_from_ledger(
("status", "error", String),
("state", "load_genesis", String),
("step", 1, i64),
("error", e.to_string(), String),
("error", format!("{:?}", e), String),
);
panic!("Failed to load genesis config: {}", e); // TODO should panic here?
}
Expand Down Expand Up @@ -195,7 +195,7 @@ pub fn get_bank_from_ledger(
("state", "load_bank_forks", String),
("status", "error", String),
("step", 4, i64),
("error", e.to_string(), String),
("error", format!("{:?}", e), String),
("duration_ms", start_time.elapsed().as_millis() as i64, i64),
);
panic!("Failed to load bank forks: {}", e);
Expand Down Expand Up @@ -230,7 +230,7 @@ pub fn get_bank_from_ledger(
("status", "error", String),
("state", "process_blockstore_from_root", String),
("step", 5, i64),
("error", e.to_string(), String),
("error", format!("{:?}", e), String),
("duration_ms", start_time.elapsed().as_millis() as i64, i64),
);
panic!("Failed to process blockstore from root: {}", e);
Expand Down Expand Up @@ -268,7 +268,7 @@ pub fn get_bank_from_ledger(
("status", "error", String),
("state", "bank_to_full_snapshot_archive", String),
("step", 6, i64),
("error", e.to_string(), String),
("error", format!("{:?}", e), String),
("duration_ms", start_time.elapsed().as_millis() as i64, i64),
);
panic!("Failed to create snapshot: {}", e);
Expand Down
Loading

0 comments on commit c4d38bd

Please sign in to comment.