Skip to content

Commit fbb2cfd

Browse files
committed
feat: add chainbound watchdog
1 parent a9a558a commit fbb2cfd

File tree

1 file changed

+72
-7
lines changed

1 file changed

+72
-7
lines changed

collector/node_conn_chainbound.go

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ package collector
55

66
import (
77
"context"
8+
"sync"
89
"time"
910

1011
fiber "github.com/chainbound/fiber-go"
1112
"github.com/flashbots/mempool-dumpster/common"
1213
"go.uber.org/zap"
1314
)
1415

16+
const watchdogTimeout = 1 * time.Minute
17+
1518
type ChainboundNodeOpts struct {
1619
TxC chan common.TxIn
1720
Log *zap.SugaredLogger
@@ -28,6 +31,10 @@ type ChainboundNodeConnection struct {
2831
fiberC chan *fiber.TransactionWithSender
2932
txC chan common.TxIn
3033
backoffSec int
34+
35+
watchdog *time.Timer
36+
watchdogMu sync.Mutex
37+
watchdogStopC chan struct{}
3138
}
3239

3340
func NewChainboundNodeConnection(opts ChainboundNodeOpts) *ChainboundNodeConnection {
@@ -42,22 +49,75 @@ func NewChainboundNodeConnection(opts ChainboundNodeOpts) *ChainboundNodeConnect
4249
}
4350

4451
return &ChainboundNodeConnection{
45-
log: opts.Log.With("src", srcTag),
46-
apiKey: opts.APIKey,
47-
url: url,
48-
srcTag: srcTag,
49-
fiberC: make(chan *fiber.TransactionWithSender),
50-
txC: opts.TxC,
51-
backoffSec: initialBackoffSec,
52+
log: opts.Log.With("src", srcTag),
53+
apiKey: opts.APIKey,
54+
url: url,
55+
srcTag: srcTag,
56+
fiberC: make(chan *fiber.TransactionWithSender),
57+
txC: opts.TxC,
58+
backoffSec: initialBackoffSec,
59+
watchdogStopC: make(chan struct{}),
60+
}
61+
}
62+
63+
func (cbc *ChainboundNodeConnection) resetWatchdog() {
64+
cbc.watchdogMu.Lock()
65+
defer cbc.watchdogMu.Unlock()
66+
67+
if cbc.watchdog == nil {
68+
cbc.watchdog = time.NewTimer(watchdogTimeout)
69+
} else {
70+
if !cbc.watchdog.Stop() {
71+
select {
72+
case <-cbc.watchdog.C:
73+
default:
74+
}
75+
}
76+
cbc.watchdog.Reset(watchdogTimeout)
77+
}
78+
}
79+
80+
func (cbc *ChainboundNodeConnection) startWatchdog() {
81+
cbc.resetWatchdog()
82+
83+
go func() {
84+
for {
85+
select {
86+
case <-cbc.watchdog.C:
87+
cbc.log.Warn("watchdog timeout: no transactions received for 1 minute, reconnecting...")
88+
cbc.reconnect()
89+
return
90+
case <-cbc.watchdogStopC:
91+
cbc.log.Debug("watchdog stopped")
92+
return
93+
}
94+
}
95+
}()
96+
}
97+
98+
func (cbc *ChainboundNodeConnection) shutdownWatchdog() {
99+
cbc.watchdogMu.Lock()
100+
defer cbc.watchdogMu.Unlock()
101+
102+
if cbc.watchdog != nil {
103+
cbc.watchdog.Stop()
104+
}
105+
106+
select {
107+
case cbc.watchdogStopC <- struct{}{}:
108+
default:
52109
}
53110
}
54111

55112
func (cbc *ChainboundNodeConnection) Start() {
56113
cbc.log.Debug("chainbound stream starting...")
57114
cbc.fiberC = make(chan *fiber.TransactionWithSender)
115+
cbc.watchdogStopC = make(chan struct{})
58116
go cbc.connect()
117+
cbc.startWatchdog()
59118

60119
for fiberTx := range cbc.fiberC {
120+
cbc.resetWatchdog()
61121
cbc.txC <- common.TxIn{
62122
T: time.Now().UTC(),
63123
Tx: fiberTx.Transaction,
@@ -69,6 +129,11 @@ func (cbc *ChainboundNodeConnection) Start() {
69129
}
70130

71131
func (cbc *ChainboundNodeConnection) reconnect() {
132+
cbc.shutdownWatchdog()
133+
134+
// Close the existing fiber channel to break out of the loop in Start()
135+
close(cbc.fiberC)
136+
72137
backoffDuration := time.Duration(cbc.backoffSec) * time.Second
73138
cbc.log.Infof("reconnecting to chainbound in %s sec ...", backoffDuration.String())
74139
time.Sleep(backoffDuration)

0 commit comments

Comments
 (0)