Skip to content

Commit

Permalink
node,consensus: no duplicate proposals allowed
Browse files Browse the repository at this point in the history
This addresses a strange proposal announcement amplification that can occur given the right timing.

In short, to support reannoucement of block proposals as well as preventing malicious spam
proposals from stacking up behind the consensus event loop, we made the entire proposal
stream handler atomic w.r.t. AcceptProposal => download = NotifyBlockProposal. However,
NotifyBlockProposal is mostly asynchronous, meaning the next call to AcceptProposal will
return true and allow the same proposal, which includes a full block, to be downloaded
again and queued in another consensus msg for NotifyBlockProposal.

To resolve this, we take a nearly identical approach to that used with AcceptCommit and
its stream handler. The main difference being that we completely block any subsequent
proposal until the prior is processed (either fully rejected or fully accepted and block
execution begun).
  • Loading branch information
jchappelow authored Feb 27, 2025
1 parent c2d4b51 commit cc32a05
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 54 deletions.
21 changes: 12 additions & 9 deletions node/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
n.log.Warn("invalid height in blk ann request", "height", height)
return
}
n.log.Debug("blk announcement received", "blockID", blkid, "height", height)

peerID := s.Conn().RemotePeer()

n.log.Info("Accept commit?", "height", height, "blockID", blkid, "appHash", ci.AppHash,
"from_peer", peers.PeerIDStringer(peerID))

// If we are a validator and this is the commit ann for a proposed block
// that we already started executing, consensus engine will handle it.
Expand All @@ -133,12 +137,16 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
n.log.Debug("ALREADY HAVE OR FETCHING BLOCK")
return // we have or are currently fetching it, do nothing, assuming we have already re-announced
}
var ceProcessing bool
defer func() {
if !ceProcessing {
done() // we did not hand off to CE, release the pre-fetch lock
}
}()

n.log.Debug("retrieving new block", "blockID", blkid)
t0 := time.Now()

peerID := s.Conn().RemotePeer()

// First try to get from this stream.
rawBlk, err := request(s, []byte(getMsg), blkReadLimit)
if err != nil {
Expand All @@ -151,17 +159,14 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
gotHeight, rawBlk, gotCI, id, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10)
if err != nil {
n.log.Errorf("unable to retrieve tx %v: %v", blkid, err)
done()
return
}
if gotHeight != height {
n.log.Errorf("getblk response had unexpected height: wanted %d, got %d", height, gotHeight)
done()
return
}
if gotCI != nil && gotCI.AppHash != ci.AppHash {
n.log.Errorf("getblk response had unexpected appHash: wanted %v, got %v", ci.AppHash, gotCI.AppHash)
done()
return
}
// Ensure that the peerID from which the block was downloaded is a valid one.
Expand All @@ -176,23 +181,21 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
blk, err := ktypes.DecodeBlock(rawBlk)
if err != nil {
n.log.Infof("decodeBlock failed for %v: %v", blkid, err)
done()
return
}
if blk.Header.Height != height {
n.log.Infof("getblk response had unexpected height: wanted %d, got %d", height, blk.Header.Height)
done()
return
}
gotBlkHash := blk.Header.Hash()
if gotBlkHash != blkHash {
n.log.Infof("invalid block hash: wanted %v, got %x", blkHash, gotBlkHash)
done()
return
}

// re-announce
n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, peerID)
ceProcessing = true // neuter the deferred done, CE will call it now
n.ce.NotifyBlockCommit(blk, ci, blkHash, done)
go func() {
n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, peerID, reqMsg.LeaderSig) // re-announce with the leader's signature
Expand Down
36 changes: 26 additions & 10 deletions node/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"errors"
"fmt"
"io"
"slices"
"sync"

ktypes "github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/node/peers"
Expand Down Expand Up @@ -160,7 +162,7 @@ func (bp *blockProp) WriteTo(w io.Writer) (int64, error) {
return n, nil*/
}

func (n *Node) announceBlkProp(ctx context.Context, blk *ktypes.Block) {
func (n *Node) announceBlkProp(ctx context.Context, blk *ktypes.Block, skipPeers ...peer.ID) {
rawBlk := ktypes.EncodeBlock(blk)
blkHash := blk.Hash()
height := blk.Header.Height
Expand All @@ -174,9 +176,9 @@ func (n *Node) announceBlkProp(ctx context.Context, blk *ktypes.Block) {
return
}

me := n.host.ID()
skipPeers = append(skipPeers, n.host.ID()) // always skip self
for _, peerID := range peers {
if peerID == me {
if slices.Contains(skipPeers, peerID) {
continue
}
prop := blockProp{Height: height, Hash: blkHash, PrevHash: blk.Header.PrevHash,
Expand Down Expand Up @@ -217,13 +219,23 @@ func (n *Node) blkPropStreamHandler(s network.Stream) {
return
}

height := prop.Height

// This requires atomicity of AcceptProposal -> download -> NotifyBlockProposal.
// We also must not ignore any proposal messages since they may be real
// (signed by leader) leader while others may be spam.
n.blkPropHandlerMtx.Lock()
defer n.blkPropHandlerMtx.Unlock()
n.blkPropHandling <- struct{}{} // block until it's our turn
done := func() { <-n.blkPropHandling }
var ceProcessing bool // true once we've handed off to CE to handle it and call when done
defer func() {
if !ceProcessing {
done()
}
}()

height := prop.Height
from := s.Conn().RemotePeer()
n.log.Info("Accept proposal?", "height", height, "blockID", prop.Hash, "prevHash", prop.PrevHash,
"from_peer", peers.PeerIDStringer(from))

if !n.ce.AcceptProposal(height, prop.Hash, prop.PrevHash, prop.LeaderSig, prop.Stamp) {
// NOTE: if this is ahead of our last commit height, we have to try to catch up
Expand Down Expand Up @@ -264,11 +276,15 @@ func (n *Node) blkPropStreamHandler(s network.Stream) {
return
}

n.log.Info("processing block proposal", "height", height, "hash", hash)
n.log.Info("processing block proposal", "height", height, "hash", hash,
"from", peers.PeerIDStringer(from))

ceProcessing = true // ce will call done now, neuter the defer

n.ce.NotifyBlockProposal(blk)
// valid new prop => reannounce
go n.announceBlkProp(context.Background(), blk)
n.ce.NotifyBlockProposal(blk, sync.OnceFunc(func() { // make the callback idempotent, and trigger reannounce
done()
go n.announceBlkProp(context.Background(), blk, s.Conn().RemotePeer())
}))
}

// sendACK is a callback for the result of validator block execution/precommit.
Expand Down
2 changes: 2 additions & 0 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ func (ce *ConsensusEngine) resetEventLoop(ctx context.Context) {
func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg consensusMessage) {
ce.log.Info("Consensus message received", "type", msg.MsgType, "sender", hex.EncodeToString(msg.Sender))

defer msg.Handled()

switch v := msg.Msg.(type) {
case *blockProposal:
if err := ce.processBlockProposal(ctx, v); err != nil {
Expand Down
30 changes: 15 additions & 15 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkProp",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -306,7 +306,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkProp",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -344,7 +344,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -371,7 +371,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -380,7 +380,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -407,7 +407,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -426,7 +426,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew (ignored)",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand All @@ -443,7 +443,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -461,7 +461,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -488,7 +488,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -569,7 +569,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkProp",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -614,7 +614,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand Down
28 changes: 14 additions & 14 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types
return false
}

ce.log.Info("Accept proposal?", "height", height, "blockID", blkID, "prevHash", prevBlockID)

// check if the blkProposal is from the leader
valid, err := ce.leader.Verify(blkID[:], leaderSig)
if err != nil {
Expand Down Expand Up @@ -119,14 +117,16 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types
// 1. If the node is a sentry node and doesn't have the block.
// 2. If the node is a validator and missed the block proposal message.
func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool {
if ce.stateInfo.hasBlock.Load() == height { // ce is notified about the blkAnn message already
return false
}

ce.stateInfo.mtx.RLock()
defer ce.stateInfo.mtx.RUnlock()

ce.log.Infof("Accept commit? height: %d, blockID: %s, appHash: %s, lastCommitHeight: %d", height, blkID, ci.AppHash, ce.stateInfo.height)
if ce.stateInfo.hasBlock.Load() == height { // ce is notified about the blkAnn message already
// that we processed correct proposal
if ce.stateInfo.blkProp != nil && ce.stateInfo.blkProp.blkHash == blkID {
ce.log.Debug("Already processed the block proposal", "height", height, "blockID", blkID)
return false
}
}

// check if we already downloaded the block through the block proposal message
if (ce.stateInfo.blkProp != nil && ce.stateInfo.blkProp.blkHash == blkID) && (ce.stateInfo.status == Proposed || ce.stateInfo.status == Executed) {
Expand Down Expand Up @@ -176,6 +176,8 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty
// report the result back to the leader.
// Only accept the block proposals from the node that this node considers as a leader.
func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg *blockProposal) error {
defer blkPropMsg.done()

if ce.role.Load() != types.RoleValidator {
ce.log.Warn("Only validators can process block proposals")
return nil
Expand Down Expand Up @@ -246,6 +248,9 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
ce.stateInfo.blkProp = blkPropMsg
ce.stateInfo.mtx.Unlock()

// allow new proposals to be checked
blkPropMsg.done()

// execCtx is applicable only for the duration of the block execution
// This is used to react to the leader's reset message by cancelling the block execution.
execCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -294,16 +299,11 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
// If the validator node processed a different block, it should rollback and reprocess the block.
// Validator nodes can skip the block execution and directly commit the block if they have already processed the block.
// The nodes should only commit the block if the appHash is valid, else halt the node.
func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func()) error {
func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, done func()) error {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

defer func() {
if doneFn != nil && ce.state.lc.height == blk.Header.Height {
// Block has been committed, release the prefetch lock on the block
doneFn()
}
}()
defer done()

if ce.state.lc.height+1 != blk.Header.Height { // only accept/process the block if it is for the next height
return nil
Expand Down
Loading

0 comments on commit cc32a05

Please sign in to comment.