Skip to content

Offload older epoch start beacon states to storage #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ binary_utils = { workspace = true }
bls = { workspace = true }
clock = { workspace = true }
criterion = { workspace = true }
database = { workspace = true }
easy-ext = { workspace = true }
eth2_cache_utils = { workspace = true }
eth2_libp2p = { workspace = true }
Expand Down
19 changes: 16 additions & 3 deletions benches/benches/fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use allocator as _;
use anyhow::Result;
use clock::Tick;
use criterion::{BatchSize, Criterion, Throughput};
use database::Database;
use easy_ext::ext;
use eth2_cache_utils::holesky::{self, CAPELLA_BEACON_STATE};
use execution_engine::NullExecutionEngine;
use fork_choice_control::{Storage, StorageMode, DEFAULT_ARCHIVAL_EPOCH_INTERVAL};
use fork_choice_store::{
ApplyBlockChanges, ApplyTickChanges, AttestationAction, AttestationItem, AttestationOrigin,
BlockAction, DataAvailabilityPolicy, Store, StoreConfig, ValidAttestation,
Expand Down Expand Up @@ -63,11 +65,19 @@ impl Criterion {
.into_iter()
.exactly_one()?;

let storage = Arc::new(Storage::new(
config.clone_arc(),
Database::in_memory(),
DEFAULT_ARCHIVAL_EPOCH_INTERVAL,
StorageMode::Standard,
));

let mut store = Store::new(
config.clone_arc(),
StoreConfig::default(),
anchor_block,
anchor_state,
storage,
false,
false,
);
Expand Down Expand Up @@ -138,7 +148,7 @@ impl Criterion {
}
}

fn process_slot(store: &mut Store<impl Preset>, slot: Slot) -> Result<()> {
fn process_slot<P: Preset>(store: &mut Store<P, Storage<P>>, slot: Slot) -> Result<()> {
let Some(changes) = store.apply_tick(Tick::start_of_slot(slot))? else {
panic!("tick at slot {slot} should be later than the current one")
};
Expand All @@ -150,7 +160,10 @@ fn process_slot(store: &mut Store<impl Preset>, slot: Slot) -> Result<()> {
Ok(())
}

fn process_block<P: Preset>(store: &mut Store<P>, block: &Arc<SignedBeaconBlock<P>>) -> Result<()> {
fn process_block<P: Preset>(
store: &mut Store<P, Storage<P>>,
block: &Arc<SignedBeaconBlock<P>>,
) -> Result<()> {
let slot = block.message().slot();

let block_action = store.validate_block(
Expand Down Expand Up @@ -193,7 +206,7 @@ fn process_block<P: Preset>(store: &mut Store<P>, block: &Arc<SignedBeaconBlock<
}

fn process_attestation<P: Preset>(
store: &mut Store<P>,
store: &mut Store<P, Storage<P>>,
attestation: Arc<Attestation<P>>,
) -> Result<()> {
let slot = attestation.data().slot;
Expand Down
6 changes: 4 additions & 2 deletions fork_choice_control/src/block_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use types::{
traits::{BeaconBlock as _, SignedBeaconBlock as _},
};

use crate::Storage;

#[derive(Constructor)]
pub struct BlockProcessor<P: Preset> {
chain_config: Arc<ChainConfig>,
Expand Down Expand Up @@ -159,7 +161,7 @@ impl<P: Preset> BlockProcessor<P> {

pub fn validate_block_for_gossip(
&self,
store: &Store<P>,
store: &Store<P, Storage<P>>,
block: &Arc<SignedBeaconBlock<P>>,
) -> Result<Option<BlockAction<P>>> {
store.validate_block_for_gossip(block, |parent| {
Expand All @@ -179,7 +181,7 @@ impl<P: Preset> BlockProcessor<P> {

pub fn validate_block<E: ExecutionEngine<P> + Send>(
&self,
store: &Store<P>,
store: &Store<P, Storage<P>>,
block: &Arc<SignedBeaconBlock<P>>,
state_root_policy: StateRootPolicy,
data_availability_policy: DataAvailabilityPolicy,
Expand Down
7 changes: 4 additions & 3 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::{

pub struct Controller<P: Preset, E, A, W: Wait> {
// The latest consistent snapshot of the store.
store_snapshot: Arc<ArcSwap<Store<P>>>,
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
block_processor: Arc<BlockProcessor<P>>,
execution_engine: E,
state_cache: Arc<StateCacheProcessor<P>>,
Expand Down Expand Up @@ -115,6 +115,7 @@ where
store_config,
anchor_block,
anchor_state,
storage.clone_arc(),
finished_initial_forward_sync,
finished_back_sync,
);
Expand Down Expand Up @@ -551,11 +552,11 @@ where
self.store_snapshot().store_config()
}

pub(crate) fn store_snapshot(&self) -> Guard<Arc<Store<P>>> {
pub(crate) fn store_snapshot(&self) -> Guard<Arc<Store<P, Storage<P>>>> {
self.store_snapshot.load()
}

pub(crate) fn owned_store_snapshot(&self) -> Arc<Store<P>> {
pub(crate) fn owned_store_snapshot(&self) -> Arc<Store<P, Storage<P>>> {
self.store_snapshot.load_full()
}

Expand Down
3 changes: 0 additions & 3 deletions fork_choice_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
//! - Notifying other components of the application about changes to the fork choice store.
//! - Testing.
//!
//! This crate exists primarily to separate [`fork_choice_store`] from persistence.
//! [`fork_choice_store`] should never depend on [`storage`] or any other databases.
//!
//! [`storage`]: ::storage

pub use crate::{
Expand Down
41 changes: 35 additions & 6 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ use crate::{

#[expect(clippy::struct_field_names)]
pub struct Mutator<P: Preset, E, W, TS, PS, LS, NS, SS, VS> {
store: Arc<Store<P>>,
store_snapshot: Arc<ArcSwap<Store<P>>>,
store: Arc<Store<P, Storage<P>>>,
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
state_cache: Arc<StateCacheProcessor<P>>,
block_processor: Arc<BlockProcessor<P>>,
event_channels: Arc<EventChannels>,
Expand Down Expand Up @@ -138,7 +138,7 @@ where
{
#[expect(clippy::too_many_arguments)]
pub fn new(
store_snapshot: Arc<ArcSwap<Store<P>>>,
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
state_cache: Arc<StateCacheProcessor<P>>,
block_processor: Arc<BlockProcessor<P>>,
event_channels: Arc<EventChannels>,
Expand Down Expand Up @@ -1487,8 +1487,37 @@ where
if misc::is_epoch_start::<P>(block.message().slot()) {
info!("unloading old beacon states (head slot: {head_slot})");

self.store_mut()
let unloaded = self
.store_mut()
.unload_old_states(unfinalized_states_in_memory);

let store = self.owned_store();
let storage = self.storage.clone_arc();
let wait_group = wait_group.clone();

Builder::new()
.name("store-unloader".to_owned())
.spawn(move || {
debug!("persisting unloaded old beacon states…");

let states_with_block_roots = unloaded
.iter()
.map(|chain_link| (chain_link.state(&store), chain_link.block_root));

match storage.append_states(states_with_block_roots) {
Ok(slots) => {
debug!(
"unloaded old beacon states persisted \
(state slots: {slots:?})",
)
}
Err(error) => {
error!("persisting unloaded old beacon states to storage failed: {error:?}")
}
}

drop(wait_group);
})?;
}

let processing_duration = insertion_time.duration_since(submission_time);
Expand Down Expand Up @@ -2452,7 +2481,7 @@ where
self.thread_pool.spawn(task);
}

fn store_mut(&mut self) -> &mut Store<P> {
fn store_mut(&mut self) -> &mut Store<P, Storage<P>> {
self.store.make_mut()
}

Expand All @@ -2461,7 +2490,7 @@ where
// faster to clone a `Store` with all the `Arc`s inside it and allocate another `Arc`.
//
// As a result, this method should only be called when `Mutator.store` is in a consistent state.
fn owned_store(&self) -> Arc<Store<P>> {
fn owned_store(&self) -> Arc<Store<P, Storage<P>>> {
self.store.clone_arc()
}

Expand Down
2 changes: 1 addition & 1 deletion fork_choice_control/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ pub struct BlockWithRoot<P: Preset> {
pub struct Snapshot<'storage, P: Preset> {
// Use a `Guard` instead of an owned snapshot unlike in tasks based on the intuition that
// `Snapshot`s will be less common than tasks.
store_snapshot: Guard<Arc<Store<P>>>,
store_snapshot: Guard<Arc<Store<P, Storage<P>>>>,
state_cache: Arc<StateCacheProcessor<P>>,
storage: &'storage Storage<P>,
}
Expand Down
42 changes: 34 additions & 8 deletions fork_choice_control/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,26 @@ pub enum StateLoadStrategy<P: Preset> {
}

#[expect(clippy::struct_field_names)]
#[derive(Clone)]
pub struct Storage<P> {
config: Arc<Config>,
pub(crate) database: Database,
pub(crate) database: Arc<Database>,
pub(crate) archival_epoch_interval: NonZeroU64,
storage_mode: StorageMode,
phantom: PhantomData<P>,
}

impl<P: Preset> Storage<P> {
#[must_use]
pub const fn new(
pub fn new(
config: Arc<Config>,
database: Database,
archival_epoch_interval: NonZeroU64,
storage_mode: StorageMode,
) -> Self {
Self {
config,
database,
database: Arc::new(database),
archival_epoch_interval,
storage_mode,
phantom: PhantomData,
Expand Down Expand Up @@ -232,7 +233,7 @@ impl<P: Preset> Storage<P> {
&self,
unfinalized: impl Iterator<Item = &'cl ChainLink<P>>,
finalized: impl DoubleEndedIterator<Item = &'cl ChainLink<P>>,
store: &Store<P>,
store: &Store<P, Self>,
) -> Result<AppendedBlockSlots> {
let mut slots = AppendedBlockSlots::default();
let mut store_head_slot = 0;
Expand Down Expand Up @@ -311,7 +312,7 @@ impl<P: Preset> Storage<P> {
}
}

if !(archival_state_appended || self.prune_storage_enabled()) {
if !archival_state_appended && !self.prune_storage_enabled() {
let state_epoch = Self::epoch_at_slot(state_slot);
let append_state = misc::is_epoch_start::<P>(state_slot)
&& state_epoch.is_multiple_of(self.archival_epoch_interval);
Expand Down Expand Up @@ -367,6 +368,25 @@ impl<P: Preset> Storage<P> {
Ok(persisted_blob_ids)
}

pub(crate) fn append_states(
&self,
states_with_block_roots: impl Iterator<Item = (Arc<BeaconState<P>>, H256)>,
) -> Result<Vec<Slot>> {
let mut slots = vec![];
let mut batch = vec![];

for (state, block_root) in states_with_block_roots {
if !self.contains_key(StateByBlockRoot(block_root))? {
slots.push(state.slot());
batch.push(serialize(StateByBlockRoot(block_root), state)?);
}
}

self.database.put_batch(batch)?;

Ok(slots)
}

pub(crate) fn blob_sidecar_by_id(
&self,
blob_id: BlobIdentifier,
Expand Down Expand Up @@ -484,7 +504,7 @@ impl<P: Preset> Storage<P> {
Ok(None)
}

pub(crate) fn genesis_block_root(&self, store: &Store<P>) -> Result<H256> {
pub(crate) fn genesis_block_root(&self, store: &Store<P, Self>) -> Result<H256> {
self.block_root_by_slot_with_store(store, GENESIS_SLOT)?
.ok_or(Error::GenesisBlockRootNotFound)
.map_err(Into::into)
Expand Down Expand Up @@ -527,7 +547,7 @@ impl<P: Preset> Storage<P> {
// Like `block_root_by_slot`, but looks for the root in `store` first.
pub(crate) fn block_root_by_slot_with_store(
&self,
store: &Store<P>,
store: &Store<P, Self>,
slot: Slot,
) -> Result<Option<H256>> {
if let Some(chain_link) = store.chain_link_before_or_at(slot) {
Expand Down Expand Up @@ -645,7 +665,7 @@ impl<P: Preset> Storage<P> {

pub(crate) fn dependent_root(
&self,
store: &Store<P>,
store: &Store<P, Self>,
state: &BeaconState<P>,
epoch: Epoch,
) -> Result<H256> {
Expand Down Expand Up @@ -887,6 +907,12 @@ impl<P: Preset> Storage<P> {
}
}

impl<P: Preset> fork_choice_store::Storage<P> for Storage<P> {
fn stored_state_by_block_root(&self, block_root: H256) -> Result<Option<Arc<BeaconState<P>>>> {
self.state_by_block_root(block_root)
}
}

#[derive(Default, Debug)]
pub struct AppendedBlockSlots {
pub finalized: Vec<Slot>,
Expand Down
Loading
Loading