Skip to content

Commit

Permalink
review revision
Browse files Browse the repository at this point in the history
  • Loading branch information
jchappelow committed Feb 27, 2025
1 parent aa6e655 commit f0c1628
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 24 deletions.
21 changes: 12 additions & 9 deletions node/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
n.log.Warn("invalid height in blk ann request", "height", height)
return
}
n.log.Debug("blk announcement received", "blockID", blkid, "height", height)

peerID := s.Conn().RemotePeer()

n.log.Info("Accept commit?", "height", height, "blockID", blkid, "appHash", ci.AppHash,
"from_peer", peers.PeerIDStringer(peerID))

// If we are a validator and this is the commit ann for a proposed block
// that we already started executing, consensus engine will handle it.
Expand All @@ -129,12 +133,16 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
n.log.Debug("ALREADY HAVE OR FETCHING BLOCK")
return // we have or are currently fetching it, do nothing, assuming we have already re-announced
}
var ceProcessing bool
defer func() {
if !ceProcessing {
done() // we did not hand off to CE, release the pre-fetch lock
}
}()

n.log.Debug("retrieving new block", "blockID", blkid)
t0 := time.Now()

peerID := s.Conn().RemotePeer()

// First try to get from this stream.
rawBlk, err := request(s, []byte(getMsg), blkReadLimit)
if err != nil {
Expand All @@ -147,17 +155,14 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
gotHeight, rawBlk, gotCI, id, err = n.getBlkWithRetry(ctx, blkHash, 500*time.Millisecond, 10)
if err != nil {
n.log.Errorf("unable to retrieve tx %v: %v", blkid, err)
done()
return
}
if gotHeight != height {
n.log.Errorf("getblk response had unexpected height: wanted %d, got %d", height, gotHeight)
done()
return
}
if gotCI != nil && gotCI.AppHash != ci.AppHash {
n.log.Errorf("getblk response had unexpected appHash: wanted %v, got %v", ci.AppHash, gotCI.AppHash)
done()
return
}
// Ensure that the peerID from which the block was downloaded is a valid one.
Expand All @@ -172,23 +177,21 @@ func (n *Node) blkAnnStreamHandler(s network.Stream) {
blk, err := ktypes.DecodeBlock(rawBlk)
if err != nil {
n.log.Infof("decodeBlock failed for %v: %v", blkid, err)
done()
return
}
if blk.Header.Height != height {
n.log.Infof("getblk response had unexpected height: wanted %d, got %d", height, blk.Header.Height)
done()
return
}
gotBlkHash := blk.Header.Hash()
if gotBlkHash != blkHash {
n.log.Infof("invalid block hash: wanted %v, got %x", blkHash, gotBlkHash)
done()
return
}

// re-announce
n.log.Infof("downloaded block %v of height %d from %v, notifying ce of the block", blkid, height, peerID)
ceProcessing = true // neuter the deferred done, CE will call it now
n.ce.NotifyBlockCommit(blk, ci, blkHash, done)
go func() {
n.announceRawBlk(context.Background(), blkHash, height, rawBlk, blk.Header, ci, peerID, reqMsg.LeaderSig) // re-announce with the leader's signature
Expand Down
18 changes: 9 additions & 9 deletions node/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ func (n *Node) blkPropStreamHandler(s network.Stream) {
// We also must not ignore any proposal messages since they may be real
// (signed by leader) leader while others may be spam.
n.blkPropHandling <- struct{}{} // block until it's our turn

var ceAsyncHandling bool // true once we've handed off to CE to asynchronously handle it, and CE takes the done func
done := func() { <-n.blkPropHandling }
var ceProcessing bool // true once we've handed off to CE to handle it and call when done
defer func() {
if !ceAsyncHandling {
<-n.blkPropHandling
if !ceProcessing {
done()
}
}()

Expand Down Expand Up @@ -279,12 +279,12 @@ func (n *Node) blkPropStreamHandler(s network.Stream) {
n.log.Info("processing block proposal", "height", height, "hash", hash,
"from", peers.PeerIDStringer(from))

done := sync.OnceFunc(func() {
<-n.blkPropHandling
ceProcessing = true // ce will call done now, neuter the defer

n.ce.NotifyBlockProposal(blk, sync.OnceFunc(func() { // make the callback idempotent, and trigger reannounce
done()
go n.announceBlkProp(context.Background(), blk, s.Conn().RemotePeer())
})
ceAsyncHandling = true // ce will call done now, neuter the defer
n.ce.NotifyBlockProposal(blk, done)
}))
}

// sendACK is a callback for the result of validator block execution/precommit.
Expand Down
8 changes: 3 additions & 5 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types
// 1. If the node is a sentry node and doesn't have the block.
// 2. If the node is a validator and missed the block proposal message.
func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool {
ce.stateInfo.mtx.RLock()
defer ce.stateInfo.mtx.RUnlock()

if ce.stateInfo.hasBlock.Load() == height { // ce is notified about the blkAnn message already
// that we processed correct proposal
if ce.stateInfo.blkProp != nil && ce.stateInfo.blkProp.blkHash == blkID {
Expand All @@ -125,11 +128,6 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty
}
}

ce.stateInfo.mtx.RLock()
defer ce.stateInfo.mtx.RUnlock()

ce.log.Infof("Accept commit? height: %d, blockID: %s, appHash: %s, lastCommitHeight: %d", height, blkID, ci.AppHash, ce.stateInfo.height)

// check if we already downloaded the block through the block proposal message
if (ce.stateInfo.blkProp != nil && ce.stateInfo.blkProp.blkHash == blkID) && (ce.stateInfo.status == Proposed || ce.stateInfo.status == Executed) {
// block is already downloaded and/being processed, accept the commit, don't request the block again
Expand Down
2 changes: 1 addition & 1 deletion node/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ConsensusEngine interface {
InCatchup() bool

AcceptProposal(height int64, blkID, prevBlkID types.Hash, leaderSig []byte, timestamp int64) bool
NotifyBlockProposal(blk *ktypes.Block, doen func())
NotifyBlockProposal(blk *ktypes.Block, done func())

AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool
NotifyBlockCommit(blk *ktypes.Block, ci *types.CommitInfo, blkID types.Hash, doneFn func())
Expand Down

0 comments on commit f0c1628

Please sign in to comment.