Skip to content

simln-lib: add htlc interceptor for simulated nodes #261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion sim-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::Arc;

use clap::Parser;
use log::LevelFilter;
use sim_cli::parsing::{create_simulation, create_simulation_with_network, parse_sim_params, Cli};
use simln_lib::{latency_interceptor::LatencyIntercepor, sim_node::Interceptor};
use simple_logger::SimpleLogger;
use tokio_util::task::TaskTracker;

Expand Down Expand Up @@ -29,7 +32,13 @@ async fn main() -> anyhow::Result<()> {
let (sim, validated_activities) = if sim_params.sim_network.is_empty() {
create_simulation(&cli, &sim_params, tasks.clone()).await?
} else {
create_simulation_with_network(&cli, &sim_params, tasks.clone()).await?
let latency = cli.latency_ms.unwrap_or(0);
let interceptors = if latency > 0 {
vec![Arc::new(LatencyIntercepor::new_poisson(latency as f32)?) as Arc<dyn Interceptor>]
} else {
vec![]
};
create_simulation_with_network(&cli, &sim_params, tasks.clone(), interceptors).await?
};
let sim2 = sim.clone();

Expand Down
23 changes: 20 additions & 3 deletions sim-cli/src/parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use log::LevelFilter;
use serde::{Deserialize, Serialize};
use simln_lib::clock::SimulationClock;
use simln_lib::sim_node::{
ln_node_from_graph, populate_network_graph, ChannelPolicy, SimGraph, SimulatedChannel,
ln_node_from_graph, populate_network_graph, ChannelPolicy, Interceptor, SimGraph,
SimulatedChannel,
};
use simln_lib::{
cln, cln::ClnNode, eclair, eclair::EclairNode, lnd, lnd::LndNode, serializers,
Expand Down Expand Up @@ -87,6 +88,10 @@ pub struct Cli {
/// simulated nodes.
#[clap(long)]
pub speedup_clock: Option<u16>,
/// Latency to optionally introduce for payments in a simulated network expressed in
/// milliseconds.
#[clap(long)]
pub latency_ms: Option<u32>,
}

impl Cli {
Expand All @@ -112,6 +117,12 @@ impl Cli {
));
}

if !sim_params.nodes.is_empty() && self.latency_ms.is_some() {
return Err(anyhow!(
"Latency for payments is only allowed when running on a simulated network"
));
}

Ok(())
}
}
Expand Down Expand Up @@ -217,6 +228,7 @@ pub async fn create_simulation_with_network(
cli: &Cli,
sim_params: &SimParams,
tasks: TaskTracker,
interceptors: Vec<Arc<dyn Interceptor>>,
) -> Result<(Simulation<SimulationClock>, Vec<ActivityDefinition>), anyhow::Error> {
let cfg: SimulationCfg = SimulationCfg::try_from(cli)?;
let SimParams {
Expand All @@ -243,8 +255,13 @@ pub async fn create_simulation_with_network(

// Setup a simulation graph that will handle propagation of payments through the network
let simulation_graph = Arc::new(Mutex::new(
SimGraph::new(channels.clone(), tasks.clone(), shutdown_trigger.clone())
.map_err(|e| SimulationError::SimulatedNetworkError(format!("{:?}", e)))?,
SimGraph::new(
channels.clone(),
tasks.clone(),
interceptors,
(shutdown_trigger.clone(), shutdown_listener.clone()),
)
.map_err(|e| SimulationError::SimulatedNetworkError(format!("{:?}", e)))?,
));

let clock = Arc::new(SimulationClock::new(cli.speedup_clock.unwrap_or(1))?);
Expand Down
3 changes: 2 additions & 1 deletion simln-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ tokio-util = { version = "0.7.13", features = ["rt"] }

[dev-dependencies]
ntest = "0.9.0"
mockall = "0.13.1"
mockall = "0.13.1"
futures = "0.3.31"
134 changes: 134 additions & 0 deletions simln-lib/src/latency_interceptor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use crate::sim_node::{
CriticalError, CustomRecords, ForwardingError, InterceptRequest, Interceptor,
};
use crate::SimulationError;
use async_trait::async_trait;
use rand_distr::{Distribution, Poisson};
use std::time::Duration;
use tokio::{select, time};

/// LatencyIntercepor is a HTLC interceptor that will delay HTLC forwarding by some randomly chosen delay.
pub struct LatencyIntercepor<D>
where
D: Distribution<f32> + Send + Sync,
{
latency_dist: D,
}

impl LatencyIntercepor<Poisson<f32>> {
pub fn new_poisson(lambda_ms: f32) -> Result<Self, SimulationError> {
let poisson_dist = Poisson::new(lambda_ms).map_err(|e| {
SimulationError::SimulatedNetworkError(format!("Could not create possion: {e}"))
})?;

Ok(Self {
latency_dist: poisson_dist,
})
}
}

#[async_trait]
impl<D> Interceptor for LatencyIntercepor<D>
where
D: Distribution<f32> + Send + Sync,
{
/// Introduces a random sleep time on the HTLC.
async fn intercept_htlc(
&self,
req: InterceptRequest,
) -> Result<Result<CustomRecords, ForwardingError>, CriticalError> {
let latency = self.latency_dist.sample(&mut rand::thread_rng());

select! {
_ = req.shutdown_listener => log::debug!("Latency interceptor exiting due to shutdown signal received."),
_ = time::sleep(Duration::from_millis(latency as u64)) => {}
}
Ok(Ok(CustomRecords::default()))
}

fn name(&self) -> String {
"Latency Interceptor".to_string()
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::{Interceptor, LatencyIntercepor};
use crate::sim_node::{CustomRecords, HtlcRef, InterceptRequest};
use crate::test_utils::get_random_keypair;
use crate::ShortChannelID;
use lightning::ln::PaymentHash;
use ntest::assert_true;
use rand::distributions::Distribution;
use rand::Rng;
use tokio::time::timeout;
use triggered::Trigger;

/// Always returns the same value, useful for testing.
struct ConstantDistribution {
value: f32,
}

impl Distribution<f32> for ConstantDistribution {
fn sample<R: Rng + ?Sized>(&self, _rng: &mut R) -> f32 {
self.value
}
}

fn test_request() -> (InterceptRequest, Trigger) {
let (shutdown_trigger, shutdown_listener) = triggered::trigger();

let (_, pk) = get_random_keypair();
let request = InterceptRequest {
forwarding_node: pk,
payment_hash: PaymentHash([0; 32]),
incoming_htlc: HtlcRef {
channel_id: ShortChannelID::from(123),
index: 1,
},
incoming_custom_records: CustomRecords::default(),
outgoing_channel_id: None,
incoming_amount_msat: 100,
outgoing_amount_msat: 50,
incoming_expiry_height: 120,
outgoing_expiry_height: 100,
shutdown_listener,
};

(request, shutdown_trigger)
}

/// Tests that the interceptor exits immediately if a shutdown signal is received.
#[tokio::test]
async fn test_shutdown_signal() {
// Set fixed dist to a high value so that the test won't flake.
let latency_dist = ConstantDistribution { value: 1000.0 };
let interceptor = LatencyIntercepor { latency_dist };

let (request, trigger) = test_request();
trigger.trigger();

assert_true!(timeout(Duration::from_secs(10), async {
interceptor.intercept_htlc(request).await
})
.await
.is_ok());
}

/// Tests the happy case where we wait for our latency and then return a result.
#[tokio::test]
async fn test_latency_response() {
let latency_dist = ConstantDistribution { value: 0.0 };
let interceptor = LatencyIntercepor { latency_dist };

let (request, _) = test_request();
// We should return immediately because timeout is zero.
assert_true!(timeout(Duration::from_secs(1), async {
interceptor.intercept_htlc(request).await
})
.await
.is_ok());
}
}
1 change: 1 addition & 0 deletions simln-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod cln;
pub mod clock;
mod defined_activity;
pub mod eclair;
pub mod latency_interceptor;
pub mod lnd;
mod random_activity;
pub mod serializers;
Expand Down
Loading