53
53
} ,
54
54
} ;
55
55
56
- /// Limit to 500K PPS
57
- const MAX_STREAMS_PER_100MS : u64 = 500_000 / 10 ;
58
56
const MAX_UNSTAKED_STREAMS_PERCENT : u64 = 20 ;
57
+ const STREAM_THROTTLING_INTERVAL_MS : u64 = 100 ;
59
58
const STREAM_THROTTLING_INTERVAL : Duration = Duration :: from_millis ( 100 ) ;
60
59
const WAIT_FOR_STREAM_TIMEOUT : Duration = Duration :: from_millis ( 100 ) ;
61
60
pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT : Duration = Duration :: from_secs ( 10 ) ;
@@ -75,6 +74,9 @@ const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
75
74
const CONNECTION_CLOSE_REASON_TOO_MANY : & [ u8 ] = b"too_many" ;
76
75
const STREAM_STOP_CODE_THROTTLING : u32 = 15 ;
77
76
77
+ /// Limit to 500K PPS
78
+ pub const DEFAULT_MAX_STREAMS_PER_MS : u64 = 500 ;
79
+
78
80
// A sequence of bytes that is part of a packet
79
81
// along with where in the packet it is
80
82
struct PacketChunk {
@@ -111,6 +113,7 @@ pub fn spawn_server(
111
113
staked_nodes : Arc < RwLock < StakedNodes > > ,
112
114
max_staked_connections : usize ,
113
115
max_unstaked_connections : usize ,
116
+ max_streams_per_ms : u64 ,
114
117
wait_for_chunk_timeout : Duration ,
115
118
coalesce : Duration ,
116
119
) -> Result < ( Endpoint , Arc < StreamStats > , JoinHandle < ( ) > ) , QuicServerError > {
@@ -136,6 +139,7 @@ pub fn spawn_server(
136
139
staked_nodes,
137
140
max_staked_connections,
138
141
max_unstaked_connections,
142
+ max_streams_per_ms,
139
143
stats. clone ( ) ,
140
144
wait_for_chunk_timeout,
141
145
coalesce,
@@ -153,6 +157,7 @@ async fn run_server(
153
157
staked_nodes : Arc < RwLock < StakedNodes > > ,
154
158
max_staked_connections : usize ,
155
159
max_unstaked_connections : usize ,
160
+ max_streams_per_ms : u64 ,
156
161
stats : Arc < StreamStats > ,
157
162
wait_for_chunk_timeout : Duration ,
158
163
coalesce : Duration ,
@@ -192,6 +197,7 @@ async fn run_server(
192
197
staked_nodes. clone ( ) ,
193
198
max_staked_connections,
194
199
max_unstaked_connections,
200
+ max_streams_per_ms,
195
201
stats. clone ( ) ,
196
202
wait_for_chunk_timeout,
197
203
) ) ;
@@ -328,6 +334,7 @@ fn handle_and_cache_new_connection(
328
334
params : & NewConnectionHandlerParams ,
329
335
wait_for_chunk_timeout : Duration ,
330
336
max_unstaked_connections : usize ,
337
+ max_streams_per_ms : u64 ,
331
338
) -> Result < ( ) , ConnectionHandlerError > {
332
339
if let Ok ( max_uni_streams) = VarInt :: from_u64 ( compute_max_allowed_uni_streams (
333
340
connection_table_l. peer_type ,
@@ -379,6 +386,7 @@ fn handle_and_cache_new_connection(
379
386
peer_type,
380
387
wait_for_chunk_timeout,
381
388
max_unstaked_connections,
389
+ max_streams_per_ms,
382
390
) ) ;
383
391
Ok ( ( ) )
384
392
} else {
@@ -407,6 +415,7 @@ async fn prune_unstaked_connections_and_add_new_connection(
407
415
max_connections : usize ,
408
416
params : & NewConnectionHandlerParams ,
409
417
wait_for_chunk_timeout : Duration ,
418
+ max_streams_per_ms : u64 ,
410
419
) -> Result < ( ) , ConnectionHandlerError > {
411
420
let stats = params. stats . clone ( ) ;
412
421
if max_connections > 0 {
@@ -420,6 +429,7 @@ async fn prune_unstaked_connections_and_add_new_connection(
420
429
params,
421
430
wait_for_chunk_timeout,
422
431
max_connections,
432
+ max_streams_per_ms,
423
433
)
424
434
} else {
425
435
connection. close (
@@ -484,6 +494,7 @@ async fn setup_connection(
484
494
staked_nodes : Arc < RwLock < StakedNodes > > ,
485
495
max_staked_connections : usize ,
486
496
max_unstaked_connections : usize ,
497
+ max_streams_per_ms : u64 ,
487
498
stats : Arc < StreamStats > ,
488
499
wait_for_chunk_timeout : Duration ,
489
500
) {
@@ -502,8 +513,9 @@ async fn setup_connection(
502
513
) ,
503
514
|( pubkey, stake, total_stake, max_stake, min_stake) | {
504
515
// The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle
505
- // interval during which we allow max MAX_STREAMS_PER_100MS streams.
506
- let min_stake_ratio = 1_f64 / MAX_STREAMS_PER_100MS as f64 ;
516
+ // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams.
517
+ let min_stake_ratio =
518
+ 1_f64 / ( max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS ) as f64 ;
507
519
let stake_ratio = stake as f64 / total_stake as f64 ;
508
520
let stake = if stake_ratio < min_stake_ratio {
509
521
// If it is a staked connection with ultra low stake ratio, treat it as unstaked.
@@ -541,6 +553,7 @@ async fn setup_connection(
541
553
& params,
542
554
wait_for_chunk_timeout,
543
555
max_unstaked_connections,
556
+ max_streams_per_ms,
544
557
) {
545
558
stats
546
559
. connection_added_from_staked_peer
@@ -556,6 +569,7 @@ async fn setup_connection(
556
569
max_unstaked_connections,
557
570
& params,
558
571
wait_for_chunk_timeout,
572
+ max_streams_per_ms,
559
573
)
560
574
. await
561
575
{
@@ -577,6 +591,7 @@ async fn setup_connection(
577
591
max_unstaked_connections,
578
592
& params,
579
593
wait_for_chunk_timeout,
594
+ max_streams_per_ms,
580
595
)
581
596
. await
582
597
{
@@ -725,12 +740,14 @@ fn max_streams_for_connection_in_100ms(
725
740
stake : u64 ,
726
741
total_stake : u64 ,
727
742
max_unstaked_connections : usize ,
743
+ max_streams_per_ms : u64 ,
728
744
) -> u64 {
745
+ let max_streams_per_interval = max_streams_per_ms. saturating_mul ( STREAM_THROTTLING_INTERVAL_MS ) ;
729
746
let max_unstaked_streams_per_100ms = if max_unstaked_connections == 0 {
730
747
0
731
748
} else {
732
749
Percentage :: from ( MAX_UNSTAKED_STREAMS_PERCENT )
733
- . apply_to ( MAX_STREAMS_PER_100MS )
750
+ . apply_to ( max_streams_per_interval )
734
751
. saturating_div ( MAX_UNSTAKED_CONNECTIONS as u64 )
735
752
} ;
736
753
@@ -744,8 +761,8 @@ fn max_streams_for_connection_in_100ms(
744
761
if matches ! ( connection_type, ConnectionPeerType :: Unstaked ) || stake == 0 {
745
762
max_unstaked_streams_per_100ms
746
763
} else {
747
- let max_total_staked_streams: u64 = MAX_STREAMS_PER_100MS
748
- - Percentage :: from ( MAX_UNSTAKED_STREAMS_PERCENT ) . apply_to ( MAX_STREAMS_PER_100MS ) ;
764
+ let max_total_staked_streams: u64 = max_streams_per_interval
765
+ - Percentage :: from ( MAX_UNSTAKED_STREAMS_PERCENT ) . apply_to ( max_streams_per_interval ) ;
749
766
std:: cmp:: max (
750
767
min_staked_streams_per_100ms,
751
768
( ( max_total_staked_streams as f64 / total_stake as f64 ) * stake as f64 ) as u64 ,
@@ -762,6 +779,7 @@ fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) ->
762
779
}
763
780
}
764
781
782
+ #[ allow( clippy:: too_many_arguments) ]
765
783
async fn handle_connection (
766
784
connection : Connection ,
767
785
remote_addr : SocketAddr ,
@@ -772,6 +790,7 @@ async fn handle_connection(
772
790
peer_type : ConnectionPeerType ,
773
791
wait_for_chunk_timeout : Duration ,
774
792
max_unstaked_connections : usize ,
793
+ max_streams_per_ms : u64 ,
775
794
) {
776
795
let stats = params. stats ;
777
796
debug ! (
@@ -787,6 +806,7 @@ async fn handle_connection(
787
806
params. stake ,
788
807
params. total_stake ,
789
808
max_unstaked_connections,
809
+ max_streams_per_ms,
790
810
) ;
791
811
let mut last_throttling_instant = tokio:: time:: Instant :: now ( ) ;
792
812
let mut streams_in_current_interval = 0 ;
@@ -1307,6 +1327,7 @@ pub mod test {
1307
1327
staked_nodes,
1308
1328
MAX_STAKED_CONNECTIONS ,
1309
1329
MAX_UNSTAKED_CONNECTIONS ,
1330
+ DEFAULT_MAX_STREAMS_PER_MS ,
1310
1331
Duration :: from_secs ( 2 ) ,
1311
1332
DEFAULT_TPU_COALESCE ,
1312
1333
)
@@ -1743,6 +1764,7 @@ pub mod test {
1743
1764
staked_nodes,
1744
1765
MAX_STAKED_CONNECTIONS ,
1745
1766
0 , // Do not allow any connection from unstaked clients/nodes
1767
+ DEFAULT_MAX_STREAMS_PER_MS ,
1746
1768
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT ,
1747
1769
DEFAULT_TPU_COALESCE ,
1748
1770
)
@@ -1774,6 +1796,7 @@ pub mod test {
1774
1796
staked_nodes,
1775
1797
MAX_STAKED_CONNECTIONS ,
1776
1798
MAX_UNSTAKED_CONNECTIONS ,
1799
+ DEFAULT_MAX_STREAMS_PER_MS ,
1777
1800
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT ,
1778
1801
DEFAULT_TPU_COALESCE ,
1779
1802
)
@@ -2120,7 +2143,8 @@ pub mod test {
2120
2143
ConnectionPeerType :: Unstaked ,
2121
2144
0 ,
2122
2145
10000 ,
2123
- MAX_UNSTAKED_CONNECTIONS
2146
+ MAX_UNSTAKED_CONNECTIONS ,
2147
+ DEFAULT_MAX_STREAMS_PER_MS ,
2124
2148
) ,
2125
2149
20
2126
2150
) ;
@@ -2131,7 +2155,8 @@ pub mod test {
2131
2155
ConnectionPeerType :: Unstaked ,
2132
2156
10 ,
2133
2157
10000 ,
2134
- MAX_UNSTAKED_CONNECTIONS
2158
+ MAX_UNSTAKED_CONNECTIONS ,
2159
+ DEFAULT_MAX_STREAMS_PER_MS ,
2135
2160
) ,
2136
2161
20
2137
2162
) ;
@@ -2143,7 +2168,8 @@ pub mod test {
2143
2168
ConnectionPeerType :: Staked ,
2144
2169
0 ,
2145
2170
10000 ,
2146
- MAX_UNSTAKED_CONNECTIONS
2171
+ MAX_UNSTAKED_CONNECTIONS ,
2172
+ DEFAULT_MAX_STREAMS_PER_MS ,
2147
2173
) ,
2148
2174
20
2149
2175
) ;
@@ -2155,7 +2181,8 @@ pub mod test {
2155
2181
ConnectionPeerType :: Staked ,
2156
2182
15 ,
2157
2183
10000 ,
2158
- MAX_UNSTAKED_CONNECTIONS
2184
+ MAX_UNSTAKED_CONNECTIONS ,
2185
+ DEFAULT_MAX_STREAMS_PER_MS
2159
2186
) ,
2160
2187
60
2161
2188
) ;
@@ -2167,7 +2194,8 @@ pub mod test {
2167
2194
ConnectionPeerType :: Staked ,
2168
2195
1000 ,
2169
2196
10000 ,
2170
- MAX_UNSTAKED_CONNECTIONS
2197
+ MAX_UNSTAKED_CONNECTIONS ,
2198
+ DEFAULT_MAX_STREAMS_PER_MS
2171
2199
) ,
2172
2200
4000
2173
2201
) ;
@@ -2179,7 +2207,8 @@ pub mod test {
2179
2207
ConnectionPeerType :: Staked ,
2180
2208
1 ,
2181
2209
50000 ,
2182
- MAX_UNSTAKED_CONNECTIONS
2210
+ MAX_UNSTAKED_CONNECTIONS ,
2211
+ DEFAULT_MAX_STREAMS_PER_MS
2183
2212
) ,
2184
2213
21
2185
2214
) ;
0 commit comments