Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/storage/src/versioned/ephemeral_v1/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ pub fn entry_count_and_size(content_type: &ContentType) -> String {
table_name(content_type)
)
}

pub fn purge_by_slot(content_type: &ContentType) -> String {
format!(
"DELETE FROM {}
WHERE slot < :slot",
table_name(content_type)
)
}
232 changes: 229 additions & 3 deletions crates/storage/src/versioned/ephemeral_v1/store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use std::marker::PhantomData;
use std::{
marker::PhantomData,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use ethportal_api::{OverlayContentKey, RawContentValue};
use alloy::eips::merge::EPOCH_SLOTS;
use ethportal_api::{jsonrpsee::tokio, OverlayContentKey, RawContentValue};
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{named_params, types::Type, OptionalExtension};
use tokio::task::JoinHandle;
use tracing::{debug, warn};
use trin_metrics::storage::StorageMetricsReporter;

Expand All @@ -15,6 +21,9 @@ use crate::{
ContentId,
};

pub const BEACON_GENESIS_TIME: u64 = 1606824023;
pub const SLOTS_PER_HISTORICAL_ROOT: u64 = 8192;

/// The store for storing ephemeral headers, bodies, and receipts.
#[allow(unused)]
#[derive(Debug)]
Expand All @@ -27,6 +36,8 @@ pub struct EphemeralV1Store<TContentKey: OverlayContentKey> {
metrics: StorageMetricsReporter,
/// Phantom Content Key
_phantom_content_key: PhantomData<TContentKey>,
/// Background task handle for periodic purging
background_purge_task: Option<JoinHandle<()>>,
}

impl<TContentKey: OverlayContentKey> VersionedContentStore for EphemeralV1Store<TContentKey> {
Expand Down Expand Up @@ -59,6 +70,7 @@ impl<TContentKey: OverlayContentKey> VersionedContentStore for EphemeralV1Store<
metrics: StorageMetricsReporter::new(subnetwork),
_phantom_content_key: PhantomData,
config,
background_purge_task: None,
};
store.init()?;
Ok(store)
Expand All @@ -71,13 +83,61 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
fn init(&mut self) -> Result<(), ContentStoreError> {
self.init_usage_stats()?;

// TODO: Prune if necessary.
// Purge content based on the last historical summaries update slot
let rows_deleted =
Self::purge_content_before_last_summary_internal(&Arc::new(self.config.clone()))?;

if rows_deleted > 0 {
debug!(
"Purged {} ephemeral content with during initialization",
rows_deleted
);
}

Ok(())
}

// PUBLIC FUNCTIONS

/// Starts the background task for periodic purging.
/// This can be called explicitly after initialization if needed.
pub fn start_background_purge_task(&mut self) -> Result<(), ContentStoreError> {
let config = Arc::new(self.config.clone());

let handle = tokio::spawn(async move {
// Run purge immediately when task starts
if let Err(e) = Self::purge_content_before_last_summary_internal(&config) {
warn!("Error purging content in background task: {}", e);
}

let mut interval = tokio::time::interval(Duration::from_secs(12 * EPOCH_SLOTS)); // One epoch duration
loop {
interval.tick().await;

// Check if we're at a historical summaries update boundary
let current_epoch = Self::expected_current_epoch();
let next_epoch = current_epoch + 1;
let period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS;

if next_epoch % period == 0 {
if let Err(e) = Self::purge_content_before_last_summary_internal(&config) {
warn!("Error purging content in background task: {}", e);
}
}
}
});

self.background_purge_task = Some(handle);
Ok(())
}

/// Stops the background purge task if it's running.
pub fn stop_background_purge_task(&mut self) {
if let Some(handle) = self.background_purge_task.take() {
handle.abort();
}
}

/// Returns whether data associated with the content id is already stored.
pub fn has_content(&self, content_id: &ContentId) -> Result<bool, ContentStoreError> {
let timer = self.metrics.start_process_timer("has_content");
Expand Down Expand Up @@ -225,6 +285,15 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
self.metrics.get_summary()
}

/// Manually triggers a purge of content before the last historical summary.
/// This can be used to manually control when content is purged, independent of the background
/// task.
///
/// Returns the number of rows deleted.
pub fn trigger_content_purge(&self) -> Result<usize, ContentStoreError> {
Self::purge_content_before_last_summary_internal(&Arc::new(self.config.clone()))
}

// INTERNAL FUNCTIONS

/// Lookup and set `usage_stats`.
Expand Down Expand Up @@ -263,6 +332,61 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
) -> u64 {
(raw_content_id.len() + raw_content_key.len() + raw_content_value.len()) as u64
}

fn expected_current_epoch() -> u64 {
let now = SystemTime::now();
let now = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
let since_genesis = now - Duration::from_secs(BEACON_GENESIS_TIME);

since_genesis.as_secs() / 12 / EPOCH_SLOTS
}

/// Internal method to purge content, used by both the main thread and background task
fn purge_content_before_last_summary_internal(
config: &Arc<EphemeralV1StoreConfig>,
) -> Result<usize, ContentStoreError> {
let current_epoch = Self::expected_current_epoch();
let cutoff_slot = Self::last_summaries_slot(current_epoch);

let conn = config.sql_connection_pool.get()?;
let query = sql::purge_by_slot(&config.content_type);

let rows_deleted = conn.execute(&query, named_params! { ":slot": cutoff_slot })?;
Ok(rows_deleted)
}

/// Computes the slot at which the last historical summary event occurred.
/// Historical summary events are appended when the next epoch is a multiple
/// of `period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS`.
///
/// If the current_epoch is less than the first event boundary (and assuming a genesis event
/// at epoch 0), then this function returns 0.
fn last_summaries_slot(current_epoch: u64) -> u64 {
// Calculate the period (in epochs) at which events are appended.
let period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS;
// Compute candidate event epoch:
// This candidate is based on (current_epoch + 1) because events are appended
// when transitioning to the next epoch.
let candidate = ((current_epoch + 1) / period) * period;
// If candidate is greater than current_epoch, then that event is in the future,
// so the last event occurred one period earlier.
let last_summaries_epoch = if candidate > current_epoch {
candidate.saturating_sub(period)
} else {
candidate
};

last_summaries_epoch * EPOCH_SLOTS
}
}

impl<TContentKey: OverlayContentKey> Drop for EphemeralV1Store<TContentKey> {
fn drop(&mut self) {
// Cancel the background task when the store is dropped
if let Some(handle) = self.background_purge_task.take() {
handle.abort();
}
}
}

/// Creates table and indexes if they don't already exist.
Expand All @@ -280,6 +404,7 @@ mod tests {
use anyhow::Result;
use ethportal_api::{types::network::Subnetwork, IdentityContentKey};
use tempfile::TempDir;
use tokio::time::{sleep, Duration};

use super::*;
use crate::{test_utils::generate_random_bytes, utils::setup_sql};
Expand Down Expand Up @@ -451,4 +576,105 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_background_purge_task() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = create_config(&temp_dir);

// Create store without starting background task
let mut store = EphemeralV1Store::<IdentityContentKey>::create(
ContentType::HistoryEphemeral,
config.clone(),
)?;

// Verify background task is not running initially
assert!(store.background_purge_task.is_none());

// Insert test data with slots before and after the cutoff
let current_epoch = EphemeralV1Store::<IdentityContentKey>::expected_current_epoch();
let cutoff_slot =
EphemeralV1Store::<IdentityContentKey>::last_summaries_slot(current_epoch);

let (key1, value1) = generate_key_value();
let (key2, value2) = generate_key_value();
let (key3, value3) = generate_key_value();

// Insert data with slots before cutoff
store.insert(&key1, value1, 0, cutoff_slot.saturating_sub(100))?;
store.insert(&key2, value2, 0, cutoff_slot.saturating_sub(50))?;

// Insert data with slot after cutoff
store.insert(&key3, value3, 0, cutoff_slot + 100)?;

// Verify data is present before starting background task
assert!(store.has_content(&ContentId::from(key1.content_id()))?);
assert!(store.has_content(&ContentId::from(key2.content_id()))?);
assert!(store.has_content(&ContentId::from(key3.content_id()))?);

// Start the background task
store.start_background_purge_task()?;
// Wait for the background task to run and purge data
sleep(Duration::from_secs(1)).await;
assert!(store.background_purge_task.is_some());

// Verify that content before cutoff was purged
assert!(
!store.has_content(&ContentId::from(key1.content_id()))?,
"key1 should be purged"
);
assert!(
!store.has_content(&ContentId::from(key2.content_id()))?,
"key2 should be purged"
);
assert!(
store.has_content(&ContentId::from(key3.content_id()))?,
"key3 should not be purged"
);

// Stop the background task
store.stop_background_purge_task();
assert!(store.background_purge_task.is_none());

Ok(())
}

#[test]
fn test_purge_content_during_init() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = create_config(&temp_dir);

// Create and populate store with test data
let mut store = EphemeralV1Store::<IdentityContentKey>::create(
ContentType::HistoryEphemeral,
config.clone(),
)?;

// Insert test data with slots before and after the cutoff
let current_epoch = EphemeralV1Store::<IdentityContentKey>::expected_current_epoch();
let cutoff_slot =
EphemeralV1Store::<IdentityContentKey>::last_summaries_slot(current_epoch);

let (key1, value1) = generate_key_value();
let (key2, value2) = generate_key_value();
let (key3, value3) = generate_key_value();

// Insert data with slots before cutoff
store.insert(&key1, value1, 0, cutoff_slot.saturating_sub(100))?;
store.insert(&key2, value2, 0, cutoff_slot.saturating_sub(50))?;

// Insert data with slot after cutoff
store.insert(&key3, value3, 0, cutoff_slot + 100)?;

// Create a new store instance to trigger init and purge
let new_store =
EphemeralV1Store::<IdentityContentKey>::create(ContentType::HistoryEphemeral, config)?;

// Verify that content before cutoff was purged
assert!(!new_store.has_content(&ContentId::from(key1.content_id()))?);
assert!(!new_store.has_content(&ContentId::from(key2.content_id()))?);
assert!(new_store.has_content(&ContentId::from(key3.content_id()))?);

Ok(())
}
}