Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 4b67e3b

Browse files
committed
pr comments and hotswap logic
1 parent d51b326 commit 4b67e3b

5 files changed

+146
-68
lines changed

core/benches/banking_stage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@ use {
99
rand::{thread_rng, Rng},
1010
rayon::prelude::*,
1111
solana_core::{
12-
banking_stage::{
13-
BankingStage, BankingStageStats, ThreadType, UnprocessedTransactionStorage,
14-
},
12+
banking_stage::{BankingStage, BankingStageStats},
1513
leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker,
1614
qos_service::QosService,
1715
unprocessed_packet_batches::*,
16+
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
1817
},
1918
solana_entry::entry::{next_hash, Entry},
2019
solana_gossip::cluster_info::{ClusterInfo, Node},
@@ -85,6 +84,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
8584
let mut transaction_buffer = UnprocessedTransactionStorage::TransactionStorage(
8685
UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len),
8786
ThreadType::Transactions,
87+
None,
8888
);
8989
let (s, _r) = unbounded();
9090
// This tests the performance of buffering packets.

core/src/banking_stage.rs

Lines changed: 53 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use {
66
crate::{
77
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
88
immutable_deserialized_packet::ImmutableDeserializedPacket,
9-
latest_unprocessed_votes::{self, LatestUnprocessedVotes, VoteSource},
9+
latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource},
1010
leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary},
1111
leader_slot_banking_stage_timing_metrics::{
1212
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
@@ -15,9 +15,7 @@ use {
1515
sigverify::SigverifyTracerPacketStats,
1616
tracer_packet_stats::TracerPacketStats,
1717
unprocessed_packet_batches::{self, *},
18-
unprocessed_transaction_storage::{
19-
InsertPacketBatchesSummary, ThreadType, UnprocessedTransactionStorage,
20-
},
18+
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
2119
},
2220
core::iter::repeat,
2321
crossbeam_channel::{
@@ -385,9 +383,9 @@ pub enum ForwardOption {
385383

386384
#[derive(Debug, Default)]
387385
pub struct FilterForwardingResults {
388-
total_forwardable_packets: usize,
389-
total_tracer_packets_in_buffer: usize,
390-
total_forwardable_tracer_packets: usize,
386+
pub(crate) total_forwardable_packets: usize,
387+
pub(crate) total_tracer_packets_in_buffer: usize,
388+
pub(crate) total_forwardable_tracer_packets: usize,
391389
}
392390

393391
impl BankingStage {
@@ -447,9 +445,10 @@ impl BankingStage {
447445
TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize);
448446
// Keeps track of extraneous vote transactions for the vote threads
449447
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
450-
let bank = poh_recorder.read().unwrap().bank();
451-
let split_voting_threads = bank
452-
.map(|bank| {
448+
let should_split_voting_threads = bank_forks
449+
.read()
450+
.map(|bank_forks| {
451+
let bank = bank_forks.root_bank();
453452
bank.feature_set
454453
.is_active(&allow_votes_to_directly_update_vote_state::id())
455454
&& bank.feature_set.is_active(&split_banking_threads::id())
@@ -465,7 +464,7 @@ impl BankingStage {
465464
(
466465
verified_vote_receiver.clone(),
467466
(
468-
split_voting_threads,
467+
should_split_voting_threads,
469468
Some(latest_unprocessed_votes.clone()),
470469
ThreadType::Voting(VoteSource::Gossip),
471470
),
@@ -474,14 +473,14 @@ impl BankingStage {
474473
1 => (
475474
tpu_verified_vote_receiver.clone(),
476475
(
477-
split_voting_threads,
476+
should_split_voting_threads,
478477
Some(latest_unprocessed_votes.clone()),
479478
ThreadType::Voting(VoteSource::Tpu),
480479
),
481480
),
482481
_ => (
483482
verified_receiver.clone(),
484-
(split_voting_threads, None, ThreadType::Transactions),
483+
(should_split_voting_threads, None, ThreadType::Transactions),
485484
),
486485
};
487486

@@ -681,15 +680,15 @@ impl BankingStage {
681680
poh_recorder: &Arc<RwLock<PohRecorder>>,
682681
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
683682
recorder: &TransactionRecorder,
684-
transaction_status_sender: Option<TransactionStatusSender>,
683+
transaction_status_sender: &Option<TransactionStatusSender>,
685684
gossip_vote_sender: &ReplayVoteSender,
686685
banking_stage_stats: &BankingStageStats,
687686
qos_service: &QosService,
688687
log_messages_bytes_limit: Option<usize>,
689688
consumed_buffered_packets_count: &mut usize,
690689
rebuffered_packet_count: &mut usize,
691690
reached_end_of_slot: &mut bool,
692-
test_fn: Option<impl Fn()>,
691+
test_fn: &Option<impl Fn()>,
693692
packets_to_process: &Vec<Rc<ImmutableDeserializedPacket>>,
694693
) -> Option<Vec<usize>> {
695694
// TODO: Right now we iterate through buffer and try the highest weighted transaction once
@@ -714,7 +713,7 @@ impl BankingStage {
714713
&bank_creation_time,
715714
recorder,
716715
packets_to_process.iter().map(|p| &**p),
717-
transaction_status_sender.clone(),
716+
transaction_status_sender,
718717
gossip_vote_sender,
719718
banking_stage_stats,
720719
qos_service,
@@ -755,15 +754,15 @@ impl BankingStage {
755754
// Out of the buffered packets just retried, collect any still unprocessed
756755
// transactions in this batch for forwarding
757756
*rebuffered_packet_count += retryable_transaction_indexes.len();
758-
if let Some(test_fn) = &test_fn {
757+
if let Some(test_fn) = test_fn {
759758
test_fn();
760759
}
761760

762761
slot_metrics_tracker
763762
.increment_retryable_packets_count(retryable_transaction_indexes.len() as u64);
764763

765764
Some(retryable_transaction_indexes)
766-
} else if reached_end_of_slot {
765+
} else if *reached_end_of_slot {
767766
None
768767
} else {
769768
// mark as end-of-slot to avoid aggressively lock poh for the remaining for
@@ -806,15 +805,15 @@ impl BankingStage {
806805
poh_recorder,
807806
slot_metrics_tracker,
808807
recorder,
809-
transaction_status_sender,
808+
&transaction_status_sender,
810809
gossip_vote_sender,
811810
banking_stage_stats,
812811
qos_service,
813812
log_messages_bytes_limit,
814813
&mut consumed_buffered_packets_count,
815814
&mut rebuffered_packet_count,
816815
&mut reached_end_of_slot,
817-
test_fn,
816+
&test_fn,
818817
packets_to_process,
819818
)
820819
},
@@ -1135,27 +1134,19 @@ impl BankingStage {
11351134
vote_source,
11361135
)
11371136
}
1138-
(_, _, thread_type) => UnprocessedTransactionStorage::new_transaction_storage(
1137+
(_, hot_swap, thread_type) => UnprocessedTransactionStorage::new_transaction_storage(
11391138
UnprocessedPacketBatches::with_capacity(batch_limit),
11401139
thread_type,
1140+
hot_swap,
11411141
),
11421142
};
11431143

11441144
loop {
11451145
// Hotswap
1146-
unprocessed_transaction_storage = match unprocessed_transaction_storage {
1147-
UnprocessedTransactionStorage::TransactionStorage(transaction_storage) => {
1148-
if let ThreadType::Voting(vote_source) = transaction_storage.thread_type {
1149-
UnprocessedTransactionStorage::new_vote_storage(
1150-
latest_unprocessed_votes,
1151-
vote_source,
1152-
)
1153-
} else {
1154-
UnprocessedTransactionStorage::TransactionStorage(transaction_storage)
1155-
}
1156-
}
1157-
_ => unprocessed_transaction_storage,
1158-
};
1146+
// TODO: figure out how often to check this as it grabs the read lock on bank forks
1147+
// when checking feature_set
1148+
unprocessed_transaction_storage =
1149+
unprocessed_transaction_storage.maybe_hot_swap(bank_forks);
11591150

11601151
let my_pubkey = cluster_info.id();
11611152
if !unprocessed_transaction_storage.is_empty()
@@ -1287,7 +1278,7 @@ impl BankingStage {
12871278
bank: &Arc<Bank>,
12881279
poh: &TransactionRecorder,
12891280
batch: &TransactionBatch,
1290-
transaction_status_sender: Option<TransactionStatusSender>,
1281+
transaction_status_sender: &Option<TransactionStatusSender>,
12911282
gossip_vote_sender: &ReplayVoteSender,
12921283
log_messages_bytes_limit: Option<usize>,
12931284
) -> ExecuteAndCommitTransactionsOutput {
@@ -1529,7 +1520,7 @@ impl BankingStage {
15291520
txs: &[SanitizedTransaction],
15301521
poh: &TransactionRecorder,
15311522
chunk_offset: usize,
1532-
transaction_status_sender: Option<TransactionStatusSender>,
1523+
transaction_status_sender: &Option<TransactionStatusSender>,
15331524
gossip_vote_sender: &ReplayVoteSender,
15341525
qos_service: &QosService,
15351526
log_messages_bytes_limit: Option<usize>,
@@ -1722,7 +1713,7 @@ impl BankingStage {
17221713
bank_creation_time: &Instant,
17231714
transactions: &[SanitizedTransaction],
17241715
poh: &TransactionRecorder,
1725-
transaction_status_sender: Option<TransactionStatusSender>,
1716+
transaction_status_sender: &Option<TransactionStatusSender>,
17261717
gossip_vote_sender: &ReplayVoteSender,
17271718
qos_service: &QosService,
17281719
log_messages_bytes_limit: Option<usize>,
@@ -1754,7 +1745,7 @@ impl BankingStage {
17541745
&transactions[chunk_start..chunk_end],
17551746
poh,
17561747
chunk_start,
1757-
transaction_status_sender.clone(),
1748+
transaction_status_sender,
17581749
gossip_vote_sender,
17591750
qos_service,
17601751
log_messages_bytes_limit,
@@ -1919,7 +1910,7 @@ impl BankingStage {
19191910
bank_creation_time: &Instant,
19201911
poh: &'a TransactionRecorder,
19211912
deserialized_packets: impl Iterator<Item = &'a ImmutableDeserializedPacket>,
1922-
transaction_status_sender: Option<TransactionStatusSender>,
1913+
transaction_status_sender: &Option<TransactionStatusSender>,
19231914
gossip_vote_sender: &'a ReplayVoteSender,
19241915
banking_stage_stats: &'a BankingStageStats,
19251916
qos_service: &'a QosService,
@@ -2187,12 +2178,15 @@ impl BankingStage {
21872178
}
21882179
}
21892180

2190-
pub(crate) fn weighted_random_order_by_stake(bank: &Arc<Bank>) -> impl Iterator<Item = Pubkey> {
2181+
pub(crate) fn weighted_random_order_by_stake<'a>(
2182+
bank: &Arc<Bank>,
2183+
pubkeys: impl Iterator<Item = &'a Pubkey>,
2184+
) -> impl Iterator<Item = Pubkey> {
21912185
// Efraimidis and Spirakis algo for weighted random sample without replacement
2192-
let mut pubkey_with_weight: Vec<(f64, Pubkey)> = bank
2193-
.staked_nodes()
2194-
.iter()
2195-
.filter_map(|(&pubkey, &stake)| {
2186+
let staked_nodes = bank.staked_nodes();
2187+
let mut pubkey_with_weight: Vec<(f64, Pubkey)> = pubkeys
2188+
.filter_map(|&pubkey| {
2189+
let stake = staked_nodes.get(&pubkey).copied().unwrap_or(0);
21962190
if stake == 0 {
21972191
None // Ignore votes from unstaked validators
21982192
} else {
@@ -2270,7 +2264,6 @@ mod tests {
22702264
solana_runtime::{bank_forks::BankForks, genesis_utils::activate_feature},
22712265
solana_sdk::{
22722266
account::AccountSharedData,
2273-
feature_set,
22742267
hash::Hash,
22752268
instruction::InstructionError,
22762269
message::{
@@ -2931,7 +2924,7 @@ mod tests {
29312924
&transactions,
29322925
&recorder,
29332926
0,
2934-
None,
2927+
&None,
29352928
&gossip_vote_sender,
29362929
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
29372930
None,
@@ -2984,7 +2977,7 @@ mod tests {
29842977
&transactions,
29852978
&recorder,
29862979
0,
2987-
None,
2980+
&None,
29882981
&gossip_vote_sender,
29892982
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
29902983
None,
@@ -3068,7 +3061,7 @@ mod tests {
30683061
&transactions,
30693062
&recorder,
30703063
0,
3071-
None,
3064+
&None,
30723065
&gossip_vote_sender,
30733066
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
30743067
None,
@@ -3160,7 +3153,7 @@ mod tests {
31603153
&transactions,
31613154
&recorder,
31623155
0,
3163-
None,
3156+
&None,
31643157
&gossip_vote_sender,
31653158
&qos_service,
31663159
None,
@@ -3200,7 +3193,7 @@ mod tests {
32003193
&transactions,
32013194
&recorder,
32023195
0,
3203-
None,
3196+
&None,
32043197
&gossip_vote_sender,
32053198
&qos_service,
32063199
None,
@@ -3297,7 +3290,7 @@ mod tests {
32973290
&transactions,
32983291
&recorder,
32993292
0,
3300-
None,
3293+
&None,
33013294
&gossip_vote_sender,
33023295
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
33033296
None,
@@ -3463,7 +3456,7 @@ mod tests {
34633456
&Instant::now(),
34643457
&transactions,
34653458
&recorder,
3466-
None,
3459+
&None,
34673460
&gossip_vote_sender,
34683461
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
34693462
None,
@@ -3530,7 +3523,7 @@ mod tests {
35303523
&Instant::now(),
35313524
&transactions,
35323525
&recorder,
3533-
None,
3526+
&None,
35343527
&gossip_vote_sender,
35353528
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
35363529
None,
@@ -3751,7 +3744,7 @@ mod tests {
37513744
&transactions,
37523745
&recorder,
37533746
0,
3754-
Some(TransactionStatusSender {
3747+
&Some(TransactionStatusSender {
37553748
sender: transaction_status_sender,
37563749
}),
37573750
&gossip_vote_sender,
@@ -3913,7 +3906,7 @@ mod tests {
39133906
&[sanitized_tx.clone()],
39143907
&recorder,
39153908
0,
3916-
Some(TransactionStatusSender {
3909+
&Some(TransactionStatusSender {
39173910
sender: transaction_status_sender,
39183911
}),
39193912
&gossip_vote_sender,
@@ -4025,6 +4018,7 @@ mod tests {
40254018
num_conflicting_transactions,
40264019
),
40274020
ThreadType::Transactions,
4021+
None,
40284022
);
40294023

40304024
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
@@ -4125,6 +4119,7 @@ mod tests {
41254119
num_conflicting_transactions,
41264120
),
41274121
ThreadType::Transactions,
4122+
None,
41284123
);
41294124
let all_packet_message_hashes: HashSet<Hash> = buffered_packet_batches
41304125
.iter()
@@ -4244,6 +4239,7 @@ mod tests {
42444239
&mut UnprocessedTransactionStorage::new_transaction_storage(
42454240
unprocessed_packet_batches,
42464241
ThreadType::Transactions,
4242+
None,
42474243
),
42484244
&poh_recorder,
42494245
&socket,
@@ -4301,6 +4297,7 @@ mod tests {
43014297
2,
43024298
),
43034299
ThreadType::Transactions,
4300+
None,
43044301
);
43054302

43064303
let genesis_config_info = create_slow_genesis_config(10_000);
@@ -4520,6 +4517,7 @@ mod tests {
45204517
let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage(
45214518
UnprocessedPacketBatches::with_capacity(100),
45224519
thread_type,
4520+
None,
45234521
);
45244522
transaction_storage
45254523
.deserialize_and_insert_batch(&packet_batch, &(0..3_usize).collect_vec());

0 commit comments

Comments
 (0)