Skip to content

Commit 984caa9

Browse files
committed
Drop messages that are no longer useful for GPBFT progression
As a GPBFT instance progresses some messages become irrelevant, in that they do not effectively aid the progress of the instance for participants. Instead, GPBFT offers other built-in mechanisms to aid progress of lagging participants such as selective rebroadcast, propagation of DECIDE messages from the previous instance and finality certificate exchange. The changes here introduce a dedicated error type returned as part of message validation to signal that although a message is valid it is no longer relevant. This error type is then checked by pubsub to avoid further propagation of those messages. This reduces the redundant pubsub traffic for the network participants. Fixes #583
1 parent 76ecca1 commit 984caa9

9 files changed

+64
-23
lines changed

gpbft/errors.go

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ var (
2525
//
2626
// See SupplementalData.
2727
ErrValidationWrongSupplement = newValidationError("unexpected supplemental data")
28+
// ErrValidationNotRelevant signals that a message is valid but not relevant at the current instance,
29+
// and is not worth propagating to others.
30+
ErrValidationNotRelevant = newValidationError("message is valid but not relevant")
2831

2932
// ErrReceivedWrongInstance signals that a message is received with mismatching instance ID.
3033
ErrReceivedWrongInstance = errors.New("received message for wrong instance")

gpbft/errors_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func TestValidationError_SentinelValues(t *testing.T) {
1717
{name: "ErrValidationInvalid", subject: ErrValidationInvalid},
1818
{name: "ErrValidationWrongBase", subject: ErrValidationWrongBase},
1919
{name: "ErrValidationWrongSupplement", subject: ErrValidationWrongSupplement},
20+
{name: "ErrValidationNotRelevant", subject: ErrValidationNotRelevant},
2021
}
2122
for _, test := range tests {
2223
t.Run(test.name, func(t *testing.T) {

gpbft/gpbft_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1609,7 +1609,8 @@ func TestGPBFT_DropOld(t *testing.T) {
16091609
}
16101610
driver.RequireDeliverMessage(newQuality)
16111611
driver.RequireDeliverMessage(newDecide0)
1612-
driver.RequireDeliverMessage(newCommit0) // no quorum of decides, should still accept it
1612+
// No quorum of decides, should still accept it but be considered not relevant
1613+
driver.RequireErrOnDeliverMessage(newCommit0, gpbft.ErrValidationNotRelevant, "not relevant")
16131614
driver.RequireDeliverMessage(newDecide1)
16141615

16151616
// Once we've received two decides, we should reject messages from the "new" instance.

gpbft/participant.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Participant struct {
3030
// if both are to be taken.
3131
apiMutex sync.Mutex
3232
// Mutex protecting currentInstance and committees cache for concurrent validation.
33-
// Note that not every access need be protected:
33+
// Note that not every access needs to be protected:
3434
// - writes to currentInstance, and reads from it during validation,
3535
// - reads from or writes to committees (which is written during validation).
3636
instanceMutex sync.Mutex

gpbft/validator.go

+22-4
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,33 @@ func (v *cachingValidator) ValidateMessage(msg *GMessage) (valid ValidatedMessag
7777
}
7878

7979
// Infer whether to proceed validating the message relative to the current instance.
80-
switch currentInstance := v.progress.Load().id; {
81-
case msg.Vote.Instance >= currentInstance+v.committeeLookback:
80+
switch current := v.progress.Load(); {
81+
case msg.Vote.Instance >= current.id+v.committeeLookback:
8282
// Message is beyond current + committee lookback.
8383
return nil, ErrValidationNoCommittee
84-
case msg.Vote.Instance >= currentInstance,
85-
msg.Vote.Instance == currentInstance-1 && msg.Vote.Phase == DECIDE_PHASE:
84+
case msg.Vote.Instance > current.id,
85+
msg.Vote.Instance == current.id-1 && msg.Vote.Phase == DECIDE_PHASE:
8686
// Only proceed to validate the message if it:
8787
// * belongs to an instance within the range of current to current + committee lookback, or
8888
// * is a DECIDE message belonging to previous instance.
89+
case msg.Vote.Instance == current.id:
90+
// Message belongs to current instance. Only validate messages that are relevant,
91+
// i.e.:
92+
// * When current instance is at DECIDE phase only validate DECIDE messages.
93+
// * Otherwise, only validate messages that would be rebroadcasted, i.e. QUALITY,
94+
// DECIDE, messages from previous round, and messages from current round.
95+
// Anything else is not relevant.
96+
switch {
97+
case current.phase == DECIDE_PHASE && msg.Vote.Phase != DECIDE_PHASE:
98+
return nil, ErrValidationNotRelevant
99+
case msg.Vote.Phase == QUALITY_PHASE,
100+
msg.Vote.Phase == DECIDE_PHASE,
101+
msg.Vote.Round == current.round-1, // Use explicit case for previous round to avoid unt64 overflow.
102+
msg.Vote.Round >= current.round:
103+
// Message is relevant. Progress to further validation.
104+
default:
105+
return nil, ErrValidationNotRelevant
106+
}
89107
default:
90108
// Message belongs to an instance older than the previous instance.
91109
return nil, ErrValidationTooOld

host.go

+4
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,10 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg
335335
case errors.Is(err, gpbft.ErrValidationTooOld):
336336
// we got the message too late
337337
return pubsub.ValidationIgnore
338+
case errors.Is(err, gpbft.ErrValidationNotRelevant):
339+
// The message is valid but will not effectively aid progress of GPBFT. Ignore it
340+
// to stop its further propagation across the network.
341+
return pubsub.ValidationIgnore
338342
case errors.Is(err, gpbft.ErrValidationNoCommittee):
339343
log.Debugf("commitee error during validation: %+v", err)
340344
return pubsub.ValidationIgnore

sim/network.go

+15-9
Original file line numberDiff line numberDiff line change
@@ -142,24 +142,30 @@ func (n *Network) SetAlarm(sender gpbft.ActorID, at time.Time) {
142142
)
143143
}
144144

145+
// HasMoreTicks checks whether there are any messages left to propagate across
146+
// the network participants. See Tick.
147+
func (n *Network) HasMoreTicks() bool {
148+
return n.queue.Len() > 0
149+
}
150+
145151
// Tick disseminates one message among participants and returns whether there are
146152
// any more messages to process.
147-
func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
153+
func (n *Network) Tick(adv *adversary.Adversary) error {
148154
msg := n.queue.Remove()
149155
n.clock = msg.deliverAt
150156

151157
receiver, found := n.participants[msg.dest]
152158
if !found {
153-
return false, fmt.Errorf("message destined to unknown participant ID: %d", msg.dest)
159+
return fmt.Errorf("message destined to unknown participant ID: %d", msg.dest)
154160
}
155161
switch payload := msg.payload.(type) {
156162
case string:
157163
if payload != "ALARM" {
158-
return false, fmt.Errorf("unknwon string message payload: %s", payload)
164+
return fmt.Errorf("unknwon string message payload: %s", payload)
159165
}
160166
n.log(TraceRecvd, "P%d %s", msg.source, payload)
161167
if err := receiver.ReceiveAlarm(); err != nil {
162-
return false, fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err)
168+
return fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err)
163169
}
164170
case gpbft.GMessage:
165171
// If GST has not elapsed, check if adversary allows the propagation of message.
@@ -170,7 +176,7 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
170176
} else if !adv.AllowMessage(msg.source, msg.dest, payload) {
171177
// GST has not passed and adversary blocks the delivery of message; proceed to
172178
// next tick.
173-
return n.queue.Len() > 0, nil
179+
return nil
174180
}
175181
}
176182
validated, err := receiver.ValidateMessage(&payload)
@@ -179,16 +185,16 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
179185
// Silently drop old messages.
180186
break
181187
}
182-
return false, fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err)
188+
return fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err)
183189
}
184190
n.log(TraceRecvd, "P%d ← P%d: %v", msg.dest, msg.source, msg.payload)
185191
if err := receiver.ReceiveMessage(validated); err != nil {
186-
return false, fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err)
192+
return fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err)
187193
}
188194
default:
189-
return false, fmt.Errorf("unknown message payload: %v", payload)
195+
return fmt.Errorf("unknown message payload: %v", payload)
190196
}
191-
return n.queue.Len() > 0, nil
197+
return nil
192198
}
193199

194200
func (n *Network) log(level int, format string, args ...interface{}) {

sim/sim.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sim
22

33
import (
4+
"errors"
45
"fmt"
56
"strings"
67

@@ -71,8 +72,7 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error {
7172
}
7273

7374
// Run until there are no more messages, meaning termination or deadlock.
74-
moreTicks := true
75-
for moreTicks {
75+
for s.network.HasMoreTicks() {
7676
if err := s.ec.Err(); err != nil {
7777
return fmt.Errorf("error in decision: %w", err)
7878
}
@@ -129,9 +129,15 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error {
129129
break
130130
}
131131
}
132-
var err error
133-
moreTicks, err = s.network.Tick(s.adversary)
134-
if err != nil {
132+
133+
switch err := s.network.Tick(s.adversary); {
134+
case errors.Is(err, gpbft.ErrValidationNotRelevant):
135+
// Ignore error signalling valid messages that are no longer useful for the
136+
// progress of GPBFT. This can occur in normal operation depending on the order
137+
// of delivered messages. In production, deployment this error is used to signal
138+
// that the message does not need to be propagated among participants. In
139+
// simulation, we simply ignore it.
140+
case err != nil:
135141
return fmt.Errorf("error performing simulation phase: %w", err)
136142
}
137143
}

test/withhold_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,11 @@ func TestWitholdCommitAdversary(t *testing.T) {
7373
}
7474
// The adversary could convince the victim to decide a, so all must decide a.
7575
require.NoError(t, err)
76-
decision := sm.GetInstance(0).GetDecision(0)
77-
require.NotNil(t, decision)
78-
require.Equal(t, &a, decision)
76+
for _, victim := range victims {
77+
decision := sm.GetInstance(0).GetDecision(victim)
78+
require.NotNil(t, decision)
79+
require.Equal(t, &a, decision)
80+
}
7981
})
8082
}
8183
}

0 commit comments

Comments
 (0)