Skip to content

Commit 28a307d

Browse files
authored
Merge pull request #44 from filecoin-project/fix/client-reconnect
Fix client reconnect
2 parents ecb0f58 + d7b9874 commit 28a307d

File tree

3 files changed

+78
-60
lines changed

3 files changed

+78
-60
lines changed

client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ var (
3232

3333
log = logging.Logger("rpc")
3434

35-
_defaultHTTPClient = &http.Client{
35+
_defaultHTTPClient = &http.Client{
3636
Transport: &http.Transport{
3737
Proxy: http.ProxyFromEnvironment,
3838
DialContext: (&net.Dialer{

server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const (
1919

2020
// RPCServer provides a jsonrpc 2.0 http server handler
2121
type RPCServer struct {
22-
methods map[string]rpcHandler
22+
methods map[string]rpcHandler
2323

2424
// aliasedMethods contains a map of alias:original method names.
2525
// These are used as fallbacks if a method is not found by the given method name.

websocket.go

+76-58
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type wsConn struct {
5151
handler *RPCServer
5252
requests <-chan clientRequest
5353
pongs chan struct{}
54+
stopPings func()
5455
stop <-chan struct{}
5556
exiting chan struct{}
5657

@@ -511,6 +512,50 @@ func (c *wsConn) setupPings() func() {
511512
}
512513
}
513514

515+
// returns true if reconnected
516+
func (c *wsConn) tryReconnect(ctx context.Context) bool {
517+
if c.connFactory == nil { // server side
518+
return false
519+
}
520+
521+
// connection dropped unexpectedly, do our best to recover it
522+
c.closeInFlight()
523+
c.closeChans()
524+
c.incoming = make(chan io.Reader) // listen again for responses
525+
go func() {
526+
c.stopPings()
527+
528+
attempts := 0
529+
var conn *websocket.Conn
530+
for conn == nil {
531+
time.Sleep(c.reconnectBackoff.next(attempts))
532+
var err error
533+
if conn, err = c.connFactory(); err != nil {
534+
log.Debugw("websocket connection retry failed", "error", err)
535+
}
536+
select {
537+
case <-ctx.Done():
538+
break
539+
default:
540+
continue
541+
}
542+
attempts++
543+
}
544+
545+
c.writeLk.Lock()
546+
c.conn = conn
547+
c.incomingErr = nil
548+
549+
c.stopPings = c.setupPings()
550+
551+
c.writeLk.Unlock()
552+
553+
go c.nextMessage()
554+
}()
555+
556+
return true
557+
}
558+
514559
func (c *wsConn) handleWsConn(ctx context.Context) {
515560
c.incoming = make(chan io.Reader)
516561
c.inflight = map[int64]clientRequest{}
@@ -530,8 +575,8 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
530575

531576
// setup pings
532577

533-
stopPings := c.setupPings()
534-
defer stopPings()
578+
c.stopPings = c.setupPings()
579+
defer c.stopPings()
535580

536581
var timeoutTimer *time.Timer
537582
if c.timeout != 0 {
@@ -545,7 +590,10 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
545590
var timeoutCh <-chan time.Time
546591
if timeoutTimer != nil {
547592
if !timeoutTimer.Stop() {
548-
<-timeoutTimer.C
593+
select {
594+
case <-timeoutTimer.C:
595+
default:
596+
}
549597
}
550598
timeoutTimer.Reset(c.timeout)
551599

@@ -554,65 +602,30 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
554602

555603
select {
556604
case r, ok := <-c.incoming:
557-
if !ok {
558-
if c.incomingErr != nil {
559-
if !websocket.IsCloseError(c.incomingErr, websocket.CloseNormalClosure) {
560-
log.Debugw("websocket error", "error", c.incomingErr)
561-
// connection dropped unexpectedly, do our best to recover it
562-
c.closeInFlight()
563-
c.closeChans()
564-
c.incoming = make(chan io.Reader) // listen again for responses
565-
go func() {
566-
if c.connFactory == nil { // likely the server side, don't try to reconnect
567-
return
568-
}
569-
570-
stopPings()
571-
572-
attempts := 0
573-
var conn *websocket.Conn
574-
for conn == nil {
575-
time.Sleep(c.reconnectBackoff.next(attempts))
576-
var err error
577-
if conn, err = c.connFactory(); err != nil {
578-
log.Debugw("websocket connection retry failed", "error", err)
579-
}
580-
select {
581-
case <-ctx.Done():
582-
break
583-
default:
584-
continue
585-
}
586-
attempts++
587-
}
588-
589-
c.writeLk.Lock()
590-
c.conn = conn
591-
c.incomingErr = nil
592-
593-
stopPings = c.setupPings()
594-
595-
c.writeLk.Unlock()
596-
597-
go c.nextMessage()
598-
}()
599-
continue
600-
}
605+
err := c.incomingErr
606+
607+
if ok {
608+
// debug util - dump all messages to stderr
609+
// r = io.TeeReader(r, os.Stderr)
610+
611+
var frame frame
612+
err = json.NewDecoder(r).Decode(&frame)
613+
if err == nil {
614+
c.handleFrame(ctx, frame)
615+
go c.nextMessage()
616+
continue
601617
}
602-
return // remote closed
603618
}
604619

605-
// debug util - dump all messages to stderr
606-
// r = io.TeeReader(r, os.Stderr)
607-
608-
var frame frame
609-
if err := json.NewDecoder(r).Decode(&frame); err != nil {
610-
log.Error("handle me:", err)
611-
return
620+
if err == nil {
621+
return // remote closed
612622
}
613623

614-
c.handleFrame(ctx, frame)
615-
go c.nextMessage()
624+
log.Errorw("websocket error", "error", err)
625+
// only client needs to reconnect
626+
if !c.tryReconnect(ctx) {
627+
return // failed to reconnect
628+
}
616629
case req := <-c.requests:
617630
c.writeLk.Lock()
618631
if req.req.ID != nil {
@@ -652,7 +665,12 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
652665
}
653666
c.writeLk.Unlock()
654667
log.Errorw("Connection timeout", "remote", c.conn.RemoteAddr())
655-
return
668+
// The server side does not perform the reconnect operation, so need to exit
669+
if c.connFactory == nil {
670+
return
671+
}
672+
// The client performs the reconnect operation, and if it exits it cannot start a handleWsConn again, so it does not need to exit
673+
continue
656674
case <-c.stop:
657675
c.writeLk.Lock()
658676
cmsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")

0 commit comments

Comments
 (0)