Skip to content

Commit

Permalink
improve network error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Feb 18, 2025
1 parent cca7076 commit b41027f
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 38 deletions.
4 changes: 3 additions & 1 deletion anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ impl Client {
start_subnet_tracker(database.watch(), network::SUBNET_COUNT, &executor);

// Start the p2p network
let network = Network::try_new(&config.network, subnet_tracker, executor.clone()).await?;
let network = Network::try_new(&config.network, subnet_tracker, executor.clone())
.await
.map_err(|e| format!("Unable to start network: {e}"))?;
// Spawn the network listening task
executor.spawn(network.run(), "network");

Expand Down
40 changes: 34 additions & 6 deletions anchor/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use discv5::multiaddr::Multiaddr;
use discv5::{Discv5, Enr, ProtocolIdentity};
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::{StreamExt, TryFutureExt};
use futures::StreamExt;
use libp2p::bytes::Bytes;
use libp2p::core::transport::PortUse;
use libp2p::core::Endpoint;
Expand Down Expand Up @@ -43,6 +43,25 @@ const TARGET_PEERS_FOR_GROUPED_QUERY: usize = 6;
/// make it easier to peers to eclipse this node. Kademlia suggests a value of 16.
pub const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16;

use crate::discovery::DiscoveryError::{Discv5Init, EnrBuild, EnrKey};
use thiserror::Error;

/// Errors that can occur while setting up and running discovery.
#[derive(Debug, Error)]
pub enum DiscoveryError {
#[error("Failed to parse keypair into an ENR key: {0}")]
EnrKey(String),

#[error("Failed to build ENR: {0}")]
EnrBuild(String),

#[error("Discv5 initialization error: {0}")]
Discv5Init(String),

#[error("Discv5 start error: {0}")]
Discv5Start(String),
}

#[derive(Debug, Clone, PartialEq)]
struct SubnetQuery {
subnet: SubnetId,
Expand Down Expand Up @@ -109,7 +128,10 @@ pub struct Discovery {
}

impl Discovery {
pub async fn new(local_keypair: Keypair, network_config: &Config) -> Result<Self, String> {
pub async fn new(
local_keypair: Keypair,
network_config: &Config,
) -> Result<Self, DiscoveryError> {
let _enr_dir = match network_config.network_dir.to_str() {
Some(path) => String::from(path),
None => String::from(""),
Expand All @@ -124,11 +146,13 @@ impl Discovery {
let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config).build();

// convert the keypair into an ENR key
let enr_key: CombinedKey = CombinedKey::from_libp2p(local_keypair)?;
let enr_key: CombinedKey =
CombinedKey::from_libp2p(local_keypair).map_err(|e| EnrKey(e.to_string()))?;

let enr = build_enr(&enr_key, network_config).map_err(|e| EnrBuild(e.to_string()))?;

let enr = build_enr(&enr_key, network_config).unwrap();
let mut discv5 = Discv5::<ProtocolId>::new(enr, enr_key, discv5_config)
.map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?;
.map_err(|e| Discv5Init(e.to_string()))?;

// Add bootnodes to routing table
for bootnode_enr in network_config.boot_nodes_enr.clone() {
Expand Down Expand Up @@ -158,7 +182,11 @@ impl Discovery {

// Start the discv5 service and obtain an event stream
let event_stream = if !network_config.disable_discovery {
discv5.start().map_err(|e| e.to_string()).await?;
discv5
.start()
.await
.map_err(|e| DiscoveryError::Discv5Start(e.to_string()))?;

debug!("Discovery service started");
EventStream::Awaiting(Box::pin(discv5.event_stream()))
} else {
Expand Down
95 changes: 64 additions & 31 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use futures::StreamExt;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::core::ConnectedPoint;
use libp2p::gossipsub::{IdentTopic, MessageAuthenticity, ValidationMode};
use libp2p::gossipsub::{ConfigBuilderError, IdentTopic, MessageAuthenticity, ValidationMode};
use libp2p::identity::Keypair;
use libp2p::multiaddr::Protocol;
use libp2p::swarm::SwarmEvent;
use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder};
use libp2p::{
futures, gossipsub, identify, ping, Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError,
};
use lighthouse_network::discovery::DiscoveredPeers;
use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256};
use lighthouse_network::EnrExt;
Expand All @@ -23,12 +25,37 @@ use tracing::{debug, error, info, trace, warn};

use crate::behaviour::AnchorBehaviour;
use crate::behaviour::AnchorBehaviourEvent;
use crate::discovery::{Discovery, FIND_NODE_QUERY_CLOSEST_PEERS};
use crate::discovery::{Discovery, DiscoveryError, FIND_NODE_QUERY_CLOSEST_PEERS};
use crate::handshake::node_info::{NodeInfo, NodeMetadata};
use crate::keypair_utils::load_private_key;
use crate::transport::build_transport;
use crate::{handshake, Config};

use crate::network::NetworkError::SwarmConfig;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum NetworkError {
#[error("Unable to listen on address {address}: {source}")]
Listen {
address: Multiaddr,
#[source]
source: TransportError<std::io::Error>,
},

#[error("Gossipsub config error: {0}")]
GossipsubConfig(#[from] ConfigBuilderError),

#[error("Gossipsub error: {0}")]
Gossipsub(String),

#[error("Discovery error: {0}")]
Discovery(#[from] DiscoveryError),

#[error("Swarm config error: {0}")]
SwarmConfig(String),
}

pub struct Network {
swarm: Swarm<AnchorBehaviour>,
subnet_event_receiver: mpsc::Receiver<SubnetEvent>,
Expand All @@ -43,10 +70,13 @@ impl Network {
config: &Config,
subnet_event_receiver: mpsc::Receiver<SubnetEvent>,
executor: TaskExecutor,
) -> Result<Network, String> {
) -> Result<Network, NetworkError> {
let local_keypair: Keypair = load_private_key(&config.network_dir);

let transport = build_transport(local_keypair.clone(), !config.disable_quic_support);
let behaviour = build_anchor_behaviour(local_keypair.clone(), config).await;

let behaviour = build_anchor_behaviour(local_keypair.clone(), config).await?;

let peer_id = local_keypair.public().to_peer_id();
let domain_type: String = config.domain_type.clone().into();
let node_info = NodeInfo::new(
Expand All @@ -66,7 +96,7 @@ impl Network {
transport,
behaviour,
config,
),
)?,
subnet_event_receiver,
peer_id,
node_info,
Expand All @@ -84,12 +114,11 @@ impl Network {
network
.swarm
.listen_on(listen_multiaddr.clone())
.map_err(|e| {
format!(
"Unable to listen on libp2p address: {} : {}",
listen_multiaddr, e
)
.map_err(|transport_err| NetworkError::Listen {
address: listen_multiaddr.clone(),
source: transport_err,
})?;

let mut log_address = listen_multiaddr;
log_address.push(Protocol::P2p(peer_id));
info!(address = %log_address, "Listening established");
Expand Down Expand Up @@ -263,8 +292,7 @@ fn subnet_to_topic(subnet: SubnetId) -> IdentTopic {
async fn build_anchor_behaviour(
local_keypair: Keypair,
network_config: &Config,
) -> AnchorBehaviour {
// TODO setup discv5
) -> Result<AnchorBehaviour, NetworkError> {
let identify = {
let local_public_key = local_keypair.public();
let identify_config = identify::Config::new("anchor".into(), local_public_key)
Expand Down Expand Up @@ -297,32 +325,29 @@ async fn build_anchor_behaviour(
.history_gossip(4)
.max_ihave_length(1500)
.max_ihave_messages(32)
.build()
.unwrap();
.build()?;

let gossipsub =
gossipsub::Behaviour::new(MessageAuthenticity::Signed(local_keypair.clone()), config)
.unwrap();
.map_err(|e| NetworkError::Gossipsub(e.to_string()))?;

let discovery = {
// Build and start the discovery sub-behaviour
let mut discovery = Discovery::new(local_keypair.clone(), network_config)
.await
.unwrap();
let mut discovery = Discovery::new(local_keypair.clone(), network_config).await?;
// start searching for peers
discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS);
discovery
};

let handshake = handshake::create_behaviour(local_keypair.clone());

AnchorBehaviour {
Ok(AnchorBehaviour {
identify,
ping: ping::Behaviour::default(),
gossipsub,
discovery,
handshake,
}
})
}

fn build_swarm(
Expand All @@ -331,15 +356,20 @@ fn build_swarm(
transport: Boxed<(PeerId, StreamMuxerBox)>,
behaviour: AnchorBehaviour,
_config: &Config,
) -> Swarm<AnchorBehaviour> {
// use the executor for libp2p
) -> Result<Swarm<AnchorBehaviour>, NetworkError> {
struct Executor(task_executor::TaskExecutor);
impl libp2p::swarm::Executor for Executor {
fn exec(&self, f: Pin<Box<dyn futures::Future<Output = ()> + Send>>) {
self.0.spawn(f, "libp2p");
}
}

let notify_handler_buffer_size = NonZeroUsize::new(7)
.ok_or_else(|| SwarmConfig("notify_handler_buffer_size must be > 0".to_string()))?;

let dial_concurrency_factor = NonZeroU8::new(1)
.ok_or_else(|| SwarmConfig("dial_concurrency_factor cannot be 0".to_string()))?;

// TODO: revisit once peer manager is integrated
// let connection_limits = {
// let limits = libp2p::connection_limits::ConnectionLimits::default()
Expand All @@ -363,19 +393,22 @@ fn build_swarm(
// };

let swarm_config = libp2p::swarm::Config::with_executor(Executor(executor))
.with_notify_handler_buffer_size(NonZeroUsize::new(7).expect("Not zero"))
.with_notify_handler_buffer_size(notify_handler_buffer_size)
.with_per_connection_event_buffer_size(4)
.with_dial_concurrency_factor(NonZeroU8::new(1).unwrap());
.with_dial_concurrency_factor(dial_concurrency_factor);

// TODO Add metrics later
SwarmBuilder::with_existing_identity(local_keypair)
// Build swarm using builder pattern
let swarm_builder = SwarmBuilder::with_existing_identity(local_keypair)
.with_tokio()
.with_other_transport(|_key| transport)
.expect("infalible")
.map_err(|e| SwarmConfig(format!("Failed to set transport: {e}")))?
.with_behaviour(|_| behaviour)
.expect("infalible")
.with_swarm_config(|_| swarm_config)
.build()
.map_err(|e| SwarmConfig(format!("Failed to set behaviour: {e}")))?
.with_swarm_config(|_| swarm_config);

let swarm = swarm_builder.build();

Ok(swarm)
}

#[cfg(test)]
Expand Down

0 comments on commit b41027f

Please sign in to comment.