Skip to content

Commit 841ecf0

Browse files
committed
use exponential backoff in reconnection code
1 parent cc44c65 commit 841ecf0

File tree

5 files changed

+118
-41
lines changed

5 files changed

+118
-41
lines changed

client.go

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"encoding/base64"
77
"encoding/json"
88
"fmt"
9-
"math"
10-
"math/rand"
119
"net/http"
1210
"reflect"
1311
"sync/atomic"
@@ -86,7 +84,7 @@ type client struct {
8684
// NewMergeClient is like NewClient, but allows to specify multiple structs
8785
// to be filled in the same namespace, using one connection
8886
func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header, opts ...Option) (ClientCloser, error) {
89-
var config Config
87+
config := defaultConfig
9088
for _, o := range opts {
9189
o(&config)
9290
}
@@ -116,13 +114,13 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe
116114

117115
handlers := map[string]rpcHandler{}
118116
go (&wsConn{
119-
conn: conn,
120-
connFactory: connFactory,
121-
reconnectInterval: config.ReconnectInterval,
122-
handler: handlers,
123-
requests: c.requests,
124-
stop: stop,
125-
exiting: exiting,
117+
conn: conn,
118+
connFactory: connFactory,
119+
reconnectBackoff: config.reconnectBackoff,
120+
handler: handlers,
121+
requests: c.requests,
122+
stop: stop,
123+
exiting: exiting,
126124
}).handleWsConn(context.TODO())
127125

128126
for _, handler := range outs {
@@ -386,6 +384,11 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
386384
}
387385
}
388386

387+
b := backoff{
388+
maxDelay: methodMaxRetryDelay,
389+
minDelay: methodMinRetryDelay,
390+
}
391+
389392
var resp clientResponse
390393
var err error
391394
// keep retrying if got a forced closed websocket conn and calling method
@@ -418,30 +421,12 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
418421
break
419422
}
420423

421-
time.Sleep(backoff(attempt))
424+
time.Sleep(b.next(attempt))
422425
}
423426

424427
return fn.processResponse(resp, retVal())
425428
}
426429

427-
func backoff(attempt int) time.Duration {
428-
if attempt < 0 {
429-
return methodMinRetryDelay
430-
}
431-
432-
minf := float64(methodMinRetryDelay)
433-
durf := minf * math.Pow(1.5, float64(attempt))
434-
durf = durf + rand.Float64()*minf
435-
436-
delay := time.Duration(durf)
437-
438-
if delay > methodMaxRetryDelay {
439-
return methodMaxRetryDelay
440-
}
441-
442-
return delay
443-
}
444-
445430
func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) {
446431
ftyp := f.Type
447432
if ftyp.Kind() != reflect.Func {

options.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,25 @@ import (
77
)
88

99
type Config struct {
10-
ReconnectInterval time.Duration
10+
reconnectBackoff backoff
1111

1212
proxyConnFactory func(func() (*websocket.Conn, error)) func() (*websocket.Conn, error) // for testing
1313
}
1414

1515
var defaultConfig = Config{
16-
ReconnectInterval: time.Second * 5,
16+
reconnectBackoff: backoff{
17+
minDelay: 100 * time.Millisecond,
18+
maxDelay: 5 * time.Second,
19+
},
1720
}
1821

1922
type Option func(c *Config)
2023

21-
func WithReconnectInterval(d time.Duration) func(c *Config) {
24+
func WithReconnectBackoff(minDelay, maxDelay time.Duration) func(c *Config) {
2225
return func(c *Config) {
23-
c.ReconnectInterval = d
26+
c.reconnectBackoff = backoff{
27+
minDelay: minDelay,
28+
maxDelay: maxDelay,
29+
}
2430
}
2531
}

rpc_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/gorilla/websocket"
1616
logging "github.com/ipfs/go-log/v2"
17+
"github.com/stretchr/testify/assert"
1718
"github.com/stretchr/testify/require"
1819
)
1920

@@ -62,6 +63,63 @@ func (h *SimpleServerHandler) StringMatch(t TestType, i2 int64) (out TestOut, er
6263
return
6364
}
6465

66+
func TestReconnection(t *testing.T) {
67+
var rpcClient struct {
68+
Add func(int) error
69+
}
70+
71+
rpcHandler := SimpleServerHandler{}
72+
73+
rpcServer := NewServer()
74+
rpcServer.Register("SimpleServerHandler", &rpcHandler)
75+
76+
testServ := httptest.NewServer(rpcServer)
77+
defer testServ.Close()
78+
79+
// capture connection attempts for this duration
80+
captureDuration := 3 * time.Second
81+
82+
// run the test until the timer expires
83+
timer := time.NewTimer(captureDuration)
84+
85+
// record the number of connection attempts during this test
86+
connectionAttempts := 1
87+
88+
closer, err := NewMergeClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", []interface{}{&rpcClient}, nil, func(c *Config) {
89+
c.proxyConnFactory = func(f func() (*websocket.Conn, error)) func() (*websocket.Conn, error) {
90+
return func() (*websocket.Conn, error) {
91+
defer func() {
92+
connectionAttempts++
93+
}()
94+
95+
if connectionAttempts > 1 {
96+
return nil, errors.New("simulates a failed reconnect attempt")
97+
}
98+
99+
c, err := f()
100+
if err != nil {
101+
return nil, err
102+
}
103+
104+
// closing the connection here triggers the reconnect logic
105+
_ = c.Close()
106+
107+
return c, nil
108+
}
109+
}
110+
})
111+
require.NoError(t, err)
112+
defer closer()
113+
114+
// let the JSON-RPC library attempt to reconnect until the timer runs out
115+
<-timer.C
116+
117+
// do some math
118+
attemptsPerSecond := int64(connectionAttempts) / int64(captureDuration/time.Second)
119+
120+
assert.Less(t, attemptsPerSecond, int64(50))
121+
}
122+
65123
func TestRPC(t *testing.T) {
66124
// setup server
67125

util.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package jsonrpc
33
import (
44
"encoding/json"
55
"fmt"
6+
"math"
7+
"math/rand"
68
"reflect"
9+
"time"
710
)
811

912
type param struct {
@@ -53,3 +56,26 @@ func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) {
5356

5457
return
5558
}
59+
60+
type backoff struct {
61+
minDelay time.Duration
62+
maxDelay time.Duration
63+
}
64+
65+
func (b *backoff) next(attempt int) time.Duration {
66+
if attempt < 0 {
67+
return b.minDelay
68+
}
69+
70+
minf := float64(b.minDelay)
71+
durf := minf * math.Pow(1.5, float64(attempt))
72+
durf = durf + rand.Float64()*minf
73+
74+
delay := time.Duration(durf)
75+
76+
if delay > b.maxDelay {
77+
return b.maxDelay
78+
}
79+
80+
return delay
81+
}

websocket.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ type outChanReg struct {
4343

4444
type wsConn struct {
4545
// outside params
46-
conn *websocket.Conn
47-
connFactory func() (*websocket.Conn, error)
48-
reconnectInterval time.Duration
49-
handler handlers
50-
requests <-chan clientRequest
51-
stop <-chan struct{}
52-
exiting chan struct{}
46+
conn *websocket.Conn
47+
connFactory func() (*websocket.Conn, error)
48+
reconnectBackoff backoff
49+
handler handlers
50+
requests <-chan clientRequest
51+
stop <-chan struct{}
52+
exiting chan struct{}
5353

5454
// incoming messages
5555
incoming chan io.Reader
@@ -490,13 +490,15 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
490490
return
491491
}
492492

493+
attempts := 0
493494
var conn *websocket.Conn
494495
for conn == nil {
495-
time.Sleep(c.reconnectInterval)
496+
time.Sleep(c.reconnectBackoff.next(attempts))
496497
var err error
497498
if conn, err = c.connFactory(); err != nil {
498499
log.Debugw("websocket connection retry failed", "error", err)
499500
}
501+
attempts++
500502
}
501503

502504
c.writeLk.Lock()

0 commit comments

Comments
 (0)