From fff476e0a87089a0c673112487b78c7f7f67924f Mon Sep 17 00:00:00 2001 From: Daniel Knopik <107140945+dknopik@users.noreply.github.com> Date: Tue, 18 Feb 2025 15:52:06 +0100 Subject: [PATCH 1/2] Make QBFT and signature collection managers nongeneric (#146) --- anchor/qbft_manager/src/lib.rs | 39 ++++++++++++--------------- anchor/qbft_manager/src/tests.rs | 10 +++---- anchor/signature_collector/src/lib.rs | 27 ++++++++++--------- anchor/validator_store/src/lib.rs | 8 +++--- 4 files changed, 40 insertions(+), 44 deletions(-) diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index 3250c12f..aeea3f5e 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -92,13 +92,11 @@ type Qbft = qbft::Qbft; type Map = DashMap>>; // Top level QBFTManager structure -pub struct QbftManager { +pub struct QbftManager { // Senders to send work off to the central processor processor: Senders, // OperatorID operator_id: QbftOperatorId, - // The slot clock for timing - slot_clock: T, // All of the QBFT instances that are voting on validator consensus data validator_consensus_data_instances: Map, // All of the QBFT instances that are voting on beacon data @@ -109,12 +107,12 @@ pub struct QbftManager { network_tx: mpsc::UnboundedSender, } -impl QbftManager { +impl QbftManager { // Construct a new QBFT Manager pub fn new( processor: Senders, operator_id: OperatorId, - slot_clock: T, + slot_clock: impl SlotClock + 'static, key: Rsa, network_tx: mpsc::UnboundedSender, ) -> Result, QbftError> { @@ -123,7 +121,6 @@ impl QbftManager { let manager = Arc::new(QbftManager { processor, operator_id, - slot_clock, validator_consensus_data_instances: DashMap::new(), beacon_vote_instances: DashMap::new(), pkey, @@ -134,13 +131,13 @@ impl QbftManager { manager .processor .permitless - .send_async(Arc::clone(&manager).cleaner(), QBFT_CLEANER_NAME)?; + .send_async(Arc::clone(&manager).cleaner(slot_clock), QBFT_CLEANER_NAME)?; Ok(manager) } // Decide a brand new qbft instance - pub async fn decide_instance>( + pub async fn decide_instance( &self, id: D::Id, initial: D, @@ -182,7 +179,7 @@ impl QbftManager { } /// Send a new network message to the instance - pub fn receive_data>( + pub fn receive_data( &self, id: D::Id, data: WrappedQbftMessage, @@ -201,15 +198,15 @@ impl QbftManager { } // Long running cleaner that will remove instances that are no longer relevant - async fn cleaner(self: Arc) { + async fn cleaner(self: Arc, slot_clock: impl SlotClock) { while !self.processor.permitless.is_closed() { sleep( - self.slot_clock + slot_clock .duration_to_next_slot() - .unwrap_or(self.slot_clock.slot_duration()), + .unwrap_or(slot_clock.slot_duration()), ) .await; - let Some(slot) = self.slot_clock.now() else { + let Some(slot) = slot_clock.now() else { continue; }; let cutoff = slot.saturating_sub(QBFT_RETAIN_SLOTS); @@ -220,15 +217,13 @@ impl QbftManager { } // Trait that describes any data that is able to be decided upon during a qbft instance -pub trait QbftDecidable: - QbftData + Send + Sync + 'static -{ +pub trait QbftDecidable: QbftData + Send + Sync + 'static { type Id: Hash + Eq + Send; - fn get_map(manager: &QbftManager) -> &Map; + fn get_map(manager: &QbftManager) -> &Map; fn get_or_spawn_instance( - manager: &QbftManager, + manager: &QbftManager, id: Self::Id, ) -> UnboundedSender> { let map = Self::get_map(manager); @@ -257,9 +252,9 @@ pub trait QbftDecidable: fn instance_height(&self, id: &Self::Id) -> InstanceHeight; } -impl QbftDecidable for ValidatorConsensusData { +impl QbftDecidable for ValidatorConsensusData { type Id = ValidatorInstanceId; - fn get_map(manager: &QbftManager) -> &Map { + fn get_map(manager: &QbftManager) -> &Map { &manager.validator_consensus_data_instances } @@ -268,9 +263,9 @@ impl QbftDecidable for ValidatorConsensusData { } } -impl QbftDecidable for BeaconVote { +impl QbftDecidable for BeaconVote { type Id = CommitteeInstanceId; - fn get_map(manager: &QbftManager) -> &Map { + fn get_map(manager: &QbftManager) -> &Map { &manager.beacon_vote_instances } diff --git a/anchor/qbft_manager/src/tests.rs b/anchor/qbft_manager/src/tests.rs index 4e304fd8..5bcb783c 100644 --- a/anchor/qbft_manager/src/tests.rs +++ b/anchor/qbft_manager/src/tests.rs @@ -27,7 +27,7 @@ static TRACING: LazyLock<()> = LazyLock::new(|| { // Top level Testing Context to provide clean wrapper around testing framework pub struct TestContext where - D: QbftDecidable, + D: QbftDecidable, D::Id: Send + Sync + Clone, { pub tester: Arc>, @@ -36,7 +36,7 @@ where impl TestContext where - D: QbftDecidable, + D: QbftDecidable, D::Id: Send + Sync + Clone, { // Create a new test context with default setup @@ -131,13 +131,13 @@ impl CommitteeSize { /// The main test coordinator that manages multiple QBFT instances pub struct QbftTester where - D: QbftDecidable, + D: QbftDecidable, D::Id: Send + Sync + Clone, { // Senders to the processor senders: Senders, // Track mapping from operator id to the respective manager - managers: HashMap>>, + managers: HashMap>, // The size of the committee pub size: CommitteeSize, // Mapping of the data hash to the data identifier. This is to send data to the proper instance @@ -211,7 +211,7 @@ impl OperatorBehavior { impl QbftTester where - D: QbftDecidable + 'static, + D: QbftDecidable + 'static, D::Id: Send + Sync + Clone, { /// Create a new QBFT tester instance diff --git a/anchor/signature_collector/src/lib.rs b/anchor/signature_collector/src/lib.rs index f98d38fe..c4ce1b07 100644 --- a/anchor/signature_collector/src/lib.rs +++ b/anchor/signature_collector/src/lib.rs @@ -27,24 +27,25 @@ struct SignatureCollector { for_slot: Slot, } -pub struct SignatureCollectorManager { +pub struct SignatureCollectorManager { processor: Senders, - slot_clock: T, signature_collectors: DashMap, } -impl SignatureCollectorManager { - pub fn new(processor: Senders, slot_clock: T) -> Result, CollectionError> { +impl SignatureCollectorManager { + pub fn new(processor: Senders, slot_clock: T) -> Result, CollectionError> + where + T: SlotClock + 'static, + { let manager = Arc::new(Self { processor, - slot_clock, signature_collectors: DashMap::new(), }); - manager - .processor - .permitless - .send_async(Arc::clone(&manager).cleaner(), COLLECTOR_CLEANER_NAME)?; + manager.processor.permitless.send_async( + Arc::clone(&manager).cleaner(slot_clock), + COLLECTOR_CLEANER_NAME, + )?; Ok(manager) } @@ -132,15 +133,15 @@ impl SignatureCollectorManager { } } - async fn cleaner(self: Arc) { + async fn cleaner(self: Arc, slot_clock: impl SlotClock) { while !self.processor.permitless.is_closed() { sleep( - self.slot_clock + slot_clock .duration_to_next_slot() - .unwrap_or(self.slot_clock.slot_duration()), + .unwrap_or(slot_clock.slot_duration()), ) .await; - let Some(slot) = self.slot_clock.now() else { + let Some(slot) = slot_clock.now() else { continue; }; let cutoff = slot.saturating_sub(SIGNATURE_COLLECTOR_RETAIN_SLOTS); diff --git a/anchor/validator_store/src/lib.rs b/anchor/validator_store/src/lib.rs index 36bf4034..81fe0147 100644 --- a/anchor/validator_store/src/lib.rs +++ b/anchor/validator_store/src/lib.rs @@ -73,8 +73,8 @@ struct InitializedValidator { pub struct AnchorValidatorStore { validators: DashMap, - signature_collector: Arc>, - qbft_manager: Arc>, + signature_collector: Arc, + qbft_manager: Arc, slashing_protection: SlashingDatabase, slashing_protection_last_prune: Mutex, slot_clock: T, @@ -89,8 +89,8 @@ impl AnchorValidatorStore { #[allow(clippy::too_many_arguments)] pub fn new( database_state: Receiver, - signature_collector: Arc>, - qbft_manager: Arc>, + signature_collector: Arc, + qbft_manager: Arc, slashing_protection: SlashingDatabase, slot_clock: T, spec: Arc, From cca70761d552ec2887328af899659660d47c077c Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Tue, 18 Feb 2025 16:38:08 +0100 Subject: [PATCH 2/2] Fix handshake network mismatch (#145) --- anchor/network/src/handshake/node_info.rs | 29 +++++++++++++++-------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/anchor/network/src/handshake/node_info.rs b/anchor/network/src/handshake/node_info.rs index 87e84e98..e35f3725 100644 --- a/anchor/network/src/handshake/node_info.rs +++ b/anchor/network/src/handshake/node_info.rs @@ -57,8 +57,8 @@ impl NodeInfo { /// Serialize `NodeInfo` to JSON bytes. fn marshal(&self) -> Result, Error> { let mut entries = vec![ - "".to_string(), // formerly forkVersion, now deprecated - self.network_id.clone(), // network id + "".to_string(), // formerly forkVersion, now deprecated + format!("0x{}", self.network_id.clone()), // network id ]; if let Some(meta) = &self.metadata { @@ -78,7 +78,12 @@ impl NodeInfo { return Err(Validation("node info must have at least 2 entries".into())); } // skip ser.entries[0]: old forkVersion - let network_id = ser.entries[1].clone(); + let network_id = ser.entries[1] + .clone() + .strip_prefix("0x") + .ok_or_else(|| Validation("network id must be prefixed with 0x".into()))? + .to_string(); + let metadata = if ser.entries.len() >= 3 { let meta = serde_json::from_slice(ser.entries[2].as_bytes())?; Some(meta) @@ -119,11 +124,14 @@ mod tests { use crate::handshake::node_info::{NodeInfo, NodeMetadata}; use libp2p::identity::Keypair; + const HOLESKY_WITH_PREFIX: &str = "0x00000502"; + const HOLESKY: &str = "00000502"; + #[test] fn test_node_info_seal_consume() { // Create a sample NodeInfo instance let node_info = NodeInfo::new( - "holesky".to_string(), + HOLESKY_WITH_PREFIX.to_string(), Some(NodeMetadata { node_version: "geth/x".to_string(), execution_node: "geth/x".to_string(), @@ -146,9 +154,7 @@ mod tests { assert_eq!(node_info, parsed_node_info); let encoded= - hex::decode("0a250802122102ba6a707dcec6c60ba2793d52123d34b22556964fc798d4aa88ffc41\ - a00e42407120c7373762f6e6f6465696e666f1aa5017b22456e7472696573223a5b22222c22686f6c65736b7\ - 9222c227b5c224e6f646556657273696f6e5c223a5c22676574682f785c222c5c22457865637574696f6e4e6f64655c223a5c22676574682f785c222c5c22436f6e73656e7375734e6f64655c223a5c22707279736d2f785c222c5c225375626e6574735c223a5c2230303030303030303030303030303030303030303030303030303030303030305c227d225d7d2a473045022100b8a2a668113330369e74b86ec818a87009e2a351f7ee4c0e431e1f659dd1bc3f02202b1ebf418efa7fb0541f77703bea8563234a1b70b8391d43daa40b6e7c3fcc84").unwrap(); + hex::decode("0a2508021221037f3a82b9c83139f3e2c26850d688783ec779e7ca3f7824557d2e72af1f8ffeed120c7373762f6e6f6465696e666f1aaa017b22456e7472696573223a5b22222c22307830783030303030353032222c227b5c224e6f646556657273696f6e5c223a5c22676574682f785c222c5c22457865637574696f6e4e6f64655c223a5c22676574682f785c222c5c22436f6e73656e7375734e6f64655c223a5c22707279736d2f785c222c5c225375626e6574735c223a5c2230303030303030303030303030303030303030303030303030303030303030305c227d225d7d2a473045022100b362c2d4f1a32ee3d1503bfa83019d9273bdfed12ba9fced1c3e168848568b5202203e47cb6958f917613bf6022cf5b46ee1e1a628bee331e8ec1fa3acaa1f19d383").unwrap(); let parsed_env = Envelope::parse_and_verify(&encoded).expect("Consume failed"); let parsed_node_info = @@ -161,11 +167,14 @@ mod tests { fn test_node_info_marshal_unmarshal() { // The old serialized data from the Go code // (note the "Subnets":"ffffffffffffffffffffffffffffffff") - let old_serialized_data = br#"{"Entries":["", "testnet", "{\"NodeVersion\":\"v0.1.12\",\"ExecutionNode\":\"geth/x\",\"ConsensusNode\":\"prysm/x\",\"Subnets\":\"ffffffffffffffffffffffffffffffff\"}"]}"#; + let old_serialized_data = format!( + r#"{{"Entries":["", "{}", "{{\"NodeVersion\":\"v0.1.12\",\"ExecutionNode\":\"geth/x\",\"ConsensusNode\":\"prysm/x\",\"Subnets\":\"ffffffffffffffffffffffffffffffff\"}}"]}}"#, + HOLESKY_WITH_PREFIX + ).into_bytes(); // The "current" NodeInfo data let current_data = NodeInfo { - network_id: "testnet".to_string(), + network_id: HOLESKY.to_string(), metadata: Some(NodeMetadata { node_version: "v0.1.12".into(), execution_node: "geth/x".into(), @@ -184,7 +193,7 @@ mod tests { // 3) Now unmarshal the old format data into the same struct let old_format = - NodeInfo::unmarshal(old_serialized_data).expect("unmarshal old data should succeed"); + NodeInfo::unmarshal(&old_serialized_data).expect("unmarshal old data should succeed"); // 4) Compare // The Go test checks reflect.DeepEqual(currentSerializedData, parsedRec)