diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 40dcb000..a7313757 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -145,13 +145,6 @@ impl Client { let subnet_tracker = start_subnet_tracker(database.watch(), network::SUBNET_COUNT, &executor); - // Start the p2p network - let network = Network::try_new(&config.network, subnet_tracker, executor.clone()) - .await - .map_err(|e| format!("Unable to start network: {e}"))?; - // Spawn the network listening task - executor.spawn(network.run(), "network"); - // Initialize slashing protection. let slashing_db_path = config.data_dir.join(SLASHING_PROTECTION_FILENAME); let slashing_protection = @@ -351,7 +344,7 @@ impl Client { .ok_or("Failed waiting for operator id")?; // Network sender/receiver - let (network_tx, _network_rx) = mpsc::channel::<(SubnetId, Vec)>(9001); + let (network_tx, network_rx) = mpsc::channel::<(SubnetId, Vec)>(9001); let network_message_sender = NetworkMessageSender::new( processor_senders.clone(), @@ -362,6 +355,18 @@ impl Client { network::SUBNET_COUNT, )?; + // Start the p2p network + let network = Network::try_new( + &config.network, + subnet_tracker, + network_rx, + executor.clone(), + ) + .await + .map_err(|e| format!("Unable to start network: {e}"))?; + // Spawn the network listening task + executor.spawn(network.run(), "network"); + // Create the signature collector let signature_collector = SignatureCollectorManager::new( processor_senders.clone(), diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 51bac41d..b338de2a 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -60,6 +60,7 @@ pub enum NetworkError { pub struct Network { swarm: Swarm, subnet_event_receiver: mpsc::Receiver, + message_rx: mpsc::Receiver<(SubnetId, Vec)>, peer_id: PeerId, node_info: NodeInfo, } @@ -70,6 +71,7 @@ impl Network { pub async fn try_new( config: &Config, subnet_event_receiver: mpsc::Receiver, + message_rx: mpsc::Receiver<(SubnetId, Vec)>, executor: TaskExecutor, ) -> Result { let local_keypair: Keypair = load_private_key(&config.network_dir); @@ -99,6 +101,7 @@ impl Network { config, )?, subnet_event_receiver, + message_rx, peer_id, node_info, }; @@ -212,7 +215,19 @@ impl Network { } } } - // TODO match input channels + event = self.message_rx.recv() => { + match event { + Some((subnet_id, message)) => { + if let Err(err) = self.gossipsub().publish(subnet_to_topic(subnet_id), message) { + error!(?err, "Failed to publish message"); + } + } + None => { + error!("message queue was closed"); + return; + } + } + } } } } @@ -233,12 +248,7 @@ impl Network { fn on_subnet_tracker_event(&mut self, event: SubnetEvent) { match event { SubnetEvent::Join(subnet) => { - if let Err(err) = self - .swarm - .behaviour_mut() - .gossipsub - .subscribe(&subnet_to_topic(subnet)) - { + if let Err(err) = self.gossipsub().subscribe(&subnet_to_topic(subnet)) { error!(?err, subnet = *subnet, "can't subscribe"); } let SubnetConnectActions { dial, discover } = @@ -266,6 +276,10 @@ impl Network { &mut self.swarm.behaviour_mut().peer_manager } + fn gossipsub(&mut self) -> &mut gossipsub::Behaviour { + &mut self.swarm.behaviour_mut().gossipsub + } + fn handle_handshake_result(&mut self, result: Result) { match result { Ok(handshake::Completed { @@ -395,6 +409,7 @@ mod test { use std::time::Duration; use subnet_tracker::test_tracker; use task_executor::TaskExecutor; + use tokio::sync::mpsc; #[tokio::test] async fn create_network() { @@ -403,10 +418,14 @@ mod test { let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let task_executor = TaskExecutor::new(handle, exit, shutdown_tx); let subnet_tracker = test_tracker(task_executor.clone(), vec![], Duration::ZERO); - assert!( - Network::try_new(&Config::default(), subnet_tracker, task_executor) - .await - .is_ok() - ); + let (_, message_rx) = mpsc::channel(1); + assert!(Network::try_new( + &Config::default(), + subnet_tracker, + message_rx, + task_executor + ) + .await + .is_ok()); } }