Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove anti-pattern of ignoring irrecoverable error channel #7155

Merged
merged 6 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/scaffold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestLoadSecretsEncryptionKey(t *testing.T) {
// Test the components are started in the correct order, and are run serially
func TestComponentsRunSerially(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)

nb := FlowNode("scaffold test")
nb.componentBuilder = component.NewComponentManagerBuilder()
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestPostShutdown(t *testing.T) {

func TestOverrideComponent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)

nb := FlowNode("scaffold test")
nb.componentBuilder = component.NewComponentManagerBuilder()
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestOverrideComponent(t *testing.T) {

func TestOverrideModules(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)

nb := FlowNode("scaffold test")
nb.componentBuilder = component.NewComponentManagerBuilder()
Expand Down Expand Up @@ -602,7 +602,7 @@ func TestDependableComponentWaitForDependencies(t *testing.T) {

func testDependableComponentWaitForDependencies(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)

nb := FlowNode("scaffold test")
nb.componentBuilder = component.NewComponentManagerBuilder()
Expand Down
6 changes: 3 additions & 3 deletions consensus/hotstuff/eventloop/event_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *EventLoopTestSuite) SetupTest() {

ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(s.T(), ctx)

s.eventLoop.Start(signalerCtx)
unittest.RequireCloseBefore(s.T(), s.eventLoop.Ready(), 100*time.Millisecond, "event loop not started")
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestEventLoop_Timeout(t *testing.T) {
eh.On("TimeoutChannel").Return(time.After(100 * time.Millisecond))

ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
eventLoop.Start(signalerCtx)

unittest.RequireCloseBefore(t, eventLoop.Ready(), 100*time.Millisecond, "event loop not stopped")
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestReadyDoneWithStartTime(t *testing.T) {
}).Return(nil).Once()

ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
eventLoop.Start(signalerCtx)

unittest.RequireCloseBefore(t, eventLoop.Ready(), 100*time.Millisecond, "event loop not started")
Expand Down
4 changes: 2 additions & 2 deletions consensus/hotstuff/integration/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,13 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
return &in
}

func (in *Instance) Run() error {
func (in *Instance) Run(t *testing.T) error {
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
<-util.AllDone(in.voteAggregator, in.timeoutAggregator)
}()
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
in.voteAggregator.Start(signalerCtx)
in.timeoutAggregator.Start(signalerCtx)
<-util.AllReady(in.voteAggregator, in.timeoutAggregator)
Expand Down
6 changes: 3 additions & 3 deletions consensus/hotstuff/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSingleInstance(t *testing.T) {
)

// run the event handler until we reach a stop condition
err := in.Run()
err := in.Run(t)
require.ErrorIs(t, err, errStopCondition, "should run until stop condition")

// check if forks and pacemaker are in expected view state
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestThreeInstances(t *testing.T) {
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run()
err := in.Run(t)
require.True(t, errors.Is(err, errStopCondition), "should run until stop condition")
wg.Done()
}(in)
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestSevenInstances(t *testing.T) {
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run()
err := in.Run(t)
require.True(t, errors.Is(err, errStopCondition), "should run until stop condition")
wg.Done()
}(in)
Expand Down
10 changes: 5 additions & 5 deletions consensus/hotstuff/integration/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func Test2TimeoutOutof7Instances(t *testing.T) {
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run()
err := in.Run(t)
require.ErrorIs(t, err, errStopCondition)
wg.Done()
}(in)
Expand Down Expand Up @@ -141,7 +141,7 @@ func Test2TimeoutOutof4Instances(t *testing.T) {
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run()
err := in.Run(t)
require.True(t, errors.Is(err, errStopCondition), "should run until stop condition")
wg.Done()
}(in)
Expand Down Expand Up @@ -209,7 +209,7 @@ func Test1TimeoutOutof5Instances(t *testing.T) {
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run()
err := in.Run(t)
require.ErrorIs(t, err, errStopCondition)
wg.Done()
}(in)
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestBlockDelayIsHigherThanTimeout(t *testing.T) {
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run()
err := in.Run(t)
require.ErrorIs(t, err, errStopCondition)
wg.Done()
}(in)
Expand Down Expand Up @@ -388,7 +388,7 @@ func TestAsyncClusterStartup(t *testing.T) {
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run()
err := in.Run(t)
require.ErrorIs(t, err, errStopCondition)
wg.Done()
}(in)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *TimeoutAggregatorTestSuite) SetupTest() {
require.NoError(s.T(), err)

ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(s.T(), ctx)
s.stopAggregator = cancel
s.aggregator.Start(signalerCtx)
unittest.RequireCloseBefore(s.T(), s.aggregator.Ready(), 100*time.Millisecond, "should close before timeout")
Expand Down
33 changes: 23 additions & 10 deletions consensus/hotstuff/voteaggregator/vote_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/mempool"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/util"
)

// defaultVoteAggregatorWorkers number of workers to dispatch events for vote aggregators
Expand Down Expand Up @@ -99,24 +100,36 @@ func NewVoteAggregator(
aggregator.queuedMessagesProcessingLoop(ctx)
})
}
componentBuilder.AddWorker(func(_ irrecoverable.SignalerContext, ready component.ReadyFunc) {
componentBuilder.AddWorker(func(parentCtx irrecoverable.SignalerContext, ready component.ReadyFunc) {
// create new context which is not connected to parent
// we need to ensure that our internal workers stop before asking
// vote collectors to stop. We want to avoid delivering events to already stopped vote collectors
ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx, errCh := irrecoverable.WithSignaler(ctx)

// start vote collectors
collectors.Start(signalerCtx)
<-collectors.Ready()

ready()
// Handle the component lifecycle in a separate goroutine so we can capture any errors
// thrown during initialization in the main goroutine.
go func() {
if err := util.WaitClosed(parentCtx, collectors.Ready()); err == nil {
// only signal ready when collectors are ready, but always handle shutdown
ready()
}

// wait for internal workers to stop
wg.Wait()
// signal vote collectors to stop
cancel()
// wait for it to stop
<-collectors.Done()
// wait for internal workers to stop, then signal vote collectors to stop
wg.Wait()
cancel()
}()

// since we are breaking the connection between parentCtx and signalerCtx, we need to
// explicitly rethrow any errors from signalerCtx to parentCtx, otherwise they are dropped.
// Handle errors in the main worker goroutine to guarantee that they are rethrown to the parent
// before the component is marked done.
if err := util.WaitError(errCh, collectors.Done()); err != nil {
parentCtx.Throw(err)
}
})
componentBuilder.AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
Expand Down
2 changes: 1 addition & 1 deletion consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func createNodes(t *testing.T, participants *ConsensusParticipants, rootSnapshot

// create a context which will be used for all nodes
ctx, cancel := context.WithCancel(context.Background())
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)

// create a function to return which the test case can use to run the nodes for some maximum duration
// and gracefully stop after.
Expand Down
6 changes: 3 additions & 3 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (suite *Suite) TestSendExpiredTransaction() {
Return(referenceBlock, nil).
Twice()

//Advancing final state to expire ref block
// Advancing final state to expire ref block
suite.finalizedBlock = latestBlock

req := &accessproto.SendTransactionRequest{
Expand Down Expand Up @@ -728,7 +728,7 @@ func (suite *Suite) TestGetSealedTransaction() {
background, cancel := context.WithCancel(context.Background())
defer cancel()

ctx, _ := irrecoverable.WithSignaler(background)
ctx := irrecoverable.NewMockSignalerContext(suite.T(), background)
ingestEng.Start(ctx)
<-ingestEng.Ready()

Expand Down Expand Up @@ -1169,7 +1169,7 @@ func (suite *Suite) TestExecuteScript() {
require.NoError(suite.T(), err)
err = db.Update(operation.IndexBlockHeight(lastBlock.Header.Height, lastBlock.ID()))
require.NoError(suite.T(), err)
//update latest sealed block
// update latest sealed block
suite.sealedBlock = lastBlock.Header
// create execution receipts for each of the execution node and the last block
executionReceipts := unittest.ReceiptsForBlockFixture(lastBlock, identities.NodeIDs())
Expand Down
2 changes: 1 addition & 1 deletion engine/collection/ingest/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (suite *Suite) TestComponentShutdown() {

// start then shut down the engine
parentCtx, cancel := context.WithCancel(context.Background())
ctx, _ := irrecoverable.WithSignaler(parentCtx)
ctx := irrecoverable.NewMockSignalerContext(suite.T(), parentCtx)
suite.engine.Start(ctx)
unittest.AssertClosesBefore(suite.T(), suite.engine.Ready(), 10*time.Millisecond)
cancel()
Expand Down
6 changes: 3 additions & 3 deletions engine/common/worker/worker_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestWorkerPool_SingleEvent_SingleWorker(t *testing.T) {

cancelCtx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, _ := irrecoverable.WithSignaler(cancelCtx)
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
cm := component.NewComponentManagerBuilder().
AddWorker(pool.WorkerLogic()).
Build()
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestWorkerBuilder_UnhappyPaths(t *testing.T) {

cancelCtx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, _ := irrecoverable.WithSignaler(cancelCtx)
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
cm := component.NewComponentManagerBuilder().
AddWorker(pool.WorkerLogic()).
Build()
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestWorkerPool_TwoWorkers_ConcurrentEvents(t *testing.T) {

cancelCtx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, _ := irrecoverable.WithSignaler(cancelCtx)
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
cm := component.NewComponentManagerBuilder().
AddWorker(pool.WorkerLogic()).
AddWorker(pool.WorkerLogic()).
Expand Down
2 changes: 1 addition & 1 deletion engine/consensus/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *IngestionSuite) SetupTest() {

ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
signalerCtx, _ := irrecoverable.WithSignaler(ctx)
signalerCtx := irrecoverable.NewMockSignalerContext(s.T(), ctx)

metrics := metrics.NewNoopCollector()
ingest, err := New(unittest.Logger(), metrics, s.net, me, s.core)
Expand Down
8 changes: 4 additions & 4 deletions engine/execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestExecutionFlow(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
unittest.RequireReturnsBefore(t, func() {
exeNode.Ready(ctx)
exeNode.Ready(t, ctx)
}, 1*time.Second, "could not start execution node on time")
defer exeNode.Done(cancel)

Expand Down Expand Up @@ -400,7 +400,7 @@ func TestFailedTxWillNotChangeStateCommitment(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
unittest.RequireReturnsBefore(t, func() {
exe1Node.Ready(ctx)
exe1Node.Ready(t, ctx)
}, 1*time.Second, "could not start execution node on time")
defer exe1Node.Done(cancel)

Expand Down Expand Up @@ -495,7 +495,7 @@ func TestFailedTxWillNotChangeStateCommitment(t *testing.T) {
scExe1Block2, err := exe1Node.ExecutionState.StateCommitmentByBlockID(block2.ID())
assert.NoError(t, err)
// TODO this is no longer valid because the system chunk can change the state
//assert.Equal(t, scExe1Block1, scExe1Block2)
// assert.Equal(t, scExe1Block1, scExe1Block2)
_ = scExe1Block2

collectionEngine.AssertExpectations(t)
Expand Down Expand Up @@ -567,7 +567,7 @@ func TestBroadcastToMultipleVerificationNodes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

unittest.RequireReturnsBefore(t, func() {
exeNode.Ready(ctx)
exeNode.Ready(t, ctx)
}, 1*time.Second, "could not start execution node on time")
defer exeNode.Done(cancel)

Expand Down
2 changes: 1 addition & 1 deletion engine/execution/ingestion/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestInogestionCoreExecuteBlock(t *testing.T) {

// start the core
ctx, cancel := context.WithCancel(context.Background())
irrecoverableCtx, _ := irrecoverable.WithSignaler(ctx)
irrecoverableCtx := irrecoverable.NewMockSignalerContext(t, ctx)
core.Start(irrecoverableCtx)
<-core.Ready()
defer func() {
Expand Down
4 changes: 2 additions & 2 deletions engine/execution/provider/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestProviderEngine_onChunkDataRequest(t *testing.T) {

cancelCtx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, _ := irrecoverable.WithSignaler(cancelCtx)
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
e.Start(ctx)
// submit using non-existing origin ID
unittest.RequireCloseBefore(t, e.Ready(), 100*time.Millisecond, "could not start engine")
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestProviderEngine_onChunkDataRequest(t *testing.T) {

cancelCtx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, _ := irrecoverable.WithSignaler(cancelCtx)
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
e.Start(ctx)
// submit using non-existing origin ID
unittest.RequireCloseBefore(t, e.Ready(), 100*time.Millisecond, "could not start engine")
Expand Down
4 changes: 2 additions & 2 deletions engine/testutil/mock/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,11 @@ type ExecutionNode struct {
StorehouseEnabled bool
}

func (en ExecutionNode) Ready(ctx context.Context) {
func (en ExecutionNode) Ready(t *testing.T, ctx context.Context) {
// TODO: receipt engine has been migrated to the new component interface, hence
// is using Start. Other engines' startup should be refactored once migrated to
// new interface.
irctx, _ := irrecoverable.WithSignaler(ctx)
irctx := irrecoverable.NewMockSignalerContext(t, ctx)
en.ReceiptsEngine.Start(irctx)
en.IngestionEngine.Start(irctx)
en.FollowerCore.Start(irctx)
Expand Down
2 changes: 1 addition & 1 deletion module/component/component_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ func TestComponentManagerShutdown(t *testing.T) {
}).Build()

parent, cancel := context.WithCancel(context.Background())
ctx, _ := irrecoverable.WithSignaler(parent)
ctx := irrecoverable.NewMockSignalerContext(t, parent)

mgr.Start(ctx)
unittest.AssertClosesBefore(t, mgr.Ready(), 10*time.Millisecond)
Expand Down
Loading
Loading