Skip to content

Commit 9244f7f

Browse files
Improvements to Deneb store upon review (#4693)
* Start testing blob pruning * Get rid of unnecessary orphaned blob column * Make random blob tests deterministic * Test for pruning being blocked by finality * Fix bugs and test fork boundary * A few more tweaks to pruning conditions * Tweak oldest_blob_slot semantics * Test margin pruning * Clean up some terminology and lints * Schema migrations for v18 * Remove FIXME * Prune blobs on finalization not every slot * Fix more bugs + tests * Address review comments
1 parent 5c5afaf commit 9244f7f

File tree

16 files changed

+701
-247
lines changed

16 files changed

+701
-247
lines changed

beacon_node/beacon_chain/src/builder.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,11 @@ where
396396
.init_anchor_info(genesis.beacon_block.message(), retain_historic_states)
397397
.map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?,
398398
);
399+
self.pending_io_batch.push(
400+
store
401+
.init_blob_info(genesis.beacon_block.slot())
402+
.map_err(|e| format!("Failed to initialize genesis blob info: {:?}", e))?,
403+
);
399404

400405
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis)
401406
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
@@ -519,6 +524,11 @@ where
519524
.init_anchor_info(weak_subj_block.message(), retain_historic_states)
520525
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
521526
);
527+
self.pending_io_batch.push(
528+
store
529+
.init_blob_info(weak_subj_block.slot())
530+
.map_err(|e| format!("Failed to initialize blob info: {:?}", e))?,
531+
);
522532

523533
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
524534
self.pending_io_batch
@@ -982,7 +992,7 @@ where
982992
);
983993
}
984994

985-
// Prune blobs sidecars older than the blob data availability boundary in the background.
995+
// Prune blobs older than the blob data availability boundary in the background.
986996
if let Some(data_availability_boundary) = beacon_chain.data_availability_boundary() {
987997
beacon_chain
988998
.store_migrator

beacon_node/beacon_chain/src/canonical_head.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -762,12 +762,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
762762
// Drop the old cache head nice and early to try and free the memory as soon as possible.
763763
drop(old_cached_head);
764764

765-
// Prune blobs in the background.
766-
if let Some(data_availability_boundary) = self.data_availability_boundary() {
767-
self.store_migrator
768-
.process_prune_blobs(data_availability_boundary);
769-
}
770-
771765
// If the finalized checkpoint changed, perform some updates.
772766
//
773767
// The `after_finalization` function will take a write-lock on `fork_choice`, therefore it
@@ -1064,6 +1058,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
10641058
self.head_tracker.clone(),
10651059
)?;
10661060

1061+
// Prune blobs in the background.
1062+
if let Some(data_availability_boundary) = self.data_availability_boundary() {
1063+
self.store_migrator
1064+
.process_prune_blobs(data_availability_boundary);
1065+
}
1066+
10671067
// Take a write-lock on the canonical head and signal for it to prune.
10681068
self.canonical_head.fork_choice_write_lock().prune()?;
10691069

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
293293
.map(|current_epoch| {
294294
std::cmp::max(
295295
fork_epoch,
296-
current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
296+
current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
297297
)
298298
})
299299
})
@@ -466,7 +466,7 @@ async fn availability_cache_maintenance_service<T: BeaconChainTypes>(
466466
let cutoff_epoch = std::cmp::max(
467467
finalized_epoch + 1,
468468
std::cmp::max(
469-
current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
469+
current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
470470
deneb_fork_epoch,
471471
),
472472
);

beacon_node/beacon_chain/src/historical_blocks.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use state_processing::{
99
use std::borrow::Cow;
1010
use std::iter;
1111
use std::time::Duration;
12-
use store::{chunked_vector::BlockRoots, AnchorInfo, ChunkWriter, KeyValueStore};
12+
use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore};
1313
use types::{Hash256, Slot};
1414

1515
/// Use a longer timeout on the pubkey cache.
@@ -65,6 +65,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
6565
.store
6666
.get_anchor_info()
6767
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
68+
let blob_info = self.store.get_blob_info();
6869

6970
// Take all blocks with slots less than the oldest block slot.
7071
let num_relevant = blocks.partition_point(|available_block| {
@@ -98,6 +99,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
9899
let mut prev_block_slot = anchor_info.oldest_block_slot;
99100
let mut chunk_writer =
100101
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
102+
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
101103

102104
let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
103105
let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_lists_to_import);
@@ -123,6 +125,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
123125
.blinded_block_as_kv_store_ops(&block_root, &blinded_block, &mut hot_batch);
124126
// Store the blobs too
125127
if let Some(blobs) = maybe_blobs {
128+
new_oldest_blob_slot = Some(block.slot());
126129
self.store
127130
.blobs_as_kv_store_ops(&block_root, blobs, &mut hot_batch);
128131
}
@@ -206,15 +209,34 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
206209
self.store.hot_db.do_atomically(hot_batch)?;
207210
self.store.cold_db.do_atomically(cold_batch)?;
208211

212+
let mut anchor_and_blob_batch = Vec::with_capacity(2);
213+
214+
// Update the blob info.
215+
if new_oldest_blob_slot != blob_info.oldest_blob_slot {
216+
if let Some(oldest_blob_slot) = new_oldest_blob_slot {
217+
let new_blob_info = BlobInfo {
218+
oldest_blob_slot: Some(oldest_blob_slot),
219+
..blob_info.clone()
220+
};
221+
anchor_and_blob_batch.push(
222+
self.store
223+
.compare_and_set_blob_info(blob_info, new_blob_info)?,
224+
);
225+
}
226+
}
227+
209228
// Update the anchor.
210229
let new_anchor = AnchorInfo {
211230
oldest_block_slot: prev_block_slot,
212231
oldest_block_parent: expected_block_root,
213232
..anchor_info
214233
};
215234
let backfill_complete = new_anchor.block_backfill_complete(self.genesis_backfill_slot);
216-
self.store
217-
.compare_and_set_anchor_info_with_write(Some(anchor_info), Some(new_anchor))?;
235+
anchor_and_blob_batch.push(
236+
self.store
237+
.compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?,
238+
);
239+
self.store.hot_db.do_atomically(anchor_and_blob_batch)?;
218240

219241
// If backfill has completed and the chain is configured to reconstruct historic states,
220242
// send a message to the background migrator instructing it to begin reconstruction.

beacon_node/beacon_chain/src/migrate.rs

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
218218
if let Err(e) = db.try_prune_blobs(false, data_availability_boundary) {
219219
error!(
220220
log,
221-
"Blobs pruning failed";
221+
"Blob pruning failed";
222222
"error" => ?e,
223223
);
224224
}
@@ -390,39 +390,44 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
390390
let (tx, rx) = mpsc::channel();
391391
let thread = thread::spawn(move || {
392392
while let Ok(notif) = rx.recv() {
393-
// Read the rest of the messages in the channel, preferring any reconstruction
394-
// notification, or the finalization notification with the greatest finalized epoch.
395-
let notif =
396-
rx.try_iter()
397-
.fold(notif, |best, other: Notification| match (&best, &other) {
398-
(Notification::Reconstruction, _)
399-
| (_, Notification::Reconstruction) => Notification::Reconstruction,
400-
(
401-
Notification::Finalization(fin1),
402-
Notification::Finalization(fin2),
403-
) => {
404-
if fin2.finalized_checkpoint.epoch > fin1.finalized_checkpoint.epoch
393+
let mut reconstruction_notif = None;
394+
let mut finalization_notif = None;
395+
let mut prune_blobs_notif = None;
396+
match notif {
397+
Notification::Reconstruction => reconstruction_notif = Some(notif),
398+
Notification::Finalization(fin) => finalization_notif = Some(fin),
399+
Notification::PruneBlobs(dab) => prune_blobs_notif = Some(dab),
400+
}
401+
// Read the rest of the messages in the channel, taking the best of each type.
402+
for notif in rx.try_iter() {
403+
match notif {
404+
Notification::Reconstruction => reconstruction_notif = Some(notif),
405+
Notification::Finalization(fin) => {
406+
if let Some(current) = finalization_notif.as_mut() {
407+
if fin.finalized_checkpoint.epoch
408+
> current.finalized_checkpoint.epoch
405409
{
406-
other
407-
} else {
408-
best
409-
}
410-
}
411-
(Notification::Finalization(_), Notification::PruneBlobs(_)) => best,
412-
(Notification::PruneBlobs(_), Notification::Finalization(_)) => other,
413-
(Notification::PruneBlobs(dab1), Notification::PruneBlobs(dab2)) => {
414-
if dab2 > dab1 {
415-
other
416-
} else {
417-
best
410+
*current = fin;
418411
}
412+
} else {
413+
finalization_notif = Some(fin);
419414
}
420-
});
421-
422-
match notif {
423-
Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log),
424-
Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log),
425-
Notification::PruneBlobs(dab) => Self::run_prune_blobs(db.clone(), dab, &log),
415+
}
416+
Notification::PruneBlobs(dab) => {
417+
prune_blobs_notif = std::cmp::max(prune_blobs_notif, Some(dab));
418+
}
419+
}
420+
}
421+
// If reconstruction is on-going, ignore finalization migration and blob pruning.
422+
if reconstruction_notif.is_some() {
423+
Self::run_reconstruction(db.clone(), &log);
424+
} else {
425+
if let Some(fin) = finalization_notif {
426+
Self::run_migration(db.clone(), fin, &log);
427+
}
428+
if let Some(dab) = prune_blobs_notif {
429+
Self::run_prune_blobs(db.clone(), dab, &log);
430+
}
426431
}
427432
}
428433
});
@@ -663,22 +668,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
663668
head_tracker_lock.remove(&head_hash);
664669
}
665670

666-
let batch: Vec<StoreOp<E>> = abandoned_blocks
671+
let mut batch: Vec<StoreOp<E>> = abandoned_blocks
667672
.into_iter()
668673
.map(Into::into)
669674
.flat_map(|block_root: Hash256| {
670-
let mut store_ops = vec![
675+
[
671676
StoreOp::DeleteBlock(block_root),
672677
StoreOp::DeleteExecutionPayload(block_root),
673-
];
674-
if store.blobs_sidecar_exists(&block_root).unwrap_or(false) {
675-
// Keep track of non-empty orphaned blobs sidecars.
676-
store_ops.extend([
677-
StoreOp::DeleteBlobs(block_root),
678-
StoreOp::PutOrphanedBlobsKey(block_root),
679-
]);
680-
}
681-
store_ops
678+
StoreOp::DeleteBlobs(block_root),
679+
]
682680
})
683681
.chain(
684682
abandoned_states
@@ -687,8 +685,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
687685
)
688686
.collect();
689687

690-
let mut kv_batch = store.convert_to_kv_batch(batch)?;
691-
692688
// Persist the head in case the process is killed or crashes here. This prevents
693689
// the head tracker reverting after our mutation above.
694690
let persisted_head = PersistedBeaconChain {
@@ -697,12 +693,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
697693
ssz_head_tracker: SszHeadTracker::from_map(&head_tracker_lock),
698694
};
699695
drop(head_tracker_lock);
700-
kv_batch.push(persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY));
696+
batch.push(StoreOp::KeyValueOp(
697+
persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY),
698+
));
701699

702700
// Persist the new finalized checkpoint as the pruning checkpoint.
703-
kv_batch.push(store.pruning_checkpoint_store_op(new_finalized_checkpoint));
701+
batch.push(StoreOp::KeyValueOp(
702+
store.pruning_checkpoint_store_op(new_finalized_checkpoint),
703+
));
704704

705-
store.hot_db.do_atomically(kv_batch)?;
705+
store.do_atomically_with_block_and_blobs_cache(batch)?;
706706
debug!(log, "Database pruning complete");
707707

708708
Ok(PruningOutcome::Successful {

beacon_node/beacon_chain/src/schema_change.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod migration_schema_v14;
55
mod migration_schema_v15;
66
mod migration_schema_v16;
77
mod migration_schema_v17;
8+
mod migration_schema_v18;
89

910
use crate::beacon_chain::{BeaconChainTypes, ETH1_CACHE_DB_KEY};
1011
use crate::eth1_chain::SszEth1;
@@ -150,6 +151,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
150151
let ops = migration_schema_v17::downgrade_from_v17::<T>(db.clone(), log)?;
151152
db.store_schema_version_atomically(to, ops)
152153
}
154+
(SchemaVersion(17), SchemaVersion(18)) => {
155+
let ops = migration_schema_v18::upgrade_to_v18::<T>(db.clone(), log)?;
156+
db.store_schema_version_atomically(to, ops)
157+
}
158+
(SchemaVersion(18), SchemaVersion(17)) => {
159+
let ops = migration_schema_v18::downgrade_from_v18::<T>(db.clone(), log)?;
160+
db.store_schema_version_atomically(to, ops)
161+
}
153162
// Anything else is an error.
154163
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
155164
target_version: to,

0 commit comments

Comments
 (0)