4
4
forward_packet_batches_by_accounts:: ForwardPacketBatchesByAccounts ,
5
5
immutable_deserialized_packet:: { DeserializedPacketError , ImmutableDeserializedPacket } ,
6
6
} ,
7
+ itertools:: Itertools ,
7
8
rand:: { thread_rng, Rng } ,
8
9
solana_perf:: packet:: { Packet , PacketBatch } ,
9
10
solana_runtime:: bank:: Bank ,
@@ -144,7 +145,7 @@ pub struct VoteBatchInsertionMetrics {
144
145
145
146
#[ derive( Debug , Default ) ]
146
147
pub struct LatestUnprocessedVotes {
147
- latest_votes_per_pubkey : RwLock < HashMap < Pubkey , RwLock < LatestValidatorVotePacket > > > ,
148
+ latest_votes_per_pubkey : RwLock < HashMap < Pubkey , Arc < RwLock < LatestValidatorVotePacket > > > > ,
148
149
}
149
150
150
151
impl LatestUnprocessedVotes {
@@ -188,6 +189,14 @@ impl LatestUnprocessedVotes {
188
189
}
189
190
}
190
191
192
+ fn get_entry ( & self , pubkey : Pubkey ) -> Option < Arc < RwLock < LatestValidatorVotePacket > > > {
193
+ self . latest_votes_per_pubkey
194
+ . read ( )
195
+ . unwrap ( )
196
+ . get ( & pubkey)
197
+ . cloned ( )
198
+ }
199
+
191
200
/// If this vote causes an unprocessed vote to be removed, returns Some(old_vote)
192
201
/// If there is a newer vote processed / waiting to be processed returns Some(vote)
193
202
/// Otherwise returns None
@@ -197,7 +206,7 @@ impl LatestUnprocessedVotes {
197
206
) -> Option < LatestValidatorVotePacket > {
198
207
let pubkey = vote. pubkey ( ) ;
199
208
let slot = vote. slot ( ) ;
200
- if let Some ( latest_vote) = self . latest_votes_per_pubkey . read ( ) . unwrap ( ) . get ( & pubkey) {
209
+ if let Some ( latest_vote) = self . get_entry ( pubkey) {
201
210
let latest_slot = latest_vote. read ( ) . unwrap ( ) . slot ( ) ;
202
211
if slot > latest_slot {
203
212
let mut latest_vote = latest_vote. write ( ) . unwrap ( ) ;
@@ -217,7 +226,7 @@ impl LatestUnprocessedVotes {
217
226
// Should have low lock contention because this is only hit on the first few blocks of startup
218
227
// and when a new vote account starts voting.
219
228
let mut latest_votes_per_pubkey = self . latest_votes_per_pubkey . write ( ) . unwrap ( ) ;
220
- latest_votes_per_pubkey. insert ( pubkey, RwLock :: new ( vote) ) ;
229
+ latest_votes_per_pubkey. insert ( pubkey, Arc :: new ( RwLock :: new ( vote) ) ) ;
221
230
None
222
231
}
223
232
@@ -237,33 +246,35 @@ impl LatestUnprocessedVotes {
237
246
forward_packet_batches_by_accounts : & mut ForwardPacketBatchesByAccounts ,
238
247
) -> usize {
239
248
let mut continue_forwarding = true ;
240
- let latest_votes_per_pubkey = self . latest_votes_per_pubkey . read ( ) . unwrap ( ) ;
241
- return weighted_random_order_by_stake (
249
+ let pubkeys_by_stake = weighted_random_order_by_stake (
242
250
& forward_packet_batches_by_accounts. current_bank ,
243
- latest_votes_per_pubkey. keys ( ) ,
251
+ self . latest_votes_per_pubkey . read ( ) . unwrap ( ) . keys ( ) ,
244
252
)
245
- . filter ( |pubkey| {
246
- if !continue_forwarding {
247
- return false ;
248
- }
249
- if let Some ( lock) = latest_votes_per_pubkey. get ( pubkey) {
250
- let mut vote = lock. write ( ) . unwrap ( ) ;
251
- if !vote. is_vote_taken ( ) && !vote. is_forwarded ( ) {
252
- if forward_packet_batches_by_accounts
253
- . add_packet ( vote. vote . as_ref ( ) . unwrap ( ) . clone ( ) )
254
- {
255
- vote. forwarded = true ;
256
- } else {
257
- // To match behavior of regular transactions we stop
258
- // forwarding votes as soon as one fails
259
- continue_forwarding = false ;
253
+ . collect_vec ( ) ;
254
+ pubkeys_by_stake
255
+ . into_iter ( )
256
+ . filter ( |& pubkey| {
257
+ if !continue_forwarding {
258
+ return false ;
259
+ }
260
+ if let Some ( lock) = self . get_entry ( pubkey) {
261
+ let mut vote = lock. write ( ) . unwrap ( ) ;
262
+ if !vote. is_vote_taken ( ) && !vote. is_forwarded ( ) {
263
+ if forward_packet_batches_by_accounts
264
+ . add_packet ( vote. vote . as_ref ( ) . unwrap ( ) . clone ( ) )
265
+ {
266
+ vote. forwarded = true ;
267
+ } else {
268
+ // To match behavior of regular transactions we stop
269
+ // forwarding votes as soon as one fails
270
+ continue_forwarding = false ;
271
+ }
272
+ return true ;
260
273
}
261
- return true ;
262
274
}
263
- }
264
- false
265
- } )
266
- . count ( ) ;
275
+ false
276
+ } )
277
+ . count ( )
267
278
}
268
279
269
280
/// Sometimes we forward and hold the packets, sometimes we forward and clear.
0 commit comments