Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make QBFT and signature collection managers nongeneric #146

Merged
merged 4 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,11 @@ type Qbft<D, S> = qbft::Qbft<DefaultLeaderFunction, D, S>;
type Map<I, D> = DashMap<I, UnboundedSender<QbftMessage<D>>>;

// Top level QBFTManager structure
pub struct QbftManager<T: SlotClock + 'static> {
pub struct QbftManager {
// Senders to send work off to the central processor
processor: Senders,
// OperatorID
operator_id: QbftOperatorId,
// The slot clock for timing
slot_clock: T,
// All of the QBFT instances that are voting on validator consensus data
validator_consensus_data_instances: Map<ValidatorInstanceId, ValidatorConsensusData>,
// All of the QBFT instances that are voting on beacon data
Expand All @@ -109,12 +107,12 @@ pub struct QbftManager<T: SlotClock + 'static> {
network_tx: mpsc::UnboundedSender<SignedSSVMessage>,
}

impl<T: SlotClock> QbftManager<T> {
impl QbftManager {
// Construct a new QBFT Manager
pub fn new(
processor: Senders,
operator_id: OperatorId,
slot_clock: T,
slot_clock: impl SlotClock + 'static,
key: Rsa<Private>,
network_tx: mpsc::UnboundedSender<SignedSSVMessage>,
) -> Result<Arc<Self>, QbftError> {
Expand All @@ -123,7 +121,6 @@ impl<T: SlotClock> QbftManager<T> {
let manager = Arc::new(QbftManager {
processor,
operator_id,
slot_clock,
validator_consensus_data_instances: DashMap::new(),
beacon_vote_instances: DashMap::new(),
pkey,
Expand All @@ -134,13 +131,13 @@ impl<T: SlotClock> QbftManager<T> {
manager
.processor
.permitless
.send_async(Arc::clone(&manager).cleaner(), QBFT_CLEANER_NAME)?;
.send_async(Arc::clone(&manager).cleaner(slot_clock), QBFT_CLEANER_NAME)?;

Ok(manager)
}

// Decide a brand new qbft instance
pub async fn decide_instance<D: QbftDecidable<T>>(
pub async fn decide_instance<D: QbftDecidable>(
&self,
id: D::Id,
initial: D,
Expand Down Expand Up @@ -182,7 +179,7 @@ impl<T: SlotClock> QbftManager<T> {
}

/// Send a new network message to the instance
pub fn receive_data<D: QbftDecidable<T>>(
pub fn receive_data<D: QbftDecidable>(
&self,
id: D::Id,
data: WrappedQbftMessage,
Expand All @@ -201,15 +198,15 @@ impl<T: SlotClock> QbftManager<T> {
}

// Long running cleaner that will remove instances that are no longer relevant
async fn cleaner(self: Arc<Self>) {
async fn cleaner(self: Arc<Self>, slot_clock: impl SlotClock) {
while !self.processor.permitless.is_closed() {
sleep(
self.slot_clock
slot_clock
.duration_to_next_slot()
.unwrap_or(self.slot_clock.slot_duration()),
.unwrap_or(slot_clock.slot_duration()),
)
.await;
let Some(slot) = self.slot_clock.now() else {
let Some(slot) = slot_clock.now() else {
continue;
};
let cutoff = slot.saturating_sub(QBFT_RETAIN_SLOTS);
Expand All @@ -220,15 +217,13 @@ impl<T: SlotClock> QbftManager<T> {
}

// Trait that describes any data that is able to be decided upon during a qbft instance
pub trait QbftDecidable<T: SlotClock + 'static>:
QbftData<Hash = Hash256> + Send + Sync + 'static
{
pub trait QbftDecidable: QbftData<Hash = Hash256> + Send + Sync + 'static {
type Id: Hash + Eq + Send;

fn get_map(manager: &QbftManager<T>) -> &Map<Self::Id, Self>;
fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self>;

fn get_or_spawn_instance(
manager: &QbftManager<T>,
manager: &QbftManager,
id: Self::Id,
) -> UnboundedSender<QbftMessage<Self>> {
let map = Self::get_map(manager);
Expand Down Expand Up @@ -257,9 +252,9 @@ pub trait QbftDecidable<T: SlotClock + 'static>:
fn instance_height(&self, id: &Self::Id) -> InstanceHeight;
}

impl<T: SlotClock + 'static> QbftDecidable<T> for ValidatorConsensusData {
impl QbftDecidable for ValidatorConsensusData {
type Id = ValidatorInstanceId;
fn get_map(manager: &QbftManager<T>) -> &Map<Self::Id, Self> {
fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self> {
&manager.validator_consensus_data_instances
}

Expand All @@ -268,9 +263,9 @@ impl<T: SlotClock + 'static> QbftDecidable<T> for ValidatorConsensusData {
}
}

impl<T: SlotClock + 'static> QbftDecidable<T> for BeaconVote {
impl QbftDecidable for BeaconVote {
type Id = CommitteeInstanceId;
fn get_map(manager: &QbftManager<T>) -> &Map<Self::Id, Self> {
fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self> {
&manager.beacon_vote_instances
}

Expand Down
10 changes: 5 additions & 5 deletions anchor/qbft_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static TRACING: LazyLock<()> = LazyLock::new(|| {
// Top level Testing Context to provide clean wrapper around testing framework
pub struct TestContext<D>
where
D: QbftDecidable<ManualSlotClock>,
D: QbftDecidable,
D::Id: Send + Sync + Clone,
{
pub tester: Arc<QbftTester<D>>,
Expand All @@ -36,7 +36,7 @@ where

impl<D> TestContext<D>
where
D: QbftDecidable<ManualSlotClock>,
D: QbftDecidable,
D::Id: Send + Sync + Clone,
{
// Create a new test context with default setup
Expand Down Expand Up @@ -131,13 +131,13 @@ impl CommitteeSize {
/// The main test coordinator that manages multiple QBFT instances
pub struct QbftTester<D>
where
D: QbftDecidable<ManualSlotClock>,
D: QbftDecidable,
D::Id: Send + Sync + Clone,
{
// Senders to the processor
senders: Senders,
// Track mapping from operator id to the respective manager
managers: HashMap<OperatorId, Arc<QbftManager<ManualSlotClock>>>,
managers: HashMap<OperatorId, Arc<QbftManager>>,
// The size of the committee
pub size: CommitteeSize,
// Mapping of the data hash to the data identifier. This is to send data to the proper instance
Expand Down Expand Up @@ -211,7 +211,7 @@ impl OperatorBehavior {

impl<D> QbftTester<D>
where
D: QbftDecidable<ManualSlotClock> + 'static,
D: QbftDecidable + 'static,
D::Id: Send + Sync + Clone,
{
/// Create a new QBFT tester instance
Expand Down
27 changes: 14 additions & 13 deletions anchor/signature_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,25 @@ struct SignatureCollector {
for_slot: Slot,
}

pub struct SignatureCollectorManager<T: SlotClock> {
pub struct SignatureCollectorManager {
processor: Senders,
slot_clock: T,
signature_collectors: DashMap<Hash256, SignatureCollector>,
}

impl<T: SlotClock + 'static> SignatureCollectorManager<T> {
pub fn new(processor: Senders, slot_clock: T) -> Result<Arc<Self>, CollectionError> {
impl SignatureCollectorManager {
pub fn new<T>(processor: Senders, slot_clock: T) -> Result<Arc<Self>, CollectionError>
where
T: SlotClock + 'static,
{
let manager = Arc::new(Self {
processor,
slot_clock,
signature_collectors: DashMap::new(),
});

manager
.processor
.permitless
.send_async(Arc::clone(&manager).cleaner(), COLLECTOR_CLEANER_NAME)?;
manager.processor.permitless.send_async(
Arc::clone(&manager).cleaner(slot_clock),
COLLECTOR_CLEANER_NAME,
)?;

Ok(manager)
}
Expand Down Expand Up @@ -132,15 +133,15 @@ impl<T: SlotClock + 'static> SignatureCollectorManager<T> {
}
}

async fn cleaner(self: Arc<Self>) {
async fn cleaner(self: Arc<Self>, slot_clock: impl SlotClock) {
while !self.processor.permitless.is_closed() {
sleep(
self.slot_clock
slot_clock
.duration_to_next_slot()
.unwrap_or(self.slot_clock.slot_duration()),
.unwrap_or(slot_clock.slot_duration()),
)
.await;
let Some(slot) = self.slot_clock.now() else {
let Some(slot) = slot_clock.now() else {
continue;
};
let cutoff = slot.saturating_sub(SIGNATURE_COLLECTOR_RETAIN_SLOTS);
Expand Down
8 changes: 4 additions & 4 deletions anchor/validator_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ struct InitializedValidator {

pub struct AnchorValidatorStore<T: SlotClock + 'static, E: EthSpec> {
validators: DashMap<PublicKeyBytes, InitializedValidator>,
signature_collector: Arc<SignatureCollectorManager<T>>,
qbft_manager: Arc<QbftManager<T>>,
signature_collector: Arc<SignatureCollectorManager>,
qbft_manager: Arc<QbftManager>,
slashing_protection: SlashingDatabase,
slashing_protection_last_prune: Mutex<Epoch>,
slot_clock: T,
Expand All @@ -89,8 +89,8 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
#[allow(clippy::too_many_arguments)]
pub fn new(
database_state: Receiver<NetworkState>,
signature_collector: Arc<SignatureCollectorManager<T>>,
qbft_manager: Arc<QbftManager<T>>,
signature_collector: Arc<SignatureCollectorManager>,
qbft_manager: Arc<QbftManager>,
slashing_protection: SlashingDatabase,
slot_clock: T,
spec: Arc<ChainSpec>,
Expand Down