Skip to content

[blockindex] sync indexers to keep sync with master #4591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions blockchain/blockdao/blockindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ type (
PutBlock(context.Context, *block.Block) error
}

// BlockIndexerWithStart defines an interface to accept block to build index from a start height
BlockIndexerWithStart interface {
// BlockIndexerWithActive tells if an indexer is activated
BlockIndexerWithActive interface {
BlockIndexer
// StartHeight returns the start height of the indexer
StartHeight() uint64
// IsActive indicates if the index is activated (passed its start height)
IsActive() bool
}

// BlockIndexerChecker defines a checker of block indexer
Expand All @@ -55,32 +55,25 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI
if !ok {
return errors.New("failed to find genesis ctx")
}
tipHeight, err := indexer.Height()
indexTip, err := indexer.Height()
if err != nil {
return err
}
daoTip, err := bic.dao.Height()
if err != nil {
return err
}
if tipHeight > daoTip {
if indexTip > daoTip {
return errors.New("indexer tip height cannot by higher than dao tip height")
}
tipBlk, err := bic.dao.GetBlockByHeight(tipHeight)
tipBlk, err := bic.dao.GetBlockByHeight(indexTip)
if err != nil {
return err
}
if targetHeight == 0 || targetHeight > daoTip {
targetHeight = daoTip
}
startHeight := tipHeight + 1
if indexerWS, ok := indexer.(BlockIndexerWithStart); ok {
indexStartHeight := indexerWS.StartHeight()
if indexStartHeight > startHeight {
startHeight = indexStartHeight
}
}
for i := startHeight; i <= targetHeight; i++ {
for i := indexTip + 1; i <= targetHeight; i++ {
// ternimate if context is done
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "terminate the indexer checking")
Expand Down
4 changes: 4 additions & 0 deletions blockindex/contractstaking/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (s *Indexer) StartHeight() uint64 {
return s.config.ContractDeployHeight
}

func (s *Indexer) IsActive() bool {
return s.cache.Height() > s.config.ContractDeployHeight
}

// ContractAddress returns the contract address
func (s *Indexer) ContractAddress() string {
return s.config.ContractAddress
Expand Down
81 changes: 36 additions & 45 deletions blockindex/sync_indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,49 @@ package blockindex
import (
"context"

"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/blockchain/block"
"github.com/iotexproject/iotex-core/v2/blockchain/blockdao"
)

// SyncIndexers is a special index that includes multiple indexes,
var (
ErrNotSychronized = errors.New("indexers are not synchronized")
)

// SyncIndexers is a special index that includes a master and multiple indexes,
// which stay in sync when blocks are added.
type SyncIndexers struct {
indexers []blockdao.BlockIndexer
startHeights []uint64 // start height of each indexer, which will be determined when the indexer is started
minStartHeight uint64 // minimum start height of all indexers
master blockdao.BlockIndexer
indexers []blockdao.BlockIndexer
}

// NewSyncIndexers creates a new SyncIndexers
// each indexer will PutBlock one by one in the order of the indexers
func NewSyncIndexers(indexers ...blockdao.BlockIndexer) *SyncIndexers {
return &SyncIndexers{indexers: indexers}
func NewSyncIndexers(master blockdao.BlockIndexer, indexers ...blockdao.BlockIndexer) *SyncIndexers {
return &SyncIndexers{
master: master,
indexers: indexers}
}

// Start starts the indexer group
func (ig *SyncIndexers) Start(ctx context.Context) error {
if err := ig.master.Start(ctx); err != nil {
return err
}
for _, indexer := range ig.indexers {
if err := indexer.Start(ctx); err != nil {
return err
}
}
return ig.initStartHeight()
return ig.checkSync()
}

// Stop stops the indexer group
func (ig *SyncIndexers) Stop(ctx context.Context) error {
if err := ig.master.Stop(ctx); err != nil {
return err
}
for _, indexer := range ig.indexers {
if err := indexer.Stop(ctx); err != nil {
return err
Expand All @@ -48,17 +61,13 @@ func (ig *SyncIndexers) Stop(ctx context.Context) error {

// PutBlock puts a block into the indexers in the group
func (ig *SyncIndexers) PutBlock(ctx context.Context, blk *block.Block) error {
for i, indexer := range ig.indexers {
// check if the block is higher than the indexer's start height
if blk.Height() < ig.startHeights[i] {
continue
}
// check if the block is higher than the indexer's height
for _, indexer := range ig.indexers {
height, err := indexer.Height()
if err != nil {
return err
}
if blk.Height() <= height {
// if the block is lower than the indexer's height, do nothing
continue
}
// put block
Expand All @@ -69,46 +78,28 @@ func (ig *SyncIndexers) PutBlock(ctx context.Context, blk *block.Block) error {
return nil
}

// StartHeight returns the minimum start height of the indexers in the group
func (ig *SyncIndexers) StartHeight() uint64 {
return ig.minStartHeight
}

// Height returns the minimum height of the indexers in the group
// Height returns the height of the indexers in the group
// which must be same as master's
func (ig *SyncIndexers) Height() (uint64, error) {
var height uint64
for i, indexer := range ig.indexers {
h, err := indexer.Height()
if err != nil {
return 0, err
}
if i == 0 || h < height {
height = h
}
}
return height, nil
return ig.master.Height()
}

// initStartHeight initializes the start height of the indexers in the group
// for every indexer, the start height is the maximum of tipheight+1 and startheight
func (ig *SyncIndexers) initStartHeight() error {
ig.minStartHeight = 0
ig.startHeights = make([]uint64, len(ig.indexers))
func (ig *SyncIndexers) checkSync() error {
masterHeight, err := ig.master.Height()
if err != nil {
return err
}
// all other indexers must have same height as master to be in-sync
for i, indexer := range ig.indexers {
if start, ok := indexer.(interface{ IsActive() bool }); ok && !start.IsActive() {
continue
}
tipHeight, err := indexer.Height()
if err != nil {
return err
}
indexStartHeight := tipHeight + 1
if indexerWithStart, ok := indexer.(blockdao.BlockIndexerWithStart); ok {
startHeight := indexerWithStart.StartHeight()
if startHeight > indexStartHeight {
indexStartHeight = startHeight
}
}
ig.startHeights[i] = indexStartHeight
if i == 0 || indexStartHeight < ig.minStartHeight {
ig.minStartHeight = indexStartHeight
if tipHeight != masterHeight {
return errors.Wrapf(ErrNotSychronized, "indexer %d, expecting height = %d, actual height = %d", i, masterHeight, tipHeight)
}
}
return nil
Expand Down
Loading
Loading