-
Notifications
You must be signed in to change notification settings - Fork 188
/
Copy pathevent_loop_test.go
280 lines (228 loc) · 9.83 KB
/
event_loop_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
package eventloop
import (
"context"
"io"
"sync"
"testing"
"time"
"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/helper"
"github.com/onflow/flow-go/consensus/hotstuff/mocks"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/utils/unittest"
)
// TestEventLoop performs unit testing of event loop, checks if submitted events are propagated
// to event handler as well as handling of timeouts.
func TestEventLoop(t *testing.T) {
suite.Run(t, new(EventLoopTestSuite))
}
type EventLoopTestSuite struct {
suite.Suite
eh *mocks.EventHandler
cancel context.CancelFunc
eventLoop *EventLoop
}
func (s *EventLoopTestSuite) SetupTest() {
s.eh = mocks.NewEventHandler(s.T())
s.eh.On("Start", mock.Anything).Return(nil).Maybe()
s.eh.On("TimeoutChannel").Return(make(<-chan time.Time, 1)).Maybe()
s.eh.On("OnLocalTimeout").Return(nil).Maybe()
log := zerolog.New(io.Discard)
eventLoop, err := NewEventLoop(log, metrics.NewNoopCollector(), metrics.NewNoopCollector(), s.eh, time.Time{})
require.NoError(s.T(), err)
s.eventLoop = eventLoop
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
s.eventLoop.Start(signalerCtx)
unittest.RequireCloseBefore(s.T(), s.eventLoop.Ready(), 100*time.Millisecond, "event loop not started")
}
func (s *EventLoopTestSuite) TearDownTest() {
s.cancel()
unittest.RequireCloseBefore(s.T(), s.eventLoop.Done(), 100*time.Millisecond, "event loop not stopped")
}
// TestReadyDone tests if event loop stops internal worker thread
func (s *EventLoopTestSuite) TestReadyDone() {
time.Sleep(1 * time.Second)
go func() {
s.cancel()
}()
unittest.RequireCloseBefore(s.T(), s.eventLoop.Done(), 100*time.Millisecond, "event loop not stopped")
}
// Test_SubmitQC tests that submitted proposal is eventually sent to event handler for processing
func (s *EventLoopTestSuite) Test_SubmitProposal() {
proposal := helper.MakeSignedProposal()
processed := atomic.NewBool(false)
s.eh.On("OnReceiveProposal", proposal).Run(func(args mock.Arguments) {
processed.Store(true)
}).Return(nil).Once()
s.eventLoop.SubmitProposal(proposal)
require.Eventually(s.T(), processed.Load, time.Millisecond*100, time.Millisecond*10)
}
// Test_SubmitQC tests that submitted QC is eventually sent to `EventHandler.OnReceiveQc` for processing
func (s *EventLoopTestSuite) Test_SubmitQC() {
// qcIngestionFunction is the archetype for EventLoop.OnQcConstructedFromVotes and EventLoop.OnNewQcDiscovered
type qcIngestionFunction func(*flow.QuorumCertificate)
testQCIngestionFunction := func(f qcIngestionFunction, qcView uint64) {
qc := helper.MakeQC(helper.WithQCView(qcView))
processed := atomic.NewBool(false)
s.eh.On("OnReceiveQc", qc).Run(func(args mock.Arguments) {
processed.Store(true)
}).Return(nil).Once()
f(qc)
require.Eventually(s.T(), processed.Load, time.Millisecond*100, time.Millisecond*10)
}
s.Run("QCs handed to EventLoop.OnQcConstructedFromVotes are forwarded to EventHandler", func() {
testQCIngestionFunction(s.eventLoop.OnQcConstructedFromVotes, 100)
})
s.Run("QCs handed to EventLoop.OnNewQcDiscovered are forwarded to EventHandler", func() {
testQCIngestionFunction(s.eventLoop.OnNewQcDiscovered, 101)
})
}
// Test_SubmitTC tests that submitted TC is eventually sent to `EventHandler.OnReceiveTc` for processing
func (s *EventLoopTestSuite) Test_SubmitTC() {
// tcIngestionFunction is the archetype for EventLoop.OnTcConstructedFromTimeouts and EventLoop.OnNewTcDiscovered
type tcIngestionFunction func(*flow.TimeoutCertificate)
testTCIngestionFunction := func(f tcIngestionFunction, tcView uint64) {
tc := helper.MakeTC(helper.WithTCView(tcView))
processed := atomic.NewBool(false)
s.eh.On("OnReceiveTc", tc).Run(func(args mock.Arguments) {
processed.Store(true)
}).Return(nil).Once()
f(tc)
require.Eventually(s.T(), processed.Load, time.Millisecond*100, time.Millisecond*10)
}
s.Run("TCs handed to EventLoop.OnTcConstructedFromTimeouts are forwarded to EventHandler", func() {
testTCIngestionFunction(s.eventLoop.OnTcConstructedFromTimeouts, 100)
})
s.Run("TCs handed to EventLoop.OnNewTcDiscovered are forwarded to EventHandler", func() {
testTCIngestionFunction(s.eventLoop.OnNewTcDiscovered, 101)
})
}
// Test_SubmitTC_IngestNewestQC tests that included QC in TC is eventually sent to `EventHandler.OnReceiveQc` for processing
func (s *EventLoopTestSuite) Test_SubmitTC_IngestNewestQC() {
// tcIngestionFunction is the archetype for EventLoop.OnTcConstructedFromTimeouts and EventLoop.OnNewTcDiscovered
type tcIngestionFunction func(*flow.TimeoutCertificate)
testTCIngestionFunction := func(f tcIngestionFunction, tcView, qcView uint64) {
tc := helper.MakeTC(helper.WithTCView(tcView),
helper.WithTCNewestQC(helper.MakeQC(helper.WithQCView(qcView))))
processed := atomic.NewBool(false)
s.eh.On("OnReceiveQc", tc.NewestQC).Run(func(args mock.Arguments) {
processed.Store(true)
}).Return(nil).Once()
f(tc)
require.Eventually(s.T(), processed.Load, time.Millisecond*100, time.Millisecond*10)
}
// process initial TC, this will track the newest TC
s.eh.On("OnReceiveTc", mock.Anything).Return(nil).Once()
s.eventLoop.OnTcConstructedFromTimeouts(helper.MakeTC(
helper.WithTCView(100),
helper.WithTCNewestQC(
helper.MakeQC(
helper.WithQCView(80),
),
),
))
s.Run("QCs handed to EventLoop.OnTcConstructedFromTimeouts are forwarded to EventHandler", func() {
testTCIngestionFunction(s.eventLoop.OnTcConstructedFromTimeouts, 100, 99)
})
s.Run("QCs handed to EventLoop.OnNewTcDiscovered are forwarded to EventHandler", func() {
testTCIngestionFunction(s.eventLoop.OnNewTcDiscovered, 100, 100)
})
}
// Test_OnPartialTcCreated tests that event loop delivers partialTcCreated events to event handler.
func (s *EventLoopTestSuite) Test_OnPartialTcCreated() {
view := uint64(1000)
newestQC := helper.MakeQC(helper.WithQCView(view - 10))
lastViewTC := helper.MakeTC(helper.WithTCView(view-1), helper.WithTCNewestQC(newestQC))
processed := atomic.NewBool(false)
partialTcCreated := &hotstuff.PartialTcCreated{
View: view,
NewestQC: newestQC,
LastViewTC: lastViewTC,
}
s.eh.On("OnPartialTcCreated", partialTcCreated).Run(func(args mock.Arguments) {
processed.Store(true)
}).Return(nil).Once()
s.eventLoop.OnPartialTcCreated(view, newestQC, lastViewTC)
require.Eventually(s.T(), processed.Load, time.Millisecond*100, time.Millisecond*10)
}
// TestEventLoop_Timeout tests that event loop delivers timeout events to event handler under pressure
func TestEventLoop_Timeout(t *testing.T) {
eh := &mocks.EventHandler{}
processed := atomic.NewBool(false)
eh.On("Start", mock.Anything).Return(nil).Once()
eh.On("OnReceiveQc", mock.Anything).Return(nil).Maybe()
eh.On("OnReceiveProposal", mock.Anything).Return(nil).Maybe()
eh.On("OnLocalTimeout").Run(func(args mock.Arguments) {
processed.Store(true)
}).Return(nil).Once()
log := zerolog.New(io.Discard)
metricsCollector := metrics.NewNoopCollector()
eventLoop, err := NewEventLoop(log, metricsCollector, metricsCollector, eh, time.Time{})
require.NoError(t, err)
eh.On("TimeoutChannel").Return(time.After(100 * time.Millisecond))
ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
eventLoop.Start(signalerCtx)
unittest.RequireCloseBefore(t, eventLoop.Ready(), 100*time.Millisecond, "event loop not stopped")
time.Sleep(10 * time.Millisecond)
var wg sync.WaitGroup
wg.Add(2)
// spam with proposals and QCs
go func() {
defer wg.Done()
for !processed.Load() {
qc := unittest.QuorumCertificateFixture()
eventLoop.OnQcConstructedFromVotes(qc)
}
}()
go func() {
defer wg.Done()
for !processed.Load() {
eventLoop.SubmitProposal(helper.MakeSignedProposal())
}
}()
require.Eventually(t, processed.Load, time.Millisecond*200, time.Millisecond*10)
unittest.AssertReturnsBefore(t, func() { wg.Wait() }, time.Millisecond*200)
cancel()
unittest.RequireCloseBefore(t, eventLoop.Done(), 100*time.Millisecond, "event loop not stopped")
}
// TestReadyDoneWithStartTime tests that event loop correctly starts and schedules start of processing
// when startTime argument is used
func TestReadyDoneWithStartTime(t *testing.T) {
eh := &mocks.EventHandler{}
eh.On("Start", mock.Anything).Return(nil)
eh.On("TimeoutChannel").Return(make(<-chan time.Time, 1))
eh.On("OnLocalTimeout").Return(nil)
metrics := metrics.NewNoopCollector()
log := zerolog.New(io.Discard)
startTimeDuration := 2 * time.Second
startTime := time.Now().Add(startTimeDuration)
eventLoop, err := NewEventLoop(log, metrics, metrics, eh, startTime)
require.NoError(t, err)
done := make(chan struct{})
eh.On("OnReceiveProposal", mock.AnythingOfType("*model.SignedProposal")).Run(func(args mock.Arguments) {
require.True(t, time.Now().After(startTime))
close(done)
}).Return(nil).Once()
ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
eventLoop.Start(signalerCtx)
unittest.RequireCloseBefore(t, eventLoop.Ready(), 100*time.Millisecond, "event loop not started")
parentBlock := unittest.BlockHeaderFixture()
header := unittest.BlockHeaderWithParentFixture(parentBlock)
proposal := &flow.Proposal{Header: header, ProposerSigData: unittest.SignatureFixture()}
eventLoop.SubmitProposal(model.SignedProposalFromFlow(proposal))
unittest.RequireCloseBefore(t, done, startTimeDuration+100*time.Millisecond, "proposal wasn't received")
cancel()
unittest.RequireCloseBefore(t, eventLoop.Done(), 100*time.Millisecond, "event loop not stopped")
}