Skip to content

Commit 06ea30d

Browse files
committed
Implement raw events and commands for BasicStation backend.
1 parent 7b8d7b2 commit 06ea30d

File tree

10 files changed

+295
-22
lines changed

10 files changed

+295
-22
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
1313
github.com/goreleaser/goreleaser v0.106.0
1414
github.com/goreleaser/nfpm v0.11.0
15-
github.com/gorilla/websocket v1.4.0
15+
github.com/gorilla/websocket v1.4.1
1616
github.com/jacobsa/crypto v0.0.0-20190317225127-9f44e2d11115 // indirect
1717
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
1818
github.com/pkg/errors v0.8.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ github.com/goreleaser/nfpm v0.11.0 h1:YJ3wyfTJbqHrE3Ym0b4odQYzGuLdemx09wPLafeZAK
8989
github.com/goreleaser/nfpm v0.11.0/go.mod h1:F2yzin6cBAL9gb+mSiReuXdsfTrOQwDMsuSpULof+y4=
9090
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
9191
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
92+
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
93+
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
9294
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
9395
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
9496
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=

internal/backend/backend.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55

66
"github.com/pkg/errors"
77

8+
"github.com/brocaar/chirpstack-api/go/v3/gw"
89
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/basicstation"
910
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/semtechudp"
1011
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
11-
"github.com/brocaar/chirpstack-api/go/v3/gw"
1212
"github.com/brocaar/lorawan"
1313
)
1414

@@ -53,6 +53,9 @@ type Backend interface {
5353
// GetUplinkFrameChan returns the channel for received uplinks.
5454
GetUplinkFrameChan() chan gw.UplinkFrame
5555

56+
// GetRawPacketForwarderEventChan returns the raw packet-forwarder command channel.
57+
GetRawPacketForwarderEventChan() chan gw.RawPacketForwarderEvent
58+
5659
// GetConnectChan returns the channel for received gateway connections.
5760
GetConnectChan() chan lorawan.EUI64
5861

@@ -64,4 +67,7 @@ type Backend interface {
6467

6568
// ApplyConfiguration applies the given configuration to the gateway.
6669
ApplyConfiguration(gw.GatewayConfiguration) error
70+
71+
// RawPacketForwarderCommand sends the given raw command to the packet-forwarder.
72+
RawPacketForwarderCommand(gw.RawPacketForwarderCommand) error
6773
}

internal/backend/basicstation/backend.go

Lines changed: 107 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"crypto/rand"
55
"crypto/tls"
66
"crypto/x509"
7+
"encoding/base64"
78
"encoding/binary"
89
"encoding/json"
910
"fmt"
@@ -20,9 +21,9 @@ import (
2021
"github.com/pkg/errors"
2122
log "github.com/sirupsen/logrus"
2223

24+
"github.com/brocaar/chirpstack-api/go/v3/gw"
2325
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/basicstation/structs"
2426
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
25-
"github.com/brocaar/chirpstack-api/go/v3/gw"
2627
"github.com/brocaar/lorawan"
2728
"github.com/brocaar/lorawan/band"
2829
)
@@ -48,9 +49,10 @@ type Backend struct {
4849

4950
gateways gateways
5051

51-
downlinkTXAckChan chan gw.DownlinkTXAck
52-
uplinkFrameChan chan gw.UplinkFrame
53-
gatewayStatsChan chan gw.GatewayStats
52+
downlinkTXAckChan chan gw.DownlinkTXAck
53+
uplinkFrameChan chan gw.UplinkFrame
54+
gatewayStatsChan chan gw.GatewayStats
55+
rawPacketForwarderEventChan chan gw.RawPacketForwarderEvent
5456

5557
band band.Band
5658
region band.Name
@@ -77,9 +79,10 @@ func NewBackend(conf config.Config) (*Backend, error) {
7779
disconnectChan: make(chan lorawan.EUI64),
7880
},
7981

80-
downlinkTXAckChan: make(chan gw.DownlinkTXAck),
81-
uplinkFrameChan: make(chan gw.UplinkFrame),
82-
gatewayStatsChan: make(chan gw.GatewayStats),
82+
downlinkTXAckChan: make(chan gw.DownlinkTXAck),
83+
uplinkFrameChan: make(chan gw.UplinkFrame),
84+
gatewayStatsChan: make(chan gw.GatewayStats),
85+
rawPacketForwarderEventChan: make(chan gw.RawPacketForwarderEvent),
8386

8487
pingInterval: conf.Backend.BasicStation.PingInterval,
8588
readTimeout: conf.Backend.BasicStation.ReadTimeout,
@@ -210,6 +213,11 @@ func (b *Backend) GetDisconnectChan() chan lorawan.EUI64 {
210213
return b.gateways.disconnectChan
211214
}
212215

216+
// GetRawPacketForwarderEventChan returns the raw packet-forwarder command channel.
217+
func (b *Backend) GetRawPacketForwarderEventChan() chan gw.RawPacketForwarderEvent {
218+
return b.rawPacketForwarderEventChan
219+
}
220+
213221
func (b *Backend) SendDownlinkFrame(df gw.DownlinkFrame) error {
214222
b.Lock()
215223
defer b.Unlock()
@@ -269,6 +277,36 @@ func (b *Backend) ApplyConfiguration(gwConfig gw.GatewayConfiguration) error {
269277
return nil
270278
}
271279

280+
// RawPacketForwarderCommand sends the given raw command to the packet-forwarder.
281+
func (b *Backend) RawPacketForwarderCommand(pl gw.RawPacketForwarderCommand) error {
282+
var gatewayID lorawan.EUI64
283+
var rawID uuid.UUID
284+
285+
copy(gatewayID[:], pl.GatewayId)
286+
copy(rawID[:], pl.RawId)
287+
288+
if len(pl.Payload) == 0 {
289+
return errors.New("raw packet-forwarder command payload is empty")
290+
}
291+
292+
mt := websocket.BinaryMessage
293+
if strings.HasPrefix(string(pl.Payload), "{") {
294+
mt = websocket.TextMessage
295+
}
296+
297+
websocketSendCounter("raw").Inc()
298+
if err := b.sendRawToGateway(gatewayID, mt, pl.Payload); err != nil {
299+
return errors.Wrap(err, "send raw packet-forwarder command to gateway error")
300+
}
301+
302+
log.WithFields(log.Fields{
303+
"gateway_id": gatewayID,
304+
"raw_id": rawID,
305+
}).Info("backend/basicstation: raw packet-forwarder command sent to gateway")
306+
307+
return nil
308+
}
309+
272310
// Close closes the backend.
273311
func (b *Backend) Close() error {
274312
b.isClosed = true
@@ -302,8 +340,14 @@ func (b *Backend) handleRouterInfo(r *http.Request, c *websocket.Conn) {
302340
}
303341
}
304342

343+
bb, err := json.Marshal(resp)
344+
if err != nil {
345+
log.WithError(err).Error("backend/basicstation: marshal json error")
346+
return
347+
}
348+
305349
c.SetWriteDeadline(time.Now().Add(b.writeTimeout))
306-
if err := c.WriteJSON(resp); err != nil {
350+
if err := c.WriteMessage(websocket.TextMessage, bb); err != nil {
307351
log.WithError(err).Error("backend/basicstation: websocket send message error")
308352
return
309353
}
@@ -367,7 +411,7 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) {
367411

368412
// receive data
369413
for {
370-
_, msg, err := c.ReadMessage()
414+
mt, msg, err := c.ReadMessage()
371415
if err != nil {
372416
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
373417
log.WithField("gateway_id", gatewayID).WithError(err).Error("backend/basicstation: read message error")
@@ -378,6 +422,16 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) {
378422
// reset the read deadline as the Basic Station doesn't respond to PONG messages (yet)
379423
c.SetReadDeadline(time.Now().Add(b.readTimeout))
380424

425+
if mt == websocket.BinaryMessage {
426+
log.WithFields(log.Fields{
427+
"gateway_id": gatewayID,
428+
"message_base64": base64.StdEncoding.EncodeToString(msg),
429+
}).Debug("backend/basicstation: binary message received")
430+
431+
b.handleRawPacketForwarderEvent(gatewayID, msg)
432+
continue
433+
}
434+
381435
log.WithFields(log.Fields{
382436
"gateway_id": gatewayID,
383437
"message": string(msg),
@@ -458,11 +512,7 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) {
458512
}
459513
b.handleDownlinkTransmittedMessage(gatewayID, pl)
460514
default:
461-
log.WithFields(log.Fields{
462-
"message_type": msgType,
463-
"gateway_id": gatewayID,
464-
"payload": string(msg),
465-
}).Warning("backend/basicstation: unexpected message-type")
515+
b.handleRawPacketForwarderEvent(gatewayID, msg)
466516
}
467517
}
468518
}
@@ -616,14 +666,56 @@ func (b *Backend) handleUplinkDataFrame(gatewayID lorawan.EUI64, v structs.Uplin
616666
b.uplinkFrameChan <- uplinkFrame
617667
}
618668

669+
func (b *Backend) handleRawPacketForwarderEvent(gatewayID lorawan.EUI64, pl []byte) {
670+
rawID, err := uuid.NewV4()
671+
if err != nil {
672+
log.WithError(err).WithFields(log.Fields{
673+
"gateway_id": gatewayID,
674+
}).Error("backend/basicstation: get random raw id error")
675+
return
676+
}
677+
678+
rawEvent := gw.RawPacketForwarderEvent{
679+
GatewayId: gatewayID[:],
680+
RawId: rawID[:],
681+
Payload: pl,
682+
}
683+
684+
log.WithFields(log.Fields{
685+
"gateway_id": gatewayID,
686+
"raw_id": rawID,
687+
}).Info("backend/basicstation: raw packet-forwarder event received")
688+
689+
b.rawPacketForwarderEventChan <- rawEvent
690+
}
691+
619692
func (b *Backend) sendToGateway(gatewayID lorawan.EUI64, v interface{}) error {
620693
gw, err := b.gateways.get(gatewayID)
621694
if err != nil {
622695
return errors.Wrap(err, "get gateway error")
623696
}
624697

698+
bb, err := json.Marshal(v)
699+
if err != nil {
700+
return errors.Wrap(err, "marshal json error")
701+
}
702+
703+
gw.conn.SetWriteDeadline(time.Now().Add(b.writeTimeout))
704+
if err := gw.conn.WriteMessage(websocket.TextMessage, bb); err != nil {
705+
return errors.Wrap(err, "send message to gateway error")
706+
}
707+
708+
return nil
709+
}
710+
711+
func (b *Backend) sendRawToGateway(gatewayID lorawan.EUI64, messageType int, data []byte) error {
712+
gw, err := b.gateways.get(gatewayID)
713+
if err != nil {
714+
return errors.Wrap(err, "get gateway error")
715+
}
716+
625717
gw.conn.SetWriteDeadline(time.Now().Add(b.writeTimeout))
626-
if err := gw.conn.WriteJSON(v); err != nil {
718+
if err := gw.conn.WriteMessage(messageType, data); err != nil {
627719
return errors.Wrap(err, "send message to gateway error")
628720
}
629721

internal/backend/basicstation/backend_test.go

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import (
1212
"github.com/stretchr/testify/require"
1313
"github.com/stretchr/testify/suite"
1414

15-
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/basicstation/structs"
16-
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
1715
"github.com/brocaar/chirpstack-api/go/v3/common"
1816
"github.com/brocaar/chirpstack-api/go/v3/gw"
17+
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/basicstation/structs"
18+
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
1919
"github.com/brocaar/lorawan"
2020
)
2121

@@ -455,6 +455,78 @@ func (ts *BackendTestSuite) TestSendDownlinkFrame() {
455455
}, df)
456456
}
457457

458+
func (ts *BackendTestSuite) TestRawPacketForwarderCommand() {
459+
assert := require.New(ts.T())
460+
id, err := uuid.NewV4()
461+
assert.NoError(err)
462+
463+
ts.T().Run("JSON", func(t *testing.T) {
464+
assert := require.New(t)
465+
pl := gw.RawPacketForwarderCommand{
466+
GatewayId: []byte{1, 2, 3, 4, 5, 6, 7, 8},
467+
RawId: id[:],
468+
Payload: []byte(`{"foo": "bar"}`),
469+
}
470+
assert.NoError(ts.backend.RawPacketForwarderCommand(pl))
471+
472+
mt, msg, err := ts.wsClient.ReadMessage()
473+
assert.NoError(err)
474+
assert.Equal(websocket.TextMessage, mt)
475+
assert.Equal(pl.Payload, msg)
476+
})
477+
478+
ts.T().Run("Binary", func(t *testing.T) {
479+
assert := require.New(t)
480+
pl := gw.RawPacketForwarderCommand{
481+
GatewayId: []byte{1, 2, 3, 4, 5, 6, 7, 8},
482+
RawId: id[:],
483+
Payload: []byte{0x01, 0x02, 0x03, 0x04},
484+
}
485+
assert.NoError(ts.backend.RawPacketForwarderCommand(pl))
486+
487+
mt, msg, err := ts.wsClient.ReadMessage()
488+
assert.NoError(err)
489+
assert.Equal(websocket.BinaryMessage, mt)
490+
assert.Equal(pl.Payload, msg)
491+
})
492+
}
493+
494+
func (ts *BackendTestSuite) TestRawPacketForwarderEvent() {
495+
ts.T().Run("Binary", func(t *testing.T) {
496+
assert := require.New(t)
497+
498+
assert.NoError(ts.wsClient.WriteMessage(websocket.BinaryMessage, []byte{0x01, 0x02, 0x03, 0x04}))
499+
500+
pl := <-ts.backend.GetRawPacketForwarderEventChan()
501+
assert.Equal([]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, pl.GatewayId)
502+
assert.NotNil(pl.RawId)
503+
assert.Equal([]byte{0x01, 0x02, 0x03, 0x04}, pl.Payload)
504+
})
505+
506+
ts.T().Run("JSON rmtsh", func(t *testing.T) {
507+
assert := require.New(t)
508+
509+
jsonMsg := `{
510+
"msgtype" : "rmtsh",
511+
"rmtsh" : [
512+
{
513+
"user" : "foo",
514+
"started" : true,
515+
"age" : 1,
516+
"pid" : 2
517+
}
518+
]
519+
}`
520+
521+
assert.NoError(ts.wsClient.WriteMessage(websocket.TextMessage, []byte(jsonMsg)))
522+
523+
pl := <-ts.backend.GetRawPacketForwarderEventChan()
524+
assert.Equal([]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, pl.GatewayId)
525+
assert.NotNil(pl.RawId)
526+
assert.Equal([]byte(jsonMsg), pl.Payload)
527+
})
528+
}
529+
458530
func TestBackend(t *testing.T) {
459531
suite.Run(t, new(BackendTestSuite))
460532
}

internal/backend/semtechudp/backend.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ import (
1414
"github.com/pkg/errors"
1515
log "github.com/sirupsen/logrus"
1616

17+
"github.com/brocaar/chirpstack-api/go/v3/gw"
1718
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/semtechudp/packets"
1819
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
1920
"github.com/brocaar/chirpstack-gateway-bridge/internal/filters"
20-
"github.com/brocaar/chirpstack-api/go/v3/gw"
2121
"github.com/brocaar/lorawan"
2222
)
2323

@@ -173,6 +173,12 @@ func (b *Backend) GetDisconnectChan() chan lorawan.EUI64 {
173173
return b.gateways.disconnectChan
174174
}
175175

176+
// GetRawPacketForwarderEventChan returns the raw packet-forwarder command channel.
177+
func (b *Backend) GetRawPacketForwarderEventChan() chan gw.RawPacketForwarderEvent {
178+
// not provided by the Semtech packet-forwarder.
179+
return nil
180+
}
181+
176182
// SendDownlinkFrame sends the given downlink frame to the gateway.
177183
func (b *Backend) SendDownlinkFrame(frame gw.DownlinkFrame) error {
178184
// mutex is needed in order to write to tokenMap
@@ -238,6 +244,11 @@ func (b *Backend) ApplyConfiguration(config gw.GatewayConfiguration) error {
238244
return b.applyConfiguration(*pfConfig, config)
239245
}
240246

247+
// RawPacketForwarderCommand sends the given raw command to the packet-forwarder.
248+
func (b *Backend) RawPacketForwarderCommand(gw.RawPacketForwarderCommand) error {
249+
return errors.New("raw packet-forwarder command not implemented by Semtech packet-forwarder")
250+
}
251+
241252
func (b *Backend) applyConfiguration(pfConfig pfConfiguration, config gw.GatewayConfiguration) error {
242253
gwConfig, err := getGatewayConfig(config)
243254
if err != nil {

0 commit comments

Comments
 (0)