Skip to content

Commit f05e217

Browse files
committed
replace mqtt library "eclipse/paho" with "goiiot/libmqtt"
1 parent 5c84500 commit f05e217

File tree

11 files changed

+134
-98
lines changed

11 files changed

+134
-98
lines changed

actor/mqtt.go

+7-17
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@ package actor
22

33
import (
44
"bytes"
5-
mqtt "github.com/eclipse/paho.mqtt.golang"
5+
"context"
66
"io"
77
)
88

9+
type MQTTPublisher interface {
10+
Publish(ctx context.Context, topic string, qos byte, retained bool, payload []byte) error
11+
}
12+
913
type Mqtt struct {
10-
Client mqtt.Client
14+
Client MQTTPublisher
1115
Topic string
1216
QOS byte
1317
Retained bool
@@ -21,19 +25,5 @@ func (m *Mqtt) Do(ctx Context) error {
2125
return err
2226
}
2327

24-
token := m.Client.Publish(m.Topic, m.QOS, m.Retained, byteBuff.Bytes())
25-
26-
waitChan := make(chan error, 1)
27-
28-
go func() {
29-
token.Wait()
30-
waitChan <- token.Error()
31-
}()
32-
33-
select {
34-
case err := <-waitChan:
35-
return err
36-
case <-ctx.Context.Done():
37-
return ctx.Context.Err()
38-
}
28+
return m.Client.Publish(ctx.Context, m.Topic, m.QOS, m.Retained, byteBuff.Bytes())
3929
}

cmd/lst/config/setup.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package config
22

33
import (
4+
"context"
45
"github.com/rainu/launchpad-super-trigger/config"
56
configActor "github.com/rainu/launchpad-super-trigger/config/actor"
67
connectionMqtt "github.com/rainu/launchpad-super-trigger/config/connection/mqtt"
@@ -28,8 +29,8 @@ func ConfigureDispatcher(configReader ...io.Reader) (*pad.TriggerDispatcher, map
2829

2930
//establish mqtt connections after callbacks were registered
3031
for _, connection := range connections {
31-
if token := connection.Connect(); token.Wait() && token.Error() != nil {
32-
zap.L().Fatal("Error while connecting to mqtt broker: %s", zap.Error(token.Error()))
32+
if err := connection.Connect(context.Background()); err != nil {
33+
zap.L().Fatal("Error while connecting to mqtt broker: %s", zap.Error(err))
3334
}
3435
}
3536

cmd/lst/main.go

-6
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"context"
55
"fmt"
6-
MQTT "github.com/eclipse/paho.mqtt.golang"
76
"github.com/rainu/launchpad-super-trigger/cmd/lst/config"
87
triggerConf "github.com/rainu/launchpad-super-trigger/config"
98
launchpad "github.com/rainu/launchpad-super-trigger/pad"
@@ -31,11 +30,6 @@ func main() {
3130
zap.ReplaceGlobals(logger)
3231
defer zap.L().Sync()
3332

34-
MQTT.ERROR, _ = zap.NewStdLogAt(zap.L(), zap.ErrorLevel)
35-
MQTT.CRITICAL, _ = zap.NewStdLogAt(zap.L(), zap.ErrorLevel)
36-
MQTT.WARN, _ = zap.NewStdLogAt(zap.L(), zap.WarnLevel)
37-
MQTT.DEBUG, _ = zap.NewStdLogAt(zap.L(), zap.DebugLevel)
38-
3933
//start pprof if needed
4034
if *Args.DebugPort > 0 {
4135
zap.L().Info(fmt.Sprintf("Start pprof debug endpoint :%d", *Args.DebugPort))

config/connection/mqtt/builder.go

+17-12
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package mqtt
22

33
import (
4-
MQTT "github.com/eclipse/paho.mqtt.golang"
4+
"context"
5+
MQTT "github.com/goiiot/libmqtt"
56
"github.com/rainu/launchpad-super-trigger/config"
7+
"go.uber.org/zap"
68
)
79

810
type Client interface {
9-
MQTT.Client
11+
Connect(ctx context.Context) error
12+
Publish(ctx context.Context, topic string, qos byte, retained bool, payload []byte) error
13+
Subscribe(topic string, qos byte, clb func(string, byte, []byte))
1014
AddListener(cl ConnectionListener)
1115
}
1216

@@ -25,17 +29,18 @@ func buildMqttConnection(connection config.MQTTConnection) Client {
2529
conObserver: connectionObserver{},
2630
}
2731

28-
opts := MQTT.NewClientOptions()
29-
opts.AddBroker(connection.Broker)
30-
31-
opts.SetAutoReconnect(true)
32-
opts.SetClientID(connection.ClientId)
33-
opts.SetUsername(connection.Username)
34-
opts.SetPassword(connection.Password)
35-
opts.SetOnConnectHandler(result.conObserver.OnConnectHandler())
36-
opts.SetConnectionLostHandler(result.conObserver.ConnectionLostHandler())
32+
client, err := MQTT.NewClient(
33+
MQTT.WithClientID(connection.ClientId),
34+
MQTT.WithIdentity(connection.Username, connection.Password),
35+
MQTT.WithCleanSession(true),
36+
MQTT.WithAutoReconnect(true),
37+
)
38+
if err != nil {
39+
zap.L().Fatal("Error build connecting to mqtt broker: %s", zap.Error(err))
40+
}
3741

38-
result.client = MQTT.NewClient(opts)
42+
result.broker = connection.Broker
43+
result.client = client
3944
result.conObserver.AddListener(&logConnectionListener{}) //for logging purposes
4045

4146
return result

config/connection/mqtt/client.go

+47-26
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,69 @@
11
package mqtt
22

3-
import MQTT "github.com/eclipse/paho.mqtt.golang"
3+
import (
4+
"context"
5+
MQTT "github.com/goiiot/libmqtt"
6+
)
47

58
type mqttClient struct {
9+
broker string
610
client MQTT.Client
711
conObserver connectionObserver
812
}
913

10-
func (m *mqttClient) IsConnected() bool {
11-
return m.client.IsConnected()
14+
type connectionListener struct {
15+
errChan chan error
1216
}
1317

14-
func (m *mqttClient) IsConnectionOpen() bool {
15-
return m.client.IsConnectionOpen()
18+
func (c *connectionListener) OnConnect(client MQTT.Client) {
19+
c.errChan <- nil
1620
}
1721

18-
func (m *mqttClient) Connect() MQTT.Token {
19-
return m.client.Connect()
22+
func (c *connectionListener) OnConnectionLost(client MQTT.Client, err error) {
23+
c.errChan <- err
2024
}
2125

22-
func (m *mqttClient) Disconnect(quiesce uint) {
23-
m.client.Disconnect(quiesce)
24-
}
26+
func (m *mqttClient) Connect(ctx context.Context) error {
27+
errChan := make(chan error, 1)
28+
defer close(errChan)
2529

26-
func (m *mqttClient) Publish(topic string, qos byte, retained bool, payload interface{}) MQTT.Token {
27-
return m.client.Publish(topic, qos, retained, payload)
28-
}
30+
listener := &connectionListener{errChan}
31+
defer func() {
32+
//we can remove our listener -> the only purpose was to wait for connection!
33+
m.conObserver.RemoveListener(listener)
34+
}()
2935

30-
func (m *mqttClient) Subscribe(topic string, qos byte, callback MQTT.MessageHandler) MQTT.Token {
31-
return m.client.Subscribe(topic, qos, callback)
32-
}
33-
34-
func (m *mqttClient) SubscribeMultiple(filters map[string]byte, callback MQTT.MessageHandler) MQTT.Token {
35-
return m.client.SubscribeMultiple(filters, callback)
36-
}
36+
m.conObserver.AddListener(listener)
37+
err := m.client.ConnectServer(m.broker, MQTT.WithConnHandleFunc(m.conObserver.OnConnectHandler()))
38+
if err != nil {
39+
return err
40+
}
3741

38-
func (m *mqttClient) Unsubscribe(topics ...string) MQTT.Token {
39-
return m.client.Unsubscribe(topics...)
42+
//wait until connection established
43+
select {
44+
case err := <-errChan:
45+
return err
46+
case <-ctx.Done():
47+
return ctx.Err()
48+
}
4049
}
4150

42-
func (m *mqttClient) AddRoute(topic string, callback MQTT.MessageHandler) {
43-
m.client.AddRoute(topic, callback)
51+
func (m *mqttClient) Subscribe(topic string, qos byte, clb func(string, byte, []byte)) {
52+
m.client.HandleTopic(topic, func(client MQTT.Client, topic string, qos MQTT.QosLevel, msg []byte) {
53+
clb(topic, qos, msg)
54+
})
55+
m.client.Subscribe(&MQTT.Topic{
56+
Name: topic,
57+
Qos: qos,
58+
})
4459
}
4560

46-
func (m *mqttClient) OptionsReader() MQTT.ClientOptionsReader {
47-
return m.client.OptionsReader()
61+
func (m *mqttClient) Publish(ctx context.Context, topic string, qos byte, retained bool, payload []byte) error {
62+
m.client.Publish(&MQTT.PublishPacket{
63+
TopicName: topic,
64+
Qos: qos,
65+
IsRetain: retained,
66+
Payload: payload,
67+
})
68+
return nil
4869
}

config/connection/mqtt/log.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package mqtt
22

33
import (
4-
MQTT "github.com/eclipse/paho.mqtt.golang"
4+
MQTT "github.com/goiiot/libmqtt"
55
"go.uber.org/zap"
66
)
77

config/connection/mqtt/observer.go

+21-18
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package mqtt
22

33
import (
4-
MQTT "github.com/eclipse/paho.mqtt.golang"
4+
MQTT "github.com/goiiot/libmqtt"
55
"sync"
66
)
77

@@ -17,38 +17,41 @@ type ConnectionListener interface {
1717
type connectionObserver struct {
1818
listenerMutex sync.RWMutex
1919
eventMutex sync.Mutex
20-
listener []ConnectionListener
20+
listener map[interface{}]ConnectionListener
2121
}
2222

2323
func (c *connectionObserver) AddListener(cl ConnectionListener) {
2424
c.listenerMutex.Lock()
2525
defer c.listenerMutex.Unlock()
2626

27-
c.listener = append(c.listener, cl)
28-
}
27+
if c.listener == nil {
28+
c.listener = map[interface{}]ConnectionListener{}
29+
}
2930

30-
func (c *connectionObserver) OnConnectHandler() MQTT.OnConnectHandler {
31-
return func(client MQTT.Client) {
32-
c.listenerMutex.RLock()
33-
c.eventMutex.Lock()
34-
defer c.listenerMutex.RUnlock()
35-
defer c.eventMutex.Unlock()
31+
c.listener[cl] = cl
32+
}
33+
func (c *connectionObserver) RemoveListener(cl ConnectionListener) {
34+
c.listenerMutex.Lock()
35+
defer c.listenerMutex.Unlock()
3636

37-
for _, listener := range c.listener {
38-
listener.OnConnect(client)
39-
}
40-
}
37+
delete(c.listener, cl)
4138
}
4239

43-
func (c *connectionObserver) ConnectionLostHandler() MQTT.ConnectionLostHandler {
44-
return func(client MQTT.Client, err error) {
40+
func (c *connectionObserver) OnConnectHandler() MQTT.ConnHandleFunc {
41+
return func(client MQTT.Client, server string, code byte, err error) {
4542
c.listenerMutex.RLock()
4643
c.eventMutex.Lock()
4744
defer c.listenerMutex.RUnlock()
4845
defer c.eventMutex.Unlock()
4946

50-
for _, listener := range c.listener {
51-
listener.OnConnectionLost(client, err)
47+
if code == MQTT.CodeSuccess {
48+
for _, listener := range c.listener {
49+
listener.OnConnect(client)
50+
}
51+
} else {
52+
for _, listener := range c.listener {
53+
listener.OnConnectionLost(client, err)
54+
}
5255
}
5356
}
5457
}

config/sensor/mqtt.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package sensor
22

33
import (
44
"fmt"
5-
MQTT "github.com/eclipse/paho.mqtt.golang"
5+
MQTT "github.com/goiiot/libmqtt"
66
"github.com/rainu/launchpad-super-trigger/config"
77
"github.com/rainu/launchpad-super-trigger/config/connection/mqtt"
88
"github.com/rainu/launchpad-super-trigger/sensor"

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ go 1.14
44

55
require (
66
github.com/boltdb/bolt v1.3.1
7-
github.com/eclipse/paho.mqtt.golang v1.2.0
87
github.com/go-playground/universal-translator v0.17.0 // indirect
8+
github.com/goiiot/libmqtt v0.9.5
99
github.com/itchyny/gojq v0.11.0
1010
github.com/leodido/go-urn v1.2.0 // indirect
1111
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect

0 commit comments

Comments
 (0)