From 87a4a507cd22ce8d15ecf028edc4f4d1dbd15c8e Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 18 Feb 2025 15:07:20 +0100 Subject: [PATCH] pass messages from network to manager --- Cargo.lock | 4 + anchor/client/src/lib.rs | 17 ++- anchor/common/ssv_types/Cargo.toml | 1 + anchor/common/ssv_types/src/cluster.rs | 5 + anchor/common/ssv_types/src/committee.rs | 33 +++++ anchor/common/ssv_types/src/consensus.rs | 13 +- anchor/common/ssv_types/src/lib.rs | 2 + anchor/common/ssv_types/src/message.rs | 4 +- anchor/common/ssv_types/src/msgid.rs | 49 +++++-- anchor/common/ssv_types/src/partial_sig.rs | 93 ++++++++++++++ anchor/network/Cargo.toml | 3 + anchor/network/src/network.rs | 142 +++++++++++++++++---- anchor/network/src/validation.rs | 2 +- anchor/qbft_manager/src/lib.rs | 9 +- anchor/qbft_manager/src/tests.rs | 4 +- anchor/signature_collector/src/lib.rs | 3 +- anchor/subnet_tracker/src/lib.rs | 19 +-- anchor/validator_store/src/lib.rs | 6 +- 18 files changed, 326 insertions(+), 83 deletions(-) create mode 100644 anchor/common/ssv_types/src/committee.rs create mode 100644 anchor/common/ssv_types/src/partial_sig.rs diff --git a/Cargo.lock b/Cargo.lock index a560cdae..7bb7dcab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5342,9 +5342,12 @@ dependencies = [ "libp2p-swarm", "libp2p-swarm-test", "lighthouse_network", + "qbft_manager", "quick-protobuf", "serde", "serde_json", + "signature_collector", + "slot_clock", "ssv_types", "ssz_types", "subnet_tracker", @@ -7440,6 +7443,7 @@ dependencies = [ name = "ssv_types" version = "0.1.0" dependencies = [ + "alloy", "base64 0.22.1", "derive_more 1.0.0", "ethereum_ssz", diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 009af7af..41da3bdf 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -145,11 +145,6 @@ impl Client { let subnet_tracker = start_subnet_tracker(database.watch(), network::SUBNET_COUNT, &executor); - // Start the p2p network - let network = Network::try_new(&config.network, subnet_tracker, executor.clone()).await?; - // Spawn the network listening task - executor.spawn(network.run(), "network"); - // Initialize slashing protection. let slashing_db_path = config.data_dir.join(SLASHING_PROTECTION_FILENAME); let slashing_protection = @@ -362,6 +357,18 @@ impl Client { ) .map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?; + // Start the p2p network + let network = Network::try_new( + &config.network, + subnet_tracker, + qbft_manager.clone(), + signature_collector.clone(), + executor.clone(), + ) + .await?; + // Spawn the network listening task + executor.spawn(network.run(), "network"); + let validator_store = AnchorValidatorStore::<_, E>::new( database.watch(), signature_collector, diff --git a/anchor/common/ssv_types/Cargo.toml b/anchor/common/ssv_types/Cargo.toml index 982aa8d2..239eb04d 100644 --- a/anchor/common/ssv_types/Cargo.toml +++ b/anchor/common/ssv_types/Cargo.toml @@ -5,6 +5,7 @@ edition = { workspace = true } authors = ["Sigma Prime "] [dependencies] +alloy = { workspace = true } base64 = { workspace = true } derive_more = { workspace = true } ethereum_ssz = { workspace = true } diff --git a/anchor/common/ssv_types/src/cluster.rs b/anchor/common/ssv_types/src/cluster.rs index 403e51b0..5205d44e 100644 --- a/anchor/common/ssv_types/src/cluster.rs +++ b/anchor/common/ssv_types/src/cluster.rs @@ -3,6 +3,7 @@ use derive_more::{Deref, From}; use indexmap::IndexSet; use ssz_derive::{Decode, Encode}; use types::{Address, Graffiti, PublicKeyBytes}; +use crate::committee::CommitteeId; /// Unique identifier for a cluster #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, From, Deref)] @@ -36,6 +37,10 @@ impl Cluster { pub fn get_f(&self) -> u64 { (self.cluster_members.len().saturating_sub(1) / 3) as u64 } + + pub fn committee_id(&self) -> CommitteeId { + self.cluster_members.iter().cloned().collect::>().into() + } } /// A member of a Cluster. diff --git a/anchor/common/ssv_types/src/committee.rs b/anchor/common/ssv_types/src/committee.rs new file mode 100644 index 00000000..2f98e16d --- /dev/null +++ b/anchor/common/ssv_types/src/committee.rs @@ -0,0 +1,33 @@ +use alloy::primitives::keccak256; +use derive_more::{Deref, From}; +use crate::OperatorId; + +const COMMITTEE_ID_LEN: usize = 32; + +/// Unique identifier for a committee +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, From, Deref)] +pub struct CommitteeId(pub [u8; COMMITTEE_ID_LEN]); + +impl From> for CommitteeId { + fn from(mut operator_ids: Vec) -> Self { + // Sort the operator IDs + operator_ids.sort(); + let mut data: Vec = Vec::with_capacity(operator_ids.len() * 4); + + // Add the operator IDs as 32 byte values + for id in operator_ids { + data.extend_from_slice(&id.to_le_bytes()); + } + + // Hash it all + keccak256(data).0.into() + } +} + +impl TryFrom<&[u8]> for CommitteeId { + type Error = (); + + fn try_from(value: &[u8]) -> Result { + value.try_into().map(CommitteeId).map_err(|_| ()) + } +} diff --git a/anchor/common/ssv_types/src/consensus.rs b/anchor/common/ssv_types/src/consensus.rs index d20f9085..ac238e0f 100644 --- a/anchor/common/ssv_types/src/consensus.rs +++ b/anchor/common/ssv_types/src/consensus.rs @@ -1,6 +1,6 @@ use crate::message::*; use crate::msgid::MessageId; -use crate::{OperatorId, ValidatorIndex}; +use crate::ValidatorIndex; use sha2::{Digest, Sha256}; use ssz::{Decode, DecodeError, Encode}; use ssz_derive::{Decode, Encode}; @@ -26,7 +26,7 @@ use types::{ // MsgType FullData // --------- ----------- // ConsensusMsg QBFTMessage SSZ -// PartialSigMsg PartialSignatureMessage SSZ +// PartialSigMsg PartialSignatureMessages SSZ pub trait QbftData: Debug + Clone + Encode + Decode { type Hash: Debug + Clone + Eq + Hash; @@ -144,15 +144,6 @@ impl Decode for QbftMessageType { } } -// A partial signature specific message -#[derive(Clone, Debug)] -pub struct PartialSignatureMessage { - pub partial_signature: Signature, - pub signing_root: Hash256, - pub signer: OperatorId, - pub validator_index: ValidatorIndex, -} - #[derive(Clone, Debug, PartialEq, Encode, Decode)] pub struct ValidatorConsensusData { pub duty: ValidatorDuty, diff --git a/anchor/common/ssv_types/src/lib.rs b/anchor/common/ssv_types/src/lib.rs index f3acc55c..0378cf2f 100644 --- a/anchor/common/ssv_types/src/lib.rs +++ b/anchor/common/ssv_types/src/lib.rs @@ -2,11 +2,13 @@ pub use cluster::{Cluster, ClusterId, ClusterMember, ValidatorIndex, ValidatorMe pub use operator::{Operator, OperatorId}; pub use share::Share; mod cluster; +pub mod committee; pub mod consensus; pub mod domain_type; pub mod message; pub mod msgid; mod operator; +pub mod partial_sig; mod share; mod sql_conversions; mod util; diff --git a/anchor/common/ssv_types/src/message.rs b/anchor/common/ssv_types/src/message.rs index e2a4d299..abe232d6 100644 --- a/anchor/common/ssv_types/src/message.rs +++ b/anchor/common/ssv_types/src/message.rs @@ -92,7 +92,7 @@ impl SSVMessage { /// /// ``` /// use ssv_types::message::{MessageId, MsgType, SSVMessage}; - /// let message_id = MessageId::new([0u8; 56]); + /// let message_id = MessageId::from([0u8; 56]); /// let msg = SSVMessage::new(MsgType::SSVConsensusMsgType, message_id, vec![1, 2, 3]); /// ``` pub fn new(msg_type: MsgType, msg_id: MessageId, data: Vec) -> Self { @@ -154,7 +154,7 @@ impl SignedSSVMessage { /// ``` /// use ssv_types::message::{MessageId, MsgType, SSVMessage, SignedSSVMessage}; /// use ssv_types::OperatorId; - /// let ssv_msg = SSVMessage::new(MsgType::SSVConsensusMsgType, MessageId::new([0u8; 56]), vec![1,2,3]); + /// let ssv_msg = SSVMessage::new(MsgType::SSVConsensusMsgType, MessageId::from([0u8; 56]), vec![1,2,3]); /// let signed_msg = SignedSSVMessage::new(vec![vec![0; 256]], vec![OperatorId(1)], ssv_msg, vec![4,5,6]).unwrap(); /// ``` pub fn new( diff --git a/anchor/common/ssv_types/src/msgid.rs b/anchor/common/ssv_types/src/msgid.rs index 0f974260..5a33c950 100644 --- a/anchor/common/ssv_types/src/msgid.rs +++ b/anchor/common/ssv_types/src/msgid.rs @@ -1,5 +1,8 @@ +use derive_more::From; +use crate::committee::CommitteeId; use crate::domain_type::DomainType; use ssz::{Decode, DecodeError, Encode}; +use types::PublicKeyBytes; const MESSAGE_ID_LEN: usize = 56; @@ -37,26 +40,50 @@ impl TryFrom<&[u8]> for Role { } #[derive(Debug, Clone, Hash, Eq, PartialEq)] -pub enum Executor { - Committee([u8; 32]), - Validator([u8; 48]), +pub enum DutyExecutor { + Committee(CommitteeId), + Validator(PublicKeyBytes), } -#[derive(Debug, Clone, Hash, Eq, PartialEq)] +#[derive(Debug, Clone, Hash, Eq, PartialEq, From)] pub struct MessageId([u8; 56]); impl MessageId { - pub fn new(domain: &DomainType, role: Role, duty_executor: &Executor) -> Self { + pub fn new(domain: &DomainType, role: Role, duty_executor: &DutyExecutor) -> Self { let mut id = [0; 56]; id[0..4].copy_from_slice(&domain.0); id[4..8].copy_from_slice(&<[u8; 4]>::from(role)); match duty_executor { - Executor::Committee(slice) => id[24..].copy_from_slice(slice), - Executor::Validator(slice) => id[8..].copy_from_slice(slice), + DutyExecutor::Committee(committee_id) => id[24..].copy_from_slice(committee_id.as_slice()), + DutyExecutor::Validator(public_key) => id[8..].copy_from_slice(public_key.as_serialized()), } MessageId(id) } + + pub fn domain(&self) -> DomainType { + DomainType( + self.0[0..4] + .try_into() + .expect("we know the slice has the correct length"), + ) + } + + pub fn role(&self) -> Option { + self.0[4..8].try_into().ok() + } + + pub fn duty_executor(&self) -> Option { + // which kind of executor we need to get depends on the role + match self.role()? { + Role::Committee => { + self.0[24..].try_into().ok().map(DutyExecutor::Committee) + } + Role::Aggregator | Role::Proposer | Role::SyncCommittee => { + PublicKeyBytes::deserialize(&self.0[8..]).ok().map(DutyExecutor::Validator) + } + } + } } impl AsRef<[u8]> for MessageId { @@ -65,9 +92,11 @@ impl AsRef<[u8]> for MessageId { } } -impl From<[u8; MESSAGE_ID_LEN]> for MessageId { - fn from(value: [u8; MESSAGE_ID_LEN]) -> Self { - MessageId(value) +impl TryFrom<&[u8]> for MessageId { + type Error = (); + + fn try_from(value: &[u8]) -> Result { + value.try_into().map(MessageId).map_err(|_| ()) } } diff --git a/anchor/common/ssv_types/src/partial_sig.rs b/anchor/common/ssv_types/src/partial_sig.rs new file mode 100644 index 00000000..c9aa40e5 --- /dev/null +++ b/anchor/common/ssv_types/src/partial_sig.rs @@ -0,0 +1,93 @@ +use ssz::{Decode, DecodeError, Encode}; +use ssz_derive::{Decode, Encode}; +use types::{Hash256, Signature, Slot}; +use crate::{OperatorId, ValidatorIndex}; + +#[derive(Clone, Copy, Debug)] +pub enum PartialSignatureKind { + // PostConsensusPartialSig is a partial signature over a decided duty (attestation data, block, etc) + PostConsensus = 0, + // RandaoPartialSig is a partial signature over randao reveal + RandaoPartialSig = 1, + // SelectionProofPartialSig is a partial signature for aggregator selection proof + SelectionProofPartialSig = 2, + // ContributionProofs is the partial selection proofs for sync committee contributions (it's an array of sigs) + ContributionProofs = 3, + // ValidatorRegistrationPartialSig is a partial signature over a ValidatorRegistration object + ValidatorRegistration = 4, + // VoluntaryExitPartialSig is a partial signature over a VoluntaryExit object + VoluntaryExit = 5, +} + +impl TryFrom for PartialSignatureKind { + type Error = (); + + fn try_from(value: u64) -> Result { + match value { + 0 => Ok(PartialSignatureKind::PostConsensus), + 1 => Ok(PartialSignatureKind::RandaoPartialSig), + 2 => Ok(PartialSignatureKind::SelectionProofPartialSig), + 3 => Ok(PartialSignatureKind::ContributionProofs), + 4 => Ok(PartialSignatureKind::ValidatorRegistration), + 5 => Ok(PartialSignatureKind::VoluntaryExit), + _ => Err(()), + } + } +} + +const U64_SIZE: usize = 8; // u64 is 8 bytes + +impl Encode for PartialSignatureKind { + fn is_ssz_fixed_len() -> bool { + true + } + + fn ssz_append(&self, buf: &mut Vec) { + buf.extend_from_slice(&(*self as u64).to_le_bytes()); + } + + fn ssz_fixed_len() -> usize { + U64_SIZE + } + + fn ssz_bytes_len(&self) -> usize { + U64_SIZE + } +} + +impl Decode for PartialSignatureKind { + fn is_ssz_fixed_len() -> bool { + true + } + + fn ssz_fixed_len() -> usize { + U64_SIZE + } + + fn from_ssz_bytes(bytes: &[u8]) -> Result { + if bytes.len() != U64_SIZE { + return Err(DecodeError::InvalidByteLength { + len: bytes.len(), + expected: U64_SIZE, + }); + } + let value = u64::from_le_bytes(bytes.try_into().unwrap()); + value.try_into().map_err(|_| DecodeError::NoMatchingVariant) + } +} + +// A partial signature specific message +#[derive(Clone, Debug, Encode, Decode)] +pub struct PartialSignatureMessages { + pub kind: PartialSignatureKind, + pub slot: Slot, + pub messages: Vec, +} + +#[derive(Clone, Debug, Encode, Decode)] +pub struct PartialSignatureMessage { + pub partial_signature: Signature, + pub signing_root: Hash256, + pub signer: OperatorId, + pub validator_index: ValidatorIndex, +} diff --git a/anchor/network/Cargo.toml b/anchor/network/Cargo.toml index cbde2bc3..0f10efc1 100644 --- a/anchor/network/Cargo.toml +++ b/anchor/network/Cargo.toml @@ -30,6 +30,9 @@ quick-protobuf = "0.8.1" serde = { workspace = true } serde_json = "1.0.137" ssv_types = { workspace = true } +qbft_manager = { workspace = true } +signature_collector = { workspace = true } +slot_clock = { workspace = true } ssz_types = "0.8" subnet_tracker = { workspace = true } task_executor = { workspace = true } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 04620a2b..61877630 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -1,6 +1,6 @@ -use crate::network::gossipsub::MessageId; use std::num::{NonZeroU8, NonZeroUsize}; use std::pin::Pin; +use std::sync::Arc; use std::time::Duration; use futures::StreamExt; @@ -8,7 +8,7 @@ use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; use libp2p::core::ConnectedPoint; use libp2p::gossipsub::{ - IdentTopic, Message, MessageAcceptance, MessageAuthenticity, ValidationMode, + IdentTopic, Message, MessageAcceptance, MessageAuthenticity, MessageId, ValidationMode, }; use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; @@ -17,7 +17,14 @@ use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder}; use lighthouse_network::discovery::DiscoveredPeers; use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256}; use lighthouse_network::EnrExt; -use ssv_types::message::SignedSSVMessage; +use qbft_manager::{ + CommitteeInstanceId, QbftManager, ValidatorDutyKind, ValidatorInstanceId, WrappedQbftMessage, +}; +use signature_collector::{SignatureCollectorManager, SignatureRequest}; +use ssv_types::consensus::{BeaconVote, QbftMessage, ValidatorConsensusData}; +use ssv_types::message::{MsgType, SignedSSVMessage}; +use ssv_types::msgid::{DutyExecutor, Role}; +use ssv_types::partial_sig::PartialSignatureMessages; use ssz::Decode; use subnet_tracker::{SubnetEvent, SubnetId}; use task_executor::TaskExecutor; @@ -31,9 +38,12 @@ use crate::handshake::node_info::{NodeInfo, NodeMetadata}; use crate::keypair_utils::load_private_key; use crate::transport::build_transport; use crate::{handshake, Config}; + pub struct Network { swarm: Swarm, subnet_event_receiver: mpsc::Receiver, + qbft_manager: Arc, + signature_collector: Arc, peer_id: PeerId, node_info: NodeInfo, } @@ -44,6 +54,8 @@ impl Network { pub async fn try_new( config: &Config, subnet_event_receiver: mpsc::Receiver, + qbft_manager: Arc, + signature_collector: Arc, executor: TaskExecutor, ) -> Result { let local_keypair: Keypair = load_private_key(&config.network_dir); @@ -70,6 +82,8 @@ impl Network { config, ), subnet_event_receiver, + qbft_manager, + signature_collector, peer_id, node_info, }; @@ -228,11 +242,106 @@ impl Network { .gossipsub .report_message_validation_result(message_id, &propagation_source, acceptance); - let Ok(_message) = result else { + let Ok(message) = result else { return; }; - // todo pass on to app + match message.ssv_message().msg_type() { + MsgType::SSVConsensusMsgType => self.on_consensus_message_received(message), + MsgType::SSVPartialSignatureMsgType => self.on_signature_message_received(message), + } + } + + fn on_consensus_message_received(&mut self, message: SignedSSVMessage) { + // todo would be nice to not have to deserialize, as that also happens in validation? + let qbft_message = match QbftMessage::from_ssz_bytes(message.ssv_message().data()) { + Ok(qbft_message) => qbft_message, + Err(err) => { + error!(?err, "Unable to decode qbft message"); + return; + } + }; + + let msg_id = message.ssv_message().msg_id(); + let instance_height = (qbft_message.height as usize).into(); + let result = match msg_id.duty_executor() { + Some(DutyExecutor::Validator(validator)) => { + let duty = match msg_id.role() { + None | Some(Role::Committee) => { + // should never happen + error!(?msg_id, "Unexpected role/executor combination in msg id"); + return; + } + Some(Role::Proposer) => ValidatorDutyKind::Proposal, + Some(Role::Aggregator) => ValidatorDutyKind::Aggregator, + Some(Role::SyncCommittee) => ValidatorDutyKind::SyncCommitteeAggregator, + }; + let id = ValidatorInstanceId { + validator, + duty, + instance_height, + }; + self.qbft_manager.receive_data::( + id, + WrappedQbftMessage { + signed_message: message, + qbft_message, + }, + ) + } + Some(DutyExecutor::Committee(committee)) => { + let id = CommitteeInstanceId { + committee, + instance_height, + }; + self.qbft_manager.receive_data::( + id, + WrappedQbftMessage { + signed_message: message, + qbft_message, + }, + ) + } + None => { + warn!(?msg_id, "received invalid message id"); + return; + } + }; + + if let Err(err) = result { + error!(?err, "Error sending network message to qbft!"); + } + } + + fn on_signature_message_received(&mut self, message: SignedSSVMessage) { + // todo would be nice to not have to deserialize, as that also happens in validation? + let partial_sig_messages = + match PartialSignatureMessages::from_ssz_bytes(message.ssv_message().data()) { + Ok(partial_sig_messages) => partial_sig_messages, + Err(err) => { + error!(?err, "Unable to decode partial signature message"); + return; + } + }; + + for partial_sig_message in partial_sig_messages.messages { + let request = SignatureRequest { + signing_root: partial_sig_message.signing_root, + threshold: 4, + slot: partial_sig_messages.slot, + }; + let result = self.signature_collector.receive_partial_signature( + request, + partial_sig_message.signer, + Box::new(partial_sig_message.partial_signature), + ); + if let Err(err) = result { + error!( + ?err, + "Error sending network message to signature collector!" + ); + } + } } fn on_subnet_tracker_event(&mut self, event: SubnetEvent) { @@ -401,26 +510,3 @@ fn build_swarm( .with_swarm_config(|_| swarm_config) .build() } - -#[cfg(test)] -mod test { - use crate::network::Network; - use crate::Config; - use std::time::Duration; - use subnet_tracker::test_tracker; - use task_executor::TaskExecutor; - - #[tokio::test] - async fn create_network() { - let handle = tokio::runtime::Handle::current(); - let (_signal, exit) = async_channel::bounded(1); - let (shutdown_tx, _) = futures::channel::mpsc::channel(1); - let task_executor = TaskExecutor::new(handle, exit, shutdown_tx); - let subnet_tracker = test_tracker(task_executor.clone(), vec![], Duration::ZERO); - assert!( - Network::try_new(&Config::default(), subnet_tracker, task_executor) - .await - .is_ok() - ); - } -} diff --git a/anchor/network/src/validation.rs b/anchor/network/src/validation.rs index 4fa962fd..cbbc34bb 100644 --- a/anchor/network/src/validation.rs +++ b/anchor/network/src/validation.rs @@ -1,6 +1,6 @@ -use crate::types::ssv_message::SignedSSVMessage; use crate::Network; use libp2p::gossipsub::{Message, MessageAcceptance}; +use ssv_types::message::SignedSSVMessage; use ssz::Decode; use tracing::debug; diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index aeea3f5e..3309789e 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -5,17 +5,18 @@ use openssl::rsa::Rsa; use openssl::sign::Signer; use processor::{DropOnFinish, Senders, WorkItem}; +pub use qbft::WrappedQbftMessage; use qbft::{ Completed, ConfigBuilder, ConfigBuilderError, DefaultLeaderFunction, InstanceHeight, Message, - WrappedQbftMessage, }; use slot_clock::SlotClock; use ssv_types::consensus::{BeaconVote, QbftData, UnsignedSSVMessage, ValidatorConsensusData}; use std::error::Error; +use ssv_types::committee::CommitteeId; use ssv_types::message::SignedSSVMessage; use ssv_types::OperatorId as QbftOperatorId; -use ssv_types::{Cluster, ClusterId, OperatorId}; +use ssv_types::{Cluster, OperatorId}; use ssz::Encode; use std::fmt::Debug; use std::hash::Hash; @@ -40,10 +41,10 @@ const QBFT_SIGNER_NAME: &str = "qbft_signer"; /// Number of slots to keep before the current slot const QBFT_RETAIN_SLOTS: u64 = 1; -// Unique Identifier for a Cluster and its corresponding QBFT instance +// Unique Identifier for a committee and its corresponding QBFT instance #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct CommitteeInstanceId { - pub committee: ClusterId, + pub committee: CommitteeId, pub instance_height: InstanceHeight, } diff --git a/anchor/qbft_manager/src/tests.rs b/anchor/qbft_manager/src/tests.rs index 5bcb783c..f8f7e95e 100644 --- a/anchor/qbft_manager/src/tests.rs +++ b/anchor/qbft_manager/src/tests.rs @@ -6,7 +6,7 @@ use processor::Senders; use slot_clock::{ManualSlotClock, SlotClock}; use ssv_types::consensus::{BeaconVote, QbftMessage, QbftMessageType}; use ssv_types::message::SignedSSVMessage; -use ssv_types::{Cluster, ClusterId, OperatorId}; +use ssv_types::{Cluster, committee::CommitteeId, OperatorId, ClusterId}; use ssz::Decode; use std::collections::HashMap; use std::sync::LazyLock; @@ -550,7 +550,7 @@ mod manager_tests { fn generate_test_data(id: usize) -> (BeaconVote, CommitteeInstanceId) { // setup mock data let id = CommitteeInstanceId { - committee: ClusterId([0; 32]), + committee: CommitteeId([0; 32]), instance_height: id.into(), }; diff --git a/anchor/signature_collector/src/lib.rs b/anchor/signature_collector/src/lib.rs index c4ce1b07..4357463a 100644 --- a/anchor/signature_collector/src/lib.rs +++ b/anchor/signature_collector/src/lib.rs @@ -2,7 +2,7 @@ use bls_lagrange::KeyId; use dashmap::DashMap; use processor::{DropOnFinish, Senders, WorkItem}; use slot_clock::SlotClock; -use ssv_types::{ClusterId, OperatorId}; +use ssv_types::OperatorId; use std::collections::{hash_map, HashMap}; use std::mem; use std::sync::Arc; @@ -153,7 +153,6 @@ impl SignatureCollectorManager { #[derive(Debug, Clone)] pub struct SignatureRequest { - pub cluster_id: ClusterId, pub signing_root: Hash256, pub threshold: u64, pub slot: Slot, diff --git a/anchor/subnet_tracker/src/lib.rs b/anchor/subnet_tracker/src/lib.rs index ff301719..62ab4ff4 100644 --- a/anchor/subnet_tracker/src/lib.rs +++ b/anchor/subnet_tracker/src/lib.rs @@ -1,4 +1,3 @@ -use alloy::primitives::keccak256; use alloy::primitives::ruint::aliases::U256; use database::{NetworkState, UniqueIndex}; use log::warn; @@ -11,6 +10,7 @@ use task_executor::TaskExecutor; use tokio::sync::{mpsc, watch}; use tokio::time::sleep; use tracing::debug; +use ssv_types::committee::CommitteeId; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(transparent)] @@ -121,22 +121,13 @@ async fn subnet_tracker( } fn get_committee_id(cluster: &Cluster) -> U256 { - let mut operator_ids = cluster + let operator_ids = cluster .cluster_members .iter() - .map(|x| **x) + .cloned() .collect::>(); - // Sort the operator IDs - operator_ids.sort(); - let mut data: Vec = Vec::with_capacity(operator_ids.len() * 4); - - // Add the operator IDs as 32 byte values - for id in operator_ids { - data.extend_from_slice(&id.to_le_bytes()); - } - - // Hash it all - U256::from_be_bytes(keccak256(data).0) + let id = CommitteeId::from(operator_ids); + U256::from_be_bytes(*id) } /// only useful for testing - introduce feature flag? diff --git a/anchor/validator_store/src/lib.rs b/anchor/validator_store/src/lib.rs index 81fe0147..4bedea4e 100644 --- a/anchor/validator_store/src/lib.rs +++ b/anchor/validator_store/src/lib.rs @@ -256,7 +256,6 @@ impl AnchorValidatorStore { ) -> Result { let collector = self.signature_collector.sign_and_collect( SignatureRequest { - cluster_id: cluster.cluster.cluster_id, signing_root, threshold: cluster .cluster @@ -388,7 +387,7 @@ impl AnchorValidatorStore { .qbft_manager .decide_instance( CommitteeInstanceId { - committee: validator.cluster.cluster_id, + committee: validator.cluster.committee_id(), instance_height: slot.as_usize().into(), }, vote, @@ -723,7 +722,7 @@ impl ValidatorStore for AnchorValidatorStore { .qbft_manager .decide_instance( CommitteeInstanceId { - committee: validator.cluster.cluster_id, + committee: validator.cluster.committee_id(), instance_height: attestation.data().slot.as_usize().into(), }, BeaconVote { @@ -772,7 +771,6 @@ impl ValidatorStore for AnchorValidatorStore { _validator_pubkey: PublicKeyBytes, _voluntary_exit: VoluntaryExit, ) -> Result { - // there should be no situation ever where we want to sign an exit Err(Error::SpecificError(SpecificError::Unsupported)) }