Skip to content

Commit

Permalink
give queue to network and publish
Browse files Browse the repository at this point in the history
  • Loading branch information
dknopik committed Feb 21, 2025
1 parent 82411fb commit f076a7a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 20 deletions.
21 changes: 13 additions & 8 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,6 @@ impl Client {
let subnet_tracker =
start_subnet_tracker(database.watch(), network::SUBNET_COUNT, &executor);

// Start the p2p network
let network = Network::try_new(&config.network, subnet_tracker, executor.clone())
.await
.map_err(|e| format!("Unable to start network: {e}"))?;
// Spawn the network listening task
executor.spawn(network.run(), "network");

// Initialize slashing protection.
let slashing_db_path = config.data_dir.join(SLASHING_PROTECTION_FILENAME);
let slashing_protection =
Expand Down Expand Up @@ -351,7 +344,7 @@ impl Client {
.ok_or("Failed waiting for operator id")?;

// Network sender/receiver
let (network_tx, _network_rx) = mpsc::channel::<(SubnetId, Vec<u8>)>(9001);
let (network_tx, network_rx) = mpsc::channel::<(SubnetId, Vec<u8>)>(9001);

let network_message_sender = NetworkMessageSender::new(
processor_senders.clone(),
Expand All @@ -362,6 +355,18 @@ impl Client {
network::SUBNET_COUNT,
)?;

// Start the p2p network
let network = Network::try_new(
&config.network,
subnet_tracker,
network_rx,
executor.clone(),
)
.await
.map_err(|e| format!("Unable to start network: {e}"))?;
// Spawn the network listening task
executor.spawn(network.run(), "network");

// Create the signature collector
let signature_collector = SignatureCollectorManager::new(
processor_senders.clone(),
Expand Down
43 changes: 31 additions & 12 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub enum NetworkError {
pub struct Network {
swarm: Swarm<AnchorBehaviour>,
subnet_event_receiver: mpsc::Receiver<SubnetEvent>,
message_rx: mpsc::Receiver<(SubnetId, Vec<u8>)>,
peer_id: PeerId,
node_info: NodeInfo,
}
Expand All @@ -70,6 +71,7 @@ impl Network {
pub async fn try_new(
config: &Config,
subnet_event_receiver: mpsc::Receiver<SubnetEvent>,
message_rx: mpsc::Receiver<(SubnetId, Vec<u8>)>,
executor: TaskExecutor,
) -> Result<Network, NetworkError> {
let local_keypair: Keypair = load_private_key(&config.network_dir);
Expand Down Expand Up @@ -99,6 +101,7 @@ impl Network {
config,
)?,
subnet_event_receiver,
message_rx,
peer_id,
node_info,
};
Expand Down Expand Up @@ -212,7 +215,19 @@ impl Network {
}
}
}
// TODO match input channels
event = self.message_rx.recv() => {
match event {
Some((subnet_id, message)) => {
if let Err(err) = self.gossipsub().publish(subnet_to_topic(subnet_id), message) {
error!(?err, "Failed to publish message");
}
}
None => {
error!("message queue was closed");
return;
}
}
}
}
}
}
Expand All @@ -233,12 +248,7 @@ impl Network {
fn on_subnet_tracker_event(&mut self, event: SubnetEvent) {
match event {
SubnetEvent::Join(subnet) => {
if let Err(err) = self
.swarm
.behaviour_mut()
.gossipsub
.subscribe(&subnet_to_topic(subnet))
{
if let Err(err) = self.gossipsub().subscribe(&subnet_to_topic(subnet)) {
error!(?err, subnet = *subnet, "can't subscribe");
}
let SubnetConnectActions { dial, discover } =
Expand Down Expand Up @@ -266,6 +276,10 @@ impl Network {
&mut self.swarm.behaviour_mut().peer_manager
}

fn gossipsub(&mut self) -> &mut gossipsub::Behaviour {
&mut self.swarm.behaviour_mut().gossipsub
}

fn handle_handshake_result(&mut self, result: Result<handshake::Completed, handshake::Failed>) {
match result {
Ok(handshake::Completed {
Expand Down Expand Up @@ -395,6 +409,7 @@ mod test {
use std::time::Duration;
use subnet_tracker::test_tracker;
use task_executor::TaskExecutor;
use tokio::sync::mpsc;

#[tokio::test]
async fn create_network() {
Expand All @@ -403,10 +418,14 @@ mod test {
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let task_executor = TaskExecutor::new(handle, exit, shutdown_tx);
let subnet_tracker = test_tracker(task_executor.clone(), vec![], Duration::ZERO);
assert!(
Network::try_new(&Config::default(), subnet_tracker, task_executor)
.await
.is_ok()
);
let (_, message_rx) = mpsc::channel(1);
assert!(Network::try_new(
&Config::default(),
subnet_tracker,
message_rx,
task_executor
)
.await
.is_ok());
}
}

0 comments on commit f076a7a

Please sign in to comment.