Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions ethmonitor/ethmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,10 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
return

case <-time.After(time.Duration(m.pollInterval.Load())):
nextBlock <- 0
select {
case nextBlock <- 0:
case <-m.ctx.Done():
}
}
}
}
Expand Down Expand Up @@ -474,12 +477,24 @@ func (m *Monitor) listenNewHead() <-chan uint64 {
if nextBlockNumber == 0 || latestBlockNum > nextBlockNumber {
// monitor is behind, so we just push to keep going without
// waiting on the nextBlock channel
ch <- nextBlockNumber
select {
case ch <- nextBlockNumber:
case <-m.ctx.Done():
return
}
continue
} else {
// wait for the next block
<-nextBlock
ch <- latestBlockNum
select {
case <-nextBlock:
case <-m.ctx.Done():
return
}
select {
case ch <- latestBlockNum:
case <-m.ctx.Done():
return
}
Comment on lines +493 to +497
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second select statement at line 493-497 is redundant and creates an unnecessary race condition. After successfully receiving from the nextBlock channel (line 488-492), the code immediately tries to send on ch. However, if the context is cancelled between these two select statements, the goroutine will return without sending on ch, which could cause the receiver to miss the block number. Consider combining these into a single select that handles both the receive and the send atomically, or ensure that the value is sent before checking context cancellation again.

Suggested change
select {
case ch <- latestBlockNum:
case <-m.ctx.Done():
return
}
// once we've observed the next block, always forward latestBlockNum
// to ch before checking context cancellation again
ch <- latestBlockNum

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

@xiam xiam Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment is accurate in the sense that the goroutine returns without sending on ch, but if we're shutting down that's acceptable. I don't think we should act on this comment, but I'd defer that decision to the reviewers.

}
}
}()
Expand Down Expand Up @@ -1008,7 +1023,10 @@ func (m *Monitor) publish(ctx context.Context, events Blocks) error {
// Publish events existing in the queue
pubEvents, ok := m.publishQueue.dequeue(maxBlockNum)
if ok {
m.publishCh <- pubEvents
select {
case m.publishCh <- pubEvents:
case <-m.ctx.Done():
}
}

return nil
Expand Down
66 changes: 66 additions & 0 deletions ethmonitor/goroutine_leak_poc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package ethmonitor_test

//go:generate mockgen -destination=internal/mocks/mock_provider.go -package=mocks github.com/0xsequence/ethkit/ethrpc RawInterface

import (
"context"
"fmt"
"math/big"
"runtime"
"testing"
"time"

"github.com/0xsequence/ethkit/ethmonitor"
"github.com/0xsequence/ethkit/ethmonitor/internal/mocks"
"go.uber.org/mock/gomock"
)

// TestMonitorShutdownNoGoroutineLeak verifies that the monitor shuts down cleanly without leaking goroutines.
func TestMonitorShutdownNoGoroutineLeak(t *testing.T) {
if testing.Short() {
t.Skip("Skipping in short mode")
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()

provider := mocks.NewMockRawInterface(ctrl)
provider.EXPECT().ChainID(gomock.Any()).Return(big.NewInt(1), nil).AnyTimes()
provider.EXPECT().IsStreamingEnabled().Return(false).AnyTimes()
provider.EXPECT().RawBlockByNumber(gomock.Any(), gomock.Any()).
Return(nil, fmt.Errorf("simulated network error")).AnyTimes()

baseline := runtime.NumGoroutine()

opts := ethmonitor.DefaultOptions
opts.PollingInterval = 10 * time.Millisecond
opts.Timeout = 50 * time.Millisecond

monitor, err := ethmonitor.NewMonitor(provider, opts)
if err != nil {
t.Fatalf("failed to create monitor: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())

done := make(chan error, 1)
go func() {
done <- monitor.Run(ctx)
}()

time.Sleep(200 * time.Millisecond)
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sleep duration of 200ms is arbitrary and may be insufficient on slower systems to reliably trigger the goroutine leak, or may be excessive on faster systems. Consider either increasing the duration to ensure the monitor enters its run loop, or using a more deterministic synchronization mechanism such as checking monitor.IsRunning() in a loop with a timeout.

Suggested change
time.Sleep(200 * time.Millisecond)
waitCtx, waitCancel := context.WithTimeout(context.Background(), time.Second)
defer waitCancel()
for {
if monitor.IsRunning() {
break
}
select {
case <-waitCtx.Done():
t.Fatalf("monitor did not start running within timeout: %v", waitCtx.Err())
case <-time.After(10 * time.Millisecond):
}
}

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not worth it in this context.

cancel()

select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Monitor.Run() didn't return within timeout")
}

runtime.GC()
time.Sleep(100 * time.Millisecond)

Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test doesn't explicitly call runtime.GC() and give goroutines time to be garbage collected before checking for leaks. The Go runtime may not immediately clean up exited goroutines. Consider adding runtime.GC() and a small sleep (e.g., 100ms) before the leak check to ensure more accurate results.

Suggested change
// Force garbage collection and give it a moment to complete before checking for leaks.
runtime.GC()
time.Sleep(100 * time.Millisecond)

Copilot uses AI. Check for mistakes.
Comment on lines +61 to +62
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sleep duration of 100ms after GC may be insufficient for goroutines to fully clean up. Goroutine leak detection is inherently racy, and a more robust approach would be to poll runtime.NumGoroutine() in a loop with a reasonable timeout (e.g., 1 second) until the count returns to baseline or the timeout expires.

Suggested change
time.Sleep(100 * time.Millisecond)
// Poll for goroutine count to return to baseline with a reasonable timeout
deadline := time.Now().Add(1 * time.Second)
for time.Now().Before(deadline) {
if runtime.NumGoroutine() <= baseline {
break
}
time.Sleep(10 * time.Millisecond)
}

Copilot uses AI. Check for mistakes.
if leaked := runtime.NumGoroutine() - baseline; leaked > 0 {
t.Fatalf("%d goroutine(s) leaked", leaked)
}
}
Loading
Loading