Skip to content

Commit

Permalink
update checkpoint config and fix log crashes
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi committed Feb 27, 2025
1 parent cc32a05 commit aabcba6
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 20 deletions.
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func DefaultConfig() *Config {
Extensions: make(map[string]map[string]string),
Checkpoint: Checkpoint{
Height: 0,
Hash: types.Hash{},
Hash: "",
},
Erc20Bridge: ERC20BridgeConfig{
RPC: make(map[string]string),
Expand Down Expand Up @@ -467,8 +467,8 @@ type MigrationConfig struct {

type Checkpoint struct {
// Height 0 indicates no checkpoint is set. The leader will attempt regular block sync.
Height int64 `toml:"height" comment:"checkpoint height for the leader. If the leader is behind this height, it will sync to this height before attempting to propose a new block."`
Hash types.Hash `toml:"hash" comment:"checkpoint block hash."`
Height int64 `toml:"height" comment:"checkpoint height for the leader. If the leader is behind this height, it will sync to this height before attempting to propose a new block"`
Hash string `toml:"hash" comment:"checkpoint block hash"`
}

type ERC20BridgeConfig struct {
Expand Down
12 changes: 6 additions & 6 deletions node/consensus/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (ce *ConsensusEngine) doBlockSync(ctx context.Context) error {
func (ce *ConsensusEngine) VerifyCheckpoint() error {
// verify the checkpoint hash and height. If the checkpoints
// are not set, return
if ce.checkpoint.Height == 0 {
if ce.checkpoint.height == 0 {
return nil
}

Expand All @@ -58,12 +58,12 @@ func (ce *ConsensusEngine) VerifyCheckpoint() error {

height, hash := ce.stateInfo.lastCommit.height, ce.stateInfo.lastCommit.blkHash

if height < ce.checkpoint.Height {
return fmt.Errorf("checkpoint verification failed: height: %d [expected: %d]", height, ce.checkpoint.Height)
if height < ce.checkpoint.height {
return fmt.Errorf("checkpoint verification failed: height: %d [expected: %d]", height, ce.checkpoint.height)
}

if height == ce.checkpoint.Height && hash != ce.checkpoint.Hash {
return fmt.Errorf("checkpoint verification failed: height: %d hash: [expected: %s, curr: %s]", ce.checkpoint.Height, ce.checkpoint.Hash, hash)
if height == ce.checkpoint.height && hash != ce.checkpoint.hash {
return fmt.Errorf("checkpoint verification failed: height: %d hash: [expected: %s, curr: %s]", ce.checkpoint.height, ce.checkpoint.hash, hash)
}

return nil
Expand All @@ -73,7 +73,7 @@ func (ce *ConsensusEngine) leaderBlockSync(ctx context.Context) error {
startHeight := ce.lastCommitHeight()
ce.log.Info("Starting block sync", "height", startHeight+1)

checkpoint := ce.checkpoint.Height
checkpoint := ce.checkpoint.height
if checkpoint <= startHeight {
ce.log.Info("Leader is synced to the checkpoint", "height", startHeight)
return nil
Expand Down
33 changes: 24 additions & 9 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type ConsensusEngine struct {
broadcastTxTimeout time.Duration

// checkpoint is the initial checkpoint for the leader to sync to the network.
checkpoint config.Checkpoint
checkpoint checkpoint

genesisHeight int64 // height of the genesis block
leader crypto.PublicKey // TODO: update with network param updates touching it
Expand Down Expand Up @@ -151,6 +151,11 @@ type ConsensusEngine struct {
catchupTimeout time.Duration
}

type checkpoint struct {
height int64
hash types.Hash
}

type leaderUpdate struct {
// Candidate is the new leader candidate
Candidate crypto.PublicKey
Expand Down Expand Up @@ -329,7 +334,6 @@ func New(cfg *Config) (*ConsensusEngine, error) {
blkProposalInterval: cfg.BlockProposalInterval,
blkAnnInterval: cfg.BlockAnnInterval,
broadcastTxTimeout: cfg.BroadcastTxTimeout,
checkpoint: cfg.Checkpoint,
db: cfg.DB,
leaderUpdates: nil,
leaderFile: config.LeaderUpdatesFilePath(cfg.RootDir),
Expand Down Expand Up @@ -378,6 +382,16 @@ func New(cfg *Config) (*ConsensusEngine, error) {
ce.proposeTimeout = defaultProposeTimeout
}

ce.checkpoint.height = cfg.Checkpoint.Height
ce.checkpoint.hash = zeroHash
if cfg.Checkpoint.Hash != "" {
hash, err := ktypes.NewHashFromString(cfg.Checkpoint.Hash)
if err != nil {
return nil, fmt.Errorf("invalid checkpoint hash: %w", err)
}
ce.checkpoint.hash = hash
}

// load the leader updates from the file if any
if err := ce.loadLeaderUpdates(); err != nil {
return nil, fmt.Errorf("error loading leader updates: %w", err)
Expand Down Expand Up @@ -575,7 +589,7 @@ func (ce *ConsensusEngine) resetEventLoop(ctx context.Context) {

// handleConsensusMessages handles the consensus messages based on the message type.
func (ce *ConsensusEngine) handleConsensusMessages(ctx context.Context, msg consensusMessage) {
ce.log.Info("Consensus message received", "type", msg.MsgType, "sender", hex.EncodeToString(msg.Sender))
ce.log.Debug("Consensus message received", "msg", msg.String(), "sender", hex.EncodeToString(msg.Sender))

defer msg.Handled()

Expand Down Expand Up @@ -964,23 +978,24 @@ func (ce *ConsensusEngine) processCurrentBlock(ctx context.Context) error {

// Fetch the block at this height and commit it, if it's the right one,
// otherwise rollback.
blkHash, rawBlk, ci, err := ce.getBlock(ctx, ce.state.blkProp.height)
height := ce.state.blkProp.height
blkHash, rawBlk, ci, err := ce.getBlock(ctx, height)
if err != nil {
return err
}

if blkHash != ce.state.blkProp.blkHash { // processed incorrect block
if err := ce.rollbackState(ctx); err != nil {
return fmt.Errorf("error aborting incorrect block execution: height: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
return fmt.Errorf("error aborting incorrect block execution: height: %d, blockID: %v, error: %w", height, blkHash, err)
}

blk, err := ktypes.DecodeBlock(rawBlk)
if err != nil {
return fmt.Errorf("failed to decode the block, blkHeight: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
return fmt.Errorf("failed to decode the block, blkHeight: %d, blockID: %v, error: %w", height, blkHash, err)
}

if err := ce.processAndCommit(ctx, blk, ci, blkHash); err != nil {
return fmt.Errorf("failed to replay the block: blkHeight: %d, blockID: %v, error: %w", ce.state.blkProp.height, blkHash, err)
return fmt.Errorf("failed to replay the block: blkHeight: %d, blockID: %v, error: %w", height, blkHash, err)
}
// recovered to the correct block -> continue to replay blocks from network
return nil
Expand All @@ -995,11 +1010,11 @@ func (ce *ConsensusEngine) processCurrentBlock(ctx context.Context) error {

// All correct! Commit the block.
if err := ce.acceptCommitInfo(ci, blkHash); err != nil {
return fmt.Errorf("failed to validate the commit info: height: %d, error: %w", ce.state.blkProp.height, err)
return fmt.Errorf("failed to validate the commit info: height: %d, error: %w", height, err)
}

if err := ce.commit(ctx); err != nil {
return fmt.Errorf("failed to commit the block: height: %d, error: %w", ce.state.blkProp.height, err)
return fmt.Errorf("failed to commit the block: height: %d, error: %w", height, err)
}

return ctx.Err()
Expand Down
5 changes: 3 additions & 2 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
return nil
}

ce.log.Info("Aborting execution of stale block proposal", "height", blkPropMsg.height, "blockID", ce.state.blkProp.blkHash)
blkHash := ce.state.blkProp.blkHash
ce.log.Info("Aborting execution of stale block proposal", "height", blkPropMsg.height, "blockID", blkHash)
if err := ce.rollbackState(ctx); err != nil {
ce.log.Error("Error aborting execution of block", "height", blkPropMsg.height, "blockID", ce.state.blkProp.blkHash, "error", err)
ce.log.Error("Error aborting execution of block", "height", blkPropMsg.height, "blockID", blkHash, "error", err)
return err
}
}
Expand Down
13 changes: 13 additions & 0 deletions node/consensus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ func (ce *ConsensusEngine) sendConsensusMessage(msg *consensusMessage) {
ce.msgChan <- *msg
}

func (cm *consensusMessage) String() string {
switch v := cm.Msg.(type) {
case *blockProposal:
return cm.Msg.(*blockProposal).String()
case *blockAnnounce:
return cm.Msg.(*blockAnnounce).String()
case *vote:
return cm.Msg.(*vote).String()
default:
return fmt.Sprintf("Unknown message type: %T", v)
}
}

// BlockProposal is a message that is sent to the consensus engine to notify
// that a new block proposal has been received from the leader.
// Ensure that the source of the block proposal is the leader.
Expand Down

0 comments on commit aabcba6

Please sign in to comment.