Skip to content

Commit dcb35bb

Browse files
committed
Make zmq socket reconnect more robust for concentratord backend.
1 parent 9fb00e1 commit dcb35bb

File tree

1 file changed

+44
-11
lines changed

1 file changed

+44
-11
lines changed

internal/backend/concentratord/concentratord.go

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ import (
1919

2020
// Backend implements a ConcentratorD backend.
2121
type Backend struct {
22-
eventSock zmq4.Socket
23-
commandSock zmq4.Socket
24-
commandMux sync.Mutex
22+
eventSockCancel func()
23+
commandSockCancel func()
24+
eventSock zmq4.Socket
25+
commandSock zmq4.Socket
26+
commandMux sync.Mutex
2527

2628
downlinkTXAckChan chan gw.DownlinkTXAck
2729
uplinkFrameChan chan gw.UplinkFrame
@@ -45,9 +47,6 @@ func NewBackend(conf config.Config) (*Backend, error) {
4547
}).Info("backend/concentratord: setting up backend")
4648

4749
b := Backend{
48-
eventSock: zmq4.NewSub(context.Background()),
49-
commandSock: zmq4.NewReq(context.Background()),
50-
5150
downlinkTXAckChan: make(chan gw.DownlinkTXAck, 1),
5251
uplinkFrameChan: make(chan gw.UplinkFrame, 1),
5352
gatewayStatsChan: make(chan gw.GatewayStats, 1),
@@ -62,6 +61,12 @@ func NewBackend(conf config.Config) (*Backend, error) {
6261
b.dialEventSockLoop()
6362
b.dialCommandSockLoop()
6463

64+
var err error
65+
b.gatewayID, err = b.getGatewayID()
66+
if err != nil {
67+
return nil, errors.Wrap(err, "get gateway id error")
68+
}
69+
6570
b.subscribeEventChan <- events.Subscribe{Subscribe: true, GatewayID: b.gatewayID}
6671

6772
go b.eventLoop()
@@ -70,6 +75,10 @@ func NewBackend(conf config.Config) (*Backend, error) {
7075
}
7176

7277
func (b *Backend) dialEventSock() error {
78+
ctx := context.Background()
79+
ctx, b.eventSockCancel = context.WithCancel(ctx)
80+
81+
b.eventSock = zmq4.NewSub(ctx)
7382
err := b.eventSock.Dial(b.eventURL)
7483
if err != nil {
7584
return errors.Wrap(err, "dial event api url error")
@@ -80,19 +89,26 @@ func (b *Backend) dialEventSock() error {
8089
return errors.Wrap(err, "set event option error")
8190
}
8291

92+
log.WithFields(log.Fields{
93+
"event_url": b.eventURL,
94+
}).Info("backend/concentratord: connected to event socket")
95+
8396
return nil
8497
}
8598

8699
func (b *Backend) dialCommandSock() error {
100+
ctx := context.Background()
101+
ctx, b.commandSockCancel = context.WithCancel(ctx)
102+
103+
b.commandSock = zmq4.NewReq(ctx)
87104
err := b.commandSock.Dial(b.commandURL)
88105
if err != nil {
89106
return errors.Wrap(err, "dial command api url error")
90107
}
91108

92-
b.gatewayID, err = b.getGatewayID()
93-
if err != nil {
94-
return errors.Wrap(err, "get gateway id error")
95-
}
109+
log.WithFields(log.Fields{
110+
"command_url": b.eventURL,
111+
}).Info("backend/concentratord: connected to command socket")
96112

97113
return nil
98114
}
@@ -135,6 +151,11 @@ func (b *Backend) getGatewayID() (lorawan.EUI64, error) {
135151
// Close closes the backend.
136152
func (b *Backend) Close() error {
137153
b.eventSock.Close()
154+
b.commandSock.Close()
155+
156+
b.eventSockCancel()
157+
b.commandSockCancel()
158+
138159
return nil
139160
}
140161

@@ -223,12 +244,14 @@ func (b *Backend) commandRequest(command string, v proto.Message) ([]byte, error
223244

224245
msg := zmq4.NewMsgFrom([]byte(command), bb)
225246
if err = b.commandSock.SendMulti(msg); err != nil {
247+
b.commandSockCancel()
226248
b.dialCommandSock()
227249
return nil, errors.Wrap(err, "send command request error")
228250
}
229251

230252
reply, err := b.commandSock.Recv()
231253
if err != nil {
254+
b.commandSockCancel()
232255
b.dialCommandSock()
233256
return nil, errors.Wrap(err, "receive command request reply error")
234257
}
@@ -241,7 +264,17 @@ func (b *Backend) eventLoop() {
241264
msg, err := b.eventSock.Recv()
242265
if err != nil {
243266
log.WithError(err).Error("backend/concentratord: receive event message error")
244-
b.dialEventSockLoop()
267+
268+
// We need to recover both the event and command sockets.
269+
func() {
270+
b.commandMux.Lock()
271+
defer b.commandMux.Unlock()
272+
273+
b.eventSockCancel()
274+
b.commandSockCancel()
275+
b.dialEventSockLoop()
276+
b.dialCommandSockLoop()
277+
}()
245278
continue
246279
}
247280

0 commit comments

Comments
 (0)