Skip to content

Commit 321c144

Browse files
committed
Offload older epoch start and checkpoint beacon states to storage
1 parent c2470af commit 321c144

File tree

14 files changed

+276
-116
lines changed

14 files changed

+276
-116
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benches/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ binary_utils = { workspace = true }
1313
bls = { workspace = true }
1414
clock = { workspace = true }
1515
criterion = { workspace = true }
16+
database = { workspace = true }
1617
easy-ext = { workspace = true }
1718
eth2_cache_utils = { workspace = true }
1819
eth2_libp2p = { workspace = true }

benches/benches/fork_choice_store.rs

+16-3
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ use allocator as _;
1111
use anyhow::Result;
1212
use clock::Tick;
1313
use criterion::{BatchSize, Criterion, Throughput};
14+
use database::Database;
1415
use easy_ext::ext;
1516
use eth2_cache_utils::holesky::{self, CAPELLA_BEACON_STATE};
1617
use execution_engine::NullExecutionEngine;
18+
use fork_choice_control::{Storage, DEFAULT_ARCHIVAL_EPOCH_INTERVAL};
1719
use fork_choice_store::{
1820
ApplyBlockChanges, ApplyTickChanges, AttestationAction, AttestationItem, AttestationOrigin,
1921
BlockAction, Store, StoreConfig, ValidAttestation,
@@ -63,11 +65,19 @@ impl Criterion {
6365
.into_iter()
6466
.exactly_one()?;
6567

68+
let storage = Arc::new(Storage::new(
69+
config.clone_arc(),
70+
Database::in_memory(),
71+
DEFAULT_ARCHIVAL_EPOCH_INTERVAL,
72+
false,
73+
));
74+
6675
let mut store = Store::new(
6776
config.clone_arc(),
6877
StoreConfig::default(),
6978
anchor_block,
7079
anchor_state,
80+
storage,
7181
false,
7282
);
7383

@@ -137,7 +147,7 @@ impl Criterion {
137147
}
138148
}
139149

140-
fn process_slot(store: &mut Store<impl Preset>, slot: Slot) -> Result<()> {
150+
fn process_slot<P: Preset>(store: &mut Store<P, Storage<P>>, slot: Slot) -> Result<()> {
141151
let Some(changes) = store.apply_tick(Tick::start_of_slot(slot))? else {
142152
panic!("tick at slot {slot} should be later than the current one")
143153
};
@@ -149,7 +159,10 @@ fn process_slot(store: &mut Store<impl Preset>, slot: Slot) -> Result<()> {
149159
Ok(())
150160
}
151161

152-
fn process_block<P: Preset>(store: &mut Store<P>, block: &Arc<SignedBeaconBlock<P>>) -> Result<()> {
162+
fn process_block<P: Preset>(
163+
store: &mut Store<P, Storage<P>>,
164+
block: &Arc<SignedBeaconBlock<P>>,
165+
) -> Result<()> {
153166
let slot = block.message().slot();
154167

155168
let block_action = store.validate_block(
@@ -191,7 +204,7 @@ fn process_block<P: Preset>(store: &mut Store<P>, block: &Arc<SignedBeaconBlock<
191204
}
192205

193206
fn process_attestation<P: Preset>(
194-
store: &mut Store<P>,
207+
store: &mut Store<P, Storage<P>>,
195208
attestation: Arc<Attestation<P>>,
196209
) -> Result<()> {
197210
let slot = attestation.data().slot;

fork_choice_control/src/block_processor.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use types::{
2727
traits::{BeaconBlock as _, SignedBeaconBlock as _},
2828
};
2929

30+
use crate::Storage;
31+
3032
#[derive(Constructor)]
3133
pub struct BlockProcessor<P: Preset> {
3234
chain_config: Arc<ChainConfig>,
@@ -158,7 +160,7 @@ impl<P: Preset> BlockProcessor<P> {
158160

159161
pub fn validate_block_for_gossip(
160162
&self,
161-
store: &Store<P>,
163+
store: &Store<P, Storage<P>>,
162164
block: &Arc<SignedBeaconBlock<P>>,
163165
) -> Result<Option<BlockAction<P>>> {
164166
store.validate_block_for_gossip(block, |parent| {
@@ -178,7 +180,7 @@ impl<P: Preset> BlockProcessor<P> {
178180

179181
pub fn validate_block<E: ExecutionEngine<P> + Send>(
180182
&self,
181-
store: &Store<P>,
183+
store: &Store<P, Storage<P>>,
182184
block: &Arc<SignedBeaconBlock<P>>,
183185
state_root_policy: StateRootPolicy,
184186
execution_engine: E,

fork_choice_control/src/controller.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ use crate::{
6262

6363
pub struct Controller<P: Preset, E, A, W: Wait> {
6464
// The latest consistent snapshot of the store.
65-
store_snapshot: Arc<ArcSwap<Store<P>>>,
65+
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
6666
block_processor: Arc<BlockProcessor<P>>,
6767
execution_engine: E,
6868
state_cache: Arc<StateCacheProcessor<P>>,
@@ -114,6 +114,7 @@ where
114114
store_config,
115115
anchor_block,
116116
anchor_state,
117+
storage.clone_arc(),
117118
finished_initial_forward_sync,
118119
);
119120

@@ -524,11 +525,11 @@ where
524525
self.store_snapshot().store_config()
525526
}
526527

527-
pub(crate) fn store_snapshot(&self) -> Guard<Arc<Store<P>>> {
528+
pub(crate) fn store_snapshot(&self) -> Guard<Arc<Store<P, Storage<P>>>> {
528529
self.store_snapshot.load()
529530
}
530531

531-
pub(crate) fn owned_store_snapshot(&self) -> Arc<Store<P>> {
532+
pub(crate) fn owned_store_snapshot(&self) -> Arc<Store<P, Storage<P>>> {
532533
self.store_snapshot.load_full()
533534
}
534535

fork_choice_control/src/mutator.rs

+40-6
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ use crate::{
8383

8484
#[expect(clippy::struct_field_names)]
8585
pub struct Mutator<P: Preset, E, W, TS, PS, LS, NS, SS, VS> {
86-
store: Arc<Store<P>>,
87-
store_snapshot: Arc<ArcSwap<Store<P>>>,
86+
store: Arc<Store<P, Storage<P>>>,
87+
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
8888
state_cache: Arc<StateCacheProcessor<P>>,
8989
block_processor: Arc<BlockProcessor<P>>,
9090
event_channels: Arc<EventChannels>,
@@ -139,7 +139,7 @@ where
139139
{
140140
#[expect(clippy::too_many_arguments)]
141141
pub fn new(
142-
store_snapshot: Arc<ArcSwap<Store<P>>>,
142+
store_snapshot: Arc<ArcSwap<Store<P, Storage<P>>>>,
143143
state_cache: Arc<StateCacheProcessor<P>>,
144144
block_processor: Arc<BlockProcessor<P>>,
145145
event_channels: Arc<EventChannels>,
@@ -1444,8 +1444,42 @@ where
14441444
if misc::is_epoch_start::<P>(head_slot) {
14451445
info!("unloading old beacon states (head slot: {head_slot})");
14461446

1447-
self.store_mut()
1447+
let unloaded = self
1448+
.store_mut()
14481449
.unload_old_states(unfinalized_states_in_memory);
1450+
1451+
let unloaded_checkpoint_states = self
1452+
.store_mut()
1453+
.unload_checkpoint_states(unfinalized_states_in_memory);
1454+
1455+
let store = self.owned_store();
1456+
let storage = self.storage.clone_arc();
1457+
let wait_group = wait_group.clone();
1458+
1459+
Builder::new()
1460+
.name("store-unloader".to_owned())
1461+
.spawn(move || {
1462+
debug!("persisting unloaded old beacon states…");
1463+
1464+
let states_with_block_roots = unloaded
1465+
.iter()
1466+
.map(|chain_link| (chain_link.state(&store), chain_link.block_root))
1467+
.chain(unloaded_checkpoint_states);
1468+
1469+
match storage.append_states(states_with_block_roots) {
1470+
Ok(slots) => {
1471+
debug!(
1472+
"unloaded old beacon states persisted \
1473+
(state slots: {slots:?})",
1474+
)
1475+
}
1476+
Err(error) => {
1477+
error!("persisting unloaded old beacon states to storage failed: {error:?}")
1478+
}
1479+
}
1480+
1481+
drop(wait_group);
1482+
})?;
14491483
}
14501484

14511485
let processing_duration = insertion_time.duration_since(submission_time);
@@ -2374,7 +2408,7 @@ where
23742408
self.thread_pool.spawn(task);
23752409
}
23762410

2377-
fn store_mut(&mut self) -> &mut Store<P> {
2411+
fn store_mut(&mut self) -> &mut Store<P, Storage<P>> {
23782412
self.store.make_mut()
23792413
}
23802414

@@ -2383,7 +2417,7 @@ where
23832417
// faster to clone a `Store` with all the `Arc`s inside it and allocate another `Arc`.
23842418
//
23852419
// As a result, this method should only be called when `Mutator.store` is in a consistent state.
2386-
fn owned_store(&self) -> Arc<Store<P>> {
2420+
fn owned_store(&self) -> Arc<Store<P, Storage<P>>> {
23872421
self.store.clone_arc()
23882422
}
23892423

fork_choice_control/src/queries.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ pub struct BlockWithRoot<P: Preset> {
724724
pub struct Snapshot<'storage, P: Preset> {
725725
// Use a `Guard` instead of an owned snapshot unlike in tasks based on the intuition that
726726
// `Snapshot`s will be less common than tasks.
727-
store_snapshot: Guard<Arc<Store<P>>>,
727+
store_snapshot: Guard<Arc<Store<P, Storage<P>>>>,
728728
state_cache: Arc<StateCacheProcessor<P>>,
729729
storage: &'storage Storage<P>,
730730
}

fork_choice_control/src/storage.rs

+35-11
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,26 @@ pub enum StateLoadStrategy<P: Preset> {
5353
}
5454

5555
#[expect(clippy::struct_field_names)]
56+
#[derive(Clone)]
5657
pub struct Storage<P> {
5758
config: Arc<Config>,
58-
pub(crate) database: Database,
59+
pub(crate) database: Arc<Database>,
5960
pub(crate) archival_epoch_interval: NonZeroU64,
6061
prune_storage: bool,
6162
phantom: PhantomData<P>,
6263
}
6364

6465
impl<P: Preset> Storage<P> {
6566
#[must_use]
66-
pub const fn new(
67+
pub fn new(
6768
config: Arc<Config>,
6869
database: Database,
6970
archival_epoch_interval: NonZeroU64,
7071
prune_storage: bool,
7172
) -> Self {
7273
Self {
7374
config,
74-
database,
75+
database: Arc::new(database),
7576
archival_epoch_interval,
7677
prune_storage,
7778
phantom: PhantomData,
@@ -222,7 +223,7 @@ impl<P: Preset> Storage<P> {
222223
&self,
223224
unfinalized: impl Iterator<Item = &'cl ChainLink<P>>,
224225
finalized: impl DoubleEndedIterator<Item = &'cl ChainLink<P>>,
225-
store: &Store<P>,
226+
store: &Store<P, Self>,
226227
) -> Result<AppendedBlockSlots> {
227228
let mut slots = AppendedBlockSlots::default();
228229
let mut store_head_slot = 0;
@@ -353,6 +354,25 @@ impl<P: Preset> Storage<P> {
353354
Ok(persisted_blob_ids)
354355
}
355356

357+
pub(crate) fn append_states(
358+
&self,
359+
states_with_block_roots: impl Iterator<Item = (Arc<BeaconState<P>>, H256)>,
360+
) -> Result<Vec<Slot>> {
361+
let mut slots = vec![];
362+
let mut batch = vec![];
363+
364+
for (state, block_root) in states_with_block_roots {
365+
if !self.contains_key(StateByBlockRoot(block_root))? {
366+
slots.push(state.slot());
367+
batch.push(serialize(StateByBlockRoot(block_root), state)?);
368+
}
369+
}
370+
371+
self.database.put_batch(batch)?;
372+
373+
Ok(slots)
374+
}
375+
356376
pub(crate) fn blob_sidecar_by_id(
357377
&self,
358378
blob_id: BlobIdentifier,
@@ -405,7 +425,7 @@ impl<P: Preset> Storage<P> {
405425
Ok(None)
406426
}
407427

408-
pub(crate) fn genesis_block_root(&self, store: &Store<P>) -> Result<H256> {
428+
pub(crate) fn genesis_block_root(&self, store: &Store<P, Self>) -> Result<H256> {
409429
self.block_root_by_slot_with_store(store, GENESIS_SLOT)?
410430
.ok_or(Error::GenesisBlockRootNotFound)
411431
.map_err(Into::into)
@@ -448,7 +468,7 @@ impl<P: Preset> Storage<P> {
448468
// Like `block_root_by_slot`, but looks for the root in `store` first.
449469
pub(crate) fn block_root_by_slot_with_store(
450470
&self,
451-
store: &Store<P>,
471+
store: &Store<P, Self>,
452472
slot: Slot,
453473
) -> Result<Option<H256>> {
454474
if let Some(chain_link) = store.chain_link_before_or_at(slot) {
@@ -551,7 +571,7 @@ impl<P: Preset> Storage<P> {
551571

552572
pub(crate) fn dependent_root(
553573
&self,
554-
store: &Store<P>,
574+
store: &Store<P, Self>,
555575
state: &BeaconState<P>,
556576
epoch: Epoch,
557577
) -> Result<H256> {
@@ -711,10 +731,8 @@ impl<P: Preset> Storage<P> {
711731

712732
itertools::process_results(results, |pairs| {
713733
pairs
714-
.take_while(|(key_bytes, _)| {
715-
FinalizedBlockByRoot::has_prefix(key_bytes)
716-
&& !UnfinalizedBlockByRoot::has_prefix(key_bytes)
717-
})
734+
.take_while(|(key_bytes, _)| FinalizedBlockByRoot::has_prefix(key_bytes))
735+
.filter(|(key_bytes, _)| !UnfinalizedBlockByRoot::has_prefix(key_bytes))
718736
.count()
719737
})
720738
}
@@ -744,6 +762,12 @@ impl<P: Preset> Storage<P> {
744762
}
745763
}
746764

765+
impl<P: Preset> fork_choice_store::Storage<P> for Storage<P> {
766+
fn stored_state_by_block_root(&self, block_root: H256) -> Result<Option<Arc<BeaconState<P>>>> {
767+
self.state_by_block_root(block_root)
768+
}
769+
}
770+
747771
#[derive(Default, Debug)]
748772
pub struct AppendedBlockSlots {
749773
pub finalized: Vec<Slot>,

0 commit comments

Comments
 (0)