Skip to content

Commit 82d7f1d

Browse files
authored
Merge pull request #7155 from onflow/peter/update-vote-agg-irrecoverable
Remove anti-pattern of ignoring irrecoverable error channel
2 parents 4cabd39 + 9222c10 commit 82d7f1d

File tree

19 files changed

+66
-53
lines changed

19 files changed

+66
-53
lines changed

cmd/scaffold_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func TestLoadSecretsEncryptionKey(t *testing.T) {
6464
// Test the components are started in the correct order, and are run serially
6565
func TestComponentsRunSerially(t *testing.T) {
6666
ctx, cancel := context.WithCancel(context.Background())
67-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
67+
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
6868

6969
nb := FlowNode("scaffold test")
7070
nb.componentBuilder = component.NewComponentManagerBuilder()
@@ -165,7 +165,7 @@ func TestPostShutdown(t *testing.T) {
165165

166166
func TestOverrideComponent(t *testing.T) {
167167
ctx, cancel := context.WithCancel(context.Background())
168-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
168+
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
169169

170170
nb := FlowNode("scaffold test")
171171
nb.componentBuilder = component.NewComponentManagerBuilder()
@@ -225,7 +225,7 @@ func TestOverrideComponent(t *testing.T) {
225225

226226
func TestOverrideModules(t *testing.T) {
227227
ctx, cancel := context.WithCancel(context.Background())
228-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
228+
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
229229

230230
nb := FlowNode("scaffold test")
231231
nb.componentBuilder = component.NewComponentManagerBuilder()
@@ -602,7 +602,7 @@ func TestDependableComponentWaitForDependencies(t *testing.T) {
602602

603603
func testDependableComponentWaitForDependencies(t *testing.T) {
604604
ctx, cancel := context.WithCancel(context.Background())
605-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
605+
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
606606

607607
nb := FlowNode("scaffold test")
608608
nb.componentBuilder = component.NewComponentManagerBuilder()

consensus/hotstuff/eventloop/event_loop_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (s *EventLoopTestSuite) SetupTest() {
5252

5353
ctx, cancel := context.WithCancel(context.Background())
5454
s.cancel = cancel
55-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
55+
signalerCtx := irrecoverable.NewMockSignalerContext(s.T(), ctx)
5656

5757
s.eventLoop.Start(signalerCtx)
5858
unittest.RequireCloseBefore(s.T(), s.eventLoop.Ready(), 100*time.Millisecond, "event loop not started")
@@ -207,7 +207,7 @@ func TestEventLoop_Timeout(t *testing.T) {
207207
eh.On("TimeoutChannel").Return(time.After(100 * time.Millisecond))
208208

209209
ctx, cancel := context.WithCancel(context.Background())
210-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
210+
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
211211
eventLoop.Start(signalerCtx)
212212

213213
unittest.RequireCloseBefore(t, eventLoop.Ready(), 100*time.Millisecond, "event loop not stopped")
@@ -264,7 +264,7 @@ func TestReadyDoneWithStartTime(t *testing.T) {
264264
}).Return(nil).Once()
265265

266266
ctx, cancel := context.WithCancel(context.Background())
267-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
267+
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
268268
eventLoop.Start(signalerCtx)
269269

270270
unittest.RequireCloseBefore(t, eventLoop.Ready(), 100*time.Millisecond, "event loop not started")

consensus/hotstuff/integration/instance_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -548,13 +548,13 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
548548
return &in
549549
}
550550

551-
func (in *Instance) Run() error {
551+
func (in *Instance) Run(t *testing.T) error {
552552
ctx, cancel := context.WithCancel(context.Background())
553553
defer func() {
554554
cancel()
555555
<-util.AllDone(in.voteAggregator, in.timeoutAggregator)
556556
}()
557-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
557+
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
558558
in.voteAggregator.Start(signalerCtx)
559559
in.timeoutAggregator.Start(signalerCtx)
560560
<-util.AllReady(in.voteAggregator, in.timeoutAggregator)

consensus/hotstuff/integration/integration_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestSingleInstance(t *testing.T) {
3232
)
3333

3434
// run the event handler until we reach a stop condition
35-
err := in.Run()
35+
err := in.Run(t)
3636
require.ErrorIs(t, err, errStopCondition, "should run until stop condition")
3737

3838
// check if forks and pacemaker are in expected view state
@@ -78,7 +78,7 @@ func TestThreeInstances(t *testing.T) {
7878
for _, in := range instances {
7979
wg.Add(1)
8080
go func(in *Instance) {
81-
err := in.Run()
81+
err := in.Run(t)
8282
require.True(t, errors.Is(err, errStopCondition), "should run until stop condition")
8383
wg.Done()
8484
}(in)
@@ -151,7 +151,7 @@ func TestSevenInstances(t *testing.T) {
151151
for _, in := range instances {
152152
wg.Add(1)
153153
go func(in *Instance) {
154-
err := in.Run()
154+
err := in.Run(t)
155155
require.True(t, errors.Is(err, errStopCondition), "should run until stop condition")
156156
wg.Done()
157157
}(in)

consensus/hotstuff/integration/liveness_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func Test2TimeoutOutof7Instances(t *testing.T) {
7373
for _, in := range instances {
7474
wg.Add(1)
7575
go func(in *Instance) {
76-
err := in.Run()
76+
err := in.Run(t)
7777
require.ErrorIs(t, err, errStopCondition)
7878
wg.Done()
7979
}(in)
@@ -141,7 +141,7 @@ func Test2TimeoutOutof4Instances(t *testing.T) {
141141
for _, in := range instances {
142142
wg.Add(1)
143143
go func(in *Instance) {
144-
err := in.Run()
144+
err := in.Run(t)
145145
require.True(t, errors.Is(err, errStopCondition), "should run until stop condition")
146146
wg.Done()
147147
}(in)
@@ -209,7 +209,7 @@ func Test1TimeoutOutof5Instances(t *testing.T) {
209209
for _, in := range instances {
210210
wg.Add(1)
211211
go func(in *Instance) {
212-
err := in.Run()
212+
err := in.Run(t)
213213
require.ErrorIs(t, err, errStopCondition)
214214
wg.Done()
215215
}(in)
@@ -306,7 +306,7 @@ func TestBlockDelayIsHigherThanTimeout(t *testing.T) {
306306
for _, in := range instances {
307307
wg.Add(1)
308308
go func(in *Instance) {
309-
err := in.Run()
309+
err := in.Run(t)
310310
require.ErrorIs(t, err, errStopCondition)
311311
wg.Done()
312312
}(in)
@@ -388,7 +388,7 @@ func TestAsyncClusterStartup(t *testing.T) {
388388
for _, in := range instances {
389389
wg.Add(1)
390390
go func(in *Instance) {
391-
err := in.Run()
391+
err := in.Run(t)
392392
require.ErrorIs(t, err, errStopCondition)
393393
wg.Done()
394394
}(in)

consensus/hotstuff/timeoutaggregator/timeout_aggregator_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (s *TimeoutAggregatorTestSuite) SetupTest() {
5555
require.NoError(s.T(), err)
5656

5757
ctx, cancel := context.WithCancel(context.Background())
58-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
58+
signalerCtx := irrecoverable.NewMockSignalerContext(s.T(), ctx)
5959
s.stopAggregator = cancel
6060
s.aggregator.Start(signalerCtx)
6161
unittest.RequireCloseBefore(s.T(), s.aggregator.Ready(), 100*time.Millisecond, "should close before timeout")

consensus/hotstuff/voteaggregator/vote_aggregator.go

+23-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/onflow/flow-go/module/irrecoverable"
1919
"github.com/onflow/flow-go/module/mempool"
2020
"github.com/onflow/flow-go/module/metrics"
21+
"github.com/onflow/flow-go/module/util"
2122
)
2223

2324
// defaultVoteAggregatorWorkers number of workers to dispatch events for vote aggregators
@@ -99,24 +100,36 @@ func NewVoteAggregator(
99100
aggregator.queuedMessagesProcessingLoop(ctx)
100101
})
101102
}
102-
componentBuilder.AddWorker(func(_ irrecoverable.SignalerContext, ready component.ReadyFunc) {
103+
componentBuilder.AddWorker(func(parentCtx irrecoverable.SignalerContext, ready component.ReadyFunc) {
103104
// create new context which is not connected to parent
104105
// we need to ensure that our internal workers stop before asking
105106
// vote collectors to stop. We want to avoid delivering events to already stopped vote collectors
106107
ctx, cancel := context.WithCancel(context.Background())
107-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
108+
signalerCtx, errCh := irrecoverable.WithSignaler(ctx)
109+
108110
// start vote collectors
109111
collectors.Start(signalerCtx)
110-
<-collectors.Ready()
111112

112-
ready()
113+
// Handle the component lifecycle in a separate goroutine so we can capture any errors
114+
// thrown during initialization in the main goroutine.
115+
go func() {
116+
if err := util.WaitClosed(parentCtx, collectors.Ready()); err == nil {
117+
// only signal ready when collectors are ready, but always handle shutdown
118+
ready()
119+
}
113120

114-
// wait for internal workers to stop
115-
wg.Wait()
116-
// signal vote collectors to stop
117-
cancel()
118-
// wait for it to stop
119-
<-collectors.Done()
121+
// wait for internal workers to stop, then signal vote collectors to stop
122+
wg.Wait()
123+
cancel()
124+
}()
125+
126+
// since we are breaking the connection between parentCtx and signalerCtx, we need to
127+
// explicitly rethrow any errors from signalerCtx to parentCtx, otherwise they are dropped.
128+
// Handle errors in the main worker goroutine to guarantee that they are rethrown to the parent
129+
// before the component is marked done.
130+
if err := util.WaitError(errCh, collectors.Done()); err != nil {
131+
parentCtx.Throw(err)
132+
}
120133
})
121134
componentBuilder.AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
122135
ready()

consensus/integration/nodes_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func createNodes(t *testing.T, participants *ConsensusParticipants, rootSnapshot
226226

227227
// create a context which will be used for all nodes
228228
ctx, cancel := context.WithCancel(context.Background())
229-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
229+
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
230230

231231
// create a function to return which the test case can use to run the nodes for some maximum duration
232232
// and gracefully stop after.

engine/access/access_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func (suite *Suite) TestSendExpiredTransaction() {
262262
Return(referenceBlock, nil).
263263
Twice()
264264

265-
//Advancing final state to expire ref block
265+
// Advancing final state to expire ref block
266266
suite.finalizedBlock = latestBlock
267267

268268
req := &accessproto.SendTransactionRequest{
@@ -728,7 +728,7 @@ func (suite *Suite) TestGetSealedTransaction() {
728728
background, cancel := context.WithCancel(context.Background())
729729
defer cancel()
730730

731-
ctx, _ := irrecoverable.WithSignaler(background)
731+
ctx := irrecoverable.NewMockSignalerContext(suite.T(), background)
732732
ingestEng.Start(ctx)
733733
<-ingestEng.Ready()
734734

@@ -1169,7 +1169,7 @@ func (suite *Suite) TestExecuteScript() {
11691169
require.NoError(suite.T(), err)
11701170
err = db.Update(operation.IndexBlockHeight(lastBlock.Header.Height, lastBlock.ID()))
11711171
require.NoError(suite.T(), err)
1172-
//update latest sealed block
1172+
// update latest sealed block
11731173
suite.sealedBlock = lastBlock.Header
11741174
// create execution receipts for each of the execution node and the last block
11751175
executionReceipts := unittest.ReceiptsForBlockFixture(lastBlock, identities.NodeIDs())

engine/collection/ingest/engine_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func (suite *Suite) TestComponentShutdown() {
297297

298298
// start then shut down the engine
299299
parentCtx, cancel := context.WithCancel(context.Background())
300-
ctx, _ := irrecoverable.WithSignaler(parentCtx)
300+
ctx := irrecoverable.NewMockSignalerContext(suite.T(), parentCtx)
301301
suite.engine.Start(ctx)
302302
unittest.AssertClosesBefore(suite.T(), suite.engine.Ready(), 10*time.Millisecond)
303303
cancel()

engine/common/worker/worker_builder_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestWorkerPool_SingleEvent_SingleWorker(t *testing.T) {
3838

3939
cancelCtx, cancel := context.WithCancel(context.Background())
4040
defer cancel()
41-
ctx, _ := irrecoverable.WithSignaler(cancelCtx)
41+
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
4242
cm := component.NewComponentManagerBuilder().
4343
AddWorker(pool.WorkerLogic()).
4444
Build()
@@ -75,7 +75,7 @@ func TestWorkerBuilder_UnhappyPaths(t *testing.T) {
7575

7676
cancelCtx, cancel := context.WithCancel(context.Background())
7777
defer cancel()
78-
ctx, _ := irrecoverable.WithSignaler(cancelCtx)
78+
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
7979
cm := component.NewComponentManagerBuilder().
8080
AddWorker(pool.WorkerLogic()).
8181
Build()
@@ -138,7 +138,7 @@ func TestWorkerPool_TwoWorkers_ConcurrentEvents(t *testing.T) {
138138

139139
cancelCtx, cancel := context.WithCancel(context.Background())
140140
defer cancel()
141-
ctx, _ := irrecoverable.WithSignaler(cancelCtx)
141+
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
142142
cm := component.NewComponentManagerBuilder().
143143
AddWorker(pool.WorkerLogic()).
144144
AddWorker(pool.WorkerLogic()).

engine/consensus/ingestion/engine_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (s *IngestionSuite) SetupTest() {
5555

5656
ctx, cancel := context.WithCancel(context.Background())
5757
s.cancel = cancel
58-
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
58+
signalerCtx := irrecoverable.NewMockSignalerContext(s.T(), ctx)
5959

6060
metrics := metrics.NewNoopCollector()
6161
ingest, err := New(unittest.Logger(), metrics, s.net, me, s.core)

engine/execution/execution_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestExecutionFlow(t *testing.T) {
6767

6868
ctx, cancel := context.WithCancel(context.Background())
6969
unittest.RequireReturnsBefore(t, func() {
70-
exeNode.Ready(ctx)
70+
exeNode.Ready(t, ctx)
7171
}, 1*time.Second, "could not start execution node on time")
7272
defer exeNode.Done(cancel)
7373

@@ -400,7 +400,7 @@ func TestFailedTxWillNotChangeStateCommitment(t *testing.T) {
400400

401401
ctx, cancel := context.WithCancel(context.Background())
402402
unittest.RequireReturnsBefore(t, func() {
403-
exe1Node.Ready(ctx)
403+
exe1Node.Ready(t, ctx)
404404
}, 1*time.Second, "could not start execution node on time")
405405
defer exe1Node.Done(cancel)
406406

@@ -495,7 +495,7 @@ func TestFailedTxWillNotChangeStateCommitment(t *testing.T) {
495495
scExe1Block2, err := exe1Node.ExecutionState.StateCommitmentByBlockID(block2.ID())
496496
assert.NoError(t, err)
497497
// TODO this is no longer valid because the system chunk can change the state
498-
//assert.Equal(t, scExe1Block1, scExe1Block2)
498+
// assert.Equal(t, scExe1Block1, scExe1Block2)
499499
_ = scExe1Block2
500500

501501
collectionEngine.AssertExpectations(t)
@@ -567,7 +567,7 @@ func TestBroadcastToMultipleVerificationNodes(t *testing.T) {
567567
ctx, cancel := context.WithCancel(context.Background())
568568

569569
unittest.RequireReturnsBefore(t, func() {
570-
exeNode.Ready(ctx)
570+
exeNode.Ready(t, ctx)
571571
}, 1*time.Second, "could not start execution node on time")
572572
defer exeNode.Done(cancel)
573573

engine/execution/ingestion/core_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestInogestionCoreExecuteBlock(t *testing.T) {
3737

3838
// start the core
3939
ctx, cancel := context.WithCancel(context.Background())
40-
irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx)
40+
irrecoverableCtx := irrecoverable.NewMockSignalerContext(t, ctx)
4141
core.Start(irrecoverableCtx)
4242
<-core.Ready()
4343
defer func() {

engine/execution/provider/engine_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestProviderEngine_onChunkDataRequest(t *testing.T) {
4747

4848
cancelCtx, cancel := context.WithCancel(context.Background())
4949
defer cancel()
50-
ctx, _ := irrecoverable.WithSignaler(cancelCtx)
50+
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
5151
e.Start(ctx)
5252
// submit using non-existing origin ID
5353
unittest.RequireCloseBefore(t, e.Ready(), 100*time.Millisecond, "could not start engine")
@@ -96,7 +96,7 @@ func TestProviderEngine_onChunkDataRequest(t *testing.T) {
9696

9797
cancelCtx, cancel := context.WithCancel(context.Background())
9898
defer cancel()
99-
ctx, _ := irrecoverable.WithSignaler(cancelCtx)
99+
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
100100
e.Start(ctx)
101101
// submit using non-existing origin ID
102102
unittest.RequireCloseBefore(t, e.Ready(), 100*time.Millisecond, "could not start engine")

engine/testutil/mock/nodes.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,11 @@ type ExecutionNode struct {
228228
StorehouseEnabled bool
229229
}
230230

231-
func (en ExecutionNode) Ready(ctx context.Context) {
231+
func (en ExecutionNode) Ready(t *testing.T, ctx context.Context) {
232232
// TODO: receipt engine has been migrated to the new component interface, hence
233233
// is using Start. Other engines' startup should be refactored once migrated to
234234
// new interface.
235-
irctx, _ := irrecoverable.WithSignaler(ctx)
235+
irctx := irrecoverable.NewMockSignalerContext(t, ctx)
236236
en.ReceiptsEngine.Start(irctx)
237237
en.IngestionEngine.Start(irctx)
238238
en.FollowerCore.Start(irctx)

module/component/component_manager_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ func TestComponentManagerShutdown(t *testing.T) {
639639
}).Build()
640640

641641
parent, cancel := context.WithCancel(context.Background())
642-
ctx, _ := irrecoverable.WithSignaler(parent)
642+
ctx := irrecoverable.NewMockSignalerContext(t, parent)
643643

644644
mgr.Start(ctx)
645645
unittest.AssertClosesBefore(t, mgr.Ready(), 10*time.Millisecond)

0 commit comments

Comments
 (0)