Skip to content

Commit

Permalink
Merge branch 'unstable' into fix-handshake
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos authored Feb 18, 2025
2 parents 41608d0 + fff476e commit aa1eae6
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 44 deletions.
39 changes: 17 additions & 22 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,11 @@ type Qbft<D, S> = qbft::Qbft<DefaultLeaderFunction, D, S>;
type Map<I, D> = DashMap<I, UnboundedSender<QbftMessage<D>>>;

// Top level QBFTManager structure
pub struct QbftManager<T: SlotClock + 'static> {
pub struct QbftManager {
// Senders to send work off to the central processor
processor: Senders,
// OperatorID
operator_id: QbftOperatorId,
// The slot clock for timing
slot_clock: T,
// All of the QBFT instances that are voting on validator consensus data
validator_consensus_data_instances: Map<ValidatorInstanceId, ValidatorConsensusData>,
// All of the QBFT instances that are voting on beacon data
Expand All @@ -109,12 +107,12 @@ pub struct QbftManager<T: SlotClock + 'static> {
network_tx: mpsc::UnboundedSender<SignedSSVMessage>,
}

impl<T: SlotClock> QbftManager<T> {
impl QbftManager {
// Construct a new QBFT Manager
pub fn new(
processor: Senders,
operator_id: OperatorId,
slot_clock: T,
slot_clock: impl SlotClock + 'static,
key: Rsa<Private>,
network_tx: mpsc::UnboundedSender<SignedSSVMessage>,
) -> Result<Arc<Self>, QbftError> {
Expand All @@ -123,7 +121,6 @@ impl<T: SlotClock> QbftManager<T> {
let manager = Arc::new(QbftManager {
processor,
operator_id,
slot_clock,
validator_consensus_data_instances: DashMap::new(),
beacon_vote_instances: DashMap::new(),
pkey,
Expand All @@ -134,13 +131,13 @@ impl<T: SlotClock> QbftManager<T> {
manager
.processor
.permitless
.send_async(Arc::clone(&manager).cleaner(), QBFT_CLEANER_NAME)?;
.send_async(Arc::clone(&manager).cleaner(slot_clock), QBFT_CLEANER_NAME)?;

Ok(manager)
}

// Decide a brand new qbft instance
pub async fn decide_instance<D: QbftDecidable<T>>(
pub async fn decide_instance<D: QbftDecidable>(
&self,
id: D::Id,
initial: D,
Expand Down Expand Up @@ -182,7 +179,7 @@ impl<T: SlotClock> QbftManager<T> {
}

/// Send a new network message to the instance
pub fn receive_data<D: QbftDecidable<T>>(
pub fn receive_data<D: QbftDecidable>(
&self,
id: D::Id,
data: WrappedQbftMessage,
Expand All @@ -201,15 +198,15 @@ impl<T: SlotClock> QbftManager<T> {
}

// Long running cleaner that will remove instances that are no longer relevant
async fn cleaner(self: Arc<Self>) {
async fn cleaner(self: Arc<Self>, slot_clock: impl SlotClock) {
while !self.processor.permitless.is_closed() {
sleep(
self.slot_clock
slot_clock
.duration_to_next_slot()
.unwrap_or(self.slot_clock.slot_duration()),
.unwrap_or(slot_clock.slot_duration()),
)
.await;
let Some(slot) = self.slot_clock.now() else {
let Some(slot) = slot_clock.now() else {
continue;
};
let cutoff = slot.saturating_sub(QBFT_RETAIN_SLOTS);
Expand All @@ -220,15 +217,13 @@ impl<T: SlotClock> QbftManager<T> {
}

// Trait that describes any data that is able to be decided upon during a qbft instance
pub trait QbftDecidable<T: SlotClock + 'static>:
QbftData<Hash = Hash256> + Send + Sync + 'static
{
pub trait QbftDecidable: QbftData<Hash = Hash256> + Send + Sync + 'static {
type Id: Hash + Eq + Send;

fn get_map(manager: &QbftManager<T>) -> &Map<Self::Id, Self>;
fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self>;

fn get_or_spawn_instance(
manager: &QbftManager<T>,
manager: &QbftManager,
id: Self::Id,
) -> UnboundedSender<QbftMessage<Self>> {
let map = Self::get_map(manager);
Expand Down Expand Up @@ -257,9 +252,9 @@ pub trait QbftDecidable<T: SlotClock + 'static>:
fn instance_height(&self, id: &Self::Id) -> InstanceHeight;
}

impl<T: SlotClock + 'static> QbftDecidable<T> for ValidatorConsensusData {
impl QbftDecidable for ValidatorConsensusData {
type Id = ValidatorInstanceId;
fn get_map(manager: &QbftManager<T>) -> &Map<Self::Id, Self> {
fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self> {
&manager.validator_consensus_data_instances
}

Expand All @@ -268,9 +263,9 @@ impl<T: SlotClock + 'static> QbftDecidable<T> for ValidatorConsensusData {
}
}

impl<T: SlotClock + 'static> QbftDecidable<T> for BeaconVote {
impl QbftDecidable for BeaconVote {
type Id = CommitteeInstanceId;
fn get_map(manager: &QbftManager<T>) -> &Map<Self::Id, Self> {
fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self> {
&manager.beacon_vote_instances
}

Expand Down
10 changes: 5 additions & 5 deletions anchor/qbft_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static TRACING: LazyLock<()> = LazyLock::new(|| {
// Top level Testing Context to provide clean wrapper around testing framework
pub struct TestContext<D>
where
D: QbftDecidable<ManualSlotClock>,
D: QbftDecidable,
D::Id: Send + Sync + Clone,
{
pub tester: Arc<QbftTester<D>>,
Expand All @@ -36,7 +36,7 @@ where

impl<D> TestContext<D>
where
D: QbftDecidable<ManualSlotClock>,
D: QbftDecidable,
D::Id: Send + Sync + Clone,
{
// Create a new test context with default setup
Expand Down Expand Up @@ -131,13 +131,13 @@ impl CommitteeSize {
/// The main test coordinator that manages multiple QBFT instances
pub struct QbftTester<D>
where
D: QbftDecidable<ManualSlotClock>,
D: QbftDecidable,
D::Id: Send + Sync + Clone,
{
// Senders to the processor
senders: Senders,
// Track mapping from operator id to the respective manager
managers: HashMap<OperatorId, Arc<QbftManager<ManualSlotClock>>>,
managers: HashMap<OperatorId, Arc<QbftManager>>,
// The size of the committee
pub size: CommitteeSize,
// Mapping of the data hash to the data identifier. This is to send data to the proper instance
Expand Down Expand Up @@ -211,7 +211,7 @@ impl OperatorBehavior {

impl<D> QbftTester<D>
where
D: QbftDecidable<ManualSlotClock> + 'static,
D: QbftDecidable + 'static,
D::Id: Send + Sync + Clone,
{
/// Create a new QBFT tester instance
Expand Down
27 changes: 14 additions & 13 deletions anchor/signature_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,25 @@ struct SignatureCollector {
for_slot: Slot,
}

pub struct SignatureCollectorManager<T: SlotClock> {
pub struct SignatureCollectorManager {
processor: Senders,
slot_clock: T,
signature_collectors: DashMap<Hash256, SignatureCollector>,
}

impl<T: SlotClock + 'static> SignatureCollectorManager<T> {
pub fn new(processor: Senders, slot_clock: T) -> Result<Arc<Self>, CollectionError> {
impl SignatureCollectorManager {
pub fn new<T>(processor: Senders, slot_clock: T) -> Result<Arc<Self>, CollectionError>
where
T: SlotClock + 'static,
{
let manager = Arc::new(Self {
processor,
slot_clock,
signature_collectors: DashMap::new(),
});

manager
.processor
.permitless
.send_async(Arc::clone(&manager).cleaner(), COLLECTOR_CLEANER_NAME)?;
manager.processor.permitless.send_async(
Arc::clone(&manager).cleaner(slot_clock),
COLLECTOR_CLEANER_NAME,
)?;

Ok(manager)
}
Expand Down Expand Up @@ -132,15 +133,15 @@ impl<T: SlotClock + 'static> SignatureCollectorManager<T> {
}
}

async fn cleaner(self: Arc<Self>) {
async fn cleaner(self: Arc<Self>, slot_clock: impl SlotClock) {
while !self.processor.permitless.is_closed() {
sleep(
self.slot_clock
slot_clock
.duration_to_next_slot()
.unwrap_or(self.slot_clock.slot_duration()),
.unwrap_or(slot_clock.slot_duration()),
)
.await;
let Some(slot) = self.slot_clock.now() else {
let Some(slot) = slot_clock.now() else {
continue;
};
let cutoff = slot.saturating_sub(SIGNATURE_COLLECTOR_RETAIN_SLOTS);
Expand Down
8 changes: 4 additions & 4 deletions anchor/validator_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ struct InitializedValidator {

pub struct AnchorValidatorStore<T: SlotClock + 'static, E: EthSpec> {
validators: DashMap<PublicKeyBytes, InitializedValidator>,
signature_collector: Arc<SignatureCollectorManager<T>>,
qbft_manager: Arc<QbftManager<T>>,
signature_collector: Arc<SignatureCollectorManager>,
qbft_manager: Arc<QbftManager>,
slashing_protection: SlashingDatabase,
slashing_protection_last_prune: Mutex<Epoch>,
slot_clock: T,
Expand All @@ -89,8 +89,8 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
#[allow(clippy::too_many_arguments)]
pub fn new(
database_state: Receiver<NetworkState>,
signature_collector: Arc<SignatureCollectorManager<T>>,
qbft_manager: Arc<QbftManager<T>>,
signature_collector: Arc<SignatureCollectorManager>,
qbft_manager: Arc<QbftManager>,
slashing_protection: SlashingDatabase,
slot_clock: T,
spec: Arc<ChainSpec>,
Expand Down

0 comments on commit aa1eae6

Please sign in to comment.