Skip to content

Commit 62c7316

Browse files
committed
Fix incorrect handling of token WaitTimeout.
Closes #210.
1 parent 7d628ed commit 62c7316

File tree

2 files changed

+19
-11
lines changed

2 files changed

+19
-11
lines changed

cmd/chirpstack-gateway-bridge/cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func init() {
6363
viper.SetDefault("integration.mqtt.state_retained", true)
6464
viper.SetDefault("integration.mqtt.keep_alive", 30*time.Second)
6565
viper.SetDefault("integration.mqtt.max_reconnect_interval", time.Minute)
66-
viper.SetDefault("integration.mqtt.max_token_wait", time.Second)
66+
viper.SetDefault("integration.mqtt.max_token_wait", 5*time.Second)
6767

6868
viper.SetDefault("integration.mqtt.auth.generic.servers", []string{"tcp://127.0.0.1:1883"})
6969
viper.SetDefault("integration.mqtt.auth.generic.clean_session", true)

internal/integration/mqtt/backend.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"text/template"
99
"time"
1010

11+
mqtt "github.com/eclipse/paho.mqtt.golang"
1112
paho "github.com/eclipse/paho.mqtt.golang"
1213
"github.com/pkg/errors"
1314
log "github.com/sirupsen/logrus"
@@ -286,8 +287,8 @@ func (b *Backend) subscribeGateway(gatewayID lorawan.EUI64) error {
286287
"qos": b.qos,
287288
}).Info("integration/mqtt: subscribing to topic")
288289

289-
if token := b.conn.Subscribe(topic.String(), b.qos, b.handleCommand); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
290-
return errors.Wrap(token.Error(), "subscribe topic error")
290+
if err := tokenWrapper(b.conn.Subscribe(topic.String(), b.qos, b.handleCommand), b.maxTokenWait); err != nil {
291+
return errors.Wrap(err, "subscribe topic error")
291292
}
292293

293294
log.WithFields(log.Fields{
@@ -307,8 +308,8 @@ func (b *Backend) unsubscribeGateway(gatewayID lorawan.EUI64) error {
307308
"topic": topic.String(),
308309
}).Info("integration/mqtt: unsubscribing from topic")
309310

310-
if token := b.conn.Unsubscribe(topic.String()); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
311-
return errors.Wrap(token.Error(), "unsubscribe topic error")
311+
if err := tokenWrapper(b.conn.Unsubscribe(topic.String()), b.maxTokenWait); err != nil {
312+
return errors.Wrap(err, "unsubscribe topic error")
312313
}
313314

314315
log.WithFields(log.Fields{
@@ -365,8 +366,8 @@ func (b *Backend) PublishState(gatewayID lorawan.EUI64, state string, v proto.Me
365366
"state": state,
366367
"gateway_id": gatewayID,
367368
}).Info("integration/mqtt: publishing state")
368-
if token := b.conn.Publish(topic.String(), b.qos, b.stateRetained, bytes); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
369-
return token.Error()
369+
if err := tokenWrapper(b.conn.Publish(topic.String(), b.qos, b.stateRetained, bytes), b.maxTokenWait); err != nil {
370+
return err
370371
}
371372
return nil
372373
}
@@ -380,8 +381,8 @@ func (b *Backend) connect() error {
380381
}
381382

382383
b.conn = paho.NewClient(b.clientOpts)
383-
if token := b.conn.Connect(); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
384-
return token.Error()
384+
if err := tokenWrapper(b.conn.Connect(), b.maxTokenWait); err != nil {
385+
return err
385386
}
386387

387388
return nil
@@ -661,8 +662,8 @@ func (b *Backend) publishEvent(gatewayID lorawan.EUI64, event string, fields log
661662
fields["event"] = event
662663

663664
log.WithFields(fields).Info("integration/mqtt: publishing event")
664-
if token := b.conn.Publish(topic.String(), b.qos, false, bytes); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
665-
return token.Error()
665+
if err := tokenWrapper(b.conn.Publish(topic.String(), b.qos, false, bytes), b.maxTokenWait); err != nil {
666+
return err
666667
}
667668
return nil
668669
}
@@ -673,3 +674,10 @@ func (b *Backend) isClosed() bool {
673674
defer b.connMux.RUnlock()
674675
return b.connClosed
675676
}
677+
678+
func tokenWrapper(token mqtt.Token, timeout time.Duration) error {
679+
if !token.WaitTimeout(timeout) {
680+
return errors.New("token wait timeout error")
681+
}
682+
return token.Error()
683+
}

0 commit comments

Comments
 (0)