Skip to content

Commit 57d357d

Browse files
committed
Run fork choice before block proposal (#3168)
## Issue Addressed Upcoming spec change ethereum/consensus-specs#2878 ## Proposed Changes 1. Run fork choice at the start of every slot, and wait for this run to complete before proposing a block. 2. As an optimisation, also run fork choice 3/4 of the way through the slot (at 9s), _dequeueing attestations for the next slot_. 3. Remove the fork choice run from the state advance timer that occurred before advancing the state. ## Additional Info ### Block Proposal Accuracy This change makes us more likely to propose on top of the correct head in the presence of re-orgs with proposer boost in play. The main scenario that this change is designed to address is described in the linked spec issue. ### Attestation Accuracy This change _also_ makes us more likely to attest to the correct head. Currently in the case of a skipped slot at `slot` we only run fork choice 9s into `slot - 1`. This means the attestations from `slot - 1` aren't taken into consideration, and any boost applied to the block from `slot - 1` is not removed (it should be). In the language of the linked spec issue, this means we are liable to attest to C, even when the majority voting weight has already caused a re-org to B. ### Why remove the call before the state advance? If we've run fork choice at the start of the slot then it has already dequeued all the attestations from the previous slot, which are the only ones eligible to influence the head in the current slot. Running fork choice again is unnecessary (unless we run it for the next slot and try to pre-empt a re-org, but I don't currently think this is a great idea). ### Performance Based on Prater testing this adds about 5-25ms of runtime to block proposal times, which are 500-1000ms on average (and spike to 5s+ sometimes due to state handling issues 😢 ). I believe this is a small enough penalty to enable it by default, with the option to disable it via the new flag `--fork-choice-before-proposal-timeout 0`. Upcoming work on block packing and state representation will also reduce block production times in general, while removing the spikes. ### Implementation Fork choice gets invoked at the start of the slot via the `per_slot_task` function called from the slot timer. It then uses a condition variable to signal to block production that fork choice has been updated. This is a bit funky, but it seems to work. One downside of the timer-based approach is that it doesn't happen automatically in most of the tests. The test added by this PR has to trigger the run manually.
1 parent 54b58fd commit 57d357d

15 files changed

+458
-47
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 100 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::errors::{BeaconChainError as Error, BlockProductionError};
1818
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
1919
use crate::events::ServerSentEventHandler;
2020
use crate::execution_payload::get_execution_payload;
21+
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
2122
use crate::head_tracker::HeadTracker;
2223
use crate::historical_blocks::HistoricalBlockError;
2324
use crate::migrate::BackgroundMigrator;
@@ -339,6 +340,10 @@ pub struct BeaconChain<T: BeaconChainTypes> {
339340
/// A state-machine that is updated with information from the network and chooses a canonical
340341
/// head block.
341342
pub fork_choice: RwLock<BeaconForkChoice<T>>,
343+
/// Transmitter used to indicate that slot-start fork choice has completed running.
344+
pub fork_choice_signal_tx: Option<ForkChoiceSignalTx>,
345+
/// Receiver used by block production to wait on slot-start fork choice.
346+
pub fork_choice_signal_rx: Option<ForkChoiceSignalRx>,
342347
/// A handler for events generated by the beacon chain. This is only initialized when the
343348
/// HTTP server is enabled.
344349
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
@@ -2952,12 +2957,64 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29522957
Ok(block_root)
29532958
}
29542959

2960+
/// If configured, wait for the fork choice run at the start of the slot to complete.
2961+
fn wait_for_fork_choice_before_block_production(
2962+
self: &Arc<Self>,
2963+
slot: Slot,
2964+
) -> Result<(), BlockProductionError> {
2965+
if let Some(rx) = &self.fork_choice_signal_rx {
2966+
let current_slot = self
2967+
.slot()
2968+
.map_err(|_| BlockProductionError::UnableToReadSlot)?;
2969+
2970+
let timeout = Duration::from_millis(self.config.fork_choice_before_proposal_timeout_ms);
2971+
2972+
if slot == current_slot || slot == current_slot + 1 {
2973+
match rx.wait_for_fork_choice(slot, timeout) {
2974+
ForkChoiceWaitResult::Success(fc_slot) => {
2975+
debug!(
2976+
self.log,
2977+
"Fork choice successfully updated before block production";
2978+
"slot" => slot,
2979+
"fork_choice_slot" => fc_slot,
2980+
);
2981+
}
2982+
ForkChoiceWaitResult::Behind(fc_slot) => {
2983+
warn!(
2984+
self.log,
2985+
"Fork choice notifier out of sync with block production";
2986+
"fork_choice_slot" => fc_slot,
2987+
"slot" => slot,
2988+
"message" => "this block may be orphaned",
2989+
);
2990+
}
2991+
ForkChoiceWaitResult::TimeOut => {
2992+
warn!(
2993+
self.log,
2994+
"Timed out waiting for fork choice before proposal";
2995+
"message" => "this block may be orphaned",
2996+
);
2997+
}
2998+
}
2999+
} else {
3000+
error!(
3001+
self.log,
3002+
"Producing block at incorrect slot";
3003+
"block_slot" => slot,
3004+
"current_slot" => current_slot,
3005+
"message" => "check clock sync, this block may be orphaned",
3006+
);
3007+
}
3008+
}
3009+
Ok(())
3010+
}
3011+
29553012
/// Produce a new block at the given `slot`.
29563013
///
29573014
/// The produced block will not be inherently valid, it must be signed by a block producer.
29583015
/// Block signing is out of the scope of this function and should be done by a separate program.
29593016
pub fn produce_block<Payload: ExecPayload<T::EthSpec>>(
2960-
&self,
3017+
self: &Arc<Self>,
29613018
randao_reveal: Signature,
29623019
slot: Slot,
29633020
validator_graffiti: Option<Graffiti>,
@@ -2972,7 +3029,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29723029

29733030
/// Same as `produce_block` but allowing for configuration of RANDAO-verification.
29743031
pub fn produce_block_with_verification<Payload: ExecPayload<T::EthSpec>>(
2975-
&self,
3032+
self: &Arc<Self>,
29763033
randao_reveal: Signature,
29773034
slot: Slot,
29783035
validator_graffiti: Option<Graffiti>,
@@ -2981,6 +3038,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29813038
metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS);
29823039
let _complete_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES);
29833040

3041+
let fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_FORK_CHOICE_TIMES);
3042+
self.wait_for_fork_choice_before_block_production(slot)?;
3043+
drop(fork_choice_timer);
3044+
29843045
// Producing a block requires the tree hash cache, so clone a full state corresponding to
29853046
// the head from the snapshot cache. Unfortunately we can't move the snapshot out of the
29863047
// cache (which would be fast), because we need to re-process the block after it has been
@@ -3362,10 +3423,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
33623423

33633424
/// Execute the fork choice algorithm and enthrone the result as the canonical head.
33643425
pub fn fork_choice(self: &Arc<Self>) -> Result<(), Error> {
3426+
self.fork_choice_at_slot(self.slot()?)
3427+
}
3428+
3429+
/// Execute fork choice at `slot`, processing queued attestations from `slot - 1` and earlier.
3430+
///
3431+
/// The `slot` is not verified in any way, callers should ensure it corresponds to at most
3432+
/// one slot ahead of the current wall-clock slot.
3433+
pub fn fork_choice_at_slot(self: &Arc<Self>, slot: Slot) -> Result<(), Error> {
33653434
metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS);
33663435
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES);
33673436

3368-
let result = self.fork_choice_internal();
3437+
let result = self.fork_choice_internal(slot);
33693438

33703439
if result.is_err() {
33713440
metrics::inc_counter(&metrics::FORK_CHOICE_ERRORS);
@@ -3374,13 +3443,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
33743443
result
33753444
}
33763445

3377-
fn fork_choice_internal(self: &Arc<Self>) -> Result<(), Error> {
3446+
fn fork_choice_internal(self: &Arc<Self>, slot: Slot) -> Result<(), Error> {
33783447
// Atomically obtain the head block root and the finalized block.
33793448
let (beacon_block_root, finalized_block) = {
33803449
let mut fork_choice = self.fork_choice.write();
33813450

33823451
// Determine the root of the block that is the head of the chain.
3383-
let beacon_block_root = fork_choice.get_head(self.slot()?, &self.spec)?;
3452+
let beacon_block_root = fork_choice.get_head(slot, &self.spec)?;
33843453

33853454
(beacon_block_root, fork_choice.get_finalized_block()?)
33863455
};
@@ -3752,6 +3821,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
37523821
}
37533822

37543823
// Update the execution layer.
3824+
// Always use the wall-clock slot to update the execution engine rather than the `slot`
3825+
// passed in.
37553826
if let Err(e) = self.update_execution_engine_forkchoice_blocking(self.slot()?) {
37563827
crit!(
37573828
self.log,
@@ -4005,8 +4076,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
40054076
"prepare_slot" => prepare_slot
40064077
);
40074078

4008-
// Use the blocking method here so that we don't form a queue of these functions when
4009-
// routinely calling them.
40104079
self.update_execution_engine_forkchoice_async(current_slot)
40114080
.await?;
40124081
}
@@ -4336,11 +4405,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
43364405
}
43374406

43384407
/// Called by the timer on every slot.
4339-
///
4340-
/// Performs slot-based pruning.
4341-
pub fn per_slot_task(&self) {
4408+
pub fn per_slot_task(self: &Arc<Self>) {
43424409
trace!(self.log, "Running beacon chain per slot tasks");
43434410
if let Some(slot) = self.slot_clock.now() {
4411+
// Run fork choice and signal to any waiting task that it has completed.
4412+
if let Err(e) = self.fork_choice() {
4413+
error!(
4414+
self.log,
4415+
"Fork choice error at slot start";
4416+
"error" => ?e,
4417+
"slot" => slot,
4418+
);
4419+
}
4420+
4421+
// Send the notification regardless of fork choice success, this is a "best effort"
4422+
// notification and we don't want block production to hit the timeout in case of error.
4423+
if let Some(tx) = &self.fork_choice_signal_tx {
4424+
if let Err(e) = tx.notify_fork_choice_complete(slot) {
4425+
warn!(
4426+
self.log,
4427+
"Error signalling fork choice waiter";
4428+
"error" => ?e,
4429+
"slot" => slot,
4430+
);
4431+
}
4432+
}
4433+
43444434
self.naive_aggregation_pool.write().prune(slot);
43454435
self.block_times_cache.write().prune(slot);
43464436
}

beacon_node/beacon_chain/src/builder.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
22
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
3+
use crate::fork_choice_signal::ForkChoiceSignalTx;
34
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
45
use crate::head_tracker::HeadTracker;
56
use crate::migrate::{BackgroundMigrator, MigratorConfig};
@@ -694,6 +695,16 @@ where
694695
);
695696
}
696697

698+
// If enabled, set up the fork choice signaller.
699+
let (fork_choice_signal_tx, fork_choice_signal_rx) =
700+
if self.chain_config.fork_choice_before_proposal_timeout_ms != 0 {
701+
let tx = ForkChoiceSignalTx::new();
702+
let rx = tx.get_receiver();
703+
(Some(tx), Some(rx))
704+
} else {
705+
(None, None)
706+
};
707+
697708
// Store the `PersistedBeaconChain` in the database atomically with the metadata so that on
698709
// restart we can correctly detect the presence of an initialized database.
699710
//
@@ -752,6 +763,8 @@ where
752763
genesis_block_root,
753764
genesis_state_root,
754765
fork_choice: RwLock::new(fork_choice),
766+
fork_choice_signal_tx,
767+
fork_choice_signal_rx,
755768
event_handler: self.event_handler,
756769
head_tracker,
757770
snapshot_cache: TimeoutRwLock::new(SnapshotCache::new(

beacon_node/beacon_chain/src/chain_config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use serde_derive::{Deserialize, Serialize};
22
use types::Checkpoint;
33

4+
pub const DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT: u64 = 250;
5+
46
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
57
pub struct ChainConfig {
68
/// Maximum number of slots to skip when importing a consensus message (e.g., block,
@@ -18,6 +20,10 @@ pub struct ChainConfig {
1820
pub enable_lock_timeouts: bool,
1921
/// The max size of a message that can be sent over the network.
2022
pub max_network_size: usize,
23+
/// Number of milliseconds to wait for fork choice before proposing a block.
24+
///
25+
/// If set to 0 then block proposal will not wait for fork choice at all.
26+
pub fork_choice_before_proposal_timeout_ms: u64,
2127
}
2228

2329
impl Default for ChainConfig {
@@ -28,6 +34,7 @@ impl Default for ChainConfig {
2834
reconstruct_historic_states: false,
2935
enable_lock_timeouts: true,
3036
max_network_size: 10 * 1_048_576, // 10M
37+
fork_choice_before_proposal_timeout_ms: DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT,
3138
}
3239
}
3340
}

beacon_node/beacon_chain/src/errors.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ pub enum BeaconChainError {
185185
},
186186
RuntimeShutdown,
187187
ProcessInvalidExecutionPayload(JoinError),
188+
ForkChoiceSignalOutOfOrder {
189+
current: Slot,
190+
latest: Slot,
191+
},
188192
}
189193

190194
easy_from_to!(SlotProcessingError, BeaconChainError);
@@ -234,6 +238,7 @@ pub enum BlockProductionError {
234238
FailedToReadFinalizedBlock(store::Error),
235239
MissingFinalizedBlock(Hash256),
236240
BlockTooLarge(usize),
241+
ForkChoiceError(BeaconChainError),
237242
}
238243

239244
easy_from_to!(BlockProcessingError, BlockProductionError);
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
//! Concurrency helpers for synchronising block proposal with fork choice.
2+
//!
3+
//! The transmitter provides a way for a thread runnning fork choice on a schedule to signal
4+
//! to the receiver that fork choice has been updated for a given slot.
5+
use crate::BeaconChainError;
6+
use parking_lot::{Condvar, Mutex};
7+
use std::sync::Arc;
8+
use std::time::Duration;
9+
use types::Slot;
10+
11+
/// Sender, for use by the per-slot task timer.
12+
pub struct ForkChoiceSignalTx {
13+
pair: Arc<(Mutex<Slot>, Condvar)>,
14+
}
15+
16+
/// Receiver, for use by the beacon chain waiting on fork choice to complete.
17+
pub struct ForkChoiceSignalRx {
18+
pair: Arc<(Mutex<Slot>, Condvar)>,
19+
}
20+
21+
pub enum ForkChoiceWaitResult {
22+
/// Successfully reached a slot greater than or equal to the awaited slot.
23+
Success(Slot),
24+
/// Fork choice was updated to a lower slot, indicative of lag or processing delays.
25+
Behind(Slot),
26+
/// Timed out waiting for the fork choice update from the sender.
27+
TimeOut,
28+
}
29+
30+
impl ForkChoiceSignalTx {
31+
pub fn new() -> Self {
32+
let pair = Arc::new((Mutex::new(Slot::new(0)), Condvar::new()));
33+
Self { pair }
34+
}
35+
36+
pub fn get_receiver(&self) -> ForkChoiceSignalRx {
37+
ForkChoiceSignalRx {
38+
pair: self.pair.clone(),
39+
}
40+
}
41+
42+
/// Signal to the receiver that fork choice has been updated to `slot`.
43+
///
44+
/// Return an error if the provided `slot` is strictly less than any previously provided slot.
45+
pub fn notify_fork_choice_complete(&self, slot: Slot) -> Result<(), BeaconChainError> {
46+
let &(ref lock, ref condvar) = &*self.pair;
47+
48+
let mut current_slot = lock.lock();
49+
50+
if slot < *current_slot {
51+
return Err(BeaconChainError::ForkChoiceSignalOutOfOrder {
52+
current: *current_slot,
53+
latest: slot,
54+
});
55+
} else {
56+
*current_slot = slot;
57+
}
58+
59+
// We use `notify_all` because there may be multiple block proposals waiting simultaneously.
60+
// Usually there'll be 0-1.
61+
condvar.notify_all();
62+
63+
Ok(())
64+
}
65+
}
66+
67+
impl Default for ForkChoiceSignalTx {
68+
fn default() -> Self {
69+
Self::new()
70+
}
71+
}
72+
73+
impl ForkChoiceSignalRx {
74+
pub fn wait_for_fork_choice(&self, slot: Slot, timeout: Duration) -> ForkChoiceWaitResult {
75+
let &(ref lock, ref condvar) = &*self.pair;
76+
77+
let mut current_slot = lock.lock();
78+
79+
// Wait for `current_slot >= slot`.
80+
//
81+
// Do not loop and wait, if we receive an update for the wrong slot then something is
82+
// quite out of whack and we shouldn't waste more time waiting.
83+
if *current_slot < slot {
84+
let timeout_result = condvar.wait_for(&mut current_slot, timeout);
85+
86+
if timeout_result.timed_out() {
87+
return ForkChoiceWaitResult::TimeOut;
88+
}
89+
}
90+
91+
if *current_slot >= slot {
92+
ForkChoiceWaitResult::Success(*current_slot)
93+
} else {
94+
ForkChoiceWaitResult::Behind(*current_slot)
95+
}
96+
}
97+
}

beacon_node/beacon_chain/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ mod errors;
1515
pub mod eth1_chain;
1616
pub mod events;
1717
mod execution_payload;
18+
pub mod fork_choice_signal;
1819
pub mod fork_revert;
1920
mod head_tracker;
2021
pub mod historical_blocks;

beacon_node/beacon_chain/src/metrics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ lazy_static! {
9090
);
9191
pub static ref BLOCK_PRODUCTION_TIMES: Result<Histogram> =
9292
try_create_histogram("beacon_block_production_seconds", "Full runtime of block production");
93+
pub static ref BLOCK_PRODUCTION_FORK_CHOICE_TIMES: Result<Histogram> = try_create_histogram(
94+
"beacon_block_production_fork_choice_seconds",
95+
"Time taken to run fork choice before block production"
96+
);
9397
pub static ref BLOCK_PRODUCTION_STATE_LOAD_TIMES: Result<Histogram> = try_create_histogram(
9498
"beacon_block_production_state_load_seconds",
9599
"Time taken to load the base state for block production"

0 commit comments

Comments
 (0)