Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ pub enum Litep2pEvent {
},
}

/// Configuration for adding protocols at runtime (protocols that are started after the Litep2p
/// instance is created).
struct RuntimeConfigs {
/// The keep-alive timeout for protocols.
///
/// Needed for supporting protocols at runtime.
keep_alive_timeout: std::time::Duration,

/// Executor used to run protocols.
executor: Arc<dyn crate::executor::Executor>,
}

/// [`Litep2p`] object.
pub struct Litep2p {
/// Local peer ID.
Expand All @@ -149,6 +161,11 @@ pub struct Litep2p {

/// Bandwidth sink.
bandwidth_sink: BandwidthSink,

/// The keep-alive timeout for protocols.
///
/// Needed for supporting protocols at runtime.
runtime_configs: RuntimeConfigs,
}

impl Litep2p {
Expand Down Expand Up @@ -415,6 +432,10 @@ impl Litep2p {
bandwidth_sink,
listen_addresses,
transport_manager,
runtime_configs: RuntimeConfigs {
keep_alive_timeout: litep2p_config.keep_alive_timeout,
executor: litep2p_config.executor,
},
})
}

Expand Down Expand Up @@ -491,6 +512,45 @@ impl Litep2p {
self.transport_manager.add_known_address(peer, address)
}

/// Register a request-response protocol at runtime.
pub fn register_request_response(
&mut self,
config: crate::protocol::request_response::Config,
) -> crate::Result<()> {
let service = self.transport_manager.register_protocol(
config.protocol_name.clone(),
config.fallback_names.clone(),
config.codec,
self.runtime_configs.keep_alive_timeout,
);

self.runtime_configs.executor.run(Box::pin(async move {
RequestResponseProtocol::new(service, config).run().await
}));

Ok(())
}

/// Register a notification protocol at runtime.
pub fn register_notification(
&mut self,
config: crate::protocol::notification::Config,
) -> crate::Result<()> {
let service = self.transport_manager.register_protocol(
config.protocol_name.clone(),
config.fallback_names.clone(),
config.codec,
self.runtime_configs.keep_alive_timeout,
);

let executor = Arc::clone(&self.runtime_configs.executor);
self.runtime_configs.executor.run(Box::pin(async move {
NotificationProtocol::new(service, config, executor).run().await
}));

Ok(())
}

/// Poll next event.
///
/// This function must be called in order for litep2p to make progress.
Expand Down
70 changes: 52 additions & 18 deletions tests/connection/stability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use litep2p::{
utils::futures_stream::FuturesStream,
Litep2p, PeerId,
};
use std::sync::{atomic::AtomicUsize, Arc};

use futures::{future::BoxFuture, StreamExt};

Expand Down Expand Up @@ -59,12 +60,19 @@ const LOG_TARGET: &str = "litep2p::stability";
pub struct StabilityProtocol {
/// The number of identical packets to send / receive on a substream.
total_packets: usize,
inbound: FuturesStream<BoxFuture<'static, Result<(), String>>>,
outbound: FuturesStream<BoxFuture<'static, Result<(), String>>>,
inbound: FuturesStream<BoxFuture<'static, Result<Substream, String>>>,
outbound: FuturesStream<BoxFuture<'static, Result<Substream, String>>>,
/// Peer Id for logging purposes.
peer_id: PeerId,
/// The sender to notify the test that the protocol finished.
tx: Option<tokio::sync::oneshot::Sender<()>>,

debug_interval: tokio::time::Interval,

inbound_num: usize,
outbound_num: usize,
inbound_debug: Vec<Arc<AtomicUsize>>,
outbound_debug: Vec<Arc<AtomicUsize>>,
}

impl StabilityProtocol {
Expand All @@ -78,6 +86,11 @@ impl StabilityProtocol {
outbound: FuturesStream::new(),
peer_id,
tx: Some(tx),
debug_interval: tokio::time::interval(std::time::Duration::from_secs(10)),
inbound_debug: (0..16).map(|_| Arc::new(AtomicUsize::new(0))).collect(),
outbound_debug: (0..16).map(|_| Arc::new(AtomicUsize::new(0))).collect(),
inbound_num: 0,
outbound_num: 0,
},
rx,
)
Expand All @@ -87,6 +100,10 @@ impl StabilityProtocol {
let mut total_packets = self.total_packets;
match direction {
Direction::Inbound => {
let index = self.inbound_num;
self.inbound_num += 1;
let atomic = self.inbound_debug.get(index).expect("Index is valid; qed").clone();

self.inbound.push(Box::pin(async move {
while total_packets > 0 {
let _payload = substream
Expand All @@ -100,28 +117,31 @@ impl StabilityProtocol {
tracing::warn!(target: LOG_TARGET, "Failed to read from substream {:?}", err);
"Failed to read from substream".to_string()
})?;
atomic.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
total_packets -= 1;
}

Ok(())
Ok(substream)
}));
}
Direction::Outbound { .. } => {
let index = self.outbound_num;
self.outbound_num += 1;
let atomic = self.outbound_debug.get(index).expect("Index is valid; qed").clone();
self.outbound.push(Box::pin(async move {
let mut frame = vec![0; 128];
for i in 0..frame.len() {
frame[i] = i as u8;
}
let mut frame = vec![0; 1 * 1024 * 1024];

while total_packets > 0 {
substream.send_framed(frame.clone().into()).await.map_err(|err| {
tracing::warn!("Failed to send to substream {:?}", err);
"Failed to send to substream".to_string()
})?;
total_packets -= 1;

atomic.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}

Ok(())
Ok(substream)
}));
}
}
Expand All @@ -136,18 +156,19 @@ impl UserProtocol for StabilityProtocol {

fn codec(&self) -> ProtocolCodec {
// Similar to the identify payload size.
ProtocolCodec::UnsignedVarint(Some(4096))
// 1 * 1024 * 1024
ProtocolCodec::UnsignedVarint(Some(10 * 1024 * 1024))
}

async fn run(mut self: Box<Self>, mut service: TransportService) -> litep2p::Result<()> {
let num_substreams = 16;
let mut handled_substreams = 0;
let mut handled_substreams = Vec::new();

loop {
if handled_substreams == 2 * num_substreams {
if handled_substreams.len() == 2 * num_substreams {
tracing::info!(
target: LOG_TARGET,
handled_substreams,
len = handled_substreams.len(),
peer_id = %self.peer_id,
"StabilityProtocol finished to handle packets",
);
Expand All @@ -160,6 +181,18 @@ impl UserProtocol for StabilityProtocol {
}

tokio::select! {
_ = self.debug_interval.tick() => {
let inbound_total: usize = self.inbound_debug.iter().map(|a| a.load(std::sync::atomic::Ordering::Relaxed)).sum();
let outbound_total: usize = self.outbound_debug.iter().map(|a| a.load(std::sync::atomic::Ordering::Relaxed)).sum();

tracing::info!(
target: LOG_TARGET,
peer_id = %self.peer_id,
"StabilityProtocol debug stats: inbound_total = {}, outbound_total = {}",
inbound_total, outbound_total
);
},

event = service.next() => match event {
Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
for i in 0..num_substreams {
Expand Down Expand Up @@ -201,8 +234,8 @@ impl UserProtocol for StabilityProtocol {

inbound = self.inbound.next(), if !self.inbound.is_empty() => {
match inbound {
Some(Ok(())) => {
handled_substreams += 1;
Some(Ok(substream)) => {
handled_substreams.push(substream);
}
Some(Err(err)) => {
tracing::error!(
Expand All @@ -227,8 +260,9 @@ impl UserProtocol for StabilityProtocol {

outbound = self.outbound.next(), if !self.outbound.is_empty() => {
match outbound {
Some(Ok(())) => {
handled_substreams += 1;
Some(Ok(substream
)) => {
handled_substreams.push(substream);
}
Some(Err(err)) => {
tracing::error!(
Expand Down Expand Up @@ -263,7 +297,7 @@ async fn stability_litep2p_transport(transport1: Transport, transport2: Transpor
let (ping_config1, _ping_event_stream1) = PingConfig::default();
let keypair = Keypair::generate();
let peer_id = keypair.public().to_peer_id();
let (stability_protocol, mut exit1) = StabilityProtocol::new(1000, peer_id);
let (stability_protocol, mut exit1) = StabilityProtocol::new(10000000, peer_id);
let config1 = ConfigBuilder::new()
.with_keypair(keypair)
.with_libp2p_ping(ping_config1)
Expand All @@ -274,7 +308,7 @@ async fn stability_litep2p_transport(transport1: Transport, transport2: Transpor
let (ping_config2, _ping_event_stream2) = PingConfig::default();
let keypair = Keypair::generate();
let peer_id = keypair.public().to_peer_id();
let (stability_protocol, mut exit2) = StabilityProtocol::new(1000, peer_id);
let (stability_protocol, mut exit2) = StabilityProtocol::new(10000000, peer_id);
let config2 = ConfigBuilder::new()
.with_keypair(keypair)
.with_libp2p_ping(ping_config2)
Expand Down
Loading