Skip to content

Commit

Permalink
Merge branch 'unstable' into sync-error-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Zacholme7 committed Feb 12, 2025
2 parents a8310dc + 72b2c24 commit ef123d3
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 79 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use network::Network;
use openssl::pkey::Private;
use openssl::rsa::Rsa;
use parking_lot::RwLock;
use qbft::Message;
use qbft_manager::QbftManager;
use sensitive_url::SensitiveUrl;
use signature_collector::SignatureCollectorManager;
use slashing_protection::SlashingDatabase;
use slot_clock::{SlotClock, SystemTimeSlotClock};
use ssv_types::message::SignedSSVMessage;
use ssv_types::OperatorId;
use std::fs::File;
use std::io::{ErrorKind, Read, Write};
Expand Down Expand Up @@ -349,13 +349,16 @@ impl Client {
SignatureCollectorManager::new(processor_senders.clone(), slot_clock.clone())
.map_err(|e| format!("Unable to initialize signature collector manager: {e:?}"))?;

// Network sender/receiver
let (network_tx, _network_rx) = mpsc::unbounded_channel::<SignedSSVMessage>();

// Create the qbft manager
let (qbft_sender, _qbft_receiver) = mpsc::channel::<Message>(500);
let qbft_manager = QbftManager::new(
processor_senders.clone(),
operator_id,
slot_clock.clone(),
qbft_sender,
key.clone(),
network_tx.clone(),
)
.map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?;

Expand Down
9 changes: 1 addition & 8 deletions anchor/common/qbft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ where
// Ensure that this message is for the correct round
let current_round = self.current_round.get();
if (wrapped_msg.qbft_message.round < current_round as u64)
|| (current_round > self.config.max_rounds())
|| (wrapped_msg.qbft_message.round > self.config.max_rounds() as u64)
{
warn!(
propose_round = wrapped_msg.qbft_message.round,
Expand Down Expand Up @@ -322,13 +322,6 @@ where

// Send the initial proposal and then the following prepare
self.send_proposal(data_hash, data);
self.send_prepare(data_hash);

// Since we are the leader and sent the proposal, switch to prepare state and accept
// proposal
self.state = InstanceState::Prepare;
self.proposal_accepted_for_current_round = true;
self.proposal_root = Some(data_hash);
}
}

Expand Down
11 changes: 11 additions & 0 deletions anchor/common/qbft/src/qbft_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ pub enum Message {
RoundChange(OperatorId, UnsignedSSVMessage),
}

impl Message {
pub fn desugar(&self) -> (OperatorId, UnsignedSSVMessage) {
match self {
Message::Propose(id, msg)
| Message::Prepare(id, msg)
| Message::Commit(id, msg)
| Message::RoundChange(id, msg) => (*id, msg.clone()),
}
}
}

/// Type definitions for the allowable messages
/// This holds the consensus data for a given round.
#[derive(Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion anchor/common/ssv_types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub trait QbftData: Debug + Clone + Encode + Decode {
}

/// A SSV Message that has not been signed yet.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Encode)]
pub struct UnsignedSSVMessage {
/// The SSV Message to be send. This is either a consensus message which contains a serialized
/// QbftMessage, or a partial signature message which contains a PartialSignatureMessage
Expand Down
2 changes: 2 additions & 0 deletions anchor/qbft_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = { workspace = true }
dashmap = { workspace = true }
ethereum_ssz = { workspace = true }
ethereum_ssz_derive = { workspace = true }
openssl = { workspace = true }
processor = { workspace = true }
qbft = { workspace = true }
slot_clock = { workspace = true }
Expand All @@ -19,6 +20,7 @@ types = { workspace = true }
[dev-dependencies]
async-channel = { workspace = true }
futures = { workspace = true }
openssl = { workspace = true }
rand = { workspace = true }
task_executor = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
Expand Down
105 changes: 79 additions & 26 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
use dashmap::DashMap;
use openssl::hash::MessageDigest;
use openssl::pkey::{PKey, Private};
use openssl::rsa::Rsa;
use openssl::sign::Signer;

use processor::{DropOnFinish, Senders, WorkItem};
use qbft::{
Completed, ConfigBuilder, ConfigBuilderError, DefaultLeaderFunction, InstanceHeight, Message,
WrappedQbftMessage,
};
use slot_clock::SlotClock;
use ssv_types::consensus::{BeaconVote, QbftData, ValidatorConsensusData};
use ssv_types::consensus::{BeaconVote, QbftData, UnsignedSSVMessage, ValidatorConsensusData};
use std::error::Error;

use ssv_types::message::SignedSSVMessage;
use ssv_types::OperatorId as QbftOperatorId;
use ssv_types::{Cluster, ClusterId, OperatorId};
use ssz::Encode;
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
Expand All @@ -27,6 +35,7 @@ mod tests;
const QBFT_INSTANCE_NAME: &str = "qbft_instance";
const QBFT_MESSAGE_NAME: &str = "qbft_message";
const QBFT_CLEANER_NAME: &str = "qbft_cleaner";
const QBFT_SIGNER_NAME: &str = "qbft_signer";

/// Number of slots to keep before the current slot
const QBFT_RETAIN_SLOTS: u64 = 1;
Expand Down Expand Up @@ -94,9 +103,10 @@ pub struct QbftManager<T: SlotClock + 'static> {
validator_consensus_data_instances: Map<ValidatorInstanceId, ValidatorConsensusData>,
// All of the QBFT instances that are voting on beacon data
beacon_vote_instances: Map<CommitteeInstanceId, BeaconVote>,
// Takes messages from qbft instances and sends them to be signed
// TODO!(). This will be the network channel for passing signatures from processor -> network
qbft_out: mpsc::Sender<Message>,
// Private key used for signing messages
pkey: Arc<PKey<Private>>,
// Channel to pass signed messages along to the network
network_tx: mpsc::UnboundedSender<SignedSSVMessage>,
}

impl<T: SlotClock> QbftManager<T> {
Expand All @@ -105,15 +115,19 @@ impl<T: SlotClock> QbftManager<T> {
processor: Senders,
operator_id: OperatorId,
slot_clock: T,
qbft_out: mpsc::Sender<Message>,
key: Rsa<Private>,
network_tx: mpsc::UnboundedSender<SignedSSVMessage>,
) -> Result<Arc<Self>, QbftError> {
let pkey = Arc::new(PKey::from_rsa(key).expect("Failed to create PKey from RSA"));

let manager = Arc::new(QbftManager {
processor,
operator_id,
slot_clock,
validator_consensus_data_instances: DashMap::new(),
beacon_vote_instances: DashMap::new(),
qbft_out,
pkey,
network_tx,
});

// Start a long running task that will clean up old instances
Expand Down Expand Up @@ -147,7 +161,7 @@ impl<T: SlotClock> QbftManager<T> {

// Get or spawn a new qbft instance. This will return the sender that we can use to send
// new messages to the specific instance
let sender = D::get_or_spawn_instance(self, id, self.qbft_out.clone());
let sender = D::get_or_spawn_instance(self, id);
self.processor.urgent_consensus.send_immediate(
move |drop_on_finish: DropOnFinish| {
// A message to initialize this instance
Expand All @@ -173,7 +187,7 @@ impl<T: SlotClock> QbftManager<T> {
id: D::Id,
data: WrappedQbftMessage,
) -> Result<(), QbftError> {
let sender = D::get_or_spawn_instance(self, id, self.qbft_out.clone());
let sender = D::get_or_spawn_instance(self, id);
self.processor.urgent_consensus.send_immediate(
move |drop_on_finish: DropOnFinish| {
let _ = sender.send(QbftMessage {
Expand Down Expand Up @@ -214,7 +228,6 @@ pub trait QbftDecidable<T: SlotClock + 'static>: QbftData<Hash = Hash256> + Send
fn get_or_spawn_instance(
manager: &QbftManager<T>,
id: Self::Id,
qbft_out: mpsc::Sender<Message>,
) -> UnboundedSender<QbftMessage<Self>> {
let map = Self::get_map(manager);
let ret = match map.entry(id) {
Expand All @@ -224,10 +237,15 @@ pub trait QbftDecidable<T: SlotClock + 'static>: QbftData<Hash = Hash256> + Send
// with the reeiver
let (tx, rx) = mpsc::unbounded_channel();
let tx = entry.insert(tx);
let _ = manager
.processor
.permitless
.send_async(Box::pin(qbft_instance(rx, qbft_out)), QBFT_INSTANCE_NAME);
let _ = manager.processor.permitless.send_async(
Box::pin(qbft_instance(
rx,
manager.network_tx.clone(),
manager.pkey.clone(),
manager.processor.clone(),
)),
QBFT_INSTANCE_NAME,
);
tx.clone()
}
};
Expand Down Expand Up @@ -282,7 +300,9 @@ enum QbftInstance<D: QbftData<Hash = Hash256>, S: FnMut(Message)> {

async fn qbft_instance<D: QbftData<Hash = Hash256>>(
mut rx: UnboundedReceiver<QbftMessage<D>>,
tx: mpsc::Sender<Message>,
network_tx: mpsc::UnboundedSender<SignedSSVMessage>,
pkey: Arc<PKey<Private>>,
processor: Senders,
) {
// Signal a new instance that is uninitialized
let mut instance = QbftInstance::Uninitialized {
Expand Down Expand Up @@ -325,18 +345,24 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
QbftInstance::Uninitialized { message_buffer } => {
// Create a new instance and receive any buffered messages
let mut instance = Box::new(Qbft::new(config, initial, |message| {
match tx.try_send(message) {
Ok(()) => (),
Err(TrySendError::Full(msg)) => {
// Queue is full - drop message under constrained bandwidth
warn!(?msg, "Dropping QBFT message due to full queue");
}
Err(TrySendError::Closed(_)) => {
// Channel closed - critical failure or shutdown
error!("QBFT message channel closed - initiating shutdown");
// todo!() need some sort of shutdown
}
}
let (id, unsigned) = message.desugar();
let serialized = unsigned.as_ssz_bytes();
let pkey = pkey.clone();
let network_tx = network_tx.clone();

processor
.urgent_consensus
.send_blocking(
move || {
if let Err(e) = sign_and_send_message(
pkey, id, unsigned, serialized, network_tx,
) {
error!("Signing failed: {}", e);
}
},
QBFT_SIGNER_NAME,
)
.unwrap_or_else(|e| warn!("Failed to send to processor: {}", e));
}));
for message in message_buffer {
instance.receive(message);
Expand Down Expand Up @@ -417,6 +443,33 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
}
}

// Sign a message and send it to the network via the network_tx
fn sign_and_send_message(
pkey: Arc<PKey<Private>>,
id: OperatorId,
unsigned: UnsignedSSVMessage,
serialized: Vec<u8>,
network_tx: UnboundedSender<SignedSSVMessage>,
) -> Result<(), Box<dyn Error>> {
// Create the signature
let mut signer = Signer::new(MessageDigest::sha256(), &pkey)?;
signer.update(&serialized)?;
let sig = signer.sign_to_vec()?;

// Build the signed ssv message, then serialize it and send to the network
let signed = SignedSSVMessage::new(
vec![sig],
vec![*id],
unsigned.ssv_message,
unsigned.full_data,
)?;
network_tx
.send(signed)
.map_err(|e| format!("Failed to send signed ssv message to network: {}", e))?;

Ok(())
}

#[derive(Debug, Clone)]
pub enum QbftError {
QueueClosedError,
Expand Down
Loading

0 comments on commit ef123d3

Please sign in to comment.