Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create common component for message serialization and signing #153

Open
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"anchor/eth",
"anchor/http_api",
"anchor/http_metrics",
"anchor/message_sender",
"anchor/network",
"anchor/processor",
"anchor/qbft_manager",
Expand All @@ -33,6 +34,7 @@ database = { path = "anchor/database" }
eth = { path = "anchor/eth" }
http_api = { path = "anchor/http_api" }
http_metrics = { path = "anchor/http_metrics" }
message_sender = { path = "anchor/message_sender" }
network = { path = "anchor/network" }
processor = { path = "anchor/processor" }
qbft = { path = "anchor/common/qbft" }
Expand Down
1 change: 1 addition & 0 deletions anchor/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fdlimit = "0.3"
http_api = { workspace = true }
http_metrics = { workspace = true }
hyper = { workspace = true }
message_sender = { workspace = true }
network = { workspace = true }
openssl = { workspace = true }
parking_lot = { workspace = true }
Expand Down
50 changes: 33 additions & 17 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use config::Config;
use database::NetworkDatabase;
use eth2::reqwest::{Certificate, ClientBuilder};
use eth2::{BeaconNodeHttpClient, Timeouts};
use message_sender::NetworkMessageSender;
use network::Network;
use openssl::pkey::Private;
use openssl::rsa::Rsa;
Expand All @@ -22,15 +23,14 @@ use sensitive_url::SensitiveUrl;
use signature_collector::SignatureCollectorManager;
use slashing_protection::SlashingDatabase;
use slot_clock::{SlotClock, SystemTimeSlotClock};
use ssv_types::message::SignedSSVMessage;
use ssv_types::OperatorId;
use std::fs::File;
use std::io::{ErrorKind, Read, Write};
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use subnet_tracker::start_subnet_tracker;
use subnet_tracker::{start_subnet_tracker, SubnetId};
use task_executor::TaskExecutor;
use tokio::net::TcpListener;
use tokio::select;
Expand Down Expand Up @@ -145,13 +145,6 @@ impl Client {
let subnet_tracker =
start_subnet_tracker(database.watch(), network::SUBNET_COUNT, &executor);

// Start the p2p network
let network = Network::try_new(&config.network, subnet_tracker, executor.clone())
.await
.map_err(|e| format!("Unable to start network: {e}"))?;
// Spawn the network listening task
executor.spawn(network.run(), "network");

// Initialize slashing protection.
let slashing_db_path = config.data_dir.join(SLASHING_PROTECTION_FILENAME);
let slashing_protection =
Expand Down Expand Up @@ -350,21 +343,44 @@ impl Client {
.await
.ok_or("Failed waiting for operator id")?;

// Create the signature collector
let signature_collector =
SignatureCollectorManager::new(processor_senders.clone(), slot_clock.clone())
.map_err(|e| format!("Unable to initialize signature collector manager: {e:?}"))?;

// Network sender/receiver
let (network_tx, _network_rx) = mpsc::unbounded_channel::<SignedSSVMessage>();
let (network_tx, network_rx) = mpsc::channel::<(SubnetId, Vec<u8>)>(9001);

let network_message_sender = NetworkMessageSender::new(
processor_senders.clone(),
network_tx.clone(),
key.clone(),
database.watch(),
operator_id,
network::SUBNET_COUNT,
)?;

// Start the p2p network
let network = Network::try_new(
&config.network,
subnet_tracker,
network_rx,
executor.clone(),
)
.await
.map_err(|e| format!("Unable to start network: {e}"))?;
// Spawn the network listening task
executor.spawn(network.run(), "network");

// Create the signature collector
let signature_collector = SignatureCollectorManager::new(
processor_senders.clone(),
network_message_sender.clone(),
slot_clock.clone(),
)
.map_err(|e| format!("Unable to initialize signature collector manager: {e:?}"))?;

// Create the qbft manager
let qbft_manager = QbftManager::new(
processor_senders.clone(),
operator_id,
slot_clock.clone(),
key.clone(),
network_tx.clone(),
network_message_sender,
)
.map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?;

Expand Down
18 changes: 18 additions & 0 deletions anchor/message_sender/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "message_sender"
version = "0.1.0"
edition = { workspace = true }
authors = ["Sigma Prime <[email protected]>"]

[dependencies]
database = { workspace = true }
ethereum_ssz = { workspace = true }
openssl = { workspace = true }
processor = { workspace = true }
ssv_types = { workspace = true }
subnet_tracker = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[features]
testing = []
13 changes: 13 additions & 0 deletions anchor/message_sender/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
mod network;

#[cfg(feature = "testing")]
pub mod testing;

pub use crate::network::*;
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;

pub trait MessageSender: Send + Sync {
Copy link

@diegomrsantos diegomrsantos Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice. In my PR, I was thinking about a way to split the "interface" and the implementation. First I created two crates, one with the trait and types and another with the implementation. The component C, which needs the functionality, imports only the interface crate, and who create C imports the implementation. But maybe two crates is too much overhead. I think we can achieve the same using one crate with different modules and features. Wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was also tempted to do that. This is for example what we do for the validator_store: This only contains the trait and types used in the interface, and there are creates lighthouse_validator_store in LH repo and anchor_validator_store in Anchor repo.

However, as you said, for small stuff like this, it is likely too much noise and too little benefit to split it up.

Copy link

@diegomrsantos diegomrsantos Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One benefit is enforcing modularity/encapsulation and to make sure that clients of the crate can only use what they are supposed to and nothing else. As a crate that provides a functionality grows, it's tempting for people to just use implementation details directly, bypassing the API when it's more convenient.

Copy link

@diegomrsantos diegomrsantos Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the whole point of the module system, right? Consider the message validator. It's harder to do that with only one crate as the network needs access to the interface and the client module needs access to the implementation, so both need to be public. That's why I mentioned features, I guess that's the only option if there's only one crate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get your point. I am using modules. How are features related? They are only for conditional compilation.

Copy link

@diegomrsantos diegomrsantos Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like this

// src/lib.rs

#[cfg(feature = "api")]
pub mod api;

#[cfg(feature = "impl")]
pub mod message_sender;

fn sign_and_send(&self, message: UnsignedSSVMessage);
fn send(&self, message: SignedSSVMessage);
}
141 changes: 141 additions & 0 deletions anchor/message_sender/src/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use crate::MessageSender;
use database::{NetworkState, UniqueIndex};
use openssl::error::ErrorStack;
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey, Private};
use openssl::rsa::Rsa;
use openssl::sign::Signer;
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use ssv_types::msgid::DutyExecutor;
use ssv_types::OperatorId;
use ssz::Encode;
use std::sync::Arc;
use subnet_tracker::SubnetId;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, watch};
use tracing::{debug, error, warn};

const SIGNER_NAME: &str = "message_sign_and_send";
const SENDER_NAME: &str = "message_send";

pub struct NetworkMessageSender {
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: PKey<Private>,
database: watch::Receiver<NetworkState>,
operator_id: OperatorId,
subnet_count: usize,
}

impl MessageSender for Arc<NetworkMessageSender> {
fn sign_and_send(&self, message: UnsignedSSVMessage) {
let sender = self.clone();
self.processor
.urgent_consensus
.send_blocking(
move || {
let signature = match sender.sign(&message) {
Ok(signature) => signature,
Err(err) => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if we need to propagate this error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of those errors that really really should never happen, and where there is not really a way to fall back. So there is no reason to propagate it somewhere.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to Murphy's law: Anything that can go wrong will go wrong :) It might be useful to test what happens to the rest of the system when this happens. I'll keep this in mind in my tasks.

error!(?err, "Signing message failed!");
return;
}
};
let message = match SignedSSVMessage::new(
vec![signature],
vec![sender.operator_id],
message.ssv_message,
message.full_data,
) {
Ok(signature) => signature,
Err(err) => {
error!(?err, "Creating signed message failed!");
return;
}
};
sender.do_send(message);
},
SIGNER_NAME,
)
.unwrap_or_else(|e| warn!("Failed to send to processor: {}", e));
}

fn send(&self, message: SignedSSVMessage) {
let sender = self.clone();
self.processor
.urgent_consensus
.send_blocking(
move || {
sender.do_send(message);
},
SENDER_NAME,
)
.unwrap_or_else(|e| warn!("Failed to send to processor: {}", e));
}
}

impl NetworkMessageSender {
pub fn new(
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: Rsa<Private>,
database: watch::Receiver<NetworkState>,
operator_id: OperatorId,
subnet_count: usize,
) -> Result<Arc<Self>, String> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to return only Self and let the caller decide if and when to wrap it in an Arc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually I'd agree, but we impl MessageSender on Arc<NetworkMessageSender>. So it is only useful wrapped.

let private_key = PKey::from_rsa(private_key)
.map_err(|err| format!("Failed to create PKey from RSA: {err}"))?;
Ok(Arc::new(Self {
processor,
network_tx,
private_key,
database,
operator_id,
subnet_count,
}))
}

fn do_send(&self, message: SignedSSVMessage) {
let subnet = match self.determine_subnet(&message) {
Ok(subnet) => subnet,
Err(err) => {
error!(?err, "Unable to determine subnet for outgoing message");
return;
}
};
match self.network_tx.try_send((subnet, message.as_ssz_bytes())) {
Ok(_) => debug!(?subnet, "Successfully sent message to network"),
Err(TrySendError::Closed(_)) => warn!("Network queue closed (shutting down?)"),
Err(TrySendError::Full(_)) => warn!("Network queue full, unable to send message!"),
}
}

fn sign(&self, message: &UnsignedSSVMessage) -> Result<Vec<u8>, ErrorStack> {
let serialized = message.ssv_message.as_ssz_bytes();
let mut signer = Signer::new(MessageDigest::sha256(), &self.private_key)?;
signer.update(&serialized)?;
signer.sign_to_vec()
}

fn determine_subnet(&self, message: &SignedSSVMessage) -> Result<SubnetId, String> {
let msg_id = message.ssv_message().msg_id();
let committee_id = match msg_id.duty_executor() {
Some(DutyExecutor::Committee(committee_id)) => committee_id,
Some(DutyExecutor::Validator(pubkey)) => {
let database = self.database.borrow();
let Some(metadata) = database.metadata().get_by(&pubkey) else {
return Err(format!("Unknown validator: {pubkey}"));
};
let Some(cluster) = database.clusters().get_by(&metadata.cluster_id) else {
return Err(format!(
"Inconsistent database, no cluster for validator: {pubkey}"
));
};
cluster.committee_id()
}
None => return Err(format!("Invalid message id: {msg_id:?}",)),
};
Ok(SubnetId::from_committee(committee_id, self.subnet_count))
}
}
39 changes: 39 additions & 0 deletions anchor/message_sender/src/testing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::MessageSender;
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use ssv_types::OperatorId;
use tokio::sync::mpsc;

pub struct TestingMessageSender {
message_tx: mpsc::UnboundedSender<SignedSSVMessage>,
operator_id: OperatorId,
}

impl MessageSender for TestingMessageSender {
fn sign_and_send(&self, message: UnsignedSSVMessage) {
let message = SignedSSVMessage::new(
vec![vec![]],
vec![self.operator_id],
message.ssv_message,
message.full_data,
)
.unwrap();
self.send(message);
}

fn send(&self, message: SignedSSVMessage) {
self.message_tx.send(message).unwrap();
}
}

impl TestingMessageSender {
pub fn new(
message_tx: mpsc::UnboundedSender<SignedSSVMessage>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could be created inside the constructor if no assertion is needed in the tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then we have to return (Self, Receiver), which is also kind of ugly.

operator_id: OperatorId,
) -> Self {
Self {
message_tx,
operator_id,
}
}
}
Loading