diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index a7313757..e958172f 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -350,7 +350,6 @@ impl Client { processor_senders.clone(), network_tx.clone(), key.clone(), - database.watch(), operator_id, network::SUBNET_COUNT, )?; diff --git a/anchor/message_sender/src/lib.rs b/anchor/message_sender/src/lib.rs index c13e005f..5e091039 100644 --- a/anchor/message_sender/src/lib.rs +++ b/anchor/message_sender/src/lib.rs @@ -6,11 +6,16 @@ pub mod testing; pub use crate::network::*; use ssv_types::consensus::UnsignedSSVMessage; use ssv_types::message::SignedSSVMessage; +use ssv_types::CommitteeId; use tokio::sync::mpsc::error::TrySendError; pub trait MessageSender: Send + Sync { - fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error>; - fn send(&self, message: SignedSSVMessage) -> Result<(), Error>; + fn sign_and_send( + &self, + message: UnsignedSSVMessage, + committee_id: CommitteeId, + ) -> Result<(), Error>; + fn send(&self, message: SignedSSVMessage, committee_id: CommitteeId) -> Result<(), Error>; } #[derive(Debug)] diff --git a/anchor/message_sender/src/network.rs b/anchor/message_sender/src/network.rs index 5b62e8e2..84858380 100644 --- a/anchor/message_sender/src/network.rs +++ b/anchor/message_sender/src/network.rs @@ -1,5 +1,4 @@ use crate::{Error, MessageSender}; -use database::{NetworkState, UniqueIndex}; use openssl::error::ErrorStack; use openssl::hash::MessageDigest; use openssl::pkey::{PKey, Private}; @@ -7,13 +6,12 @@ use openssl::rsa::Rsa; use openssl::sign::Signer; use ssv_types::consensus::UnsignedSSVMessage; use ssv_types::message::SignedSSVMessage; -use ssv_types::msgid::DutyExecutor; -use ssv_types::OperatorId; +use ssv_types::{CommitteeId, OperatorId}; use ssz::Encode; use std::sync::Arc; use subnet_tracker::SubnetId; +use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::{mpsc, watch}; use tracing::{debug, error, warn}; const SIGNER_NAME: &str = "message_sign_and_send"; @@ -23,13 +21,16 @@ pub struct NetworkMessageSender { processor: processor::Senders, network_tx: mpsc::Sender<(SubnetId, Vec)>, private_key: PKey, - network_state_rx: watch::Receiver, operator_id: OperatorId, subnet_count: usize, } impl MessageSender for Arc { - fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error> { + fn sign_and_send( + &self, + message: UnsignedSSVMessage, + committee_id: CommitteeId, + ) -> Result<(), Error> { if self.network_tx.is_closed() { return Err(Error::NetworkQueueClosed); } @@ -58,14 +59,14 @@ impl MessageSender for Arc { return; } }; - sender.do_send(message); + sender.do_send(message, committee_id); }, SIGNER_NAME, ) .map_err(Error::Processor) } - fn send(&self, message: SignedSSVMessage) -> Result<(), Error> { + fn send(&self, message: SignedSSVMessage, committee_id: CommitteeId) -> Result<(), Error> { if self.network_tx.is_closed() { return Err(Error::NetworkQueueClosed); } @@ -75,7 +76,7 @@ impl MessageSender for Arc { .urgent_consensus .send_blocking( move || { - sender.do_send(message); + sender.do_send(message, committee_id); }, SENDER_NAME, ) @@ -88,7 +89,6 @@ impl NetworkMessageSender { processor: processor::Senders, network_tx: mpsc::Sender<(SubnetId, Vec)>, private_key: Rsa, - network_state_rx: watch::Receiver, operator_id: OperatorId, subnet_count: usize, ) -> Result, String> { @@ -98,20 +98,13 @@ impl NetworkMessageSender { processor, network_tx, private_key, - network_state_rx, operator_id, subnet_count, })) } - fn do_send(&self, message: SignedSSVMessage) { - let subnet = match self.determine_subnet(&message) { - Ok(subnet) => subnet, - Err(err) => { - error!(?err, "Unable to determine subnet for outgoing message"); - return; - } - }; + fn do_send(&self, message: SignedSSVMessage, committee_id: CommitteeId) { + let subnet = SubnetId::from_committee(committee_id, self.subnet_count); match self.network_tx.try_send((subnet, message.as_ssz_bytes())) { Ok(_) => debug!(?subnet, "Successfully sent message to network"), Err(TrySendError::Closed(_)) => warn!("Network queue closed (shutting down?)"), @@ -125,22 +118,4 @@ impl NetworkMessageSender { signer.update(&serialized)?; signer.sign_to_vec() } - - fn determine_subnet(&self, message: &SignedSSVMessage) -> Result { - let msg_id = message.ssv_message().msg_id(); - let committee_id = match msg_id.duty_executor() { - Some(DutyExecutor::Committee(committee_id)) => committee_id, - Some(DutyExecutor::Validator(pubkey)) => { - let database = self.network_state_rx.borrow(); - let Some(cluster) = database.clusters().get_by(&pubkey) else { - return Err(format!( - "No cluster for validator: {pubkey}" - )); - }; - cluster.committee_id() - } - None => return Err(format!("Invalid message id: {msg_id:?}",)), - }; - Ok(SubnetId::from_committee(committee_id, self.subnet_count)) - } } diff --git a/anchor/message_sender/src/testing.rs b/anchor/message_sender/src/testing.rs index c354e683..46c83182 100644 --- a/anchor/message_sender/src/testing.rs +++ b/anchor/message_sender/src/testing.rs @@ -1,7 +1,7 @@ use crate::{Error, MessageSender}; use ssv_types::consensus::UnsignedSSVMessage; use ssv_types::message::SignedSSVMessage; -use ssv_types::OperatorId; +use ssv_types::{CommitteeId, OperatorId}; use tokio::sync::mpsc; pub struct MockMessageSender { @@ -10,7 +10,11 @@ pub struct MockMessageSender { } impl MessageSender for MockMessageSender { - fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error> { + fn sign_and_send( + &self, + message: UnsignedSSVMessage, + committee_id: CommitteeId, + ) -> Result<(), Error> { let message = SignedSSVMessage::new( vec![vec![]], vec![self.operator_id], @@ -18,11 +22,13 @@ impl MessageSender for MockMessageSender { message.full_data, ) .unwrap(); - self.send(message) + self.send(message, committee_id) } - fn send(&self, message: SignedSSVMessage) -> Result<(), Error> { - self.message_tx.send(message).map_err(|_| Error::NetworkQueueClosed) + fn send(&self, message: SignedSSVMessage, _committee_id: CommitteeId) -> Result<(), Error> { + self.message_tx + .send(message) + .map_err(|_| Error::NetworkQueueClosed) } } diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index 22e9533f..898c4c7f 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -318,10 +318,19 @@ async fn qbft_instance>( // The instance is uninitialized and we have received a manager message to // initialize it QbftInstance::Uninitialized { message_buffer } => { + let message_sender = message_sender.clone(); + let committee_id = config + .committee_members() + .iter() + .cloned() + .collect::>() + .into(); // Create a new instance and receive any buffered messages - let mut instance = Box::new(Qbft::new(config, initial, |message| { + let mut instance = Box::new(Qbft::new(config, initial, move |message| { let (_, unsigned) = message.desugar(); - if let Err(err) = message_sender.clone().sign_and_send(unsigned) { + if let Err(err) = + message_sender.clone().sign_and_send(unsigned, committee_id) + { error!(?err, "Unable to send qbft message!"); } })); @@ -397,10 +406,18 @@ async fn qbft_instance>( // Send the decided message (aggregated commit) match qbft.get_aggregated_commit() { Some(msg) => { - if let Err(err) = message_sender.clone().send(msg) { + let committee_id = qbft + .config() + .committee_members() + .iter() + .cloned() + .collect::>() + .into(); + + if let Err(err) = message_sender.clone().send(msg, committee_id) { error!(?err, "Unable to send aggregated commit message"); } - }, + } None => error!("Aggregated commit does not exist"), }