-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Split out voting and banking threads in banking stage #26722
Split out voting and banking threads in banking stage #26722
Conversation
49d8e6d
to
4d37f06
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool feature to separate votes from transactions. Made first round of review at high level, will dig deeper next round.
core/benches/banking_stage.rs
Outdated
@@ -80,8 +82,10 @@ fn bench_consume_buffered(bencher: &mut Bencher) { | |||
let transactions = vec![tx; 4194304]; | |||
let batches = transactions_to_deserialized_packets(&transactions).unwrap(); | |||
let batches_len = batches.len(); | |||
let mut transaction_buffer = | |||
UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len); | |||
let mut transaction_buffer = UnprocessedTransactionStorage::TransactionStorage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought, maybe good to setup similar benching for vote-only threads
@@ -158,7 +176,7 @@ impl Ord for ImmutableDeserializedPacket { | |||
/// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store | |||
/// PacketBatch's received from sigverify. Banking thread continuously scans the buffer | |||
/// to pick proper packets to add to the block. | |||
#[derive(Default)] | |||
#[derive(Default, Debug)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not sure if Debug
is absolutely necessary here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need it for the Debug on the encapsulating enum
@@ -39,6 +39,8 @@ pub enum DeserializedPacketError { | |||
SanitizeError(#[from] SanitizeError), | |||
#[error("transaction failed prioritization")] | |||
PrioritizationFailure, | |||
#[error("vote transaction deserialization error")] | |||
VoteTransactionError, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would vote transaction still land in unprocessed_packet_batches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no because the banking threads will never see this transaction
@@ -51,6 +53,22 @@ pub struct ImmutableDeserializedPacket { | |||
} | |||
|
|||
impl ImmutableDeserializedPacket { | |||
pub fn new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: doesn't look like this new
does much, not sure if necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we access outside the module we need it see latest_unprocessed_votes
core/src/banking_stage.rs
Outdated
|
||
pub fn capacity(&self) -> usize { | ||
match self { | ||
Self::VoteStorage(_, _) => usize::MAX, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, would there be need to limit number of queued votes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically it is limited by the # of voters
@@ -884,6 +1100,9 @@ impl BankingStage { | |||
tracer_packet_stats: &mut TracerPacketStats, | |||
bank_forks: &Arc<RwLock<BankForks>>, | |||
) { | |||
if unprocessed_transaction_storage.should_not_process() { | |||
return; | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, banking stage still has a thread to receive and buffer gossip votes, but not processing them at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read further, so gossip and tpu votes are received in separate threads, but are stored at same Latest_unprocessed_votes. So effective, gossip vote threads just do receiving and storing; TPU vote thread processes all vote transactions (on top of receiving tpu votes). Hope I got this right.
let filter_forwarding_result = unprocessed_transaction_storage | ||
.filter_forwardable_packets_and_add_batches(&mut forward_packet_batches_by_accounts); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing logic forwards up to the CU limit, prioritizing transactions that have a higher fee-per-cu weight.
I think the votes should have a separate forwarding logic to prioritize by stake weight up to the same CU limit, since votes do not have a per-cu fee
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done in a separate PR though, feel free to create an issue.
Self::push_unprocessed( | ||
buffered_packet_batches, | ||
unprocessed_transaction_storage, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good here to augment the LeaderSlotMetricsTracker
to track the number of dropped votes due to being too old. This will tell us how effective this change actually is
This can be done by augmenting the leader metrics here https://github.com/taozhu-chicago/solana/blob/27f4a48a97e35431e22c13292c7ea8e7bdc67255/core/src/leader_slot_banking_stage_metrics.rs#L127.
Once this extra field is released, we can then lookup the metric banking_stage-leader_slot_packet_counts
: https://github.com/taozhu-chicago/solana/blob/27f4a48a97e35431e22c13292c7ea8e7bdc67255/core/src/leader_slot_banking_stage_metrics.rs#L137`, filtered by id = 1
for the tpu vote thread to see the average dropped old votes per block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might also be good to see how many initial votes were drained/correlate with how many packets were processed which is already available in leader_slot_packet_counts. committed_transactions_count
: https://github.com/taozhu-chicago/solana/blob/27f4a48a97e35431e22c13292c7ea8e7bdc67255/core/src/leader_slot_banking_stage_metrics.rs#L170-L174
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, instead of augmenting the existing banking_stage-leader_slot_packet_counts
, we just create an separate banking_stage-vote_packet_counts
only reported by id = 1
for these additional vote metrics. These can be reported in addition to the banking_stage-leader_slot_packet_counts
on the thread with id = 1
Steps to do this:
- Add an additional vote metrics struct here: https://github.com/taozhu-chicago/solana/blob/27f4a48a97e35431e22c13292c7ea8e7bdc67255/core/src/leader_slot_banking_stage_metrics.rs#L254 with a
report()
method like the other structs. - Augment this function here: https://github.com/taozhu-chicago/solana/blob/27f4a48a97e35431e22c13292c7ea8e7bdc67255/core/src/leader_slot_banking_stage_metrics.rs#L271-L276 to additionally report the vote metrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For those voting metrics, might also be helpful to give us additional insight that if we saw N
votes for instance, how many of those:
- were unique vote accounts
- percentage of the stake that maps to those accounts
Can also be done in a separate PR 😃
7d4fdad
to
8ea2a49
Compare
067b5a2
to
503a154
Compare
d3db821
to
8100353
Compare
core/src/banking_stage.rs
Outdated
latest_vote.clear(); | ||
latest_unprocessed_votes | ||
.size | ||
.fetch_sub(1, Ordering::AcqRel); | ||
return Some(packet); | ||
} | ||
} | ||
} | ||
} | ||
None | ||
}) | ||
.chunks(batch_size) | ||
.into_iter() | ||
.flat_map(|vote_packets| { | ||
let vote_packets = vote_packets.into_iter().collect_vec(); | ||
let packets_to_process = vote_packets | ||
.iter() | ||
.map(&DeserializedVotePacket::get_vote_packet) | ||
.collect_vec(); | ||
if let Some(retryable_vote_indices) = | ||
processing_function(&packets_to_process) | ||
{ | ||
retryable_vote_indices | ||
.iter() | ||
.map(|i| vote_packets[*i].clone()) | ||
.collect_vec() | ||
} else { | ||
vote_packets | ||
} | ||
}) | ||
.collect_vec(); | ||
// Insert the retryable votes back in | ||
latest_unprocessed_votes.insert_batch(retryable_votes.into_iter()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This .clear()
into .insert_batch() is a bit cumbersome to maintain, especially because insert_batch()
has a assert!(!vote.is_processed());
which is a flag that might get cleared by the call to clear()
- It seems like with this mechanism, once we've applied a vote to one fork, we can't apply it to another if we switch forks, which seems off.
/ --2
1
\---3
I.e. here if the latest vote for a validator is 1, and we apply it to slot 2, but then switch to slot 3 as the major fork, we won't re-apply it to slot 3, whereas today we'll apply whatever cluster_info_vote_listener
is sending us.
I think for now we maintain current behavior, and
- Remove the clear() logic
- Remove the
is_processed()
checks - Just apply whatever's in the bank on every slot for now, if it's already been in the bank we'll just fail with DuplicateSignature and shouldn't be included in the block since that's not a retryable error (can add a test for this for the VoteTransactions).
In the future we can do something like what's in cluster_info_vote_listener and parse the vote states in the bank to figure out which votes to apply
solana/core/src/verified_vote_packets.rs
Lines 123 to 150 in ed539d6
self.my_leader_bank | |
.vote_accounts() | |
.get(&vote_account_key) | |
.and_then(|(_stake, vote_account)| { | |
vote_account.vote_state().as_ref().ok().map(|vote_state| { | |
let start_vote_slot = | |
vote_state.last_voted_slot().map(|x| x + 1).unwrap_or(0); | |
match validator_gossip_votes { | |
FullTowerVote(GossipVote { | |
slot, | |
hash, | |
packet_batch, | |
signature, | |
}) => self | |
.filter_vote(slot, hash, packet_batch, signature) | |
.map(|packet| vec![packet]) | |
.unwrap_or_default(), | |
IncrementalVotes(validator_gossip_votes) => { | |
validator_gossip_votes | |
.range((start_vote_slot, Hash::default())..) | |
.filter_map(|((slot, hash), (packet, tx_signature))| { | |
self.filter_vote(slot, hash, packet, tx_signature) | |
}) | |
.collect::<Vec<PacketBatch>>() | |
} | |
} | |
}) | |
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like with this mechanism, once we've applied a vote to one fork, we can't apply it to another if we switch forks, which seems off.
I agree, although I think this behavior is already present, UnprocessedPacketBatches
are cleared of the votes once successfully written to a bank. We rely on gossip or tpu to send us new votes when switching forks which is the same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just apply whatever's in the bank on every slot for now, if it's already been in the bank we'll just fail with DuplicateSignature and shouldn't be included in the block since that's not a retryable error (can add a test for this for the VoteTransactions).
Is this not problematic as in every slot will now contain # staked validators votes even if the vote is a no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, although I think this behavior is already present, UnprocessedPacketBatches are cleared of the votes once successfully written to a bank. We rely on gossip or tpu to send us new votes when switching forks which is the same here.
I think the difference is here we keep track of the latest vote, even between forks, so we would filter those earlier slots out that were applied on one fork already, even if gossip sent them again right?
4b67e3b
to
aab14e2
Compare
aab14e2
to
74dd570
Compare
Additionally this allows us to aggressively prune the buffer for voting threads as with the new vote state only the latest vote from each validator is necessary.
74dd570
to
efc0018
Compare
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); | ||
let should_split_voting_threads = bank_forks | ||
.read() | ||
.map(|bank_forks| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just unwrap here, we do that for every other lock anyways
let bank = bank_forks.root_bank(); | ||
bank.feature_set | ||
.is_active(&allow_votes_to_directly_update_vote_state::id()) | ||
&& bank.feature_set.is_active(&split_banking_threads::id()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably don't need a separate feature for this? Might be better to make this a configurable option like the concurrent replay stuff: #27401
// Many banks that process transactions in parallel. | ||
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads) | ||
.map(|i| { | ||
let (verified_receiver, forward_option) = match i { | ||
let (verified_receiver, transaction_storage_options) = match i { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I quite understand the issue, can't we call UnprocessedTransactionStorage::new_vote_storage()
or UnprocessedTransactionStorage::new_transaction_storage()
and pass in the common reference to the Arc<LatestUnprocessedVotes>
?
@@ -646,12 +674,111 @@ impl BankingStage { | |||
(Ok(()), packet_vec_len, Some(leader_pubkey)) | |||
} | |||
|
|||
// TODO: UNIT TEST THIS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this todo still relevant?
false | ||
}) | ||
.for_each(|lock| { | ||
if let Ok(cell) = lock.write() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should just unwrap here
let retryable_votes = weighted_random_order_by_stake(&bank, latest_votes_per_pubkey.keys()) | ||
.filter_map(|pubkey| { | ||
if let Some(lock) = latest_votes_per_pubkey.get(&pubkey) { | ||
if let Ok(latest_vote) = lock.write() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should just unwrap here
let packet = latest_vote.clone(); | ||
latest_vote.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it could be wrapped in a single method take_clone()
core/src/banking_stage.rs
Outdated
latest_vote.clear(); | ||
latest_unprocessed_votes | ||
.size | ||
.fetch_sub(1, Ordering::AcqRel); | ||
return Some(packet); | ||
} | ||
} | ||
} | ||
} | ||
None | ||
}) | ||
.chunks(batch_size) | ||
.into_iter() | ||
.flat_map(|vote_packets| { | ||
let vote_packets = vote_packets.into_iter().collect_vec(); | ||
let packets_to_process = vote_packets | ||
.iter() | ||
.map(&DeserializedVotePacket::get_vote_packet) | ||
.collect_vec(); | ||
if let Some(retryable_vote_indices) = | ||
processing_function(&packets_to_process) | ||
{ | ||
retryable_vote_indices | ||
.iter() | ||
.map(|i| vote_packets[*i].clone()) | ||
.collect_vec() | ||
} else { | ||
vote_packets | ||
} | ||
}) | ||
.collect_vec(); | ||
// Insert the retryable votes back in | ||
latest_unprocessed_votes.insert_batch(retryable_votes.into_iter()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, although I think this behavior is already present, UnprocessedPacketBatches are cleared of the votes once successfully written to a bank. We rely on gossip or tpu to send us new votes when switching forks which is the same here.
I think the difference is here we keep track of the latest vote, even between forks, so we would filter those earlier slots out that were applied on one fork already, even if gossip sent them again right?
I think we can have three separate PR's:
|
Splitting this large pr up as per carl's suggestion. |
Additionally this allows us to aggressively prune the buffer for voting threads
as with the new vote state only the latest vote from each validator is
necessary.
Problem
We want to drop extraneous vote txs from banking stage but the current setup does not allow it. Initial discussion was had in #25961
Summary of Changes
Fixes #
Feature Gate Issue: #24717