Skip to content

Commit 57c8876

Browse files
committed
Offload older epoch start and checkpoint beacon states to storage
1 parent 7e161fc commit 57c8876

File tree

14 files changed

+274
-112
lines changed

14 files changed

+274
-112
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benches/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ binary_utils = { workspace = true }
1313
bls = { workspace = true }
1414
clock = { workspace = true }
1515
criterion = { workspace = true }
16+
database = { workspace = true }
1617
easy-ext = { workspace = true }
1718
eth2_cache_utils = { workspace = true }
1819
eth2_libp2p = { workspace = true }

benches/benches/fork_choice_store.rs

+16-3
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ use allocator as _;
1111
use anyhow::Result;
1212
use clock::Tick;
1313
use criterion::{BatchSize, Criterion, Throughput};
14+
use database::Database;
1415
use easy_ext::ext;
1516
use eth2_cache_utils::holesky::{self, CAPELLA_BEACON_STATE};
1617
use execution_engine::NullExecutionEngine;
18+
use fork_choice_control::{Storage, StorageMode, DEFAULT_ARCHIVAL_EPOCH_INTERVAL};
1719
use fork_choice_store::{
1820
ApplyBlockChanges, ApplyTickChanges, AttestationAction, AttestationItem, AttestationOrigin,
1921
BlockAction, Store, StoreConfig, ValidAttestation,
@@ -63,11 +65,19 @@ impl Criterion {
6365
.into_iter()
6466
.exactly_one()?;
6567

68+
let storage = Arc::new(Storage::new(
69+
config.clone_arc(),
70+
Database::in_memory(),
71+
DEFAULT_ARCHIVAL_EPOCH_INTERVAL,
72+
StorageMode::Standard,
73+
));
74+
6675
let mut store = Store::new(
6776
config.clone_arc(),
6877
StoreConfig::default(),
6978
anchor_block,
7079
anchor_state,
80+
storage,
7181
false,
7282
false,
7383
);
@@ -138,7 +148,7 @@ impl Criterion {
138148
}
139149
}
140150

141-
fn process_slot(store: &mut Store<impl Preset>, slot: Slot) -> Result<()> {
151+
fn process_slot<P: Preset>(store: &mut Store<P, Storage<P>>, slot: Slot) -> Result<()> {
142152
let Some(changes) = store.apply_tick(Tick::start_of_slot(slot))? else {
143153
panic!("tick at slot {slot} should be later than the current one")
144154
};
@@ -150,7 +160,10 @@ fn process_slot(store: &mut Store<impl Preset>, slot: Slot) -> Result<()> {
150160
Ok(())
151161
}
152162

153-
fn process_block<P: Preset>(store: &mut Store<P>, block: &Arc<SignedBeaconBlock<P>>) -> Result<()> {
163+
fn process_block<P: Preset>(
164+
store: &mut Store<P, Storage<P>>,
165+
block: &Arc<SignedBeaconBlock<P>>,
166+
) -> Result<()> {
154167
let slot = block.message().slot();
155168

156169
let block_action = store.validate_block(
@@ -192,7 +205,7 @@ fn process_block<P: Preset>(store: &mut Store<P>, block: &Arc<SignedBeaconBlock<
192205
}
193206

194207
fn process_attestation<P: Preset>(
195-
store: &mut Store<P>,
208+
store: &mut Store<P, Storage<P>>,
196209
attestation: Arc<Attestation<P>>,
197210
) -> Result<()> {
198211
let slot = attestation.data().slot;

fork_choice_control/src/block_processor.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use types::{
2727
traits::{BeaconBlock as _, SignedBeaconBlock as _},
2828
};
2929

30+
use crate::Storage;
31+
3032
#[derive(Constructor)]
3133
pub struct BlockProcessor<P: Preset> {
3234
chain_config: Arc<ChainConfig>,
@@ -158,7 +160,7 @@ impl<P: Preset> BlockProcessor<P> {
158160

159161
pub fn validate_block_for_gossip(
160162
&self,
161-
store: &Store<P>,
163+
store: &Store<P, Storage<P>>,
162164
block: &Arc<SignedBeaconBlock<P>>,
163165
) -> Result<Option<BlockAction<P>>> {
164166
store.validate_block_for_gossip(block, |parent| {
@@ -178,7 +180,7 @@ impl<P: Preset> BlockProcessor<P> {
178180

179181
pub fn validate_block<E: ExecutionEngine<P> + Send>(
180182
&self,
181-
store: &Store<P>,
183+
store: &Store<P, Storage<P>>,
182184
block: &Arc<SignedBeaconBlock<P>>,
183185
state_root_policy: StateRootPolicy,
184186
execution_engine: E,

fork_choice_control/src/controller.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ use crate::{
6262

6363
pub struct Controller<P: Preset, E, A, W: Wait> {
6464
// The latest consistent snapshot of the store.
65-
store_snapshot: Arc<ArcSwap<Store<P>>>,
65+
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
6666
block_processor: Arc<BlockProcessor<P>>,
6767
execution_engine: E,
6868
state_cache: Arc<StateCacheProcessor<P>>,
@@ -115,6 +115,7 @@ where
115115
store_config,
116116
anchor_block,
117117
anchor_state,
118+
storage.clone_arc(),
118119
finished_initial_forward_sync,
119120
finished_back_sync,
120121
);
@@ -546,11 +547,11 @@ where
546547
self.store_snapshot().store_config()
547548
}
548549

549-
pub(crate) fn store_snapshot(&self) -> Guard<Arc<Store<P>>> {
550+
pub(crate) fn store_snapshot(&self) -> Guard<Arc<Store<P, Storage<P>>>> {
550551
self.store_snapshot.load()
551552
}
552553

553-
pub(crate) fn owned_store_snapshot(&self) -> Arc<Store<P>> {
554+
pub(crate) fn owned_store_snapshot(&self) -> Arc<Store<P, Storage<P>>> {
554555
self.store_snapshot.load_full()
555556
}
556557

fork_choice_control/src/mutator.rs

+40-6
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ use crate::{
8080

8181
#[expect(clippy::struct_field_names)]
8282
pub struct Mutator<P: Preset, E, W, TS, PS, LS, NS, SS, VS> {
83-
store: Arc<Store<P>>,
84-
store_snapshot: Arc<ArcSwap<Store<P>>>,
83+
store: Arc<Store<P, Storage<P>>>,
84+
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
8585
state_cache: Arc<StateCacheProcessor<P>>,
8686
block_processor: Arc<BlockProcessor<P>>,
8787
event_channels: Arc<EventChannels>,
@@ -136,7 +136,7 @@ where
136136
{
137137
#[expect(clippy::too_many_arguments)]
138138
pub fn new(
139-
store_snapshot: Arc<ArcSwap<Store<P>>>,
139+
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
140140
state_cache: Arc<StateCacheProcessor<P>>,
141141
block_processor: Arc<BlockProcessor<P>>,
142142
event_channels: Arc<EventChannels>,
@@ -1456,8 +1456,42 @@ where
14561456
if misc::is_epoch_start::<P>(block.message().slot()) {
14571457
info!("unloading old beacon states (head slot: {head_slot})");
14581458

1459-
self.store_mut()
1459+
let unloaded = self
1460+
.store_mut()
14601461
.unload_old_states(unfinalized_states_in_memory);
1462+
1463+
let unloaded_checkpoint_states = self
1464+
.store_mut()
1465+
.unload_checkpoint_states(unfinalized_states_in_memory);
1466+
1467+
let store = self.owned_store();
1468+
let storage = self.storage.clone_arc();
1469+
let wait_group = wait_group.clone();
1470+
1471+
Builder::new()
1472+
.name("store-unloader".to_owned())
1473+
.spawn(move || {
1474+
debug!("persisting unloaded old beacon states…");
1475+
1476+
let states_with_block_roots = unloaded
1477+
.iter()
1478+
.map(|chain_link| (chain_link.state(&store), chain_link.block_root))
1479+
.chain(unloaded_checkpoint_states);
1480+
1481+
match storage.append_states(states_with_block_roots) {
1482+
Ok(slots) => {
1483+
debug!(
1484+
"unloaded old beacon states persisted \
1485+
(state slots: {slots:?})",
1486+
)
1487+
}
1488+
Err(error) => {
1489+
error!("persisting unloaded old beacon states to storage failed: {error:?}")
1490+
}
1491+
}
1492+
1493+
drop(wait_group);
1494+
})?;
14611495
}
14621496

14631497
let processing_duration = insertion_time.duration_since(submission_time);
@@ -2419,7 +2453,7 @@ where
24192453
self.thread_pool.spawn(task);
24202454
}
24212455

2422-
fn store_mut(&mut self) -> &mut Store<P> {
2456+
fn store_mut(&mut self) -> &mut Store<P, Storage<P>> {
24232457
self.store.make_mut()
24242458
}
24252459

@@ -2428,7 +2462,7 @@ where
24282462
// faster to clone a `Store` with all the `Arc`s inside it and allocate another `Arc`.
24292463
//
24302464
// As a result, this method should only be called when `Mutator.store` is in a consistent state.
2431-
fn owned_store(&self) -> Arc<Store<P>> {
2465+
fn owned_store(&self) -> Arc<Store<P, Storage<P>>> {
24322466
self.store.clone_arc()
24332467
}
24342468

fork_choice_control/src/queries.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ pub struct BlockWithRoot<P: Preset> {
754754
pub struct Snapshot<'storage, P: Preset> {
755755
// Use a `Guard` instead of an owned snapshot unlike in tasks based on the intuition that
756756
// `Snapshot`s will be less common than tasks.
757-
store_snapshot: Guard<Arc<Store<P>>>,
757+
store_snapshot: Guard<Arc<Store<P, Storage<P>>>>,
758758
state_cache: Arc<StateCacheProcessor<P>>,
759759
storage: &'storage Storage<P>,
760760
}

fork_choice_control/src/storage.rs

+33-7
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,26 @@ pub enum StateLoadStrategy<P: Preset> {
5353
}
5454

5555
#[expect(clippy::struct_field_names)]
56+
#[derive(Clone)]
5657
pub struct Storage<P> {
5758
config: Arc<Config>,
58-
pub(crate) database: Database,
59+
pub(crate) database: Arc<Database>,
5960
pub(crate) archival_epoch_interval: NonZeroU64,
6061
storage_mode: StorageMode,
6162
phantom: PhantomData<P>,
6263
}
6364

6465
impl<P: Preset> Storage<P> {
6566
#[must_use]
66-
pub const fn new(
67+
pub fn new(
6768
config: Arc<Config>,
6869
database: Database,
6970
archival_epoch_interval: NonZeroU64,
7071
storage_mode: StorageMode,
7172
) -> Self {
7273
Self {
7374
config,
74-
database,
75+
database: Arc::new(database),
7576
archival_epoch_interval,
7677
storage_mode,
7778
phantom: PhantomData,
@@ -232,7 +233,7 @@ impl<P: Preset> Storage<P> {
232233
&self,
233234
unfinalized: impl Iterator<Item = &'cl ChainLink<P>>,
234235
finalized: impl DoubleEndedIterator<Item = &'cl ChainLink<P>>,
235-
store: &Store<P>,
236+
store: &Store<P, Self>,
236237
) -> Result<AppendedBlockSlots> {
237238
let mut slots = AppendedBlockSlots::default();
238239
let mut store_head_slot = 0;
@@ -363,6 +364,25 @@ impl<P: Preset> Storage<P> {
363364
Ok(persisted_blob_ids)
364365
}
365366

367+
pub(crate) fn append_states(
368+
&self,
369+
states_with_block_roots: impl Iterator<Item = (Arc<BeaconState<P>>, H256)>,
370+
) -> Result<Vec<Slot>> {
371+
let mut slots = vec![];
372+
let mut batch = vec![];
373+
374+
for (state, block_root) in states_with_block_roots {
375+
if !self.contains_key(StateByBlockRoot(block_root))? {
376+
slots.push(state.slot());
377+
batch.push(serialize(StateByBlockRoot(block_root), state)?);
378+
}
379+
}
380+
381+
self.database.put_batch(batch)?;
382+
383+
Ok(slots)
384+
}
385+
366386
pub(crate) fn blob_sidecar_by_id(
367387
&self,
368388
blob_id: BlobIdentifier,
@@ -480,7 +500,7 @@ impl<P: Preset> Storage<P> {
480500
Ok(None)
481501
}
482502

483-
pub(crate) fn genesis_block_root(&self, store: &Store<P>) -> Result<H256> {
503+
pub(crate) fn genesis_block_root(&self, store: &Store<P, Self>) -> Result<H256> {
484504
self.block_root_by_slot_with_store(store, GENESIS_SLOT)?
485505
.ok_or(Error::GenesisBlockRootNotFound)
486506
.map_err(Into::into)
@@ -523,7 +543,7 @@ impl<P: Preset> Storage<P> {
523543
// Like `block_root_by_slot`, but looks for the root in `store` first.
524544
pub(crate) fn block_root_by_slot_with_store(
525545
&self,
526-
store: &Store<P>,
546+
store: &Store<P, Self>,
527547
slot: Slot,
528548
) -> Result<Option<H256>> {
529549
if let Some(chain_link) = store.chain_link_before_or_at(slot) {
@@ -641,7 +661,7 @@ impl<P: Preset> Storage<P> {
641661

642662
pub(crate) fn dependent_root(
643663
&self,
644-
store: &Store<P>,
664+
store: &Store<P, Self>,
645665
state: &BeaconState<P>,
646666
epoch: Epoch,
647667
) -> Result<H256> {
@@ -880,6 +900,12 @@ impl<P: Preset> Storage<P> {
880900
}
881901
}
882902

903+
impl<P: Preset> fork_choice_store::Storage<P> for Storage<P> {
904+
fn stored_state_by_block_root(&self, block_root: H256) -> Result<Option<Arc<BeaconState<P>>>> {
905+
self.state_by_block_root(block_root)
906+
}
907+
}
908+
883909
#[derive(Default, Debug)]
884910
pub struct AppendedBlockSlots {
885911
pub finalized: Vec<Slot>,

0 commit comments

Comments
 (0)