Skip to content

Drop messages that are no longer useful for GPBFT progression #649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions gpbft/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ var (
//
// See SupplementalData.
ErrValidationWrongSupplement = newValidationError("unexpected supplemental data")
// ErrValidationNotRelevant signals that a message is not relevant at the current
// instance, and is not worth propagating to others.
ErrValidationNotRelevant = newValidationError("message is valid but not relevant")

// ErrReceivedWrongInstance signals that a message is received with mismatching instance ID.
ErrReceivedWrongInstance = errors.New("received message for wrong instance")
Expand Down
1 change: 1 addition & 0 deletions gpbft/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestValidationError_SentinelValues(t *testing.T) {
{name: "ErrValidationInvalid", subject: ErrValidationInvalid},
{name: "ErrValidationWrongBase", subject: ErrValidationWrongBase},
{name: "ErrValidationWrongSupplement", subject: ErrValidationWrongSupplement},
{name: "ErrValidationNotRelevant", subject: ErrValidationNotRelevant},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion gpbft/gpbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,7 +1609,8 @@ func TestGPBFT_DropOld(t *testing.T) {
}
driver.RequireDeliverMessage(newQuality)
driver.RequireDeliverMessage(newDecide0)
driver.RequireDeliverMessage(newCommit0) // no quorum of decides, should still accept it
// No quorum of decides, should still accept it but be considered not relevant
driver.RequireErrOnDeliverMessage(newCommit0, gpbft.ErrValidationNotRelevant, "not relevant")
driver.RequireDeliverMessage(newDecide1)

// Once we've received two decides, we should reject messages from the "new" instance.
Expand Down
2 changes: 1 addition & 1 deletion gpbft/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Participant struct {
// if both are to be taken.
apiMutex sync.Mutex
// Mutex protecting currentInstance and committees cache for concurrent validation.
// Note that not every access need be protected:
// Note that not every access needs to be protected:
// - writes to currentInstance, and reads from it during validation,
// - reads from or writes to committees (which is written during validation).
instanceMutex sync.Mutex
Expand Down
29 changes: 25 additions & 4 deletions gpbft/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,36 @@ func (v *cachingValidator) ValidateMessage(msg *GMessage) (valid ValidatedMessag
}

// Infer whether to proceed validating the message relative to the current instance.
switch currentInstance := v.progress.Load().id; {
case msg.Vote.Instance >= currentInstance+v.committeeLookback:
switch current := v.progress.Load(); {
case msg.Vote.Instance >= current.id+v.committeeLookback:
// Message is beyond current + committee lookback.
return nil, ErrValidationNoCommittee
case msg.Vote.Instance >= currentInstance,
msg.Vote.Instance == currentInstance-1 && msg.Vote.Phase == DECIDE_PHASE:
case msg.Vote.Instance > current.id,
msg.Vote.Instance+1 == current.id && msg.Vote.Phase == DECIDE_PHASE:
// Only proceed to validate the message if it:
// * belongs to an instance within the range of current to current + committee lookback, or
// * is a DECIDE message belonging to previous instance.
case msg.Vote.Instance == current.id:
// Message belongs to current instance. Only validate messages that are relevant,
// i.e.:
// * When current instance is at DECIDE phase only validate DECIDE messages.
// * Otherwise, only validate messages that would be rebroadcasted, i.e. QUALITY,
// DECIDE, messages from previous round, and messages from current round.
// Anything else is not relevant.
switch {
case current.phase == DECIDE_PHASE && msg.Vote.Phase != DECIDE_PHASE:
return nil, ErrValidationNotRelevant
case msg.Vote.Phase == QUALITY_PHASE,
msg.Vote.Phase == DECIDE_PHASE,
// Check if message round is larger than or equal to current round.
msg.Vote.Round >= current.round,
// Check if message round is equal to previous round. Note that we increment the
// message round to check this in order to avoid unit64 wrapping.
msg.Vote.Round+1 == current.round:
// Message is relevant. Progress to further validation.
default:
return nil, ErrValidationNotRelevant
}
default:
// Message belongs to an instance older than the previous instance.
return nil, ErrValidationTooOld
Expand Down
4 changes: 4 additions & 0 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ func (h *gpbftRunner) validatePubsubMessage(ctx context.Context, _ peer.ID, msg
case errors.Is(err, gpbft.ErrValidationTooOld):
// we got the message too late
return pubsub.ValidationIgnore
case errors.Is(err, gpbft.ErrValidationNotRelevant):
// The message is valid but will not effectively aid progress of GPBFT. Ignore it
// to stop its further propagation across the network.
return pubsub.ValidationIgnore
case errors.Is(err, gpbft.ErrValidationNoCommittee):
log.Debugf("commitee error during validation: %+v", err)
return pubsub.ValidationIgnore
Expand Down
24 changes: 15 additions & 9 deletions sim/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,24 +142,30 @@ func (n *Network) SetAlarm(sender gpbft.ActorID, at time.Time) {
)
}

// HasMoreTicks checks whether there are any messages left to propagate across
// the network participants. See Tick.
func (n *Network) HasMoreTicks() bool {
return n.queue.Len() > 0
}

// Tick disseminates one message among participants and returns whether there are
// any more messages to process.
func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
func (n *Network) Tick(adv *adversary.Adversary) error {
msg := n.queue.Remove()
n.clock = msg.deliverAt

receiver, found := n.participants[msg.dest]
if !found {
return false, fmt.Errorf("message destined to unknown participant ID: %d", msg.dest)
return fmt.Errorf("message destined to unknown participant ID: %d", msg.dest)
}
switch payload := msg.payload.(type) {
case string:
if payload != "ALARM" {
return false, fmt.Errorf("unknwon string message payload: %s", payload)
return fmt.Errorf("unknwon string message payload: %s", payload)
}
n.log(TraceRecvd, "P%d %s", msg.source, payload)
if err := receiver.ReceiveAlarm(); err != nil {
return false, fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err)
return fmt.Errorf("failed to deliver alarm from %d to %d: %w", msg.source, msg.dest, err)
}
case gpbft.GMessage:
// If GST has not elapsed, check if adversary allows the propagation of message.
Expand All @@ -170,7 +176,7 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
} else if !adv.AllowMessage(msg.source, msg.dest, payload) {
// GST has not passed and adversary blocks the delivery of message; proceed to
// next tick.
return n.queue.Len() > 0, nil
return nil
}
}
validated, err := receiver.ValidateMessage(&payload)
Expand All @@ -179,16 +185,16 @@ func (n *Network) Tick(adv *adversary.Adversary) (bool, error) {
// Silently drop old messages.
break
}
return false, fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err)
return fmt.Errorf("invalid message from %d to %d: %w", msg.source, msg.dest, err)
}
n.log(TraceRecvd, "P%d ← P%d: %v", msg.dest, msg.source, msg.payload)
if err := receiver.ReceiveMessage(validated); err != nil {
return false, fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err)
return fmt.Errorf("failed to deliver message from %d to %d: %w", msg.source, msg.dest, err)
}
default:
return false, fmt.Errorf("unknown message payload: %v", payload)
return fmt.Errorf("unknown message payload: %v", payload)
}
return n.queue.Len() > 0, nil
return nil
}

func (n *Network) log(level int, format string, args ...interface{}) {
Expand Down
16 changes: 11 additions & 5 deletions sim/sim.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sim

import (
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -71,8 +72,7 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error {
}

// Run until there are no more messages, meaning termination or deadlock.
moreTicks := true
for moreTicks {
for s.network.HasMoreTicks() {
if err := s.ec.Err(); err != nil {
return fmt.Errorf("error in decision: %w", err)
}
Expand Down Expand Up @@ -129,9 +129,15 @@ func (s *Simulation) Run(instanceCount uint64, maxRounds uint64) error {
break
}
}
var err error
moreTicks, err = s.network.Tick(s.adversary)
if err != nil {

switch err := s.network.Tick(s.adversary); {
case errors.Is(err, gpbft.ErrValidationNotRelevant):
// Ignore error signalling valid messages that are no longer useful for the
// progress of GPBFT. This can occur in normal operation depending on the order
// of delivered messages. In production, deployment this error is used to signal
// that the message does not need to be propagated among participants. In
// simulation, we simply ignore it.
case err != nil:
return fmt.Errorf("error performing simulation phase: %w", err)
}
}
Expand Down
8 changes: 5 additions & 3 deletions test/withhold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ func TestWitholdCommitAdversary(t *testing.T) {
}
// The adversary could convince the victim to decide a, so all must decide a.
require.NoError(t, err)
decision := sm.GetInstance(0).GetDecision(0)
require.NotNil(t, decision)
require.Equal(t, &a, decision)
for _, victim := range victims {
decision := sm.GetInstance(0).GetDecision(victim)
require.NotNil(t, decision)
require.Equal(t, &a, decision)
}
})
}
}
Loading