Skip to content

Commit

Permalink
node,consensus: no duplicate proposals allowed
Browse files Browse the repository at this point in the history
  • Loading branch information
jchappelow committed Feb 27, 2025
1 parent 510ede1 commit aa6e655
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 41 deletions.
36 changes: 26 additions & 10 deletions node/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"errors"
"fmt"
"io"
"slices"
"sync"

ktypes "github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/node/peers"
Expand Down Expand Up @@ -160,7 +162,7 @@ func (bp *blockProp) WriteTo(w io.Writer) (int64, error) {
return n, nil*/
}

func (n *Node) announceBlkProp(ctx context.Context, blk *ktypes.Block) {
func (n *Node) announceBlkProp(ctx context.Context, blk *ktypes.Block, skipPeers ...peer.ID) {
rawBlk := ktypes.EncodeBlock(blk)
blkHash := blk.Hash()
height := blk.Header.Height
Expand All @@ -174,9 +176,9 @@ func (n *Node) announceBlkProp(ctx context.Context, blk *ktypes.Block) {
return
}

me := n.host.ID()
skipPeers = append(skipPeers, n.host.ID()) // always skip self
for _, peerID := range peers {
if peerID == me {
if slices.Contains(skipPeers, peerID) {
continue
}
prop := blockProp{Height: height, Hash: blkHash, PrevHash: blk.Header.PrevHash,
Expand Down Expand Up @@ -217,13 +219,23 @@ func (n *Node) blkPropStreamHandler(s network.Stream) {
return
}

height := prop.Height

// This requires atomicity of AcceptProposal -> download -> NotifyBlockProposal.
// We also must not ignore any proposal messages since they may be real
// (signed by leader) leader while others may be spam.
n.blkPropHandlerMtx.Lock()
defer n.blkPropHandlerMtx.Unlock()
n.blkPropHandling <- struct{}{} // block until it's our turn

height := prop.Height
var ceAsyncHandling bool // true once we've handed off to CE to asynchronously handle it, and CE takes the done func
defer func() {
if !ceAsyncHandling {
<-n.blkPropHandling
}
}()

from := s.Conn().RemotePeer()
n.log.Info("Accept proposal?", "height", height, "blockID", prop.Hash, "prevHash", prop.PrevHash,
"from_peer", peers.PeerIDStringer(from))

if !n.ce.AcceptProposal(height, prop.Hash, prop.PrevHash, prop.LeaderSig, prop.Stamp) {
// NOTE: if this is ahead of our last commit height, we have to try to catch up
Expand Down Expand Up @@ -264,11 +276,15 @@ func (n *Node) blkPropStreamHandler(s network.Stream) {
return
}

n.log.Info("processing block proposal", "height", height, "hash", hash)
n.log.Info("processing block proposal", "height", height, "hash", hash,
"from", peers.PeerIDStringer(from))

n.ce.NotifyBlockProposal(blk)
// valid new prop => reannounce
go n.announceBlkProp(context.Background(), blk)
done := sync.OnceFunc(func() {
<-n.blkPropHandling
go n.announceBlkProp(context.Background(), blk, s.Conn().RemotePeer())
})
ceAsyncHandling = true // ce will call done now, neuter the defer
n.ce.NotifyBlockProposal(blk, done)
}

// sendACK is a callback for the result of validator block execution/precommit.
Expand Down
2 changes: 2 additions & 0 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ func (ce *ConsensusEngine) resetEventLoop(ctx context.Context) {
func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg consensusMessage) {
ce.log.Info("Consensus message received", "type", msg.MsgType, "sender", hex.EncodeToString(msg.Sender))

defer msg.Handled()

switch v := msg.Msg.(type) {
case *blockProposal:
if err := ce.processBlockProposal(ctx, v); err != nil {
Expand Down
30 changes: 15 additions & 15 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkProp",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -306,7 +306,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkProp",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -344,7 +344,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -371,7 +371,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -380,7 +380,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -407,7 +407,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -426,7 +426,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew (ignored)",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand All @@ -443,7 +443,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -461,7 +461,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -488,7 +488,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -569,7 +569,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkProp",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -614,7 +614,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand Down
22 changes: 12 additions & 10 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types
return false
}

ce.log.Info("Accept proposal?", "height", height, "blockID", blkID, "prevHash", prevBlockID)

// check if the blkProposal is from the leader
valid, err := ce.leader.Verify(blkID[:], leaderSig)
if err != nil {
Expand Down Expand Up @@ -120,7 +118,11 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types
// 2. If the node is a validator and missed the block proposal message.
func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool {
if ce.stateInfo.hasBlock.Load() == height { // ce is notified about the blkAnn message already
return false
// that we processed correct proposal
if ce.stateInfo.blkProp != nil && ce.stateInfo.blkProp.blkHash == blkID {
ce.log.Debug("Already processed the block proposal", "height", height, "blockID", blkID)
return false
}
}

ce.stateInfo.mtx.RLock()
Expand Down Expand Up @@ -176,6 +178,8 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty
// report the result back to the leader.
// Only accept the block proposals from the node that this node considers as a leader.
func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg *blockProposal) error {
defer blkPropMsg.done()

if ce.role.Load() != types.RoleValidator {
ce.log.Warn("Only validators can process block proposals")
return nil
Expand Down Expand Up @@ -246,6 +250,9 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
ce.stateInfo.blkProp = blkPropMsg
ce.stateInfo.mtx.Unlock()

// allow new proposals to be checked
blkPropMsg.done()

// execCtx is applicable only for the duration of the block execution
// This is used to react to the leader's reset message by cancelling the block execution.
execCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -294,16 +301,11 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
// If the validator node processed a different block, it should rollback and reprocess the block.
// Validator nodes can skip the block execution and directly commit the block if they have already processed the block.
// The nodes should only commit the block if the appHash is valid, else halt the node.
func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func()) error {
func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, done func()) error {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

defer func() {
if doneFn != nil && ce.state.lc.height == blk.Header.Height {
// Block has been committed, release the prefetch lock on the block
doneFn()
}
}()
defer done()

if ce.state.lc.height+1 != blk.Header.Height { // only accept/process the block if it is for the next height
return nil
Expand Down
30 changes: 28 additions & 2 deletions node/consensus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ type consensusMessage struct {
Sender []byte
MsgType consensusMsgType
Msg any

// done is a callback provided by the message sender to signal that the
// message has been handled. It is optional. Do NOT call directly; use
// Handled() instead.
done func()
}

func (msg *consensusMessage) Handled() {
if msg.done != nil {
msg.done()
msg.done = nil
}
}

func (ce *ConsensusEngine) sendConsensusMessage(msg *consensusMessage) {
Expand All @@ -39,6 +51,7 @@ type blockProposal struct {
height int64
blkHash types.Hash
blk *ktypes.Block
done func()
}

func (bpm *blockProposal) Type() consensusMsgType {
Expand Down Expand Up @@ -104,21 +117,28 @@ func (ce *ConsensusEngine) sendResetMsg(msg *resetMsg) {

// NotifyBlockProposal is used by the p2p stream handler to notify the consensus engine of a block proposal.
// Only a validator should receive block proposals and notify the consensus engine, whereas others should ignore this message.
func (ce *ConsensusEngine) NotifyBlockProposal(blk *ktypes.Block) {
func (ce *ConsensusEngine) NotifyBlockProposal(blk *ktypes.Block, doneFn func()) {
if ce.role.Load() == types.RoleLeader {
return
}

done := doneFn
if done == nil {
done = func() {}
}

blkProp := &blockProposal{
height: blk.Header.Height,
blkHash: blk.Header.Hash(),
blk: blk,
done: done, // to unblock at some earlier point
}

go ce.sendConsensusMessage(&consensusMessage{
MsgType: blkProp.Type(),
Msg: blkProp,
Sender: ce.leader.Bytes(),
done: done,
})
}

Expand All @@ -132,6 +152,11 @@ func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.Commit
ce.log.Info("Notifying consensus engine of block with leader update", "newLeader", hex.EncodeToString(leader.Bytes()), "blkHash", blk.Hash().String())
}

done := doneFn
if done == nil {
done = func() {}
}

// if leader receives a block announce message, with OfflineLeaderUpdate, let
// the leader process it and relinquish leadership to the new leader.
// AcceptCommit() already verified the correctness of the votes, no need to
Expand All @@ -140,7 +165,7 @@ func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.Commit
blk: blk,
blkID: blkID,
ci: ci,
done: doneFn,
done: done,
}

// only notify if the leader doesn't already know about the block
Expand All @@ -149,6 +174,7 @@ func (ce *ConsensusEngine) NotifyBlockCommit(blk *ktypes.Block, ci *types.Commit
MsgType: blkCommit.Type(),
Msg: blkCommit,
Sender: leader.Bytes(),
done: done,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion node/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ConsensusEngine interface {
InCatchup() bool

AcceptProposal(height int64, blkID, prevBlkID types.Hash, leaderSig []byte, timestamp int64) bool
NotifyBlockProposal(blk *ktypes.Block)
NotifyBlockProposal(blk *ktypes.Block, doen func())

AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool
NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func())
Expand Down
Loading

0 comments on commit aa6e655

Please sign in to comment.