40
40
gossip_error:: GossipError ,
41
41
ping_pong:: { self , PingCache , Pong } ,
42
42
restart_crds_values:: { RestartLastVotedForkSlots , RestartLastVotedForkSlotsError } ,
43
- socketaddr, socketaddr_any,
44
43
weighted_shuffle:: WeightedShuffle ,
45
44
} ,
46
45
bincode:: { serialize, serialized_size} ,
@@ -127,7 +126,7 @@ pub const MAX_INCREMENTAL_SNAPSHOT_HASHES: usize = 25;
127
126
const MAX_PRUNE_DATA_NODES : usize = 32 ;
128
127
/// Number of bytes in the randomly generated token sent with ping messages.
129
128
const GOSSIP_PING_TOKEN_SIZE : usize = 32 ;
130
- const GOSSIP_PING_CACHE_CAPACITY : usize = 65536 ;
129
+ const GOSSIP_PING_CACHE_CAPACITY : usize = 126976 ;
131
130
const GOSSIP_PING_CACHE_TTL : Duration = Duration :: from_secs ( 1280 ) ;
132
131
const GOSSIP_PING_CACHE_RATE_LIMIT_DELAY : Duration = Duration :: from_secs ( 1280 / 64 ) ;
133
132
pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS : u64 = 10_000 ;
@@ -2422,6 +2421,20 @@ impl ClusterInfo {
2422
2421
}
2423
2422
Ok ( ( ) )
2424
2423
} ;
2424
+ let mut pings = Vec :: new ( ) ;
2425
+ let mut rng = rand:: thread_rng ( ) ;
2426
+ let keypair: Arc < Keypair > = self . keypair ( ) . clone ( ) ;
2427
+ let mut verify_gossip_addr = |value : & CrdsValue | {
2428
+ verify_gossip_addr (
2429
+ & mut rng,
2430
+ & keypair,
2431
+ value,
2432
+ stakes,
2433
+ & self . socket_addr_space ,
2434
+ & self . ping_cache ,
2435
+ & mut pings,
2436
+ )
2437
+ } ;
2425
2438
// Split packets based on their types.
2426
2439
let mut pull_requests = vec ! [ ] ;
2427
2440
let mut pull_responses = vec ! [ ] ;
@@ -2432,15 +2445,23 @@ impl ClusterInfo {
2432
2445
for ( from_addr, packet) in packets {
2433
2446
match packet {
2434
2447
Protocol :: PullRequest ( filter, caller) => {
2435
- pull_requests. push ( ( from_addr, filter, caller) )
2448
+ if verify_gossip_addr ( & caller) {
2449
+ pull_requests. push ( ( from_addr, filter, caller) )
2450
+ }
2436
2451
}
2437
2452
Protocol :: PullResponse ( _, mut data) => {
2438
2453
check_duplicate_instance ( & data) ?;
2439
- pull_responses. append ( & mut data) ;
2454
+ data. retain ( & mut verify_gossip_addr) ;
2455
+ if !data. is_empty ( ) {
2456
+ pull_responses. append ( & mut data) ;
2457
+ }
2440
2458
}
2441
- Protocol :: PushMessage ( from, data) => {
2459
+ Protocol :: PushMessage ( from, mut data) => {
2442
2460
check_duplicate_instance ( & data) ?;
2443
- push_messages. push ( ( from, data) ) ;
2461
+ data. retain ( & mut verify_gossip_addr) ;
2462
+ if !data. is_empty ( ) {
2463
+ push_messages. push ( ( from, data) ) ;
2464
+ }
2444
2465
}
2445
2466
Protocol :: PruneMessage ( _from, data) => prune_messages. push ( data) ,
2446
2467
Protocol :: PingMessage ( ping) => ping_messages. push ( ( from_addr, ping) ) ,
@@ -2454,6 +2475,17 @@ impl ClusterInfo {
2454
2475
}
2455
2476
push_messages. retain ( |( _, data) | !data. is_empty ( ) ) ;
2456
2477
}
2478
+ if !pings. is_empty ( ) {
2479
+ self . stats
2480
+ . packets_sent_gossip_requests_count
2481
+ . add_relaxed ( pings. len ( ) as u64 ) ;
2482
+ let packet_batch = PacketBatch :: new_unpinned_with_recycler_data_and_dests (
2483
+ recycler,
2484
+ "ping_contact_infos" ,
2485
+ & pings,
2486
+ ) ;
2487
+ let _ = response_sender. send ( packet_batch) ;
2488
+ }
2457
2489
self . handle_batch_ping_messages ( ping_messages, recycler, response_sender) ;
2458
2490
self . handle_batch_prune_messages ( prune_messages, stakes) ;
2459
2491
self . handle_batch_push_messages (
@@ -2722,9 +2754,12 @@ impl ClusterInfo {
2722
2754
shred_version : u16 ,
2723
2755
) -> ( ContactInfo , UdpSocket , Option < TcpListener > ) {
2724
2756
let bind_ip_addr = IpAddr :: V4 ( Ipv4Addr :: UNSPECIFIED ) ;
2725
- let ( _, gossip_socket) = bind_in_range ( bind_ip_addr, VALIDATOR_PORT_RANGE ) . unwrap ( ) ;
2726
- let contact_info = Self :: gossip_contact_info ( id, socketaddr_any ! ( ) , shred_version) ;
2727
-
2757
+ let ( port, gossip_socket) = bind_in_range ( bind_ip_addr, VALIDATOR_PORT_RANGE ) . unwrap ( ) ;
2758
+ let contact_info = Self :: gossip_contact_info (
2759
+ id,
2760
+ SocketAddr :: new ( IpAddr :: V4 ( Ipv4Addr :: LOCALHOST ) , port) ,
2761
+ shred_version,
2762
+ ) ;
2728
2763
( contact_info, gossip_socket, None )
2729
2764
}
2730
2765
}
@@ -3145,6 +3180,44 @@ fn filter_on_shred_version(
3145
3180
}
3146
3181
}
3147
3182
3183
+ // If the CRDS value is an unstaked contact-info, verifies if
3184
+ // it has responded to ping on its gossip socket address.
3185
+ // Returns false if the CRDS value should be discarded.
3186
+ #[ must_use]
3187
+ fn verify_gossip_addr < R : Rng + CryptoRng > (
3188
+ rng : & mut R ,
3189
+ keypair : & Keypair ,
3190
+ value : & CrdsValue ,
3191
+ stakes : & HashMap < Pubkey , u64 > ,
3192
+ socket_addr_space : & SocketAddrSpace ,
3193
+ ping_cache : & Mutex < PingCache > ,
3194
+ pings : & mut Vec < ( SocketAddr , Protocol /* ::PingMessage */ ) > ,
3195
+ ) -> bool {
3196
+ let ( pubkey, addr) = match & value. data {
3197
+ CrdsData :: ContactInfo ( node) => ( node. pubkey ( ) , node. gossip ( ) ) ,
3198
+ CrdsData :: LegacyContactInfo ( node) => ( node. pubkey ( ) , node. gossip ( ) ) ,
3199
+ _ => return true , // If not a contact-info, nothing to verify.
3200
+ } ;
3201
+ // For (sufficiently) staked nodes, don't bother with ping/pong.
3202
+ if stakes. get ( pubkey) >= Some ( & MIN_STAKE_FOR_GOSSIP ) {
3203
+ return true ;
3204
+ }
3205
+ // Invalid addresses are not verifiable.
3206
+ let Some ( addr) = addr. ok ( ) . filter ( |addr| socket_addr_space. check ( addr) ) else {
3207
+ return false ;
3208
+ } ;
3209
+ let ( out, ping) = {
3210
+ let node = ( * pubkey, addr) ;
3211
+ let mut pingf = move || Ping :: new_rand ( rng, keypair) . ok ( ) ;
3212
+ let mut ping_cache = ping_cache. lock ( ) . unwrap ( ) ;
3213
+ ping_cache. check ( Instant :: now ( ) , node, & mut pingf)
3214
+ } ;
3215
+ if let Some ( ping) = ping {
3216
+ pings. push ( ( addr, Protocol :: PingMessage ( ping) ) ) ;
3217
+ }
3218
+ out
3219
+ }
3220
+
3148
3221
#[ cfg( test) ]
3149
3222
mod tests {
3150
3223
use {
@@ -3153,6 +3226,7 @@ mod tests {
3153
3226
crds_gossip_pull:: tests:: MIN_NUM_BLOOM_FILTERS ,
3154
3227
crds_value:: { CrdsValue , CrdsValueLabel , Vote as CrdsVote } ,
3155
3228
duplicate_shred:: { self , tests:: new_rand_shred, MAX_DUPLICATE_SHREDS } ,
3229
+ socketaddr,
3156
3230
} ,
3157
3231
itertools:: izip,
3158
3232
solana_ledger:: shred:: Shredder ,
0 commit comments