Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
dknopik committed Feb 25, 2025
1 parent f076a7a commit 4813ec6
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 29 deletions.
11 changes: 9 additions & 2 deletions anchor/message_sender/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ pub mod testing;
pub use crate::network::*;
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use tokio::sync::mpsc::error::TrySendError;

pub trait MessageSender: Send + Sync {
fn sign_and_send(&self, message: UnsignedSSVMessage);
fn send(&self, message: SignedSSVMessage);
fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error>;
fn send(&self, message: SignedSSVMessage) -> Result<(), Error>;
}

#[derive(Debug)]
pub enum Error {
Processor(TrySendError<processor::WorkItem>),
NetworkQueueClosed,
}
35 changes: 20 additions & 15 deletions anchor/message_sender/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::MessageSender;
use crate::{Error, MessageSender};
use database::{NetworkState, UniqueIndex};
use openssl::error::ErrorStack;
use openssl::hash::MessageDigest;
Expand All @@ -23,13 +23,17 @@ pub struct NetworkMessageSender {
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: PKey<Private>,
database: watch::Receiver<NetworkState>,
network_state_rx: watch::Receiver<NetworkState>,
operator_id: OperatorId,
subnet_count: usize,
}

impl MessageSender for Arc<NetworkMessageSender> {
fn sign_and_send(&self, message: UnsignedSSVMessage) {
fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error> {
if self.network_tx.is_closed() {
return Err(Error::NetworkQueueClosed);
}

let sender = self.clone();
self.processor
.urgent_consensus
Expand All @@ -48,7 +52,7 @@ impl MessageSender for Arc<NetworkMessageSender> {
message.ssv_message,
message.full_data,
) {
Ok(signature) => signature,
Ok(signed_message) => signed_message,
Err(err) => {
error!(?err, "Creating signed message failed!");
return;
Expand All @@ -58,10 +62,14 @@ impl MessageSender for Arc<NetworkMessageSender> {
},
SIGNER_NAME,
)
.unwrap_or_else(|e| warn!("Failed to send to processor: {}", e));
.map_err(Error::Processor)
}

fn send(&self, message: SignedSSVMessage) {
fn send(&self, message: SignedSSVMessage) -> Result<(), Error> {
if self.network_tx.is_closed() {
return Err(Error::NetworkQueueClosed);
}

let sender = self.clone();
self.processor
.urgent_consensus
Expand All @@ -71,7 +79,7 @@ impl MessageSender for Arc<NetworkMessageSender> {
},
SENDER_NAME,
)
.unwrap_or_else(|e| warn!("Failed to send to processor: {}", e));
.map_err(Error::Processor)
}
}

Expand All @@ -80,7 +88,7 @@ impl NetworkMessageSender {
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: Rsa<Private>,
database: watch::Receiver<NetworkState>,
network_state_rx: watch::Receiver<NetworkState>,
operator_id: OperatorId,
subnet_count: usize,
) -> Result<Arc<Self>, String> {
Expand All @@ -90,7 +98,7 @@ impl NetworkMessageSender {
processor,
network_tx,
private_key,
database,
network_state_rx,
operator_id,
subnet_count,
}))
Expand Down Expand Up @@ -123,13 +131,10 @@ impl NetworkMessageSender {
let committee_id = match msg_id.duty_executor() {
Some(DutyExecutor::Committee(committee_id)) => committee_id,
Some(DutyExecutor::Validator(pubkey)) => {
let database = self.database.borrow();
let Some(metadata) = database.metadata().get_by(&pubkey) else {
return Err(format!("Unknown validator: {pubkey}"));
};
let Some(cluster) = database.clusters().get_by(&metadata.cluster_id) else {
let database = self.network_state_rx.borrow();
let Some(cluster) = database.clusters().get_by(&pubkey) else {
return Err(format!(
"Inconsistent database, no cluster for validator: {pubkey}"
"No cluster for validator: {pubkey}"
));
};
cluster.committee_id()
Expand Down
16 changes: 8 additions & 8 deletions anchor/message_sender/src/testing.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
use crate::MessageSender;
use crate::{Error, MessageSender};
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use ssv_types::OperatorId;
use tokio::sync::mpsc;

pub struct TestingMessageSender {
pub struct MockMessageSender {
message_tx: mpsc::UnboundedSender<SignedSSVMessage>,
operator_id: OperatorId,
}

impl MessageSender for TestingMessageSender {
fn sign_and_send(&self, message: UnsignedSSVMessage) {
impl MessageSender for MockMessageSender {
fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error> {
let message = SignedSSVMessage::new(
vec![vec![]],
vec![self.operator_id],
message.ssv_message,
message.full_data,
)
.unwrap();
self.send(message);
self.send(message)
}

fn send(&self, message: SignedSSVMessage) {
self.message_tx.send(message).unwrap();
fn send(&self, message: SignedSSVMessage) -> Result<(), Error> {
self.message_tx.send(message).map_err(|_| Error::NetworkQueueClosed)
}
}

impl TestingMessageSender {
impl MockMessageSender {
pub fn new(
message_tx: mpsc::UnboundedSender<SignedSSVMessage>,
operator_id: OperatorId,
Expand Down
10 changes: 8 additions & 2 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
// Create a new instance and receive any buffered messages
let mut instance = Box::new(Qbft::new(config, initial, |message| {
let (_, unsigned) = message.desugar();
message_sender.clone().sign_and_send(unsigned);
if let Err(err) = message_sender.clone().sign_and_send(unsigned) {
error!(?err, "Unable to send qbft message!");
}
}));
for message in message_buffer {
instance.receive(message);
Expand Down Expand Up @@ -394,7 +396,11 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(

// Send the decided message (aggregated commit)
match qbft.get_aggregated_commit() {
Some(msg) => message_sender.clone().send(msg),
Some(msg) => {
if let Err(err) = message_sender.clone().send(msg) {
error!(?err, "Unable to send aggregated commit message");
}
},
None => error!("Aggregated commit does not exist"),
}

Expand Down
4 changes: 2 additions & 2 deletions anchor/qbft_manager/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{
CommitteeInstanceId, Completed, QbftDecidable, QbftError, QbftManager, WrappedQbftMessage,
};
use message_sender::testing::TestingMessageSender;
use message_sender::testing::MockMessageSender;
use processor::Senders;
use slot_clock::{ManualSlotClock, SlotClock};
use ssv_types::consensus::{BeaconVote, QbftMessage, QbftMessageType};
Expand Down Expand Up @@ -240,7 +240,7 @@ where
sender_queues.clone(),
operator_id,
slot_clock.clone(),
TestingMessageSender::new(network_tx.clone(), operator_id),
MockMessageSender::new(network_tx.clone(), operator_id),
)
.expect("Creation should not fail");

Expand Down

0 comments on commit 4813ec6

Please sign in to comment.