Skip to content

Commit db589f3

Browse files
committed
refactor(aggregator): slave synchronizer updates the next signers in the epoch service
Instead of having to repeat the call on 'inform_new_epoch' in the state machine for the slave.
1 parent 1a1bdbc commit db589f3

File tree

6 files changed

+187
-12
lines changed

6 files changed

+187
-12
lines changed

mithril-aggregator/src/dependency_injection/builder/protocol/certificates.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ impl DependenciesBuilder {
161161
&mut self,
162162
) -> Result<Arc<MithrilSignerRegistrationSlave>> {
163163
let registerer = MithrilSignerRegistrationSlave::new(
164+
self.get_epoch_service().await?,
164165
self.get_verification_key_store().await?,
165166
self.get_signer_store().await?,
166167
self.get_signer_registration_verifier().await?,

mithril-aggregator/src/runtime/state_machine.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,6 @@ impl AggregatorRuntime {
274274
self.runner
275275
.synchronize_slave_aggregator_signer_registration()
276276
.await?;
277-
// Needed to recompute epoch data for the next signing round on the slave
278-
self.runner.inform_new_epoch(new_time_point.epoch).await?;
279277
}
280278
self.runner.precompute_epoch_data().await?;
281279
}
@@ -940,7 +938,7 @@ mod tests {
940938
runner
941939
.expect_inform_new_epoch()
942940
.with(predicate::eq(new_time_point_clone.clone().epoch))
943-
.times(2)
941+
.once()
944942
.returning(|_| Ok(()));
945943
runner
946944
.expect_update_epoch_settings()

mithril-aggregator/src/services/epoch_service.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pub trait EpochService: Sync + Send {
4949
/// Note: must be called after `inform_epoch`.
5050
async fn update_epoch_settings(&mut self) -> StdResult<()>;
5151

52+
/// Update the next signers with stake for the next epoch.
53+
async fn update_next_signers_with_stake(&mut self) -> StdResult<()>;
54+
5255
/// Inform the service that it can precompute data for its current epoch.
5356
///
5457
/// Note: must be called after `inform_epoch`.
@@ -383,6 +386,27 @@ impl EpochService for MithrilEpochService {
383386
self.insert_future_epoch_settings(data.epoch).await
384387
}
385388

389+
async fn update_next_signers_with_stake(&mut self) -> StdResult<()> {
390+
debug!(self.logger, ">> update_next_signers_with_stake");
391+
392+
let data = self.unwrap_data().with_context(|| {
393+
"can't update next signers with stake if inform_epoch has not been called first"
394+
})?;
395+
396+
let next_signer_retrieval_epoch = data.epoch.offset_to_next_signer_retrieval_epoch();
397+
let next_signers_with_stake = self
398+
.get_signers_with_stake_at_epoch(next_signer_retrieval_epoch)
399+
.await?;
400+
401+
self.epoch_data.as_mut().unwrap().next_signers_with_stake = next_signers_with_stake;
402+
403+
self.precompute_epoch_data()
404+
.await
405+
.with_context(|| "Epoch service failed to precompute epoch data")?;
406+
407+
Ok(())
408+
}
409+
386410
async fn precompute_epoch_data(&mut self) -> StdResult<()> {
387411
debug!(self.logger, ">> precompute_epoch_data");
388412

@@ -523,6 +547,7 @@ pub(crate) struct FakeEpochService {
523547
inform_epoch_error: bool,
524548
update_epoch_settings_error: bool,
525549
precompute_epoch_data_error: bool,
550+
update_next_signers_with_stake_error: bool,
526551
}
527552

528553
#[cfg(test)]
@@ -615,6 +640,7 @@ impl FakeEpochServiceBuilder {
615640
inform_epoch_error: false,
616641
update_epoch_settings_error: false,
617642
precompute_epoch_data_error: false,
643+
update_next_signers_with_stake_error: false,
618644
}
619645
}
620646
}
@@ -660,19 +686,21 @@ impl FakeEpochService {
660686
inform_epoch_error: false,
661687
update_epoch_settings_error: false,
662688
precompute_epoch_data_error: false,
689+
update_next_signers_with_stake_error: false,
663690
}
664691
}
665692

666-
#[allow(dead_code)]
667693
pub fn toggle_errors(
668694
&mut self,
669695
inform_epoch: bool,
670696
update_protocol_parameters: bool,
671697
precompute_epoch: bool,
698+
update_next_signers_with_stake: bool,
672699
) {
673700
self.inform_epoch_error = inform_epoch;
674701
self.update_epoch_settings_error = update_protocol_parameters;
675702
self.precompute_epoch_data_error = precompute_epoch;
703+
self.update_next_signers_with_stake_error = update_next_signers_with_stake;
676704
}
677705

678706
fn unwrap_data(&self) -> Result<&EpochData, EpochServiceError> {
@@ -714,6 +742,13 @@ impl EpochService for FakeEpochService {
714742
Ok(())
715743
}
716744

745+
async fn update_next_signers_with_stake(&mut self) -> StdResult<()> {
746+
if self.update_next_signers_with_stake_error {
747+
anyhow::bail!("update_next_signers_with_stake fake error");
748+
}
749+
Ok(())
750+
}
751+
717752
fn cardano_era(&self) -> StdResult<CardanoEra> {
718753
Ok(self.unwrap_data()?.cardano_era.clone())
719754
}
@@ -1213,6 +1248,48 @@ mod tests {
12131248
assert!(service.computed_epoch_data.is_none());
12141249
}
12151250

1251+
#[tokio::test]
1252+
async fn update_next_signers_with_stake_succeeds() {
1253+
let fixture = MithrilFixtureBuilder::default().with_signers(3).build();
1254+
let next_fixture = MithrilFixtureBuilder::default().with_signers(5).build();
1255+
let next_avk = next_fixture.compute_avk();
1256+
let epoch = Epoch(4);
1257+
let mut service = EpochServiceBuilder {
1258+
next_signers_with_stake: next_fixture.signers_with_stake().clone(),
1259+
..EpochServiceBuilder::new(epoch, fixture.clone())
1260+
}
1261+
.build()
1262+
.await;
1263+
service
1264+
.inform_epoch(epoch)
1265+
.await
1266+
.expect("inform_epoch should not fail");
1267+
service.epoch_data = Some(EpochData {
1268+
next_signers_with_stake: vec![],
1269+
..service.epoch_data.unwrap()
1270+
});
1271+
service.computed_epoch_data = None;
1272+
1273+
service
1274+
.update_next_signers_with_stake()
1275+
.await
1276+
.expect("update_next_signers_with_stake should not fail");
1277+
1278+
let expected_next_signers_with_stake = next_fixture.signers_with_stake();
1279+
assert_eq!(
1280+
expected_next_signers_with_stake,
1281+
service.epoch_data.unwrap().next_signers_with_stake
1282+
);
1283+
1284+
assert_eq!(
1285+
next_avk,
1286+
service
1287+
.computed_epoch_data
1288+
.unwrap()
1289+
.next_aggregate_verification_key
1290+
);
1291+
}
1292+
12161293
#[tokio::test]
12171294
async fn update_epoch_settings_insert_future_epoch_settings_in_the_store() {
12181295
let future_protocol_parameters = ProtocolParameters::new(6, 89, 0.124);

mithril-aggregator/src/services/signer_registration/error.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,13 @@ pub enum SignerRegistrationError {
3131
#[error("signer already registered")]
3232
ExistingSigner(Box<SignerWithStake>),
3333

34-
/// Store error.
34+
/// Store.
3535
#[error("store error")]
36-
StoreError(#[source] StdError),
36+
Store(#[source] StdError),
37+
38+
/// Epoch service.
39+
#[error("epoch service error")]
40+
EpochService(#[source] StdError),
3741

3842
/// Signer registration failed.
3943
#[error("signer registration failed")]

mithril-aggregator/src/services/signer_registration/master.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ impl SignerRegisterer for MithrilSignerRegistrationMaster {
147147
registration_round.epoch
148148
)
149149
})
150-
.map_err(|e| SignerRegistrationError::StoreError(anyhow!(e)))?
150+
.map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?
151151
{
152152
Some(_) => Err(SignerRegistrationError::ExistingSigner(Box::new(
153153
signer_save,

mithril-aggregator/src/services/signer_registration/slave.rs

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use mithril_common::{
1010
use mithril_persistence::store::StakeStorer;
1111

1212
use crate::{
13+
dependency_injection::EpochServiceWrapper,
1314
services::{AggregatorClient, EpochPruningTask},
1415
SignerRegistrationVerifier, VerificationKeyStorer,
1516
};
@@ -21,6 +22,9 @@ use super::{
2122

2223
/// A [MithrilSignerRegistrationSlave] supports signer registrations in a slave aggregator
2324
pub struct MithrilSignerRegistrationSlave {
25+
/// Epoch service
26+
pub epoch_service: EpochServiceWrapper,
27+
2428
/// Verification key store
2529
verification_key_store: Arc<dyn VerificationKeyStorer>,
2630

@@ -44,6 +48,7 @@ pub struct MithrilSignerRegistrationSlave {
4448
impl MithrilSignerRegistrationSlave {
4549
/// MithrilSignerRegistererSlave factory
4650
pub fn new(
51+
epoch_service: EpochServiceWrapper,
4752
verification_key_store: Arc<dyn VerificationKeyStorer>,
4853
signer_recorder: Arc<dyn SignerRecorder>,
4954
signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
@@ -52,6 +57,7 @@ impl MithrilSignerRegistrationSlave {
5257
verification_key_epoch_retention_limit: Option<u64>,
5358
) -> Self {
5459
Self {
60+
epoch_service,
5561
verification_key_store,
5662
signer_recorder,
5763
signer_registration_verifier,
@@ -90,9 +96,16 @@ impl MithrilSignerRegistrationSlave {
9096
epoch
9197
)
9298
})
93-
.map_err(|e| SignerRegistrationError::StoreError(anyhow!(e)))?;
99+
.map_err(|e| SignerRegistrationError::Store(anyhow!(e)))?;
94100
}
95101

102+
self.epoch_service
103+
.write()
104+
.await
105+
.update_next_signers_with_stake()
106+
.await
107+
.map_err(|e| SignerRegistrationError::EpochService(anyhow!(e)))?;
108+
96109
Ok(())
97110
}
98111
}
@@ -130,8 +143,8 @@ impl SignerSynchronizer for MithrilSignerRegistrationSlave {
130143
.get_stakes(registration_epoch)
131144
.await
132145
.with_context(|| "synchronize_all_signers failed")
133-
.map_err(SignerRegistrationError::StoreError)?
134-
.ok_or(SignerRegistrationError::StoreError(anyhow::anyhow!(
146+
.map_err(SignerRegistrationError::Store)?
147+
.ok_or(SignerRegistrationError::Store(anyhow::anyhow!(
135148
"Slave aggregator did not return any stake distribution"
136149
)))?;
137150
self.synchronize_signers(registration_epoch, &next_signers, &stake_distribution)
@@ -214,8 +227,9 @@ mod tests {
214227
database::{repository::SignerRegistrationStore, test_helper::main_db_connection},
215228
message_adapters::FromEpochSettingsAdapter,
216229
services::{
217-
AggregatorClient, AggregatorClientError, EpochPruningTask, MockAggregatorClient,
218-
MockSignerRecorder, MockSignerRegistrationVerifier, SignerSynchronizer,
230+
AggregatorClient, AggregatorClientError, EpochPruningTask, FakeEpochService,
231+
MockAggregatorClient, MockSignerRecorder, MockSignerRegistrationVerifier,
232+
SignerSynchronizer,
219233
},
220234
store::MockVerificationKeyStorer,
221235
tools::mocks::MockStakeStore,
@@ -226,10 +240,15 @@ mod tests {
226240
use test_utils::*;
227241

228242
mod test_utils {
243+
use tokio::sync::RwLock;
244+
245+
use crate::{dependency_injection::EpochServiceWrapper, services::FakeEpochService};
246+
229247
use super::*;
230248

231249
/// MithrilSignerRegistrationSlaveBuilder is a test builder for [MithrilSignerRegistrationSlave]
232250
pub struct MithrilSignerRegistrationSlaveBuilder {
251+
epoch_service: EpochServiceWrapper,
233252
signer_recorder: Arc<dyn SignerRecorder>,
234253
signer_registration_verifier: Arc<dyn SignerRegistrationVerifier>,
235254
master_aggregator_client: Arc<dyn AggregatorClient>,
@@ -241,6 +260,7 @@ mod tests {
241260
impl Default for MithrilSignerRegistrationSlaveBuilder {
242261
fn default() -> Self {
243262
Self {
263+
epoch_service: Arc::new(RwLock::new(FakeEpochService::without_data())),
244264
signer_recorder: Arc::new(MockSignerRecorder::new()),
245265
signer_registration_verifier: Arc::new(MockSignerRegistrationVerifier::new()),
246266
master_aggregator_client: Arc::new(MockAggregatorClient::new()),
@@ -254,6 +274,13 @@ mod tests {
254274
}
255275

256276
impl MithrilSignerRegistrationSlaveBuilder {
277+
pub fn with_epoch_service(self, epoch_service: FakeEpochService) -> Self {
278+
Self {
279+
epoch_service: Arc::new(RwLock::new(epoch_service)),
280+
..self
281+
}
282+
}
283+
257284
pub fn with_verification_key_store(
258285
self,
259286
verification_key_store: Arc<dyn VerificationKeyStorer>,
@@ -310,6 +337,7 @@ mod tests {
310337

311338
pub fn build(self) -> MithrilSignerRegistrationSlave {
312339
MithrilSignerRegistrationSlave {
340+
epoch_service: self.epoch_service,
313341
verification_key_store: self.verification_key_store,
314342
signer_recorder: self.signer_recorder,
315343
signer_registration_verifier: self.signer_registration_verifier,
@@ -478,6 +506,73 @@ mod tests {
478506
.expect_err("synchronize_all_signers should fail");
479507
}
480508

509+
#[tokio::test]
510+
async fn synchronize_all_signers_fails_if_epoch_service_update_next_signers_fails() {
511+
let registration_epoch = Epoch(1);
512+
let fixture = MithrilFixtureBuilder::default()
513+
.with_signers(5)
514+
.disable_signers_certification()
515+
.build();
516+
let signers = fixture.signers();
517+
let stake_distribution = fixture.stake_distribution();
518+
let epoch_settings_message = FromEpochSettingsAdapter::try_adapt(EpochSettingsMessage {
519+
epoch: registration_epoch,
520+
next_signers: SignerMessagePart::from_signers(signers),
521+
..EpochSettingsMessage::dummy()
522+
})
523+
.unwrap();
524+
525+
let signer_registration_slave = MithrilSignerRegistrationSlaveBuilder::default()
526+
.with_epoch_service({
527+
let mut epoch_service = FakeEpochService::without_data();
528+
epoch_service.toggle_errors(false, false, false, true);
529+
530+
epoch_service
531+
})
532+
.with_signer_recorder({
533+
let mut signer_recorder = MockSignerRecorder::new();
534+
signer_recorder
535+
.expect_record_signer_registration()
536+
.returning(|_| Ok(()))
537+
.times(5);
538+
539+
Arc::new(signer_recorder)
540+
})
541+
.with_signer_registration_verifier({
542+
let mut signer_registration_verifier = MockSignerRegistrationVerifier::new();
543+
signer_registration_verifier
544+
.expect_verify()
545+
.returning(|signer, _| Ok(SignerWithStake::from_signer(signer.to_owned(), 123)))
546+
.times(5);
547+
548+
Arc::new(signer_registration_verifier)
549+
})
550+
.with_master_aggregator_client({
551+
let mut aggregator_client = MockAggregatorClient::new();
552+
aggregator_client
553+
.expect_retrieve_epoch_settings()
554+
.returning(move || Ok(Some(epoch_settings_message.clone())))
555+
.times(1);
556+
557+
Arc::new(aggregator_client)
558+
})
559+
.with_stake_store({
560+
let mut stake_store = MockStakeStore::new();
561+
stake_store
562+
.expect_get_stakes()
563+
.returning(move |_epoch| Ok(Some(stake_distribution.clone())))
564+
.times(1);
565+
566+
Arc::new(stake_store)
567+
})
568+
.build();
569+
570+
signer_registration_slave
571+
.synchronize_all_signers()
572+
.await
573+
.expect_err("synchronize_all_signers should fail");
574+
}
575+
481576
#[tokio::test]
482577
async fn synchronize_all_signers_fails_if_fetching_epoch_settings_fails() {
483578
let signer_registration_slave = MithrilSignerRegistrationSlaveBuilder::default()

0 commit comments

Comments
 (0)