Skip to content

Commit

Permalink
Add tests for nodes crashing right after a timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Feb 26, 2025
1 parent c61d085 commit 4962488
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 31 deletions.
2 changes: 2 additions & 0 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"runtime/debug"
"simplex/record"
"slices"
"sync"
Expand Down Expand Up @@ -952,6 +953,7 @@ func (e *Epoch) persistEmptyNotarization(emptyNotarization *EmptyNotarization, s
return err
}

debug.PrintStack()
e.Logger.Debug("Persisted empty block to WAL",
zap.Int("size", len(emptyNotarizationRecord)),
zap.Uint64("round", emptyNotarization.Vote.Round))
Expand Down
142 changes: 111 additions & 31 deletions epoch_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package simplex_test

import (
"context"
"fmt"
. "simplex"
"simplex/testutil"
"sync/atomic"
Expand Down Expand Up @@ -117,9 +118,20 @@ func TestEpochLeaderFailoverWithEmptyNotarization(t *testing.T) {
nextBlockSeqToCommit := uint64(3)
nextRoundToCommit := uint64(4)

// Ensure our node proposes block with sequence 3 for round 4
notarizeAndFinalizeRound(t, nodes, nextRoundToCommit, nextBlockSeqToCommit, e, bb, quorum, storage, false)
require.Equal(t, uint64(4), storage.Height())
bbAfterCrash := &testBlockBuilder{
out: make(chan *testBlock, 1),
in: make(chan *testBlock, 1),
blockShouldBeBuilt: make(chan struct{}, 1),
}

bbAfterCrash.out <- block3.(*testBlock)
bbAfterCrash.in <- block3.(*testBlock)

runCrashAndRestartExecution(t, e, nodes, bb, wal, storage, func(t *testing.T, e *Epoch, bb *testBlockBuilder, storage *InMemStorage, wal *testWAL) {
// Ensure our node proposes block with sequence 3 for round 4
notarizeAndFinalizeRound(t, nodes, nextRoundToCommit, nextBlockSeqToCommit, e, bb, quorum, storage, false)
require.Equal(t, uint64(4), storage.Height())
})
}

func TestEpochLeaderFailoverReceivesEmptyVotesEarly(t *testing.T) {
Expand Down Expand Up @@ -188,40 +200,45 @@ func TestEpochLeaderFailoverReceivesEmptyVotesEarly(t *testing.T) {

waitForBlockProposerTimeout(t, e, start)

wal.lock.Lock()
walContent, err := wal.ReadAll()
require.NoError(t, err)
wal.lock.Unlock()
runCrashAndRestartExecution(t, e, nodes, bb, wal, storage, func(t *testing.T, e *Epoch, bb *testBlockBuilder, storage *InMemStorage, wal *testWAL) {
wal.lock.Lock()
walContent, err := wal.ReadAll()
require.NoError(t, err)
wal.lock.Unlock()

rawEmptyVote, rawEmptyNotarization, rawProposal := walContent[len(walContent)-3], walContent[len(walContent)-2], walContent[len(walContent)-1]
emptyVote, err := ParseEmptyVoteRecord(rawEmptyVote)
require.NoError(t, err)
require.Equal(t, createEmptyVote(emptyBlockMd, nodes[0]).Vote, emptyVote)
fmt.Println(">>>>>", len(walContent))

emptyNotarization, err := EmptyNotarizationFromRecord(rawEmptyNotarization, &testQCDeserializer{t: t})
require.NoError(t, err)
require.Equal(t, emptyVoteFrom1.Vote, emptyNotarization.Vote)
require.Equal(t, uint64(3), emptyNotarization.Vote.Round)
require.Equal(t, uint64(2), emptyNotarization.Vote.Seq)
require.Equal(t, uint64(3), storage.Height())
rawEmptyVote, rawEmptyNotarization, rawProposal := walContent[len(walContent)-3], walContent[len(walContent)-2], walContent[len(walContent)-1]
emptyVote, err := ParseEmptyVoteRecord(rawEmptyVote)
require.NoError(t, err)
require.Equal(t, createEmptyVote(emptyBlockMd, nodes[0]).Vote, emptyVote)

header, _, err := ParseBlockRecord(rawProposal)
require.NoError(t, err)
require.Equal(t, uint64(4), header.Round)
require.Equal(t, uint64(3), header.Seq)
emptyNotarization, err := EmptyNotarizationFromRecord(rawEmptyNotarization, &testQCDeserializer{t: t})
require.NoError(t, err)
require.Equal(t, emptyVoteFrom1.Vote, emptyNotarization.Vote)
require.Equal(t, uint64(3), emptyNotarization.Vote.Round)
require.Equal(t, uint64(2), emptyNotarization.Vote.Seq)
require.Equal(t, uint64(3), storage.Height())

// Ensure our node proposes block with sequence 3 for round 4
block := <-bb.out
header, _, err := ParseBlockRecord(rawProposal)
require.NoError(t, err)
require.Equal(t, uint64(4), header.Round)
require.Equal(t, uint64(3), header.Seq)

for i := 1; i <= quorum; i++ {
injectTestFinalization(t, e, block, nodes[i])
}
// Ensure our node proposes block with sequence 3 for round 4
block := <-bb.out

for i := 1; i <= quorum; i++ {
injectTestFinalization(t, e, block, nodes[i])
}

block2 := storage.waitForBlockCommit(3)
require.Equal(t, block, block2)
require.Equal(t, uint64(4), storage.Height())
require.Equal(t, uint64(4), block2.BlockHeader().Round)
require.Equal(t, uint64(3), block2.BlockHeader().Seq)
})

block2 := storage.waitForBlockCommit(3)
require.Equal(t, block, block2)
require.Equal(t, uint64(4), storage.Height())
require.Equal(t, uint64(4), block2.BlockHeader().Round)
require.Equal(t, uint64(3), block2.BlockHeader().Seq)
}

func TestEpochLeaderFailover(t *testing.T) {
Expand Down Expand Up @@ -746,6 +763,69 @@ func TestEpochLeaderFailoverNotNeeded(t *testing.T) {
require.False(t, timedOut.Load())
}

func runCrashAndRestartExecution(t *testing.T, e *Epoch, nodes []NodeID, bb *testBlockBuilder, wal *testWAL, storage *InMemStorage, f func(t *testing.T, e *Epoch, bb *testBlockBuilder, storage *InMemStorage, wal *testWAL)) {
// Split the test into two scenarios:
// 1) The node proceeds as usual.
// 2) The node crashes and restarts.
cloneWAL := wal.Clone()
cloneStorage := storage.Clone()

// Clone the block builder
bbAfterCrash := &testBlockBuilder{
out: cloneBlockChan(bb.out),
in: cloneBlockChan(bb.in),
blockShouldBeBuilt: make(chan struct{}, cap(bb.blockShouldBeBuilt)),
}

// Case 1:
t.Run(fmt.Sprintf("%s-no-crash", t.Name()), func(t *testing.T) {
f(t, e, bb, storage, wal)
})

// Case 2:
t.Run(fmt.Sprintf("%s-with-crash", t.Name()), func(t *testing.T) {
conf := EpochConfig{
QCDeserializer: &testQCDeserializer{t: t},
BlockDeserializer: &blockDeserializer{},
MaxProposalWait: DefaultMaxProposalWaitTime,
StartTime: time.Now(),
Logger: testutil.MakeLogger(t, 1),
ID: nodes[0],
Signer: &testSigner{},
WAL: cloneWAL,
Verifier: &testVerifier{},
Storage: cloneStorage,
Comm: noopComm(nodes),
BlockBuilder: bbAfterCrash,
SignatureAggregator: &testSignatureAggregator{},
}

e, err := NewEpoch(conf)
require.NoError(t, err)

fmt.Println("-------------")
require.NoError(t, e.Start())
f(t, e, bbAfterCrash, cloneStorage, cloneWAL)
})
}

func cloneBlockChan(in chan *testBlock) chan *testBlock {
tmp := make(chan *testBlock, cap(in))
out := make(chan *testBlock, cap(in))

for len(in) > 0 {
block := <-in
tmp <- block
out <- block
}

for len(tmp) > 0 {
in <- <-tmp
}

return out
}

type recordingComm struct {
Communication
BroadcastMessages chan *Message
Expand Down
13 changes: 13 additions & 0 deletions epoch_multinode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ func newTestWAL(t *testing.T) *testWAL {
return &tw
}

func (tw *testWAL) Clone() *testWAL {
rawWAL, err := tw.ReadAll()
require.NoError(tw.t, err)

wal := newTestWAL(tw.t)

for _, entry := range rawWAL {
wal.Append(entry)
}

return wal
}

func (tw *testWAL) Append(b []byte) error {
tw.lock.Lock()
defer tw.lock.Unlock()
Expand Down
12 changes: 12 additions & 0 deletions epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,18 @@ func newInMemStorage() *InMemStorage {
return s
}

func (mem *InMemStorage) Clone() *InMemStorage {
clone := newInMemStorage()
for seq := uint64(0); seq < mem.Height(); seq++ {
block, fCert, ok := mem.Retrieve(seq)
if !ok {
panic(fmt.Sprintf("failed retrieving block %d", seq))
}
clone.Index(block, fCert)
}
return clone
}

func (mem *InMemStorage) waitForBlockCommit(seq uint64) Block {
mem.lock.Lock()
defer mem.lock.Unlock()
Expand Down

0 comments on commit 4962488

Please sign in to comment.