Skip to content

Commit fd55010

Browse files
carlaKCelnosh
authored andcommitted
sim-lib: add interceptor that introduces latency to sim-node
1 parent 0c14939 commit fd55010

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed

simln-lib/src/latency_interceptor.rs

+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
use crate::sim_node::{
2+
CriticalError, CustomRecords, ForwardingError, InterceptRequest, Interceptor,
3+
};
4+
use crate::SimulationError;
5+
use async_trait::async_trait;
6+
use rand_distr::{Distribution, Poisson};
7+
use std::time::Duration;
8+
use tokio::{select, time};
9+
10+
/// LatencyIntercepor is a HTLC interceptor that will delay HTLC forwarding by some randomly chosen delay.
11+
pub struct LatencyIntercepor<D>
12+
where
13+
D: Distribution<f32> + Send + Sync,
14+
{
15+
latency_dist: D,
16+
}
17+
18+
impl LatencyIntercepor<Poisson<f32>> {
19+
pub fn new_poisson(lambda_ms: f32) -> Result<Self, SimulationError> {
20+
let poisson_dist = Poisson::new(lambda_ms).map_err(|e| {
21+
SimulationError::SimulatedNetworkError(format!("Could not create possion: {e}"))
22+
})?;
23+
24+
Ok(Self {
25+
latency_dist: poisson_dist,
26+
})
27+
}
28+
}
29+
30+
#[async_trait]
31+
impl<D> Interceptor for LatencyIntercepor<D>
32+
where
33+
D: Distribution<f32> + Send + Sync,
34+
{
35+
/// Introduces a random sleep time on the HTLC.
36+
async fn intercept_htlc(
37+
&self,
38+
req: InterceptRequest,
39+
) -> Result<Result<CustomRecords, ForwardingError>, CriticalError> {
40+
let latency = self.latency_dist.sample(&mut rand::thread_rng());
41+
42+
select! {
43+
_ = req.shutdown_listener => log::debug!("Latency interceptor exiting due to shutdown signal received."),
44+
_ = time::sleep(Duration::from_millis(latency as u64)) => {}
45+
}
46+
Ok(Ok(CustomRecords::default()))
47+
}
48+
49+
fn name(&self) -> String {
50+
"Latency Interceptor".to_string()
51+
}
52+
}
53+
54+
#[cfg(test)]
55+
mod tests {
56+
use std::time::Duration;
57+
58+
use super::{Interceptor, LatencyIntercepor};
59+
use crate::sim_node::{CustomRecords, HtlcRef, InterceptRequest};
60+
use crate::test_utils::get_random_keypair;
61+
use crate::ShortChannelID;
62+
use lightning::ln::PaymentHash;
63+
use ntest::assert_true;
64+
use rand::distributions::Distribution;
65+
use rand::Rng;
66+
use tokio::time::timeout;
67+
use triggered::Trigger;
68+
69+
/// Always returns the same value, useful for testing.
70+
struct ConstantDistribution {
71+
value: f32,
72+
}
73+
74+
impl Distribution<f32> for ConstantDistribution {
75+
fn sample<R: Rng + ?Sized>(&self, _rng: &mut R) -> f32 {
76+
self.value
77+
}
78+
}
79+
80+
fn test_request() -> (InterceptRequest, Trigger) {
81+
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
82+
83+
let (_, pk) = get_random_keypair();
84+
let request = InterceptRequest {
85+
forwarding_node: pk,
86+
payment_hash: PaymentHash([0; 32]),
87+
incoming_htlc: HtlcRef {
88+
channel_id: ShortChannelID::from(123),
89+
index: 1,
90+
},
91+
incoming_custom_records: CustomRecords::default(),
92+
outgoing_channel_id: None,
93+
incoming_amount_msat: 100,
94+
outgoing_amount_msat: 50,
95+
incoming_expiry_height: 120,
96+
outgoing_expiry_height: 100,
97+
shutdown_listener,
98+
};
99+
100+
(request, shutdown_trigger)
101+
}
102+
103+
/// Tests that the interceptor exits immediately if a shutdown signal is received.
104+
#[tokio::test]
105+
async fn test_shutdown_signal() {
106+
// Set fixed dist to a high value so that the test won't flake.
107+
let latency_dist = ConstantDistribution { value: 1000.0 };
108+
let interceptor = LatencyIntercepor { latency_dist };
109+
110+
let (request, trigger) = test_request();
111+
trigger.trigger();
112+
113+
assert_true!(timeout(Duration::from_secs(10), async {
114+
interceptor.intercept_htlc(request).await
115+
})
116+
.await
117+
.is_ok());
118+
}
119+
120+
/// Tests the happy case where we wait for our latency and then return a result.
121+
#[tokio::test]
122+
async fn test_latency_response() {
123+
let latency_dist = ConstantDistribution { value: 0.0 };
124+
let interceptor = LatencyIntercepor { latency_dist };
125+
126+
let (request, _) = test_request();
127+
// We should return immediately because timeout is zero.
128+
assert_true!(timeout(Duration::from_secs(1), async {
129+
interceptor.intercept_htlc(request).await
130+
})
131+
.await
132+
.is_ok());
133+
}
134+
}

simln-lib/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod cln;
3333
pub mod clock;
3434
mod defined_activity;
3535
pub mod eclair;
36+
pub mod latency_interceptor;
3637
pub mod lnd;
3738
mod random_activity;
3839
pub mod serializers;

0 commit comments

Comments
 (0)