Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.

Commit 87c97e8

Browse files
Fix
1 parent 34e08f1 commit 87c97e8

File tree

5 files changed

+66
-6
lines changed

5 files changed

+66
-6
lines changed

consensus/polybft/mocks_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,12 @@ func (tp *syncerMock) Sync(func(*types.FullBlock) bool) error {
395395
return args.Error(0)
396396
}
397397

398+
func (tp *syncerMock) IsSyncingWithPeer() bool {
399+
args := tp.Called()
400+
401+
return args.Bool(0)
402+
}
403+
398404
func init() {
399405
// setup custom hash header func
400406
setupHeaderHashFunc()

consensus/polybft/polybft.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,12 +599,24 @@ func (p *Polybft) startConsensusProtocol() {
599599
if ev.Source == "syncer" && ev.NewChain[0].Number >= p.blockchain.CurrentHeader().Number {
600600
p.logger.Info("sync block notification received", "block height", ev.NewChain[0].Number,
601601
"current height", p.blockchain.CurrentHeader().Number)
602-
syncerBlockCh <- struct{}{}
602+
603+
select {
604+
case syncerBlockCh <- struct{}{}:
605+
default:
606+
}
603607
}
604608
}
605609
}
606610
}()
607611

612+
// wait until he stops syncing
613+
p.logger.Info("waiting to stop syncing so that we can try to join consensus if node is a validator")
614+
615+
for p.syncer.IsSyncingWithPeer() {
616+
}
617+
618+
p.logger.Info("node synced up on start. Trying to join consensus if validator")
619+
608620
var (
609621
sequenceCh <-chan struct{}
610622
stopSequence func()

e2e-polybft/e2e/consensus_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package e2e
22

33
import (
4+
"bytes"
45
"fmt"
56
"math/big"
67
"path"
@@ -53,22 +54,40 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) {
5354
})
5455

5556
t.Run("sync protocol, drop single validator node", func(t *testing.T) {
57+
validatorSrv := cluster.Servers[0]
58+
validatorAcc, err := sidechain.GetAccountFromDir(validatorSrv.DataDir())
59+
require.NoError(t, err)
60+
5661
// query the current block number, as it is a starting point for the test
57-
currentBlockNum, err := cluster.Servers[0].JSONRPC().Eth().BlockNumber()
62+
currentBlockNum, err := validatorSrv.JSONRPC().Eth().BlockNumber()
5863
require.NoError(t, err)
5964

65+
// wait for 2 epochs to elapse, before we stop the node
66+
require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute))
67+
6068
// stop one node
61-
node := cluster.Servers[0]
62-
node.Stop()
69+
validatorSrv.Stop()
70+
71+
// check what is the current block on the running nodes
72+
currentBlockNum, err = cluster.Servers[1].JSONRPC().Eth().BlockNumber()
73+
require.NoError(t, err)
6374

6475
// wait for 2 epochs to elapse, so that rest of the network progresses
6576
require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute))
6677

6778
// start the node again
68-
node.Start()
79+
validatorSrv.Start()
6980

7081
// wait 2 more epochs to elapse and make sure that stopped node managed to catch up
7182
require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute))
83+
84+
// wait until the validator mines one block to check if he is back in consensus
85+
require.NoError(t, cluster.WaitUntil(3*time.Minute, 2*time.Second, func() bool {
86+
latestBlock, err := cluster.Servers[0].JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false)
87+
require.NoError(t, err)
88+
89+
return bytes.Equal(validatorAcc.Address().Bytes(), latestBlock.Miner.Bytes())
90+
}))
7291
})
7392

7493
t.Run("sync protocol, drop single non-validator node", func(t *testing.T) {
@@ -87,7 +106,12 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) {
87106
node.Start()
88107

89108
// wait 2 more epochs to elapse and make sure that stopped node managed to catch up
90-
require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute))
109+
blockToWait := currentBlockNum + 4*epochSize
110+
require.NoError(t, cluster.WaitForBlock(blockToWait, 2*time.Minute))
111+
112+
latestBlockOnDroppedNode, err := node.JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false)
113+
require.NoError(t, err)
114+
require.GreaterOrEqual(t, latestBlockOnDroppedNode.Number, blockToWait)
91115
})
92116
}
93117

syncer/syncer.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package syncer
33
import (
44
"errors"
55
"fmt"
6+
"sync/atomic"
67
"time"
78

89
"github.com/0xPolygon/polygon-edge/helper/progress"
@@ -38,6 +39,8 @@ type syncer struct {
3839

3940
// Channel to notify Sync that a new status arrived
4041
newStatusCh chan struct{}
42+
43+
isSyncing atomic.Bool
4144
}
4245

4346
func NewSyncer(
@@ -219,6 +222,9 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64,
219222
return 0, false, err
220223
}
221224

225+
s.setIsSyncing(true)
226+
defer s.setIsSyncing(false)
227+
222228
// Create a blockchain subscription for the sync progression and start tracking
223229
subscription := s.blockchain.SubscribeEvents()
224230
s.syncProgression.StartProgression(localLatest+1, subscription)
@@ -272,6 +278,16 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64,
272278
}
273279
}
274280

281+
// setIsSyncing updates the isSyncing field
282+
func (s *syncer) setIsSyncing(isSyncing bool) {
283+
s.isSyncing.Store(isSyncing)
284+
}
285+
286+
// IsSyncingWithPeer indicates if node is syncing with peer
287+
func (s *syncer) IsSyncingWithPeer() bool {
288+
return s.isSyncing.Load()
289+
}
290+
275291
func updateMetrics(fullBlock *types.FullBlock) {
276292
metrics.SetGauge([]string{syncerMetrics, "tx_num"}, float32(len(fullBlock.Block.Transactions)))
277293
metrics.SetGauge([]string{syncerMetrics, "receipts_num"}, float32(len(fullBlock.Receipts)))

syncer/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ type Syncer interface {
7070
HasSyncPeer() bool
7171
// Sync starts routine to sync blocks
7272
Sync(func(*types.FullBlock) bool) error
73+
74+
IsSyncingWithPeer() bool
7375
}
7476

7577
type Progression interface {

0 commit comments

Comments
 (0)