Skip to content

Commit a6e63a8

Browse files
committed
wip: some congestion trickery
1 parent a89e698 commit a6e63a8

File tree

5 files changed

+114
-9
lines changed

5 files changed

+114
-9
lines changed

iroh/src/magicsock.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,19 @@ const ENDPOINTS_FRESH_ENOUGH_DURATION: Duration = Duration::from_secs(27);
9595

9696
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
9797

98+
/// Jitter range for heartbeat intervals (±25% of base interval).
99+
/// This prevents synchronized heartbeat storms across many nodes.
100+
const HEARTBEAT_JITTER_PCT: f64 = 0.25;
101+
102+
/// Create a jittered interval to prevent synchronized heartbeat storms.
103+
fn jittered_interval(base: Duration) -> time::Interval {
104+
let jitter_range = base.as_secs_f64() * HEARTBEAT_JITTER_PCT;
105+
let jitter = rand::thread_rng().gen_range(-jitter_range..=jitter_range);
106+
let jittered = base.as_secs_f64() + jitter;
107+
let duration = Duration::from_secs_f64(jittered.max(0.1));
108+
time::interval(duration)
109+
}
110+
98111
/// Contains options for `MagicSock::listen`.
99112
#[derive(derive_more::Debug)]
100113
pub(crate) struct Options {
@@ -1850,7 +1863,7 @@ impl Actor {
18501863
let mut current_netmon_state = self.netmon_watcher.get();
18511864

18521865
#[cfg(not(wasm_browser))]
1853-
let mut direct_addr_heartbeat_timer = time::interval(HEARTBEAT_INTERVAL);
1866+
let mut direct_addr_heartbeat_timer = jittered_interval(HEARTBEAT_INTERVAL);
18541867

18551868
#[cfg(not(wasm_browser))]
18561869
let mut portmap_watcher = self

iroh/src/magicsock/node_map/node_state.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,11 +460,21 @@ impl NodeState {
460460
// which we should have received the pong, clear best addr and
461461
// pong. Both are used to select this path again, but we know
462462
// it's not a usable path now.
463-
path_state.validity = PathValidity::empty();
463+
// Record ping failure and only clear validity after threshold
464+
path_state.validity.record_ping_failure();
464465
metrics.path_ping_failures.inc();
465466

466467
path_state.validity.record_metrics(metrics);
467-
metrics.path_marked_outdated.inc();
468+
469+
if path_state.validity.should_mark_outdated() {
470+
debug!(
471+
"path {} marked outdated after {} consecutive failures",
472+
addr,
473+
path_state.validity.consecutive_failures()
474+
);
475+
metrics.path_marked_outdated.inc();
476+
path_state.validity = PathValidity::empty();
477+
}
468478
}
469479
}
470480
}

iroh/src/magicsock/node_map/path_state.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,14 @@ impl PathState {
125125
}
126126
}
127127

128+
let had_failures = self.validity.consecutive_failures() > 0;
129+
128130
self.validity.update_pong(r.pong_at, r.latency);
131+
self.validity.reset_failures();
132+
133+
if had_failures {
134+
metrics.path_failure_resets.inc();
135+
}
129136

130137
self.validity.record_metrics(metrics);
131138
}
@@ -134,17 +141,20 @@ impl PathState {
134141
self.last_payload_msg = Some(now);
135142
self.validity
136143
.receive_payload(now, path_validity::Source::QuicPayload);
144+
self.validity.reset_failures();
137145
}
138146

139147
#[cfg(test)]
140148
pub(super) fn with_pong_reply(node_id: NodeId, r: PongReply) -> Self {
149+
let mut validity = PathValidity::new(r.pong_at, r.latency);
150+
validity.reset_failures();
141151
PathState {
142152
node_id,
143153
path: r.from.clone(),
144154
last_ping: None,
145155
last_got_ping: None,
146156
call_me_maybe_time: None,
147-
validity: PathValidity::new(r.pong_at, r.latency),
157+
validity,
148158
last_payload_msg: None,
149159
sources: HashMap::new(),
150160
}

iroh/src/magicsock/node_map/path_validity.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,14 @@ use crate::magicsock::Metrics as MagicsockMetrics;
99
/// currently trusted.
1010
///
1111
/// If trust goes away, it can be brought back with another valid DISCO UDP pong.
12-
const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500);
12+
///
13+
/// Increased from 6.5s to 12s to be more resilient under congestion.
14+
const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_secs(12);
15+
16+
/// Number of consecutive ping failures required before marking a path as outdated.
17+
///
18+
/// This implements a tolerance to prevent temporary packet loss from causing path degradation.
19+
const PING_FAILURE_THRESHOLD: u8 = 3;
1320

1421
/// Tracks a path's validity.
1522
///
@@ -27,6 +34,7 @@ struct Inner {
2734
latest_pong: Instant,
2835
latency: Duration,
2936
trust_until: Instant,
37+
consecutive_failures: u8,
3038
congestion_metrics: CongestionMetrics,
3139
}
3240

@@ -150,6 +158,7 @@ impl PathValidity {
150158
trust_until: pong_at + Source::ReceivedPong.trust_duration(),
151159
latest_pong: pong_at,
152160
latency,
161+
consecutive_failures: 0,
153162
congestion_metrics: metrics,
154163
}))
155164
}
@@ -161,6 +170,7 @@ impl PathValidity {
161170
inner.trust_until = pong_at + Source::ReceivedPong.trust_duration();
162171
inner.latest_pong = pong_at;
163172
inner.latency = latency;
173+
inner.consecutive_failures = 0;
164174
inner.congestion_metrics.add_latency_sample(latency);
165175
}
166176
None => {
@@ -226,6 +236,31 @@ impl PathValidity {
226236
Some(self.0.as_ref()?.latest_pong)
227237
}
228238

239+
/// Record a ping failure (timeout or no response).
240+
///
241+
/// Only marks the path as outdated after PING_FAILURE_THRESHOLD consecutive failures.
242+
pub(super) fn record_ping_failure(&mut self) {
243+
let Some(state) = self.0.as_mut() else {
244+
return;
245+
};
246+
state.consecutive_failures = state.consecutive_failures.saturating_add(1);
247+
}
248+
249+
/// Check if path should be considered outdated based on consecutive failures.
250+
pub(super) fn should_mark_outdated(&self) -> bool {
251+
self.0
252+
.as_ref()
253+
.map(|state| state.consecutive_failures >= PING_FAILURE_THRESHOLD)
254+
.unwrap_or(false)
255+
}
256+
257+
/// Reset consecutive failure counter (called when we receive activity).
258+
pub(super) fn reset_failures(&mut self) {
259+
if let Some(state) = self.0.as_mut() {
260+
state.consecutive_failures = 0;
261+
}
262+
}
263+
229264
/// Record that a ping was sent on this path.
230265
pub(super) fn record_ping_sent(&mut self) {
231266
if let Some(state) = self.0.as_mut() {
@@ -267,6 +302,14 @@ impl PathValidity {
267302
.and_then(|state| state.congestion_metrics.avg_latency())
268303
}
269304

305+
/// Get the number of consecutive failures.
306+
pub(super) fn consecutive_failures(&self) -> u8 {
307+
self.0
308+
.as_ref()
309+
.map(|state| state.consecutive_failures)
310+
.unwrap_or(0)
311+
}
312+
270313
/// Record congestion metrics to the metrics system.
271314
/// Should be called periodically or on significant events.
272315
pub(super) fn record_metrics(&self, metrics: &MagicsockMetrics) {
@@ -302,7 +345,7 @@ impl Inner {
302345
mod tests {
303346
use n0_future::time::{Duration, Instant};
304347

305-
use super::{PathValidity, Source, TRUST_UDP_ADDR_DURATION};
348+
use super::{PING_FAILURE_THRESHOLD, PathValidity, Source, TRUST_UDP_ADDR_DURATION};
306349

307350
#[tokio::test(start_paused = true)]
308351
async fn test_basic_path_validity_lifetime() {
@@ -330,6 +373,32 @@ mod tests {
330373
assert!(!validity.is_valid(Instant::now()));
331374
assert!(validity.is_outdated(Instant::now()));
332375
}
376+
377+
#[tokio::test]
378+
async fn test_multiple_ping_failures() {
379+
let mut validity = PathValidity::new(Instant::now(), Duration::from_millis(20));
380+
381+
// First failure should not mark as outdated
382+
validity.record_ping_failure();
383+
assert!(!validity.should_mark_outdated());
384+
assert_eq!(validity.consecutive_failures(), 1);
385+
386+
// Second failure should not mark as outdated
387+
validity.record_ping_failure();
388+
assert!(!validity.should_mark_outdated());
389+
assert_eq!(validity.consecutive_failures(), 2);
390+
391+
// Third failure should mark as outdated (threshold = 3)
392+
validity.record_ping_failure();
393+
assert!(validity.should_mark_outdated());
394+
assert_eq!(validity.consecutive_failures(), PING_FAILURE_THRESHOLD);
395+
396+
// Receiving pong should reset failures
397+
validity.update_pong(Instant::now(), Duration::from_millis(20));
398+
assert_eq!(validity.consecutive_failures(), 0);
399+
assert!(!validity.should_mark_outdated());
400+
}
401+
333402
#[tokio::test]
334403
async fn test_congestion_metrics() {
335404
let mut validity = PathValidity::new(Instant::now(), Duration::from_millis(10));

iroh/src/magicsock/transports/relay.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,14 @@ pub(crate) struct RelayTransport {
3939

4040
impl RelayTransport {
4141
pub(crate) fn new(config: RelayActorConfig) -> Self {
42-
let (relay_datagram_send_tx, relay_datagram_send_rx) = mpsc::channel(256);
42+
// Increased from 256 to 1024 to better handle congestion and burst traffic
43+
let (relay_datagram_send_tx, relay_datagram_send_rx) = mpsc::channel(1024);
4344

44-
let (relay_datagram_recv_tx, relay_datagram_recv_rx) = mpsc::channel(512);
45+
// Increased from 512 to 2048 to reduce drops under load
46+
let (relay_datagram_recv_tx, relay_datagram_recv_rx) = mpsc::channel(2048);
4547

46-
let (actor_sender, actor_receiver) = mpsc::channel(256);
48+
// Increased from 256 to 512 for actor control messages
49+
let (actor_sender, actor_receiver) = mpsc::channel(512);
4750

4851
let my_node_id = config.secret_key.public();
4952
let my_relay = config.my_relay.clone();

0 commit comments

Comments
 (0)