Skip to content

Commit 4a65072

Browse files
committed
Fix incorrect handling of token WaitTimeout.
Closes #210.
1 parent c7a2543 commit 4a65072

File tree

2 files changed

+19
-11
lines changed

2 files changed

+19
-11
lines changed

cmd/chirpstack-gateway-bridge/cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func init() {
6363
viper.SetDefault("integration.mqtt.state_retained", true)
6464
viper.SetDefault("integration.mqtt.keep_alive", 30*time.Second)
6565
viper.SetDefault("integration.mqtt.max_reconnect_interval", time.Minute)
66-
viper.SetDefault("integration.mqtt.max_token_wait", time.Second)
66+
viper.SetDefault("integration.mqtt.max_token_wait", 5*time.Second)
6767

6868
viper.SetDefault("integration.mqtt.auth.generic.servers", []string{"tcp://127.0.0.1:1883"})
6969
viper.SetDefault("integration.mqtt.auth.generic.clean_session", true)

internal/integration/mqtt/backend.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"text/template"
99
"time"
1010

11+
mqtt "github.com/eclipse/paho.mqtt.golang"
1112
paho "github.com/eclipse/paho.mqtt.golang"
1213
"github.com/gofrs/uuid"
1314
"github.com/golang/protobuf/jsonpb"
@@ -290,8 +291,8 @@ func (b *Backend) subscribeGateway(gatewayID lorawan.EUI64) error {
290291
"qos": b.qos,
291292
}).Info("integration/mqtt: subscribing to topic")
292293

293-
if token := b.conn.Subscribe(topic.String(), b.qos, b.handleCommand); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
294-
return errors.Wrap(token.Error(), "subscribe topic error")
294+
if err := tokenWrapper(b.conn.Subscribe(topic.String(), b.qos, b.handleCommand), b.maxTokenWait); err != nil {
295+
return errors.Wrap(err, "subscribe topic error")
295296
}
296297

297298
log.WithFields(log.Fields{
@@ -311,8 +312,8 @@ func (b *Backend) unsubscribeGateway(gatewayID lorawan.EUI64) error {
311312
"topic": topic.String(),
312313
}).Info("integration/mqtt: unsubscribing from topic")
313314

314-
if token := b.conn.Unsubscribe(topic.String()); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
315-
return errors.Wrap(token.Error(), "unsubscribe topic error")
315+
if err := tokenWrapper(b.conn.Unsubscribe(topic.String()), b.maxTokenWait); err != nil {
316+
return errors.Wrap(err, "unsubscribe topic error")
316317
}
317318

318319
log.WithFields(log.Fields{
@@ -368,8 +369,8 @@ func (b *Backend) PublishState(gatewayID lorawan.EUI64, state string, v proto.Me
368369
"state": state,
369370
"gateway_id": gatewayID,
370371
}).Info("integration/mqtt: publishing state")
371-
if token := b.conn.Publish(topic.String(), b.qos, b.stateRetained, bytes); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
372-
return token.Error()
372+
if err := tokenWrapper(b.conn.Publish(topic.String(), b.qos, b.stateRetained, bytes), b.maxTokenWait); err != nil {
373+
return err
373374
}
374375
return nil
375376
}
@@ -383,8 +384,8 @@ func (b *Backend) connect() error {
383384
}
384385

385386
b.conn = paho.NewClient(b.clientOpts)
386-
if token := b.conn.Connect(); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
387-
return token.Error()
387+
if err := tokenWrapper(b.conn.Connect(), b.maxTokenWait); err != nil {
388+
return err
388389
}
389390

390391
return nil
@@ -676,8 +677,8 @@ func (b *Backend) publishEvent(gatewayID lorawan.EUI64, event string, fields log
676677
fields["event"] = event
677678

678679
log.WithFields(fields).Info("integration/mqtt: publishing event")
679-
if token := b.conn.Publish(topic.String(), b.qos, false, bytes); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
680-
return token.Error()
680+
if err := tokenWrapper(b.conn.Publish(topic.String(), b.qos, false, bytes), b.maxTokenWait); err != nil {
681+
return err
681682
}
682683
return nil
683684
}
@@ -688,3 +689,10 @@ func (b *Backend) isClosed() bool {
688689
defer b.connMux.RUnlock()
689690
return b.connClosed
690691
}
692+
693+
func tokenWrapper(token mqtt.Token, timeout time.Duration) error {
694+
if !token.WaitTimeout(timeout) {
695+
return errors.New("token wait timeout error")
696+
}
697+
return token.Error()
698+
}

0 commit comments

Comments
 (0)