Skip to content

Commit

Permalink
pass committee id instead of fetching from network state
Browse files Browse the repository at this point in the history
  • Loading branch information
dknopik committed Feb 25, 2025
1 parent 4813ec6 commit 8a489c0
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 49 deletions.
1 change: 0 additions & 1 deletion anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ impl Client {
processor_senders.clone(),
network_tx.clone(),
key.clone(),
database.watch(),
operator_id,
network::SUBNET_COUNT,
)?;
Expand Down
9 changes: 7 additions & 2 deletions anchor/message_sender/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ pub mod testing;
pub use crate::network::*;
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use ssv_types::CommitteeId;
use tokio::sync::mpsc::error::TrySendError;

pub trait MessageSender: Send + Sync {
fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error>;
fn send(&self, message: SignedSSVMessage) -> Result<(), Error>;
fn sign_and_send(
&self,
message: UnsignedSSVMessage,
committee_id: CommitteeId,
) -> Result<(), Error>;
fn send(&self, message: SignedSSVMessage, committee_id: CommitteeId) -> Result<(), Error>;
}

#[derive(Debug)]
Expand Down
49 changes: 12 additions & 37 deletions anchor/message_sender/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use crate::{Error, MessageSender};
use database::{NetworkState, UniqueIndex};
use openssl::error::ErrorStack;
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey, Private};
use openssl::rsa::Rsa;
use openssl::sign::Signer;
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use ssv_types::msgid::DutyExecutor;
use ssv_types::OperatorId;
use ssv_types::{CommitteeId, OperatorId};
use ssz::Encode;
use std::sync::Arc;
use subnet_tracker::SubnetId;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, watch};
use tracing::{debug, error, warn};

const SIGNER_NAME: &str = "message_sign_and_send";
Expand All @@ -23,13 +21,16 @@ pub struct NetworkMessageSender {
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: PKey<Private>,
network_state_rx: watch::Receiver<NetworkState>,
operator_id: OperatorId,
subnet_count: usize,
}

impl MessageSender for Arc<NetworkMessageSender> {
fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error> {
fn sign_and_send(
&self,
message: UnsignedSSVMessage,
committee_id: CommitteeId,
) -> Result<(), Error> {
if self.network_tx.is_closed() {
return Err(Error::NetworkQueueClosed);
}
Expand Down Expand Up @@ -58,14 +59,14 @@ impl MessageSender for Arc<NetworkMessageSender> {
return;
}
};
sender.do_send(message);
sender.do_send(message, committee_id);
},
SIGNER_NAME,
)
.map_err(Error::Processor)
}

fn send(&self, message: SignedSSVMessage) -> Result<(), Error> {
fn send(&self, message: SignedSSVMessage, committee_id: CommitteeId) -> Result<(), Error> {
if self.network_tx.is_closed() {
return Err(Error::NetworkQueueClosed);
}
Expand All @@ -75,7 +76,7 @@ impl MessageSender for Arc<NetworkMessageSender> {
.urgent_consensus
.send_blocking(
move || {
sender.do_send(message);
sender.do_send(message, committee_id);
},
SENDER_NAME,
)
Expand All @@ -88,7 +89,6 @@ impl NetworkMessageSender {
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: Rsa<Private>,
network_state_rx: watch::Receiver<NetworkState>,
operator_id: OperatorId,
subnet_count: usize,
) -> Result<Arc<Self>, String> {
Expand All @@ -98,20 +98,13 @@ impl NetworkMessageSender {
processor,
network_tx,
private_key,
network_state_rx,
operator_id,
subnet_count,
}))
}

fn do_send(&self, message: SignedSSVMessage) {
let subnet = match self.determine_subnet(&message) {
Ok(subnet) => subnet,
Err(err) => {
error!(?err, "Unable to determine subnet for outgoing message");
return;
}
};
fn do_send(&self, message: SignedSSVMessage, committee_id: CommitteeId) {
let subnet = SubnetId::from_committee(committee_id, self.subnet_count);
match self.network_tx.try_send((subnet, message.as_ssz_bytes())) {
Ok(_) => debug!(?subnet, "Successfully sent message to network"),
Err(TrySendError::Closed(_)) => warn!("Network queue closed (shutting down?)"),
Expand All @@ -125,22 +118,4 @@ impl NetworkMessageSender {
signer.update(&serialized)?;
signer.sign_to_vec()
}

fn determine_subnet(&self, message: &SignedSSVMessage) -> Result<SubnetId, String> {
let msg_id = message.ssv_message().msg_id();
let committee_id = match msg_id.duty_executor() {
Some(DutyExecutor::Committee(committee_id)) => committee_id,
Some(DutyExecutor::Validator(pubkey)) => {
let database = self.network_state_rx.borrow();
let Some(cluster) = database.clusters().get_by(&pubkey) else {
return Err(format!(
"No cluster for validator: {pubkey}"
));
};
cluster.committee_id()
}
None => return Err(format!("Invalid message id: {msg_id:?}",)),
};
Ok(SubnetId::from_committee(committee_id, self.subnet_count))
}
}
16 changes: 11 additions & 5 deletions anchor/message_sender/src/testing.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{Error, MessageSender};
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use ssv_types::OperatorId;
use ssv_types::{CommitteeId, OperatorId};
use tokio::sync::mpsc;

pub struct MockMessageSender {
Expand All @@ -10,19 +10,25 @@ pub struct MockMessageSender {
}

impl MessageSender for MockMessageSender {
fn sign_and_send(&self, message: UnsignedSSVMessage) -> Result<(), Error> {
fn sign_and_send(
&self,
message: UnsignedSSVMessage,
committee_id: CommitteeId,
) -> Result<(), Error> {
let message = SignedSSVMessage::new(
vec![vec![]],
vec![self.operator_id],
message.ssv_message,
message.full_data,
)
.unwrap();
self.send(message)
self.send(message, committee_id)
}

fn send(&self, message: SignedSSVMessage) -> Result<(), Error> {
self.message_tx.send(message).map_err(|_| Error::NetworkQueueClosed)
fn send(&self, message: SignedSSVMessage, _committee_id: CommitteeId) -> Result<(), Error> {
self.message_tx
.send(message)
.map_err(|_| Error::NetworkQueueClosed)
}
}

Expand Down
25 changes: 21 additions & 4 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,19 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
// The instance is uninitialized and we have received a manager message to
// initialize it
QbftInstance::Uninitialized { message_buffer } => {
let message_sender = message_sender.clone();
let committee_id = config
.committee_members()
.iter()
.cloned()
.collect::<Vec<_>>()
.into();
// Create a new instance and receive any buffered messages
let mut instance = Box::new(Qbft::new(config, initial, |message| {
let mut instance = Box::new(Qbft::new(config, initial, move |message| {
let (_, unsigned) = message.desugar();
if let Err(err) = message_sender.clone().sign_and_send(unsigned) {
if let Err(err) =
message_sender.clone().sign_and_send(unsigned, committee_id)
{
error!(?err, "Unable to send qbft message!");
}
}));
Expand Down Expand Up @@ -397,10 +406,18 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
// Send the decided message (aggregated commit)
match qbft.get_aggregated_commit() {
Some(msg) => {
if let Err(err) = message_sender.clone().send(msg) {
let committee_id = qbft
.config()
.committee_members()
.iter()
.cloned()
.collect::<Vec<_>>()
.into();

if let Err(err) = message_sender.clone().send(msg, committee_id) {
error!(?err, "Unable to send aggregated commit message");
}
},
}
None => error!("Aggregated commit does not exist"),
}

Expand Down

0 comments on commit 8a489c0

Please sign in to comment.