Skip to content

Commit

Permalink
create common component for message serialization and signing
Browse files Browse the repository at this point in the history
  • Loading branch information
dknopik committed Feb 20, 2025
1 parent 8d2ae13 commit 82411fb
Show file tree
Hide file tree
Showing 15 changed files with 293 additions and 122 deletions.
20 changes: 17 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"anchor/eth",
"anchor/http_api",
"anchor/http_metrics",
"anchor/message_sender",
"anchor/network",
"anchor/processor",
"anchor/qbft_manager",
Expand All @@ -33,6 +34,7 @@ database = { path = "anchor/database" }
eth = { path = "anchor/eth" }
http_api = { path = "anchor/http_api" }
http_metrics = { path = "anchor/http_metrics" }
message_sender = { path = "anchor/message_sender" }
network = { path = "anchor/network" }
processor = { path = "anchor/processor" }
qbft = { path = "anchor/common/qbft" }
Expand Down
1 change: 1 addition & 0 deletions anchor/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fdlimit = "0.3"
http_api = { workspace = true }
http_metrics = { workspace = true }
hyper = { workspace = true }
message_sender = { workspace = true }
network = { workspace = true }
openssl = { workspace = true }
parking_lot = { workspace = true }
Expand Down
31 changes: 21 additions & 10 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use config::Config;
use database::NetworkDatabase;
use eth2::reqwest::{Certificate, ClientBuilder};
use eth2::{BeaconNodeHttpClient, Timeouts};
use message_sender::NetworkMessageSender;
use network::Network;
use openssl::pkey::Private;
use openssl::rsa::Rsa;
Expand All @@ -22,15 +23,14 @@ use sensitive_url::SensitiveUrl;
use signature_collector::SignatureCollectorManager;
use slashing_protection::SlashingDatabase;
use slot_clock::{SlotClock, SystemTimeSlotClock};
use ssv_types::message::SignedSSVMessage;
use ssv_types::OperatorId;
use std::fs::File;
use std::io::{ErrorKind, Read, Write};
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use subnet_tracker::start_subnet_tracker;
use subnet_tracker::{start_subnet_tracker, SubnetId};
use task_executor::TaskExecutor;
use tokio::net::TcpListener;
use tokio::select;
Expand Down Expand Up @@ -350,21 +350,32 @@ impl Client {
.await
.ok_or("Failed waiting for operator id")?;

// Create the signature collector
let signature_collector =
SignatureCollectorManager::new(processor_senders.clone(), slot_clock.clone())
.map_err(|e| format!("Unable to initialize signature collector manager: {e:?}"))?;

// Network sender/receiver
let (network_tx, _network_rx) = mpsc::unbounded_channel::<SignedSSVMessage>();
let (network_tx, _network_rx) = mpsc::channel::<(SubnetId, Vec<u8>)>(9001);

let network_message_sender = NetworkMessageSender::new(
processor_senders.clone(),
network_tx.clone(),
key.clone(),
database.watch(),
operator_id,
network::SUBNET_COUNT,
)?;

// Create the signature collector
let signature_collector = SignatureCollectorManager::new(
processor_senders.clone(),
network_message_sender.clone(),
slot_clock.clone(),
)
.map_err(|e| format!("Unable to initialize signature collector manager: {e:?}"))?;

// Create the qbft manager
let qbft_manager = QbftManager::new(
processor_senders.clone(),
operator_id,
slot_clock.clone(),
key.clone(),
network_tx.clone(),
network_message_sender,
)
.map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?;

Expand Down
18 changes: 18 additions & 0 deletions anchor/message_sender/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "message_sender"
version = "0.1.0"
edition = { workspace = true }
authors = ["Sigma Prime <[email protected]>"]

[dependencies]
database = { workspace = true }
ethereum_ssz = { workspace = true }
openssl = { workspace = true }
processor = { workspace = true }
ssv_types = { workspace = true }
subnet_tracker = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[features]
testing = []
13 changes: 13 additions & 0 deletions anchor/message_sender/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
mod network;

#[cfg(feature = "testing")]
pub mod testing;

pub use crate::network::*;
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;

pub trait MessageSender: Send + Sync {
fn sign_and_send(&self, message: UnsignedSSVMessage);
fn send(&self, message: SignedSSVMessage);
}
141 changes: 141 additions & 0 deletions anchor/message_sender/src/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use crate::MessageSender;
use database::{NetworkState, UniqueIndex};
use openssl::error::ErrorStack;
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey, Private};
use openssl::rsa::Rsa;
use openssl::sign::Signer;
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use ssv_types::msgid::DutyExecutor;
use ssv_types::OperatorId;
use ssz::Encode;
use std::sync::Arc;
use subnet_tracker::SubnetId;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, watch};
use tracing::{debug, error, warn};

const SIGNER_NAME: &str = "message_sign_and_send";
const SENDER_NAME: &str = "message_send";

pub struct NetworkMessageSender {
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: PKey<Private>,
database: watch::Receiver<NetworkState>,
operator_id: OperatorId,
subnet_count: usize,
}

impl MessageSender for Arc<NetworkMessageSender> {
fn sign_and_send(&self, message: UnsignedSSVMessage) {
let sender = self.clone();
self.processor
.urgent_consensus
.send_blocking(
move || {
let signature = match sender.sign(&message) {
Ok(signature) => signature,
Err(err) => {
error!(?err, "Signing message failed!");
return;
}
};
let message = match SignedSSVMessage::new(
vec![signature],
vec![sender.operator_id],
message.ssv_message,
message.full_data,
) {
Ok(signature) => signature,
Err(err) => {
error!(?err, "Creating signed message failed!");
return;
}
};
sender.do_send(message);
},
SIGNER_NAME,
)
.unwrap_or_else(|e| warn!("Failed to send to processor: {}", e));
}

fn send(&self, message: SignedSSVMessage) {
let sender = self.clone();
self.processor
.urgent_consensus
.send_blocking(
move || {
sender.do_send(message);
},
SENDER_NAME,
)
.unwrap_or_else(|e| warn!("Failed to send to processor: {}", e));
}
}

impl NetworkMessageSender {
pub fn new(
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: Rsa<Private>,
database: watch::Receiver<NetworkState>,
operator_id: OperatorId,
subnet_count: usize,
) -> Result<Arc<Self>, String> {
let private_key = PKey::from_rsa(private_key)
.map_err(|err| format!("Failed to create PKey from RSA: {err}"))?;
Ok(Arc::new(Self {
processor,
network_tx,
private_key,
database,
operator_id,
subnet_count,
}))
}

fn do_send(&self, message: SignedSSVMessage) {
let subnet = match self.determine_subnet(&message) {
Ok(subnet) => subnet,
Err(err) => {
error!(?err, "Unable to determine subnet for outgoing message");
return;
}
};
match self.network_tx.try_send((subnet, message.as_ssz_bytes())) {
Ok(_) => debug!(?subnet, "Successfully sent message to network"),
Err(TrySendError::Closed(_)) => warn!("Network queue closed (shutting down?)"),
Err(TrySendError::Full(_)) => warn!("Network queue full, unable to send message!"),
}
}

fn sign(&self, message: &UnsignedSSVMessage) -> Result<Vec<u8>, ErrorStack> {
let serialized = message.ssv_message.as_ssz_bytes();
let mut signer = Signer::new(MessageDigest::sha256(), &self.private_key)?;
signer.update(&serialized)?;
signer.sign_to_vec()
}

fn determine_subnet(&self, message: &SignedSSVMessage) -> Result<SubnetId, String> {
let msg_id = message.ssv_message().msg_id();
let committee_id = match msg_id.duty_executor() {
Some(DutyExecutor::Committee(committee_id)) => committee_id,
Some(DutyExecutor::Validator(pubkey)) => {
let database = self.database.borrow();
let Some(metadata) = database.metadata().get_by(&pubkey) else {
return Err(format!("Unknown validator: {pubkey}"));
};
let Some(cluster) = database.clusters().get_by(&metadata.cluster_id) else {
return Err(format!(
"Inconsistent database, no cluster for validator: {pubkey}"
));
};
cluster.committee_id()
}
None => return Err(format!("Invalid message id: {msg_id:?}",)),
};
Ok(SubnetId::from_committee(committee_id, self.subnet_count))
}
}
39 changes: 39 additions & 0 deletions anchor/message_sender/src/testing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::MessageSender;
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use ssv_types::OperatorId;
use tokio::sync::mpsc;

pub struct TestingMessageSender {
message_tx: mpsc::UnboundedSender<SignedSSVMessage>,
operator_id: OperatorId,
}

impl MessageSender for TestingMessageSender {
fn sign_and_send(&self, message: UnsignedSSVMessage) {
let message = SignedSSVMessage::new(
vec![vec![]],
vec![self.operator_id],
message.ssv_message,
message.full_data,
)
.unwrap();
self.send(message);
}

fn send(&self, message: SignedSSVMessage) {
self.message_tx.send(message).unwrap();
}
}

impl TestingMessageSender {
pub fn new(
message_tx: mpsc::UnboundedSender<SignedSSVMessage>,
operator_id: OperatorId,
) -> Self {
Self {
message_tx,
operator_id,
}
}
}
5 changes: 2 additions & 3 deletions anchor/qbft_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = { workspace = true }
dashmap = { workspace = true }
ethereum_ssz = { workspace = true }
ethereum_ssz_derive = { workspace = true }
openssl = { workspace = true }
message_sender = { workspace = true }
processor = { workspace = true }
qbft = { workspace = true }
slot_clock = { workspace = true }
Expand All @@ -20,8 +20,7 @@ types = { workspace = true }
[dev-dependencies]
async-channel = { workspace = true }
futures = { workspace = true }
openssl = { workspace = true }
rand = { workspace = true }
message_sender = { workspace = true, features = ["testing"] }
task_executor = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
Loading

0 comments on commit 82411fb

Please sign in to comment.