Skip to content

Commit d6f33e5

Browse files
committed
Refactor validation logic into a mutex-free pluggable struct
Refactor the validation logic out of `Participant` and into its own dedicated struct that is mutex-free and listens to the progress made by the participant to infer the correct validation path. The change above significantly reduces the need for mutex control over current instance, which makes it easier to plug in extra conditional behaviour, e.g. #583. Fixes #561
1 parent 95179da commit d6f33e5

8 files changed

+346
-259
lines changed

gpbft/gpbft.go

+16-3
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ type instance struct {
209209
// independently of protocol phases/rounds.
210210
decision *quorumState
211211
// tracer traces logic logs for debugging and simulation purposes.
212-
tracer Tracer
212+
tracer Tracer
213+
progress ProgressObserver
213214
}
214215

215216
func newInstance(
@@ -218,7 +219,8 @@ func newInstance(
218219
input ECChain,
219220
data *SupplementalData,
220221
powerTable PowerTable,
221-
beacon []byte) (*instance, error) {
222+
beacon []byte,
223+
progress ProgressObserver) (*instance, error) {
222224
if input.IsZero() {
223225
return nil, fmt.Errorf("input is empty")
224226
}
@@ -247,6 +249,7 @@ func newInstance(
247249
},
248250
decision: newQuorumState(powerTable),
249251
tracer: participant.tracer,
252+
progress: progress,
250253
}, nil
251254
}
252255

@@ -482,6 +485,7 @@ func (i *instance) beginQuality() error {
482485
i.phaseTimeout = i.alarmAfterSynchrony()
483486
i.resetRebroadcastParams()
484487
i.broadcast(i.round, QUALITY_PHASE, i.proposal, false, nil)
488+
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
485489
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrQualityPhase))
486490
metrics.currentPhase.Record(context.TODO(), int64(QUALITY_PHASE))
487491
return nil
@@ -542,6 +546,7 @@ func (i *instance) beginConverge(justification *Justification) {
542546
i.getRound(i.round).converged.SetSelfValue(i.proposal, justification)
543547

544548
i.broadcast(i.round, CONVERGE_PHASE, i.proposal, true, justification)
549+
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
545550
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrConvergePhase))
546551
metrics.currentPhase.Record(context.TODO(), int64(CONVERGE_PHASE))
547552
}
@@ -599,6 +604,7 @@ func (i *instance) beginPrepare(justification *Justification) {
599604
i.resetRebroadcastParams()
600605

601606
i.broadcast(i.round, PREPARE_PHASE, i.value, false, justification)
607+
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
602608
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrPreparePhase))
603609
metrics.currentPhase.Record(context.TODO(), int64(PREPARE_PHASE))
604610
}
@@ -652,6 +658,7 @@ func (i *instance) beginCommit() {
652658
}
653659

654660
i.broadcast(i.round, COMMIT_PHASE, i.value, false, justification)
661+
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
655662
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrCommitPhase))
656663
metrics.currentPhase.Record(context.TODO(), int64(COMMIT_PHASE))
657664
}
@@ -712,6 +719,7 @@ func (i *instance) tryCommit(round uint64) error {
712719
func (i *instance) beginDecide(round uint64) {
713720
i.phase = DECIDE_PHASE
714721
i.resetRebroadcastParams()
722+
715723
var justification *Justification
716724
// Value cannot be empty here.
717725
if quorum, ok := i.getRound(round).committed.FindStrongQuorumFor(i.value.Key()); ok {
@@ -727,6 +735,7 @@ func (i *instance) beginDecide(round uint64) {
727735
// Since each node sends only one DECIDE message, they must share the same vote
728736
// in order to be aggregated.
729737
i.broadcast(0, DECIDE_PHASE, i.value, false, justification)
738+
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
730739
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrDecidePhase))
731740
metrics.currentPhase.Record(context.TODO(), int64(DECIDE_PHASE))
732741
}
@@ -740,6 +749,7 @@ func (i *instance) skipToDecide(value ECChain, justification *Justification) {
740749
i.value = i.proposal
741750
i.resetRebroadcastParams()
742751
i.broadcast(0, DECIDE_PHASE, i.value, false, justification)
752+
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
743753
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrDecidePhase))
744754
metrics.currentPhase.Record(context.TODO(), int64(DECIDE_PHASE))
745755
metrics.skipCounter.Add(context.TODO(), 1, metric.WithAttributes(attrSkipToDecide))
@@ -775,6 +785,7 @@ func (i *instance) getRound(r uint64) *roundState {
775785
func (i *instance) beginNextRound() {
776786
i.log("moving to round %d with %s", i.round+1, i.proposal.String())
777787
i.round += 1
788+
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
778789
metrics.currentRound.Record(context.TODO(), int64(i.round))
779790

780791
prevRound := i.getRound(i.round - 1)
@@ -802,6 +813,7 @@ func (i *instance) beginNextRound() {
802813
func (i *instance) skipToRound(round uint64, chain ECChain, justification *Justification) {
803814
i.log("skipping from round %d to round %d with %s", i.round, round, i.proposal.String())
804815
i.round = round
816+
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
805817
metrics.currentRound.Record(context.TODO(), int64(i.round))
806818
metrics.skipCounter.Add(context.TODO(), 1, metric.WithAttributes(attrSkipToRound))
807819

@@ -843,6 +855,7 @@ func (i *instance) terminate(decision *Justification) {
843855
i.value = decision.Vote.Value
844856
i.terminationValue = decision
845857
i.resetRebroadcastParams()
858+
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
846859
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrTerminatedPhase))
847860
metrics.roundHistogram.Record(context.TODO(), int64(i.round))
848861
metrics.currentPhase.Record(context.TODO(), int64(TERMINATED_PHASE))
@@ -940,7 +953,7 @@ func (i *instance) rebroadcastTimeoutElapsed() bool {
940953

941954
func (i *instance) rebroadcast() error {
942955
// Rebroadcast quality and all messages from the current and previous rounds, unless the
943-
// instance has progressed to DECIDE phase. In which case, only DECIDE message is
956+
// instance has progress to DECIDE phase. In which case, only DECIDE message is
944957
// rebroadcasted.
945958
//
946959
// Note that the implementation here rebroadcasts more messages than FIP-0086

gpbft/gpbft_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func TestGPBFT_WithEvenPowerDistribution(t *testing.T) {
281281
t.Run("Queues future instance messages during current instance", func(t *testing.T) {
282282
instance, driver := newInstanceAndDriver(t)
283283
futureInstance := emulator.NewInstance(t,
284-
42,
284+
8,
285285
gpbft.PowerEntries{
286286
gpbft.PowerEntry{
287287
ID: 0,

gpbft/options.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ import (
88
"time"
99
)
1010

11-
var (
11+
const (
1212
defaultDelta = 3 * time.Second
1313
defaultDeltaBackOffExponent = 2.0
1414
defaultMaxCachedInstances = 10
1515
defaultMaxCachedMessagesPerInstance = 25_000
16+
defaultCommitteeLookback = 10
1617
)
1718

1819
// Option represents a configurable parameter.
@@ -22,6 +23,7 @@ type options struct {
2223
delta time.Duration
2324
deltaBackOffExponent float64
2425

26+
committeeLookback uint64
2527
maxLookaheadRounds uint64
2628
rebroadcastAfter func(int) time.Duration
2729

@@ -36,6 +38,7 @@ func newOptions(o ...Option) (*options, error) {
3638
opts := &options{
3739
delta: defaultDelta,
3840
deltaBackOffExponent: defaultDeltaBackOffExponent,
41+
committeeLookback: defaultCommitteeLookback,
3942
rebroadcastAfter: defaultRebroadcastAfter,
4043
maxCachedInstances: defaultMaxCachedInstances,
4144
maxCachedMessagesPerInstance: defaultMaxCachedMessagesPerInstance,
@@ -118,6 +121,15 @@ func WithMaxCachedMessagesPerInstance(v int) Option {
118121
}
119122
}
120123

124+
// WithCommitteeLookback sets the number of instances in the past from which the
125+
// committee for the latest instance is derived. Defaults to 10 if unset.
126+
func WithCommitteeLookback(lookback uint64) Option {
127+
return func(o *options) error {
128+
o.committeeLookback = lookback
129+
return nil
130+
}
131+
}
132+
121133
var defaultRebroadcastAfter = exponentialBackoffer(1.3, 0.1, 3*time.Second, 30*time.Second)
122134

123135
// WithRebroadcastBackoff sets the duration after the gPBFT timeout has elapsed, at

0 commit comments

Comments
 (0)