@@ -273,10 +273,322 @@ impl LatestUnprocessedVotes {
273
273
. for_each ( |lock| {
274
274
if let Ok ( cell) = lock. write ( ) {
275
275
if let Ok ( mut vote) = cell. try_borrow_mut ( ) {
276
- vote. clear ( ) ;
276
+ if vote. is_forwarded ( ) {
277
+ vote. clear ( ) ;
278
+ self . size . fetch_sub ( 1 , Ordering :: Relaxed ) ;
279
+ }
277
280
}
278
281
}
279
282
} ) ;
280
283
}
281
284
}
282
285
}
286
+
287
+ #[ cfg( test) ]
288
+ mod tests {
289
+ use {
290
+ super :: * ,
291
+ itertools:: Itertools ,
292
+ rand:: { thread_rng, Rng } ,
293
+ solana_perf:: packet:: { Packet , PacketFlags } ,
294
+ solana_runtime:: genesis_utils:: ValidatorVoteKeypairs ,
295
+ solana_runtime:: bank:: Bank ,
296
+ solana_sdk:: { hash:: Hash , signature:: Signer , system_transaction:: transfer} ,
297
+ solana_vote_program:: {
298
+ vote_state:: VoteStateUpdate ,
299
+ vote_transaction:: { new_vote_state_update_transaction, new_vote_transaction} ,
300
+ } ,
301
+ std:: thread:: Builder ,
302
+ } ;
303
+
304
+ fn from_slots (
305
+ slots : Vec < ( u64 , u32 ) > ,
306
+ vote_source : VoteSource ,
307
+ keypairs : & ValidatorVoteKeypairs ,
308
+ ) -> DeserializedVotePacket {
309
+ let vote = VoteStateUpdate :: from ( slots) ;
310
+ let vote_tx = new_vote_state_update_transaction (
311
+ vote,
312
+ Hash :: new_unique ( ) ,
313
+ & keypairs. node_keypair ,
314
+ & keypairs. vote_keypair ,
315
+ & keypairs. vote_keypair ,
316
+ None ,
317
+ ) ;
318
+ let mut packet = Packet :: from_data ( None , vote_tx) . unwrap ( ) ;
319
+ packet. meta . flags . set ( PacketFlags :: SIMPLE_VOTE_TX , true ) ;
320
+ DeserializedVotePacket :: new ( packet, vote_source) . unwrap ( )
321
+ }
322
+
323
+ #[ test]
324
+ fn test_deserialize_vote_packets ( ) {
325
+ let keypairs = ValidatorVoteKeypairs :: new_rand ( ) ;
326
+ let bankhash = Hash :: new_unique ( ) ;
327
+ let blockhash = Hash :: new_unique ( ) ;
328
+ let switch_proof = Hash :: new_unique ( ) ;
329
+ let mut vote = Packet :: from_data (
330
+ None ,
331
+ new_vote_transaction (
332
+ vec ! [ 0 , 1 , 2 ] ,
333
+ bankhash,
334
+ blockhash,
335
+ & keypairs. node_keypair ,
336
+ & keypairs. vote_keypair ,
337
+ & keypairs. vote_keypair ,
338
+ None ,
339
+ ) ,
340
+ )
341
+ . unwrap ( ) ;
342
+ vote. meta . flags . set ( PacketFlags :: SIMPLE_VOTE_TX , true ) ;
343
+ let mut vote_switch = Packet :: from_data (
344
+ None ,
345
+ new_vote_transaction (
346
+ vec ! [ 0 , 1 , 2 ] ,
347
+ bankhash,
348
+ blockhash,
349
+ & keypairs. node_keypair ,
350
+ & keypairs. vote_keypair ,
351
+ & keypairs. vote_keypair ,
352
+ Some ( switch_proof) ,
353
+ ) ,
354
+ )
355
+ . unwrap ( ) ;
356
+ vote_switch
357
+ . meta
358
+ . flags
359
+ . set ( PacketFlags :: SIMPLE_VOTE_TX , true ) ;
360
+ let mut vote_state_update = Packet :: from_data (
361
+ None ,
362
+ new_vote_state_update_transaction (
363
+ VoteStateUpdate :: from ( vec ! [ ( 0 , 3 ) , ( 1 , 2 ) , ( 2 , 1 ) ] ) ,
364
+ blockhash,
365
+ & keypairs. node_keypair ,
366
+ & keypairs. vote_keypair ,
367
+ & keypairs. vote_keypair ,
368
+ None ,
369
+ ) ,
370
+ )
371
+ . unwrap ( ) ;
372
+ vote_state_update
373
+ . meta
374
+ . flags
375
+ . set ( PacketFlags :: SIMPLE_VOTE_TX , true ) ;
376
+ let mut vote_state_update_switch = Packet :: from_data (
377
+ None ,
378
+ new_vote_state_update_transaction (
379
+ VoteStateUpdate :: from ( vec ! [ ( 0 , 3 ) , ( 1 , 2 ) , ( 3 , 1 ) ] ) ,
380
+ blockhash,
381
+ & keypairs. node_keypair ,
382
+ & keypairs. vote_keypair ,
383
+ & keypairs. vote_keypair ,
384
+ Some ( switch_proof) ,
385
+ ) ,
386
+ )
387
+ . unwrap ( ) ;
388
+ vote_state_update_switch
389
+ . meta
390
+ . flags
391
+ . set ( PacketFlags :: SIMPLE_VOTE_TX , true ) ;
392
+ let random_transaction = Packet :: from_data (
393
+ None ,
394
+ transfer (
395
+ & keypairs. node_keypair ,
396
+ & Pubkey :: new_unique ( ) ,
397
+ 1000 ,
398
+ blockhash,
399
+ ) ,
400
+ )
401
+ . unwrap ( ) ;
402
+ let packet_batch = PacketBatch :: new ( vec ! [
403
+ vote,
404
+ vote_switch,
405
+ vote_state_update,
406
+ vote_state_update_switch,
407
+ random_transaction,
408
+ ] ) ;
409
+
410
+ let deserialized_packets = deserialize_packets (
411
+ & packet_batch,
412
+ & ( 0 ..packet_batch. len ( ) ) . collect_vec ( ) ,
413
+ VoteSource :: Gossip ,
414
+ )
415
+ . collect_vec ( ) ;
416
+
417
+ assert_eq ! ( 2 , deserialized_packets. len( ) ) ;
418
+ assert_eq ! ( VoteSource :: Gossip , deserialized_packets[ 0 ] . vote_source) ;
419
+ assert_eq ! ( VoteSource :: Gossip , deserialized_packets[ 1 ] . vote_source) ;
420
+
421
+ assert_eq ! (
422
+ keypairs. node_keypair. pubkey( ) ,
423
+ deserialized_packets[ 0 ] . pubkey
424
+ ) ;
425
+ assert_eq ! (
426
+ keypairs. node_keypair. pubkey( ) ,
427
+ deserialized_packets[ 1 ] . pubkey
428
+ ) ;
429
+
430
+ assert ! ( deserialized_packets[ 0 ] . vote. is_some( ) ) ;
431
+ assert ! ( deserialized_packets[ 1 ] . vote. is_some( ) ) ;
432
+ }
433
+
434
+ #[ test]
435
+ fn test_update_latest_vote ( ) {
436
+ let latest_unprocessed_votes = LatestUnprocessedVotes :: new ( ) ;
437
+ let keypair_a = ValidatorVoteKeypairs :: new_rand ( ) ;
438
+ let keypair_b = ValidatorVoteKeypairs :: new_rand ( ) ;
439
+
440
+ let vote_a = from_slots ( vec ! [ ( 0 , 2 ) , ( 1 , 1 ) ] , VoteSource :: Gossip , & keypair_a) ;
441
+ let vote_b = from_slots ( vec ! [ ( 0 , 5 ) , ( 4 , 2 ) , ( 9 , 1 ) ] , VoteSource :: Gossip , & keypair_b) ;
442
+
443
+ assert ! ( latest_unprocessed_votes
444
+ . update_latest_vote( vote_a)
445
+ . is_none( ) ) ;
446
+ assert ! ( latest_unprocessed_votes
447
+ . update_latest_vote( vote_b)
448
+ . is_none( ) ) ;
449
+ assert_eq ! ( 2 , latest_unprocessed_votes. len( ) ) ;
450
+
451
+ assert_eq ! (
452
+ Some ( 1 ) ,
453
+ latest_unprocessed_votes. get_latest_vote_slot( keypair_a. node_keypair. pubkey( ) )
454
+ ) ;
455
+ assert_eq ! (
456
+ Some ( 9 ) ,
457
+ latest_unprocessed_votes. get_latest_vote_slot( keypair_b. node_keypair. pubkey( ) )
458
+ ) ;
459
+
460
+ let vote_a = from_slots (
461
+ vec ! [ ( 0 , 5 ) , ( 1 , 4 ) , ( 3 , 3 ) , ( 10 , 1 ) ] ,
462
+ VoteSource :: Gossip ,
463
+ & keypair_a,
464
+ ) ;
465
+ let vote_b = from_slots ( vec ! [ ( 0 , 5 ) , ( 4 , 2 ) , ( 6 , 1 ) ] , VoteSource :: Gossip , & keypair_a) ;
466
+
467
+ // Evict previous vote
468
+ assert_eq ! (
469
+ 1 ,
470
+ latest_unprocessed_votes
471
+ . update_latest_vote( vote_a)
472
+ . unwrap( )
473
+ . slot
474
+ ) ;
475
+ // Drop current vote
476
+ assert_eq ! (
477
+ 6 ,
478
+ latest_unprocessed_votes
479
+ . update_latest_vote( vote_b)
480
+ . unwrap( )
481
+ . slot
482
+ ) ;
483
+
484
+ assert_eq ! ( 2 , latest_unprocessed_votes. len( ) ) ;
485
+ }
486
+
487
+ #[ test]
488
+ fn test_simulate_threads ( ) {
489
+ let latest_unprocessed_votes = Arc :: new ( LatestUnprocessedVotes :: new ( ) ) ;
490
+ let latest_unprocessed_votes_tpu = latest_unprocessed_votes. clone ( ) ;
491
+ let keypairs = Arc :: new (
492
+ ( 0 ..10 )
493
+ . map ( |_| ValidatorVoteKeypairs :: new_rand ( ) )
494
+ . collect_vec ( ) ,
495
+ ) ;
496
+ let keypairs_tpu = keypairs. clone ( ) ;
497
+ let vote_limit = 1000 ;
498
+
499
+ let gossip = Builder :: new ( )
500
+ . spawn ( move || {
501
+ let mut rng = thread_rng ( ) ;
502
+ for i in 0 ..vote_limit {
503
+ let vote = from_slots (
504
+ vec ! [ ( i, 1 ) ] ,
505
+ VoteSource :: Gossip ,
506
+ & keypairs[ rng. gen_range ( 0 , 10 ) ] ,
507
+ ) ;
508
+ latest_unprocessed_votes. update_latest_vote ( vote) ;
509
+ }
510
+ } )
511
+ . unwrap ( ) ;
512
+
513
+ let tpu = Builder :: new ( )
514
+ . spawn ( move || {
515
+ let mut rng = thread_rng ( ) ;
516
+ for i in 0 ..vote_limit {
517
+ let vote = from_slots (
518
+ vec ! [ ( i, 1 ) ] ,
519
+ VoteSource :: Tpu ,
520
+ & keypairs_tpu[ rng. gen_range ( 0 , 10 ) ] ,
521
+ ) ;
522
+ latest_unprocessed_votes_tpu. update_latest_vote ( vote) ;
523
+ if i % 214 == 0 {
524
+ // Simulate draining and processing packets
525
+ let latest_votes_per_pubkey = latest_unprocessed_votes_tpu
526
+ . latest_votes_per_pubkey
527
+ . read ( )
528
+ . unwrap ( ) ;
529
+ latest_votes_per_pubkey. iter ( ) . for_each ( |( _pubkey, lock) | {
530
+ let latest_vote = lock. write ( ) . unwrap ( ) ;
531
+ let mut latest_vote = latest_vote. try_borrow_mut ( ) . unwrap ( ) ;
532
+ if !latest_vote. is_processed ( ) {
533
+ latest_vote. clear ( ) ;
534
+ latest_unprocessed_votes_tpu
535
+ . size
536
+ . fetch_sub ( 1 , Ordering :: AcqRel ) ;
537
+ }
538
+ } ) ;
539
+ }
540
+ }
541
+ } )
542
+ . unwrap ( ) ;
543
+ gossip. join ( ) . unwrap ( ) ;
544
+ tpu. join ( ) . unwrap ( ) ;
545
+ }
546
+
547
+ #[ test]
548
+ fn test_forwardable_packets ( ) {
549
+ let latest_unprocessed_votes = LatestUnprocessedVotes :: new ( ) ;
550
+ let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts :: new_with_default_batch_limits ( Arc :: new ( Bank :: default_for_tests ( ) ) ) ;
551
+ todo ! ( )
552
+ }
553
+
554
+ #[ test]
555
+ fn test_clear_forwarded_packets ( ) {
556
+ let latest_unprocessed_votes = LatestUnprocessedVotes :: new ( ) ;
557
+ let keypair_a = ValidatorVoteKeypairs :: new_rand ( ) ;
558
+ let keypair_b = ValidatorVoteKeypairs :: new_rand ( ) ;
559
+ let keypair_c = ValidatorVoteKeypairs :: new_rand ( ) ;
560
+ let keypair_d = ValidatorVoteKeypairs :: new_rand ( ) ;
561
+
562
+ let vote_a = from_slots ( vec ! [ ( 1 , 1 ) ] , VoteSource :: Gossip , & keypair_a) ;
563
+ let mut vote_b = from_slots ( vec ! [ ( 2 , 1 ) ] , VoteSource :: Tpu , & keypair_b) ;
564
+ vote_b. forwarded = true ;
565
+ let vote_c = from_slots ( vec ! [ ( 3 , 1 ) ] , VoteSource :: Tpu , & keypair_c) ;
566
+ let vote_d = from_slots ( vec ! [ ( 4 , 1 ) ] , VoteSource :: Gossip , & keypair_d) ;
567
+
568
+ latest_unprocessed_votes. update_latest_vote ( vote_a) ;
569
+ latest_unprocessed_votes. update_latest_vote ( vote_b) ;
570
+ latest_unprocessed_votes. update_latest_vote ( vote_c) ;
571
+ latest_unprocessed_votes. update_latest_vote ( vote_d) ;
572
+ assert_eq ! ( 4 , latest_unprocessed_votes. len( ) ) ;
573
+
574
+ latest_unprocessed_votes. clear_forwarded_packets ( ) ;
575
+ assert_eq ! ( 1 , latest_unprocessed_votes. len( ) ) ;
576
+
577
+ assert_eq ! (
578
+ Some ( 1 ) ,
579
+ latest_unprocessed_votes. get_latest_vote_slot( keypair_a. node_keypair. pubkey( ) )
580
+ ) ;
581
+ assert_eq ! (
582
+ Some ( 2 ) ,
583
+ latest_unprocessed_votes. get_latest_vote_slot( keypair_b. node_keypair. pubkey( ) )
584
+ ) ;
585
+ assert_eq ! (
586
+ Some ( 3 ) ,
587
+ latest_unprocessed_votes. get_latest_vote_slot( keypair_c. node_keypair. pubkey( ) )
588
+ ) ;
589
+ assert_eq ! (
590
+ Some ( 4 ) ,
591
+ latest_unprocessed_votes. get_latest_vote_slot( keypair_d. node_keypair. pubkey( ) )
592
+ ) ;
593
+ }
594
+ }
0 commit comments