From 4813ec67f75060806eef1c28611ca46d0e5ddf31 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 25 Feb 2025 14:39:00 +0100 Subject: [PATCH] address review --- anchor/message_sender/src/lib.rs | 11 +++++++-- anchor/message_sender/src/network.rs | 35 ++++++++++++++++------------ anchor/message_sender/src/testing.rs | 16 ++++++------- anchor/qbft_manager/src/lib.rs | 10 ++++++-- anchor/qbft_manager/src/tests.rs | 4 ++-- 5 files changed, 47 insertions(+), 29 deletions(-) diff --git a/anchor/message_sender/src/lib.rs b/anchor/message_sender/src/lib.rs index 409cd50d..c13e005f 100644 --- a/anchor/message_sender/src/lib.rs +++ b/anchor/message_sender/src/lib.rs @@ -6,8 +6,15 @@ pub mod testing; pub use crate::network::*; use ssv_types::consensus::UnsignedSSVMessage; use ssv_types::message::SignedSSVMessage; +use tokio::sync::mpsc::error::TrySendError; pub trait MessageSender: Send + Sync { - fn sign_and_send(&self, message: UnsignedSSVMessage); - fn send(&self, message: SignedSSVMessage); + fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error>; + fn send(&self, message: SignedSSVMessage) -> Result<(), Error>; +} + +#[derive(Debug)] +pub enum Error { + Processor(TrySendError), + NetworkQueueClosed, } diff --git a/anchor/message_sender/src/network.rs b/anchor/message_sender/src/network.rs index 79594c0d..5b62e8e2 100644 --- a/anchor/message_sender/src/network.rs +++ b/anchor/message_sender/src/network.rs @@ -1,4 +1,4 @@ -use crate::MessageSender; +use crate::{Error, MessageSender}; use database::{NetworkState, UniqueIndex}; use openssl::error::ErrorStack; use openssl::hash::MessageDigest; @@ -23,13 +23,17 @@ pub struct NetworkMessageSender { processor: processor::Senders, network_tx: mpsc::Sender<(SubnetId, Vec)>, private_key: PKey, - database: watch::Receiver, + network_state_rx: watch::Receiver, operator_id: OperatorId, subnet_count: usize, } impl MessageSender for Arc { - fn sign_and_send(&self, message: UnsignedSSVMessage) { + fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error> { + if self.network_tx.is_closed() { + return Err(Error::NetworkQueueClosed); + } + let sender = self.clone(); self.processor .urgent_consensus @@ -48,7 +52,7 @@ impl MessageSender for Arc { message.ssv_message, message.full_data, ) { - Ok(signature) => signature, + Ok(signed_message) => signed_message, Err(err) => { error!(?err, "Creating signed message failed!"); return; @@ -58,10 +62,14 @@ impl MessageSender for Arc { }, SIGNER_NAME, ) - .unwrap_or_else(|e| warn!("Failed to send to processor: {}", e)); + .map_err(Error::Processor) } - fn send(&self, message: SignedSSVMessage) { + fn send(&self, message: SignedSSVMessage) -> Result<(), Error> { + if self.network_tx.is_closed() { + return Err(Error::NetworkQueueClosed); + } + let sender = self.clone(); self.processor .urgent_consensus @@ -71,7 +79,7 @@ impl MessageSender for Arc { }, SENDER_NAME, ) - .unwrap_or_else(|e| warn!("Failed to send to processor: {}", e)); + .map_err(Error::Processor) } } @@ -80,7 +88,7 @@ impl NetworkMessageSender { processor: processor::Senders, network_tx: mpsc::Sender<(SubnetId, Vec)>, private_key: Rsa, - database: watch::Receiver, + network_state_rx: watch::Receiver, operator_id: OperatorId, subnet_count: usize, ) -> Result, String> { @@ -90,7 +98,7 @@ impl NetworkMessageSender { processor, network_tx, private_key, - database, + network_state_rx, operator_id, subnet_count, })) @@ -123,13 +131,10 @@ impl NetworkMessageSender { let committee_id = match msg_id.duty_executor() { Some(DutyExecutor::Committee(committee_id)) => committee_id, Some(DutyExecutor::Validator(pubkey)) => { - let database = self.database.borrow(); - let Some(metadata) = database.metadata().get_by(&pubkey) else { - return Err(format!("Unknown validator: {pubkey}")); - }; - let Some(cluster) = database.clusters().get_by(&metadata.cluster_id) else { + let database = self.network_state_rx.borrow(); + let Some(cluster) = database.clusters().get_by(&pubkey) else { return Err(format!( - "Inconsistent database, no cluster for validator: {pubkey}" + "No cluster for validator: {pubkey}" )); }; cluster.committee_id() diff --git a/anchor/message_sender/src/testing.rs b/anchor/message_sender/src/testing.rs index 3d62b341..c354e683 100644 --- a/anchor/message_sender/src/testing.rs +++ b/anchor/message_sender/src/testing.rs @@ -1,16 +1,16 @@ -use crate::MessageSender; +use crate::{Error, MessageSender}; use ssv_types::consensus::UnsignedSSVMessage; use ssv_types::message::SignedSSVMessage; use ssv_types::OperatorId; use tokio::sync::mpsc; -pub struct TestingMessageSender { +pub struct MockMessageSender { message_tx: mpsc::UnboundedSender, operator_id: OperatorId, } -impl MessageSender for TestingMessageSender { - fn sign_and_send(&self, message: UnsignedSSVMessage) { +impl MessageSender for MockMessageSender { + fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error> { let message = SignedSSVMessage::new( vec![vec![]], vec![self.operator_id], @@ -18,15 +18,15 @@ impl MessageSender for TestingMessageSender { message.full_data, ) .unwrap(); - self.send(message); + self.send(message) } - fn send(&self, message: SignedSSVMessage) { - self.message_tx.send(message).unwrap(); + fn send(&self, message: SignedSSVMessage) -> Result<(), Error> { + self.message_tx.send(message).map_err(|_| Error::NetworkQueueClosed) } } -impl TestingMessageSender { +impl MockMessageSender { pub fn new( message_tx: mpsc::UnboundedSender, operator_id: OperatorId, diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index b159d8be..22e9533f 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -321,7 +321,9 @@ async fn qbft_instance>( // Create a new instance and receive any buffered messages let mut instance = Box::new(Qbft::new(config, initial, |message| { let (_, unsigned) = message.desugar(); - message_sender.clone().sign_and_send(unsigned); + if let Err(err) = message_sender.clone().sign_and_send(unsigned) { + error!(?err, "Unable to send qbft message!"); + } })); for message in message_buffer { instance.receive(message); @@ -394,7 +396,11 @@ async fn qbft_instance>( // Send the decided message (aggregated commit) match qbft.get_aggregated_commit() { - Some(msg) => message_sender.clone().send(msg), + Some(msg) => { + if let Err(err) = message_sender.clone().send(msg) { + error!(?err, "Unable to send aggregated commit message"); + } + }, None => error!("Aggregated commit does not exist"), } diff --git a/anchor/qbft_manager/src/tests.rs b/anchor/qbft_manager/src/tests.rs index 133bd188..8fdfd6c0 100644 --- a/anchor/qbft_manager/src/tests.rs +++ b/anchor/qbft_manager/src/tests.rs @@ -1,7 +1,7 @@ use super::{ CommitteeInstanceId, Completed, QbftDecidable, QbftError, QbftManager, WrappedQbftMessage, }; -use message_sender::testing::TestingMessageSender; +use message_sender::testing::MockMessageSender; use processor::Senders; use slot_clock::{ManualSlotClock, SlotClock}; use ssv_types::consensus::{BeaconVote, QbftMessage, QbftMessageType}; @@ -240,7 +240,7 @@ where sender_queues.clone(), operator_id, slot_clock.clone(), - TestingMessageSender::new(network_tx.clone(), operator_id), + MockMessageSender::new(network_tx.clone(), operator_id), ) .expect("Creation should not fail");