Skip to content

Commit

Permalink
Merge branch 'unstable' into keysplitter
Browse files Browse the repository at this point in the history
  • Loading branch information
Zacholme7 committed Feb 18, 2025
2 parents 045b7af + cca7076 commit eca2089
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 54 deletions.
29 changes: 19 additions & 10 deletions anchor/network/src/handshake/node_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ impl NodeInfo {
/// Serialize `NodeInfo` to JSON bytes.
fn marshal(&self) -> Result<Vec<u8>, Error> {
let mut entries = vec![
"".to_string(), // formerly forkVersion, now deprecated
self.network_id.clone(), // network id
"".to_string(), // formerly forkVersion, now deprecated
format!("0x{}", self.network_id.clone()), // network id
];

if let Some(meta) = &self.metadata {
Expand All @@ -78,7 +78,12 @@ impl NodeInfo {
return Err(Validation("node info must have at least 2 entries".into()));
}
// skip ser.entries[0]: old forkVersion
let network_id = ser.entries[1].clone();
let network_id = ser.entries[1]
.clone()
.strip_prefix("0x")
.ok_or_else(|| Validation("network id must be prefixed with 0x".into()))?
.to_string();

let metadata = if ser.entries.len() >= 3 {
let meta = serde_json::from_slice(ser.entries[2].as_bytes())?;
Some(meta)
Expand Down Expand Up @@ -119,11 +124,14 @@ mod tests {
use crate::handshake::node_info::{NodeInfo, NodeMetadata};
use libp2p::identity::Keypair;

const HOLESKY_WITH_PREFIX: &str = "0x00000502";
const HOLESKY: &str = "00000502";

#[test]
fn test_node_info_seal_consume() {
// Create a sample NodeInfo instance
let node_info = NodeInfo::new(
"holesky".to_string(),
HOLESKY_WITH_PREFIX.to_string(),
Some(NodeMetadata {
node_version: "geth/x".to_string(),
execution_node: "geth/x".to_string(),
Expand All @@ -146,9 +154,7 @@ mod tests {
assert_eq!(node_info, parsed_node_info);

let encoded=
hex::decode("0a250802122102ba6a707dcec6c60ba2793d52123d34b22556964fc798d4aa88ffc41\
a00e42407120c7373762f6e6f6465696e666f1aa5017b22456e7472696573223a5b22222c22686f6c65736b7\
9222c227b5c224e6f646556657273696f6e5c223a5c22676574682f785c222c5c22457865637574696f6e4e6f64655c223a5c22676574682f785c222c5c22436f6e73656e7375734e6f64655c223a5c22707279736d2f785c222c5c225375626e6574735c223a5c2230303030303030303030303030303030303030303030303030303030303030305c227d225d7d2a473045022100b8a2a668113330369e74b86ec818a87009e2a351f7ee4c0e431e1f659dd1bc3f02202b1ebf418efa7fb0541f77703bea8563234a1b70b8391d43daa40b6e7c3fcc84").unwrap();
hex::decode("0a2508021221037f3a82b9c83139f3e2c26850d688783ec779e7ca3f7824557d2e72af1f8ffeed120c7373762f6e6f6465696e666f1aaa017b22456e7472696573223a5b22222c22307830783030303030353032222c227b5c224e6f646556657273696f6e5c223a5c22676574682f785c222c5c22457865637574696f6e4e6f64655c223a5c22676574682f785c222c5c22436f6e73656e7375734e6f64655c223a5c22707279736d2f785c222c5c225375626e6574735c223a5c2230303030303030303030303030303030303030303030303030303030303030305c227d225d7d2a473045022100b362c2d4f1a32ee3d1503bfa83019d9273bdfed12ba9fced1c3e168848568b5202203e47cb6958f917613bf6022cf5b46ee1e1a628bee331e8ec1fa3acaa1f19d383").unwrap();

let parsed_env = Envelope::parse_and_verify(&encoded).expect("Consume failed");
let parsed_node_info =
Expand All @@ -161,11 +167,14 @@ mod tests {
fn test_node_info_marshal_unmarshal() {
// The old serialized data from the Go code
// (note the "Subnets":"ffffffffffffffffffffffffffffffff")
let old_serialized_data = br#"{"Entries":["", "testnet", "{\"NodeVersion\":\"v0.1.12\",\"ExecutionNode\":\"geth/x\",\"ConsensusNode\":\"prysm/x\",\"Subnets\":\"ffffffffffffffffffffffffffffffff\"}"]}"#;
let old_serialized_data = format!(
r#"{{"Entries":["", "{}", "{{\"NodeVersion\":\"v0.1.12\",\"ExecutionNode\":\"geth/x\",\"ConsensusNode\":\"prysm/x\",\"Subnets\":\"ffffffffffffffffffffffffffffffff\"}}"]}}"#,
HOLESKY_WITH_PREFIX
).into_bytes();

// The "current" NodeInfo data
let current_data = NodeInfo {
network_id: "testnet".to_string(),
network_id: HOLESKY.to_string(),
metadata: Some(NodeMetadata {
node_version: "v0.1.12".into(),
execution_node: "geth/x".into(),
Expand All @@ -184,7 +193,7 @@ mod tests {

// 3) Now unmarshal the old format data into the same struct
let old_format =
NodeInfo::unmarshal(old_serialized_data).expect("unmarshal old data should succeed");
NodeInfo::unmarshal(&old_serialized_data).expect("unmarshal old data should succeed");

// 4) Compare
// The Go test checks reflect.DeepEqual(currentSerializedData, parsedRec)
Expand Down
39 changes: 17 additions & 22 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,11 @@ type Qbft<D, S> = qbft::Qbft<DefaultLeaderFunction, D, S>;
type Map<I, D> = DashMap<I, UnboundedSender<QbftMessage<D>>>;

// Top level QBFTManager structure
pub struct QbftManager<T: SlotClock + 'static> {
pub struct QbftManager {
// Senders to send work off to the central processor
processor: Senders,
// OperatorID
operator_id: QbftOperatorId,
// The slot clock for timing
slot_clock: T,
// All of the QBFT instances that are voting on validator consensus data
validator_consensus_data_instances: Map<ValidatorInstanceId, ValidatorConsensusData>,
// All of the QBFT instances that are voting on beacon data
Expand All @@ -109,12 +107,12 @@ pub struct QbftManager<T: SlotClock + 'static> {
network_tx: mpsc::UnboundedSender<SignedSSVMessage>,
}

impl<T: SlotClock> QbftManager<T> {
impl QbftManager {
// Construct a new QBFT Manager
pub fn new(
processor: Senders,
operator_id: OperatorId,
slot_clock: T,
slot_clock: impl SlotClock + 'static,
key: Rsa<Private>,
network_tx: mpsc::UnboundedSender<SignedSSVMessage>,
) -> Result<Arc<Self>, QbftError> {
Expand All @@ -123,7 +121,6 @@ impl<T: SlotClock> QbftManager<T> {
let manager = Arc::new(QbftManager {
processor,
operator_id,
slot_clock,
validator_consensus_data_instances: DashMap::new(),
beacon_vote_instances: DashMap::new(),
pkey,
Expand All @@ -134,13 +131,13 @@ impl<T: SlotClock> QbftManager<T> {
manager
.processor
.permitless
.send_async(Arc::clone(&manager).cleaner(), QBFT_CLEANER_NAME)?;
.send_async(Arc::clone(&manager).cleaner(slot_clock), QBFT_CLEANER_NAME)?;

Ok(manager)
}

// Decide a brand new qbft instance
pub async fn decide_instance<D: QbftDecidable<T>>(
pub async fn decide_instance<D: QbftDecidable>(
&self,
id: D::Id,
initial: D,
Expand Down Expand Up @@ -182,7 +179,7 @@ impl<T: SlotClock> QbftManager<T> {
}

/// Send a new network message to the instance
pub fn receive_data<D: QbftDecidable<T>>(
pub fn receive_data<D: QbftDecidable>(
&self,
id: D::Id,
data: WrappedQbftMessage,
Expand All @@ -201,15 +198,15 @@ impl<T: SlotClock> QbftManager<T> {
}

// Long running cleaner that will remove instances that are no longer relevant
async fn cleaner(self: Arc<Self>) {
async fn cleaner(self: Arc<Self>, slot_clock: impl SlotClock) {
while !self.processor.permitless.is_closed() {
sleep(
self.slot_clock
slot_clock
.duration_to_next_slot()
.unwrap_or(self.slot_clock.slot_duration()),
.unwrap_or(slot_clock.slot_duration()),
)
.await;
let Some(slot) = self.slot_clock.now() else {
let Some(slot) = slot_clock.now() else {
continue;
};
let cutoff = slot.saturating_sub(QBFT_RETAIN_SLOTS);
Expand All @@ -220,15 +217,13 @@ impl<T: SlotClock> QbftManager<T> {
}

// Trait that describes any data that is able to be decided upon during a qbft instance
pub trait QbftDecidable<T: SlotClock + 'static>:
QbftData<Hash = Hash256> + Send + Sync + 'static
{
pub trait QbftDecidable: QbftData<Hash = Hash256> + Send + Sync + 'static {
type Id: Hash + Eq + Send;

fn get_map(manager: &QbftManager<T>) -> &Map<Self::Id, Self>;
fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self>;

fn get_or_spawn_instance(
manager: &QbftManager<T>,
manager: &QbftManager,
id: Self::Id,
) -> UnboundedSender<QbftMessage<Self>> {
let map = Self::get_map(manager);
Expand Down Expand Up @@ -257,9 +252,9 @@ pub trait QbftDecidable<T: SlotClock + 'static>:
fn instance_height(&self, id: &Self::Id) -> InstanceHeight;
}

impl<T: SlotClock + 'static> QbftDecidable<T> for ValidatorConsensusData {
impl QbftDecidable for ValidatorConsensusData {
type Id = ValidatorInstanceId;
fn get_map(manager: &QbftManager<T>) -> &Map<Self::Id, Self> {
fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self> {
&manager.validator_consensus_data_instances
}

Expand All @@ -268,9 +263,9 @@ impl<T: SlotClock + 'static> QbftDecidable<T> for ValidatorConsensusData {
}
}

impl<T: SlotClock + 'static> QbftDecidable<T> for BeaconVote {
impl QbftDecidable for BeaconVote {
type Id = CommitteeInstanceId;
fn get_map(manager: &QbftManager<T>) -> &Map<Self::Id, Self> {
fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self> {
&manager.beacon_vote_instances
}

Expand Down
10 changes: 5 additions & 5 deletions anchor/qbft_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static TRACING: LazyLock<()> = LazyLock::new(|| {
// Top level Testing Context to provide clean wrapper around testing framework
pub struct TestContext<D>
where
D: QbftDecidable<ManualSlotClock>,
D: QbftDecidable,
D::Id: Send + Sync + Clone,
{
pub tester: Arc<QbftTester<D>>,
Expand All @@ -36,7 +36,7 @@ where

impl<D> TestContext<D>
where
D: QbftDecidable<ManualSlotClock>,
D: QbftDecidable,
D::Id: Send + Sync + Clone,
{
// Create a new test context with default setup
Expand Down Expand Up @@ -131,13 +131,13 @@ impl CommitteeSize {
/// The main test coordinator that manages multiple QBFT instances
pub struct QbftTester<D>
where
D: QbftDecidable<ManualSlotClock>,
D: QbftDecidable,
D::Id: Send + Sync + Clone,
{
// Senders to the processor
senders: Senders,
// Track mapping from operator id to the respective manager
managers: HashMap<OperatorId, Arc<QbftManager<ManualSlotClock>>>,
managers: HashMap<OperatorId, Arc<QbftManager>>,
// The size of the committee
pub size: CommitteeSize,
// Mapping of the data hash to the data identifier. This is to send data to the proper instance
Expand Down Expand Up @@ -211,7 +211,7 @@ impl OperatorBehavior {

impl<D> QbftTester<D>
where
D: QbftDecidable<ManualSlotClock> + 'static,
D: QbftDecidable + 'static,
D::Id: Send + Sync + Clone,
{
/// Create a new QBFT tester instance
Expand Down
27 changes: 14 additions & 13 deletions anchor/signature_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,25 @@ struct SignatureCollector {
for_slot: Slot,
}

pub struct SignatureCollectorManager<T: SlotClock> {
pub struct SignatureCollectorManager {
processor: Senders,
slot_clock: T,
signature_collectors: DashMap<Hash256, SignatureCollector>,
}

impl<T: SlotClock + 'static> SignatureCollectorManager<T> {
pub fn new(processor: Senders, slot_clock: T) -> Result<Arc<Self>, CollectionError> {
impl SignatureCollectorManager {
pub fn new<T>(processor: Senders, slot_clock: T) -> Result<Arc<Self>, CollectionError>
where
T: SlotClock + 'static,
{
let manager = Arc::new(Self {
processor,
slot_clock,
signature_collectors: DashMap::new(),
});

manager
.processor
.permitless
.send_async(Arc::clone(&manager).cleaner(), COLLECTOR_CLEANER_NAME)?;
manager.processor.permitless.send_async(
Arc::clone(&manager).cleaner(slot_clock),
COLLECTOR_CLEANER_NAME,
)?;

Ok(manager)
}
Expand Down Expand Up @@ -132,15 +133,15 @@ impl<T: SlotClock + 'static> SignatureCollectorManager<T> {
}
}

async fn cleaner(self: Arc<Self>) {
async fn cleaner(self: Arc<Self>, slot_clock: impl SlotClock) {
while !self.processor.permitless.is_closed() {
sleep(
self.slot_clock
slot_clock
.duration_to_next_slot()
.unwrap_or(self.slot_clock.slot_duration()),
.unwrap_or(slot_clock.slot_duration()),
)
.await;
let Some(slot) = self.slot_clock.now() else {
let Some(slot) = slot_clock.now() else {
continue;
};
let cutoff = slot.saturating_sub(SIGNATURE_COLLECTOR_RETAIN_SLOTS);
Expand Down
8 changes: 4 additions & 4 deletions anchor/validator_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ struct InitializedValidator {

pub struct AnchorValidatorStore<T: SlotClock + 'static, E: EthSpec> {
validators: DashMap<PublicKeyBytes, InitializedValidator>,
signature_collector: Arc<SignatureCollectorManager<T>>,
qbft_manager: Arc<QbftManager<T>>,
signature_collector: Arc<SignatureCollectorManager>,
qbft_manager: Arc<QbftManager>,
slashing_protection: SlashingDatabase,
slashing_protection_last_prune: Mutex<Epoch>,
slot_clock: T,
Expand All @@ -89,8 +89,8 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
#[allow(clippy::too_many_arguments)]
pub fn new(
database_state: Receiver<NetworkState>,
signature_collector: Arc<SignatureCollectorManager<T>>,
qbft_manager: Arc<QbftManager<T>>,
signature_collector: Arc<SignatureCollectorManager>,
qbft_manager: Arc<QbftManager>,
slashing_protection: SlashingDatabase,
slot_clock: T,
spec: Arc<ChainSpec>,
Expand Down

0 comments on commit eca2089

Please sign in to comment.