diff --git a/Cargo.lock b/Cargo.lock index 989655e8..fe30283e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1822,6 +1822,7 @@ dependencies = [ "http_api", "http_metrics", "hyper 1.6.0", + "message_sender", "network", "openssl", "parking_lot", @@ -5073,6 +5074,20 @@ dependencies = [ "safe_arith", ] +[[package]] +name = "message_sender" +version = "0.1.0" +dependencies = [ + "database", + "ethereum_ssz", + "openssl", + "processor", + "ssv_types", + "subnet_tracker", + "tokio", + "tracing", +] + [[package]] name = "metastruct" version = "0.1.3" @@ -6192,10 +6207,9 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "futures", - "openssl", + "message_sender", "processor", "qbft", - "rand 0.8.5", "slot_clock", "ssv_types", "task_executor", @@ -7262,6 +7276,7 @@ version = "0.1.0" dependencies = [ "bls_lagrange", "dashmap", + "message_sender", "processor", "slot_clock", "ssv_types", @@ -7626,7 +7641,6 @@ dependencies = [ "alloy", "database", "ethereum_serde_utils", - "log", "serde", "ssv_types", "task_executor", diff --git a/Cargo.toml b/Cargo.toml index a8680459..57f2e891 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "anchor/eth", "anchor/http_api", "anchor/http_metrics", + "anchor/message_sender", "anchor/network", "anchor/processor", "anchor/qbft_manager", @@ -33,6 +34,7 @@ database = { path = "anchor/database" } eth = { path = "anchor/eth" } http_api = { path = "anchor/http_api" } http_metrics = { path = "anchor/http_metrics" } +message_sender = { path = "anchor/message_sender" } network = { path = "anchor/network" } processor = { path = "anchor/processor" } qbft = { path = "anchor/common/qbft" } diff --git a/anchor/client/Cargo.toml b/anchor/client/Cargo.toml index 04815613..be64517f 100644 --- a/anchor/client/Cargo.toml +++ b/anchor/client/Cargo.toml @@ -22,6 +22,7 @@ fdlimit = "0.3" http_api = { workspace = true } http_metrics = { workspace = true } hyper = { workspace = true } +message_sender = { workspace = true } network = { workspace = true } openssl = { workspace = true } parking_lot = { workspace = true } diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 99dac06d..40dcb000 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -13,6 +13,7 @@ use config::Config; use database::NetworkDatabase; use eth2::reqwest::{Certificate, ClientBuilder}; use eth2::{BeaconNodeHttpClient, Timeouts}; +use message_sender::NetworkMessageSender; use network::Network; use openssl::pkey::Private; use openssl::rsa::Rsa; @@ -22,7 +23,6 @@ use sensitive_url::SensitiveUrl; use signature_collector::SignatureCollectorManager; use slashing_protection::SlashingDatabase; use slot_clock::{SlotClock, SystemTimeSlotClock}; -use ssv_types::message::SignedSSVMessage; use ssv_types::OperatorId; use std::fs::File; use std::io::{ErrorKind, Read, Write}; @@ -30,7 +30,7 @@ use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use subnet_tracker::start_subnet_tracker; +use subnet_tracker::{start_subnet_tracker, SubnetId}; use task_executor::TaskExecutor; use tokio::net::TcpListener; use tokio::select; @@ -350,21 +350,32 @@ impl Client { .await .ok_or("Failed waiting for operator id")?; - // Create the signature collector - let signature_collector = - SignatureCollectorManager::new(processor_senders.clone(), slot_clock.clone()) - .map_err(|e| format!("Unable to initialize signature collector manager: {e:?}"))?; - // Network sender/receiver - let (network_tx, _network_rx) = mpsc::unbounded_channel::(); + let (network_tx, _network_rx) = mpsc::channel::<(SubnetId, Vec)>(9001); + + let network_message_sender = NetworkMessageSender::new( + processor_senders.clone(), + network_tx.clone(), + key.clone(), + database.watch(), + operator_id, + network::SUBNET_COUNT, + )?; + + // Create the signature collector + let signature_collector = SignatureCollectorManager::new( + processor_senders.clone(), + network_message_sender.clone(), + slot_clock.clone(), + ) + .map_err(|e| format!("Unable to initialize signature collector manager: {e:?}"))?; // Create the qbft manager let qbft_manager = QbftManager::new( processor_senders.clone(), operator_id, slot_clock.clone(), - key.clone(), - network_tx.clone(), + network_message_sender, ) .map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?; diff --git a/anchor/message_sender/Cargo.toml b/anchor/message_sender/Cargo.toml new file mode 100644 index 00000000..6a98af43 --- /dev/null +++ b/anchor/message_sender/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "message_sender" +version = "0.1.0" +edition = { workspace = true } +authors = ["Sigma Prime "] + +[dependencies] +database = { workspace = true } +ethereum_ssz = { workspace = true } +openssl = { workspace = true } +processor = { workspace = true } +ssv_types = { workspace = true } +subnet_tracker = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[features] +testing = [] diff --git a/anchor/message_sender/src/lib.rs b/anchor/message_sender/src/lib.rs new file mode 100644 index 00000000..409cd50d --- /dev/null +++ b/anchor/message_sender/src/lib.rs @@ -0,0 +1,13 @@ +mod network; + +#[cfg(feature = "testing")] +pub mod testing; + +pub use crate::network::*; +use ssv_types::consensus::UnsignedSSVMessage; +use ssv_types::message::SignedSSVMessage; + +pub trait MessageSender: Send + Sync { + fn sign_and_send(&self, message: UnsignedSSVMessage); + fn send(&self, message: SignedSSVMessage); +} diff --git a/anchor/message_sender/src/network.rs b/anchor/message_sender/src/network.rs new file mode 100644 index 00000000..79594c0d --- /dev/null +++ b/anchor/message_sender/src/network.rs @@ -0,0 +1,141 @@ +use crate::MessageSender; +use database::{NetworkState, UniqueIndex}; +use openssl::error::ErrorStack; +use openssl::hash::MessageDigest; +use openssl::pkey::{PKey, Private}; +use openssl::rsa::Rsa; +use openssl::sign::Signer; +use ssv_types::consensus::UnsignedSSVMessage; +use ssv_types::message::SignedSSVMessage; +use ssv_types::msgid::DutyExecutor; +use ssv_types::OperatorId; +use ssz::Encode; +use std::sync::Arc; +use subnet_tracker::SubnetId; +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::{mpsc, watch}; +use tracing::{debug, error, warn}; + +const SIGNER_NAME: &str = "message_sign_and_send"; +const SENDER_NAME: &str = "message_send"; + +pub struct NetworkMessageSender { + processor: processor::Senders, + network_tx: mpsc::Sender<(SubnetId, Vec)>, + private_key: PKey, + database: watch::Receiver, + operator_id: OperatorId, + subnet_count: usize, +} + +impl MessageSender for Arc { + fn sign_and_send(&self, message: UnsignedSSVMessage) { + let sender = self.clone(); + self.processor + .urgent_consensus + .send_blocking( + move || { + let signature = match sender.sign(&message) { + Ok(signature) => signature, + Err(err) => { + error!(?err, "Signing message failed!"); + return; + } + }; + let message = match SignedSSVMessage::new( + vec![signature], + vec![sender.operator_id], + message.ssv_message, + message.full_data, + ) { + Ok(signature) => signature, + Err(err) => { + error!(?err, "Creating signed message failed!"); + return; + } + }; + sender.do_send(message); + }, + SIGNER_NAME, + ) + .unwrap_or_else(|e| warn!("Failed to send to processor: {}", e)); + } + + fn send(&self, message: SignedSSVMessage) { + let sender = self.clone(); + self.processor + .urgent_consensus + .send_blocking( + move || { + sender.do_send(message); + }, + SENDER_NAME, + ) + .unwrap_or_else(|e| warn!("Failed to send to processor: {}", e)); + } +} + +impl NetworkMessageSender { + pub fn new( + processor: processor::Senders, + network_tx: mpsc::Sender<(SubnetId, Vec)>, + private_key: Rsa, + database: watch::Receiver, + operator_id: OperatorId, + subnet_count: usize, + ) -> Result, String> { + let private_key = PKey::from_rsa(private_key) + .map_err(|err| format!("Failed to create PKey from RSA: {err}"))?; + Ok(Arc::new(Self { + processor, + network_tx, + private_key, + database, + operator_id, + subnet_count, + })) + } + + fn do_send(&self, message: SignedSSVMessage) { + let subnet = match self.determine_subnet(&message) { + Ok(subnet) => subnet, + Err(err) => { + error!(?err, "Unable to determine subnet for outgoing message"); + return; + } + }; + match self.network_tx.try_send((subnet, message.as_ssz_bytes())) { + Ok(_) => debug!(?subnet, "Successfully sent message to network"), + Err(TrySendError::Closed(_)) => warn!("Network queue closed (shutting down?)"), + Err(TrySendError::Full(_)) => warn!("Network queue full, unable to send message!"), + } + } + + fn sign(&self, message: &UnsignedSSVMessage) -> Result, ErrorStack> { + let serialized = message.ssv_message.as_ssz_bytes(); + let mut signer = Signer::new(MessageDigest::sha256(), &self.private_key)?; + signer.update(&serialized)?; + signer.sign_to_vec() + } + + fn determine_subnet(&self, message: &SignedSSVMessage) -> Result { + let msg_id = message.ssv_message().msg_id(); + let committee_id = match msg_id.duty_executor() { + Some(DutyExecutor::Committee(committee_id)) => committee_id, + Some(DutyExecutor::Validator(pubkey)) => { + let database = self.database.borrow(); + let Some(metadata) = database.metadata().get_by(&pubkey) else { + return Err(format!("Unknown validator: {pubkey}")); + }; + let Some(cluster) = database.clusters().get_by(&metadata.cluster_id) else { + return Err(format!( + "Inconsistent database, no cluster for validator: {pubkey}" + )); + }; + cluster.committee_id() + } + None => return Err(format!("Invalid message id: {msg_id:?}",)), + }; + Ok(SubnetId::from_committee(committee_id, self.subnet_count)) + } +} diff --git a/anchor/message_sender/src/testing.rs b/anchor/message_sender/src/testing.rs new file mode 100644 index 00000000..3d62b341 --- /dev/null +++ b/anchor/message_sender/src/testing.rs @@ -0,0 +1,39 @@ +use crate::MessageSender; +use ssv_types::consensus::UnsignedSSVMessage; +use ssv_types::message::SignedSSVMessage; +use ssv_types::OperatorId; +use tokio::sync::mpsc; + +pub struct TestingMessageSender { + message_tx: mpsc::UnboundedSender, + operator_id: OperatorId, +} + +impl MessageSender for TestingMessageSender { + fn sign_and_send(&self, message: UnsignedSSVMessage) { + let message = SignedSSVMessage::new( + vec![vec![]], + vec![self.operator_id], + message.ssv_message, + message.full_data, + ) + .unwrap(); + self.send(message); + } + + fn send(&self, message: SignedSSVMessage) { + self.message_tx.send(message).unwrap(); + } +} + +impl TestingMessageSender { + pub fn new( + message_tx: mpsc::UnboundedSender, + operator_id: OperatorId, + ) -> Self { + Self { + message_tx, + operator_id, + } + } +} diff --git a/anchor/qbft_manager/Cargo.toml b/anchor/qbft_manager/Cargo.toml index 6283afcf..5b116fe9 100644 --- a/anchor/qbft_manager/Cargo.toml +++ b/anchor/qbft_manager/Cargo.toml @@ -8,7 +8,7 @@ edition = { workspace = true } dashmap = { workspace = true } ethereum_ssz = { workspace = true } ethereum_ssz_derive = { workspace = true } -openssl = { workspace = true } +message_sender = { workspace = true } processor = { workspace = true } qbft = { workspace = true } slot_clock = { workspace = true } @@ -20,8 +20,7 @@ types = { workspace = true } [dev-dependencies] async-channel = { workspace = true } futures = { workspace = true } -openssl = { workspace = true } -rand = { workspace = true } +message_sender = { workspace = true, features = ["testing"] } task_executor = { workspace = true } tokio = { workspace = true, features = ["test-util"] } tracing-subscriber = { workspace = true } diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index 8cd67e14..b159d8be 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -1,22 +1,14 @@ use dashmap::DashMap; -use openssl::hash::MessageDigest; -use openssl::pkey::{PKey, Private}; -use openssl::rsa::Rsa; -use openssl::sign::Signer; - +use message_sender::MessageSender; use processor::{DropOnFinish, Senders, WorkItem}; use qbft::{ Completed, ConfigBuilder, ConfigBuilderError, DefaultLeaderFunction, InstanceHeight, Message, WrappedQbftMessage, }; use slot_clock::SlotClock; -use ssv_types::consensus::{BeaconVote, QbftData, UnsignedSSVMessage, ValidatorConsensusData}; -use std::error::Error; - -use ssv_types::message::SignedSSVMessage; +use ssv_types::consensus::{BeaconVote, QbftData, ValidatorConsensusData}; use ssv_types::OperatorId as QbftOperatorId; use ssv_types::{Cluster, CommitteeId, OperatorId}; -use ssz::Encode; use std::fmt::Debug; use std::hash::Hash; use std::sync::Arc; @@ -35,7 +27,6 @@ mod tests; const QBFT_INSTANCE_NAME: &str = "qbft_instance"; const QBFT_MESSAGE_NAME: &str = "qbft_message"; const QBFT_CLEANER_NAME: &str = "qbft_cleaner"; -const QBFT_SIGNER_NAME: &str = "qbft_signer"; /// Number of slots to keep before the current slot const QBFT_RETAIN_SLOTS: u64 = 1; @@ -101,10 +92,8 @@ pub struct QbftManager { validator_consensus_data_instances: Map, // All of the QBFT instances that are voting on beacon data beacon_vote_instances: Map, - // Private key used for signing messages - pkey: Arc>, - // Channel to pass signed messages along to the network - network_tx: mpsc::UnboundedSender, + // Utility to sign and serialize network messages + message_sender: Arc, } impl QbftManager { @@ -113,18 +102,14 @@ impl QbftManager { processor: Senders, operator_id: OperatorId, slot_clock: impl SlotClock + 'static, - key: Rsa, - network_tx: mpsc::UnboundedSender, + message_sender: impl MessageSender + 'static, ) -> Result, QbftError> { - let pkey = Arc::new(PKey::from_rsa(key).expect("Failed to create PKey from RSA")); - let manager = Arc::new(QbftManager { processor, operator_id, validator_consensus_data_instances: DashMap::new(), beacon_vote_instances: DashMap::new(), - pkey, - network_tx, + message_sender: Arc::new(message_sender), }); // Start a long running task that will clean up old instances @@ -235,12 +220,7 @@ pub trait QbftDecidable: QbftData + Send + Sync + 'static { let (tx, rx) = mpsc::unbounded_channel(); let tx = entry.insert(tx); let _ = manager.processor.permitless.send_async( - Box::pin(qbft_instance( - rx, - manager.network_tx.clone(), - manager.pkey.clone(), - manager.processor.clone(), - )), + Box::pin(qbft_instance(rx, manager.message_sender.clone())), QBFT_INSTANCE_NAME, ); tx.clone() @@ -297,9 +277,7 @@ enum QbftInstance, S: FnMut(Message)> { async fn qbft_instance>( mut rx: UnboundedReceiver>, - network_tx: mpsc::UnboundedSender, - pkey: Arc>, - processor: Senders, + message_sender: Arc, ) { // Signal a new instance that is uninitialized let mut instance = QbftInstance::Uninitialized { @@ -342,24 +320,8 @@ async fn qbft_instance>( QbftInstance::Uninitialized { message_buffer } => { // Create a new instance and receive any buffered messages let mut instance = Box::new(Qbft::new(config, initial, |message| { - let (id, unsigned) = message.desugar(); - let serialized = unsigned.as_ssz_bytes(); - let pkey = pkey.clone(); - let network_tx = network_tx.clone(); - - processor - .urgent_consensus - .send_blocking( - move || { - if let Err(e) = sign_and_send_message( - pkey, id, unsigned, serialized, network_tx, - ) { - error!("Signing failed: {}", e); - } - }, - QBFT_SIGNER_NAME, - ) - .unwrap_or_else(|e| warn!("Failed to send to processor: {}", e)); + let (_, unsigned) = message.desugar(); + message_sender.clone().sign_and_send(unsigned); })); for message in message_buffer { instance.receive(message); @@ -432,11 +394,7 @@ async fn qbft_instance>( // Send the decided message (aggregated commit) match qbft.get_aggregated_commit() { - Some(msg) => { - network_tx.send(msg).unwrap_or_else(|e| { - error!("Failed to send signed ssv message to network: {:?}", e) - }); - } + Some(msg) => message_sender.clone().send(msg), None => error!("Aggregated commit does not exist"), } @@ -452,33 +410,6 @@ async fn qbft_instance>( } } -// Sign a message and send it to the network via the network_tx -fn sign_and_send_message( - pkey: Arc>, - id: OperatorId, - unsigned: UnsignedSSVMessage, - serialized: Vec, - network_tx: UnboundedSender, -) -> Result<(), Box> { - // Create the signature - let mut signer = Signer::new(MessageDigest::sha256(), &pkey)?; - signer.update(&serialized)?; - let sig = signer.sign_to_vec()?; - - // Build the signed ssv message, then serialize it and send to the network - let signed = SignedSSVMessage::new( - vec![sig], - vec![id], - unsigned.ssv_message, - unsigned.full_data, - )?; - network_tx - .send(signed) - .map_err(|e| format!("Failed to send signed ssv message to network: {}", e))?; - - Ok(()) -} - #[derive(Debug, Clone)] pub enum QbftError { QueueClosedError, diff --git a/anchor/qbft_manager/src/tests.rs b/anchor/qbft_manager/src/tests.rs index 30c01170..133bd188 100644 --- a/anchor/qbft_manager/src/tests.rs +++ b/anchor/qbft_manager/src/tests.rs @@ -1,7 +1,7 @@ use super::{ CommitteeInstanceId, Completed, QbftDecidable, QbftError, QbftManager, WrappedQbftMessage, }; -use openssl::rsa::Rsa; +use message_sender::testing::TestingMessageSender; use processor::Senders; use slot_clock::{ManualSlotClock, SlotClock}; use ssv_types::consensus::{BeaconVote, QbftMessage, QbftMessageType}; @@ -229,9 +229,6 @@ where // broadcasted back into the instances let (network_tx, network_rx) = mpsc::unbounded_channel(); - // generate a random private key - let pkey = Rsa::generate(2048).expect("Should not fail"); - // Construct and save a manager for each operator in the committee. By having access to all // the managers in the committee, we can direct messages to the proper place and // spawn multiple concurrent instances @@ -243,8 +240,7 @@ where sender_queues.clone(), operator_id, slot_clock.clone(), - pkey.clone(), - network_tx.clone(), + TestingMessageSender::new(network_tx.clone(), operator_id), ) .expect("Creation should not fail"); diff --git a/anchor/signature_collector/Cargo.toml b/anchor/signature_collector/Cargo.toml index 5fff325e..8950fbd4 100644 --- a/anchor/signature_collector/Cargo.toml +++ b/anchor/signature_collector/Cargo.toml @@ -7,6 +7,7 @@ authors = ["Sigma Prime "] [dependencies] bls_lagrange = { workspace = true } dashmap = { workspace = true } +message_sender = { workspace = true } processor = { workspace = true } slot_clock = { workspace = true } ssv_types = { workspace = true } diff --git a/anchor/signature_collector/src/lib.rs b/anchor/signature_collector/src/lib.rs index c4ce1b07..cb48d9dd 100644 --- a/anchor/signature_collector/src/lib.rs +++ b/anchor/signature_collector/src/lib.rs @@ -1,5 +1,6 @@ use bls_lagrange::KeyId; use dashmap::DashMap; +use message_sender::MessageSender; use processor::{DropOnFinish, Senders, WorkItem}; use slot_clock::SlotClock; use ssv_types::{ClusterId, OperatorId}; @@ -29,16 +30,19 @@ struct SignatureCollector { pub struct SignatureCollectorManager { processor: Senders, + _message_sender: Arc, signature_collectors: DashMap, } impl SignatureCollectorManager { - pub fn new(processor: Senders, slot_clock: T) -> Result, CollectionError> - where - T: SlotClock + 'static, - { + pub fn new( + processor: Senders, + message_sender: impl MessageSender + 'static, + slot_clock: impl SlotClock + 'static, + ) -> Result, CollectionError> { let manager = Arc::new(Self { processor, + _message_sender: Arc::new(message_sender), signature_collectors: DashMap::new(), }); @@ -77,8 +81,8 @@ impl SignatureCollectorManager { self.processor.urgent_consensus.send_blocking( move || { let signature = Box::new(our_key.sign(request.signing_root)); + // todo use: manager.message_sender.sign_and_send(); let _ = manager.receive_partial_signature(request, our_operator_id, signature); - // TODO send signature over network }, SIGNER_NAME, )?; diff --git a/anchor/subnet_tracker/Cargo.toml b/anchor/subnet_tracker/Cargo.toml index 3e5d8186..26beb519 100644 --- a/anchor/subnet_tracker/Cargo.toml +++ b/anchor/subnet_tracker/Cargo.toml @@ -8,7 +8,6 @@ authors = ["Sigma Prime "] alloy = { workspace = true } database = { workspace = true } ethereum_serde_utils = "0.7.0" -log = "0.4.25" serde = { workspace = true } ssv_types = { workspace = true } task_executor = { workspace = true } diff --git a/anchor/subnet_tracker/src/lib.rs b/anchor/subnet_tracker/src/lib.rs index fd0e0d44..0a40a934 100644 --- a/anchor/subnet_tracker/src/lib.rs +++ b/anchor/subnet_tracker/src/lib.rs @@ -1,14 +1,14 @@ use alloy::primitives::ruint::aliases::U256; use database::{NetworkState, UniqueIndex}; -use log::warn; use serde::{Deserialize, Serialize}; +use ssv_types::CommitteeId; use std::collections::HashSet; use std::ops::Deref; use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::{mpsc, watch}; use tokio::time::sleep; -use tracing::debug; +use tracing::{debug, warn}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(transparent)] @@ -18,6 +18,16 @@ impl SubnetId { pub fn new(id: u64) -> Self { id.into() } + + pub fn from_committee(committee_id: CommitteeId, subnet_count: usize) -> Self { + // Derive a numeric "committee ID" and convert to an index in [0..subnet_count]. + let id = U256::from_be_bytes(*committee_id); + SubnetId( + (id % U256::from(subnet_count)) + .try_into() + .expect("modulo must be < subnet_count"), + ) + } } impl From for SubnetId { @@ -73,12 +83,8 @@ async fn subnet_tracker( let state = db.borrow(); for cluster_id in state.get_own_clusters() { if let Some(cluster) = state.clusters().get_by(cluster_id) { - // Derive a numeric "committee ID" and convert to an index in [0..subnet_count]. - let id = U256::from_be_bytes(*cluster.committee_id()); - let index = (id % U256::from(subnet_count)) - .try_into() - .expect("modulo must be < subnet_count"); - current_subnets.insert(index); + let subnet_id = SubnetId::from_committee(cluster.committee_id(), subnet_count); + current_subnets.insert(subnet_id); } } } @@ -87,11 +93,7 @@ async fn subnet_tracker( // send a `Leave` event. for subnet in previous_subnets.difference(¤t_subnets) { debug!(?subnet, "send leave"); - if tx - .send(SubnetEvent::Leave(SubnetId(*subnet))) - .await - .is_err() - { + if tx.send(SubnetEvent::Leave(*subnet)).await.is_err() { warn!("Network no longer listening for subnets"); return; } @@ -101,7 +103,7 @@ async fn subnet_tracker( // send a `Join` event. for subnet in current_subnets.difference(&previous_subnets) { debug!(?subnet, "send join"); - if tx.send(SubnetEvent::Join(SubnetId(*subnet))).await.is_err() { + if tx.send(SubnetEvent::Join(*subnet)).await.is_err() { warn!("Network no longer listening for subnets"); return; }