Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node,consensus: no duplicate proposals allowed #1421

Merged
merged 1 commit into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions node/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
n.log.Warn("invalid height in blk ann request", "height", height)
return
}
n.log.Debug("blk announcement received", "blockID", blkid, "height", height)

peerID := s.Conn().RemotePeer()

n.log.Info("Accept commit?", "height", height, "blockID", blkid, "appHash", ci.AppHash,
"from_peer", peers.PeerIDStringer(peerID))

// If we are a validator and this is the commit ann for a proposed block
// that we already started executing, consensus engine will handle it.
Expand All @@ -133,12 +137,16 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
n.log.Debug("ALREADY HAVE OR FETCHING BLOCK")
return // we have or are currently fetching it, do nothing, assuming we have already re-announced
}
var ceProcessing bool
defer func() {
if !ceProcessing {
done() // we did not hand off to CE, release the pre-fetch lock
}
}()

n.log.Debug("retrieving new block", "blockID", blkid)
t0 := time.Now()

peerID := s.Conn().RemotePeer()

// First try to get from this stream.
rawBlk, err := request(s, []byte(getMsg), blkReadLimit)
if err != nil {
Expand All @@ -151,17 +159,14 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
gotHeight, rawBlk, gotCI, id, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10)
if err != nil {
n.log.Errorf("unable to retrieve tx %v: %v", blkid, err)
done()
return
}
if gotHeight != height {
n.log.Errorf("getblk response had unexpected height: wanted %d, got %d", height, gotHeight)
done()
return
}
if gotCI != nil && gotCI.AppHash != ci.AppHash {
n.log.Errorf("getblk response had unexpected appHash: wanted %v, got %v", ci.AppHash, gotCI.AppHash)
done()
return
}
// Ensure that the peerID from which the block was downloaded is a valid one.
Expand All @@ -176,23 +181,21 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
blk, err := ktypes.DecodeBlock(rawBlk)
if err != nil {
n.log.Infof("decodeBlock failed for %v: %v", blkid, err)
done()
return
}
if blk.Header.Height != height {
n.log.Infof("getblk response had unexpected height: wanted %d, got %d", height, blk.Header.Height)
done()
return
}
gotBlkHash := blk.Header.Hash()
if gotBlkHash != blkHash {
n.log.Infof("invalid block hash: wanted %v, got %x", blkHash, gotBlkHash)
done()
return
}

// re-announce
n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, peerID)
ceProcessing = true // neuter the deferred done, CE will call it now
n.ce.NotifyBlockCommit(blk, ci, blkHash, done)
go func() {
n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, peerID, reqMsg.LeaderSig) // re-announce with the leader's signature
Expand Down
36 changes: 26 additions & 10 deletions node/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"errors"
"fmt"
"io"
"slices"
"sync"

ktypes "github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/node/peers"
Expand Down Expand Up @@ -160,7 +162,7 @@ func (bp *blockProp) WriteTo(w io.Writer) (int64, error) {
return n, nil*/
}

func (n *Node) announceBlkProp(ctx context.Context, blk *ktypes.Block) {
func (n *Node) announceBlkProp(ctx context.Context, blk *ktypes.Block, skipPeers ...peer.ID) {
rawBlk := ktypes.EncodeBlock(blk)
blkHash := blk.Hash()
height := blk.Header.Height
Expand All @@ -174,9 +176,9 @@ func (n *Node) announceBlkProp(ctx context.Context, blk *ktypes.Block) {
return
}

me := n.host.ID()
skipPeers = append(skipPeers, n.host.ID()) // always skip self
for _, peerID := range peers {
if peerID == me {
if slices.Contains(skipPeers, peerID) {
continue
}
prop := blockProp{Height: height, Hash: blkHash, PrevHash: blk.Header.PrevHash,
Expand Down Expand Up @@ -217,13 +219,23 @@ func (n *Node) blkPropStreamHandler(s network.Stream) {
return
}

height := prop.Height

// This requires atomicity of AcceptProposal -> download -> NotifyBlockProposal.
// We also must not ignore any proposal messages since they may be real
// (signed by leader) leader while others may be spam.
n.blkPropHandlerMtx.Lock()
defer n.blkPropHandlerMtx.Unlock()
n.blkPropHandling <- struct{}{} // block until it's our turn
done := func() { <-n.blkPropHandling }
var ceProcessing bool // true once we've handed off to CE to handle it and call when done
defer func() {
if !ceProcessing {
done()
}
}()

height := prop.Height
from := s.Conn().RemotePeer()
n.log.Info("Accept proposal?", "height", height, "blockID", prop.Hash, "prevHash", prop.PrevHash,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AcceptProposal logs the same, so this can be removed.

"from_peer", peers.PeerIDStringer(from))

if !n.ce.AcceptProposal(height, prop.Hash, prop.PrevHash, prop.LeaderSig, prop.Stamp) {
// NOTE: if this is ahead of our last commit height, we have to try to catch up
Expand Down Expand Up @@ -264,11 +276,15 @@ func (n *Node) blkPropStreamHandler(s network.Stream) {
return
}

n.log.Info("processing block proposal", "height", height, "hash", hash)
n.log.Info("processing block proposal", "height", height, "hash", hash,
"from", peers.PeerIDStringer(from))

ceProcessing = true // ce will call done now, neuter the defer

n.ce.NotifyBlockProposal(blk)
// valid new prop => reannounce
go n.announceBlkProp(context.Background(), blk)
n.ce.NotifyBlockProposal(blk, sync.OnceFunc(func() { // make the callback idempotent, and trigger reannounce
done()
go n.announceBlkProp(context.Background(), blk, s.Conn().RemotePeer())
}))
}

// sendACK is a callback for the result of validator block execution/precommit.
Expand Down
2 changes: 2 additions & 0 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ func (ce *ConsensusEngine) resetEventLoop(ctx context.Context) {
func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg consensusMessage) {
ce.log.Info("Consensus message received", "type", msg.MsgType, "sender", hex.EncodeToString(msg.Sender))

defer msg.Handled()

switch v := msg.Msg.(type) {
case *blockProposal:
if err := ce.processBlockProposal(ctx, v); err != nil {
Expand Down
30 changes: 15 additions & 15 deletions node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkProp",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -306,7 +306,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkProp",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -344,7 +344,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -371,7 +371,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -380,7 +380,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -407,7 +407,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -426,7 +426,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew (ignored)",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Committed, 1, zeroHash)
Expand All @@ -443,7 +443,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand All @@ -461,7 +461,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand All @@ -488,7 +488,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropOld",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -569,7 +569,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkProp",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp1.blk)
val.NotifyBlockProposal(blkProp1.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp1.blkHash)
Expand Down Expand Up @@ -614,7 +614,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestValidatorStateMachine(t *testing.T) {
{
name: "blkPropNew",
trigger: func(t *testing.T, leader, val *ConsensusEngine) {
val.NotifyBlockProposal(blkProp2.blk)
val.NotifyBlockProposal(blkProp2.blk, nil)
},
verify: func(t *testing.T, leader, val *ConsensusEngine) error {
return verifyStatus(t, val, Executed, 0, blkProp2.blkHash)
Expand Down
28 changes: 14 additions & 14 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types
return false
}

ce.log.Info("Accept proposal?", "height", height, "blockID", blkID, "prevHash", prevBlockID)

// check if the blkProposal is from the leader
valid, err := ce.leader.Verify(blkID[:], leaderSig)
if err != nil {
Expand Down Expand Up @@ -119,14 +117,16 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types
// 1. If the node is a sentry node and doesn't have the block.
// 2. If the node is a validator and missed the block proposal message.
func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool {
if ce.stateInfo.hasBlock.Load() == height { // ce is notified about the blkAnn message already
return false
}

ce.stateInfo.mtx.RLock()
defer ce.stateInfo.mtx.RUnlock()

ce.log.Infof("Accept commit? height: %d, blockID: %s, appHash: %s, lastCommitHeight: %d", height, blkID, ci.AppHash, ce.stateInfo.height)
if ce.stateInfo.hasBlock.Load() == height { // ce is notified about the blkAnn message already
// that we processed correct proposal
if ce.stateInfo.blkProp != nil && ce.stateInfo.blkProp.blkHash == blkID {
ce.log.Debug("Already processed the block proposal", "height", height, "blockID", blkID)
return false
}
}

// check if we already downloaded the block through the block proposal message
if (ce.stateInfo.blkProp != nil && ce.stateInfo.blkProp.blkHash == blkID) && (ce.stateInfo.status == Proposed || ce.stateInfo.status == Executed) {
Expand Down Expand Up @@ -176,6 +176,8 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty
// report the result back to the leader.
// Only accept the block proposals from the node that this node considers as a leader.
func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg *blockProposal) error {
defer blkPropMsg.done()

if ce.role.Load() != types.RoleValidator {
ce.log.Warn("Only validators can process block proposals")
return nil
Expand Down Expand Up @@ -246,6 +248,9 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
ce.stateInfo.blkProp = blkPropMsg
ce.stateInfo.mtx.Unlock()

// allow new proposals to be checked
blkPropMsg.done()

// execCtx is applicable only for the duration of the block execution
// This is used to react to the leader's reset message by cancelling the block execution.
execCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -294,16 +299,11 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
// If the validator node processed a different block, it should rollback and reprocess the block.
// Validator nodes can skip the block execution and directly commit the block if they have already processed the block.
// The nodes should only commit the block if the appHash is valid, else halt the node.
func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func()) error {
func (ce *ConsensusEngine) commitBlock(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, done func()) error {
ce.state.mtx.Lock()
defer ce.state.mtx.Unlock()

defer func() {
if doneFn != nil && ce.state.lc.height == blk.Header.Height {
// Block has been committed, release the prefetch lock on the block
doneFn()
}
}()
defer done()

if ce.state.lc.height+1 != blk.Header.Height { // only accept/process the block if it is for the next height
return nil
Expand Down
Loading
Loading