Skip to content

Commit 0c5c7c2

Browse files
apollo_consensus: move proposal storage to SM; dedupe in SHC (#10337)
1 parent 9b4f79d commit 0c5c7c2

File tree

2 files changed

+36
-31
lines changed

2 files changed

+36
-31
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
#[path = "single_height_consensus_test.rs"]
99
mod single_height_consensus_test;
1010

11-
use std::collections::hash_map::Entry;
12-
use std::collections::{HashMap, VecDeque};
11+
use std::collections::{HashSet, VecDeque};
1312
use std::sync::{Arc, Mutex};
1413
use std::time::Duration;
1514

@@ -138,7 +137,8 @@ pub(crate) struct SingleHeightConsensus {
138137
validators: Vec<ValidatorId>,
139138
timeouts: TimeoutsConfig,
140139
state_machine: StateMachine,
141-
proposals: HashMap<Round, Option<ProposalCommitment>>,
140+
// Tracks rounds for which we started validating a proposal to avoid duplicate validations.
141+
pending_validation_rounds: HashSet<Round>,
142142
last_prevote: Option<Vote>,
143143
last_precommit: Option<Vote>,
144144
height_voted_storage: Arc<Mutex<dyn HeightVotedStorageTrait>>,
@@ -162,7 +162,7 @@ impl SingleHeightConsensus {
162162
validators,
163163
timeouts,
164164
state_machine,
165-
proposals: HashMap::new(),
165+
pending_validation_rounds: HashSet::new(),
166166
last_prevote: None,
167167
last_precommit: None,
168168
height_voted_storage,
@@ -209,20 +209,25 @@ impl SingleHeightConsensus {
209209
warn!("Invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer);
210210
return Ok(ShcReturn::Tasks(Vec::new()));
211211
}
212-
let Entry::Vacant(proposal_entry) = self.proposals.entry(init.round) else {
213-
warn!("Round {} already has a proposal, ignoring", init.round);
212+
// Avoid duplicate validations:
213+
// - If SM already has an entry for this round, a (re)proposal was already recorded.
214+
// - If we already started validating this round, ignore repeats.
215+
if self.state_machine.has_proposal_for_round(init.round)
216+
|| self.pending_validation_rounds.contains(&init.round)
217+
{
218+
warn!("Round {} already handled a proposal, ignoring", init.round);
214219
return Ok(ShcReturn::Tasks(Vec::new()));
215-
};
220+
}
216221
let timeout = self.timeouts.get_proposal_timeout(init.round);
217222
info!(
218223
"Accepting {init:?}. node_round: {}, timeout: {timeout:?}",
219224
self.state_machine.round()
220225
);
221226
CONSENSUS_PROPOSALS_VALID_INIT.increment(1);
222227

223-
// Since validating the proposal is non-blocking, we want to avoid validating the same round
224-
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
225-
proposal_entry.insert(None);
228+
// Since validating the proposal is non-blocking, avoid validating the same round twice in
229+
// parallel (e.g., due to repeats or spam).
230+
self.pending_validation_rounds.insert(init.round);
226231
let block_receiver = context.validate_proposal(init, timeout, p2p_messages_receiver).await;
227232
context.set_height_and_round(height, self.state_machine.round()).await;
228233
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)]))
@@ -292,23 +297,22 @@ impl SingleHeightConsensus {
292297
CONSENSUS_PROPOSALS_INVALID.increment(1);
293298
}
294299

295-
// Retaining the entry for this round prevents us from receiving another proposal on
296-
// this round. While this prevents spam attacks it also prevents re-receiving after
297-
// a network issue.
298-
let old = self.proposals.insert(round, proposal_id);
299-
assert!(
300-
old.is_some_and(|p| p.is_none()),
301-
"Proposal entry for round {round} should exist and be empty: {old:?}"
302-
);
300+
// Cleanup: validation for round {round} finished, so remove it from the pending
301+
// set. This doesn't affect logic.
302+
self.pending_validation_rounds.remove(&round);
303303
let sm_events = self.state_machine.handle_event(event, &leader_fn);
304304
self.handle_state_machine_events(context, sm_events).await
305305
}
306306
StateMachineEvent::GetProposal(proposal_id, round) => {
307307
if proposal_id.is_none() {
308308
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
309309
}
310-
let old = self.proposals.insert(round, proposal_id);
311-
assert!(old.is_none(), "There should be no entry for round {round} when proposing");
310+
// Ensure SM has no proposal recorded yet for this round when proposing.
311+
assert!(
312+
!self.state_machine.has_proposal_for_round(round),
313+
"There should be no entry for round {round} when proposing"
314+
);
315+
312316
assert_eq!(
313317
round,
314318
self.state_machine.round(),
@@ -477,15 +481,13 @@ impl SingleHeightConsensus {
477481

478482
// Make sure there is an existing proposal for the valid round and it matches the proposal
479483
// ID.
480-
let existing = self.proposals.get(&valid_round).and_then(|&inner| inner);
484+
let existing = self.state_machine.proposal_id_for_round(valid_round);
481485
assert!(
482486
existing.is_some_and(|id| id == proposal_id),
483487
"A proposal with ID {proposal_id:?} should exist for valid_round: {valid_round}. \
484488
Found: {existing:?}",
485489
);
486490

487-
let old = self.proposals.insert(round, Some(proposal_id));
488-
assert!(old.is_none(), "There should be no proposal for round {round}.");
489491
let init = ProposalInit {
490492
height: self.state_machine.height(),
491493
round,
@@ -558,14 +560,9 @@ impl SingleHeightConsensus {
558560
))
559561
};
560562
let block = self
561-
.proposals
562-
.remove(&round)
563-
.ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?
564-
.ok_or_else(|| {
565-
invalid_decision(
566-
"Proposal is invalid or validations haven't yet completed".to_string(),
567-
)
568-
})?;
563+
.state_machine
564+
.proposal_id_for_round(round)
565+
.ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?;
569566
if block != proposal_id {
570567
return Err(invalid_decision(format!(
571568
"StateMachine proposal commitment should match the stored block. Shc.block_id: \

crates/apollo_consensus/src/state_machine.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ impl StateMachine {
162162
&self.precommits
163163
}
164164

165+
pub(crate) fn has_proposal_for_round(&self, round: Round) -> bool {
166+
self.proposals.contains_key(&round)
167+
}
168+
169+
pub(crate) fn proposal_id_for_round(&self, round: Round) -> Option<ProposalCommitment> {
170+
self.proposals.get(&round).and_then(|(id, _)| *id)
171+
}
172+
165173
fn make_self_vote(
166174
&self,
167175
vote_type: VoteType,

0 commit comments

Comments
 (0)