Skip to content

Commit 9248cf2

Browse files
authored
Merge pull request #4 from FlowFuse/first-tests
First tests
2 parents da7d206 + 6b94085 commit 9248cf2

File tree

5 files changed

+191
-21
lines changed

5 files changed

+191
-21
lines changed

.github/workflows/tests.yml

+28-2
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ on:
77
pull_request:
88

99
jobs:
10-
unit:
10+
unit-tests:
1111
name: Unit tests
1212
runs-on: ubuntu-latest
13+
strategy:
14+
matrix:
15+
node-version: [18.x]
1316
steps:
1417
- name: Checkout
1518
uses: actions/checkout@v4
@@ -24,4 +27,27 @@ jobs:
2427
- name: Unit tests
2528
run: npm run test
2629

27-
30+
notify-slack:
31+
name: Notify on failure
32+
needs: [ unit-tests ]
33+
if: failure()
34+
runs-on: ubuntu-latest
35+
36+
steps:
37+
- name: Map users
38+
id: map-actor-to-slack
39+
uses: icalia-actions/[email protected]
40+
with:
41+
actor-map: ${{ vars.SLACK_GITHUB_USERS_MAP }}
42+
default-mapping: C067BD0377F
43+
44+
- name: Send notification
45+
uses: ravsamhq/notify-slack-action@v2
46+
with:
47+
status: 'failure'
48+
notification_title: 'FlowFuse Tests Pipeline'
49+
footer: "<{run_url}|View Run>"
50+
mention_users: ${{ steps.map-actor-to-slack.outputs.actor-mapping }}
51+
env:
52+
SLACK_WEBHOOK_URL: ${{ secrets.GH_WORKFLOWS_WEBHOOK }}
53+

index.js

100644100755
+4-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
#!/usr/bin/env node
12
const express = require('express')
23
const bodyParser = require('body-parser')
34
const { API } = require('./lib/api.js')
45
const { healthz } = require('./lib/health.js')
56

6-
const port = 3500
7+
const port = process.env.FORGE_PORT || 3500
78

89
const app = express()
910
app.use(bodyParser.json({}))
@@ -17,6 +18,8 @@ const options = {
1718
token: process.env.FORGE_TEAM_TOKEN
1819
}
1920

21+
// console.log(options)
22+
2023
const api = new API(app, options)
2124

2225
process.on('SIGTERM', async () => {

lib/agent.js

+96-5
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,106 @@
11
const got = require('got')
22
const mqtt = require('mqtt')
33

4+
const REPORT_TIME = 1000 * 60 * 2
5+
46
class Agent {
57
constructor (options) {
68
this.options = options
79
this.topics = {}
10+
this.error = null
11+
this.stopped = true
812
}
913

1014
async connect () {
1115
const agent = this
1216
this.connected = false
17+
this.reconnectCount = 0
18+
this.stopped = false
19+
20+
// console.log(`${this.options.forgeURL}/api/v1/teams/${this.options.team}/brokers/${this.options.broker}/credentials`)
1321

1422
try {
15-
this.creds = await got.get(`${this.options.forgeURL}/api/v1/team/${this.options.team}/broker/${this.options.broker}/creds`, {
23+
this.creds = await got.get(`${this.options.forgeURL}/api/v1/teams/${this.options.team}/brokers/${this.options.broker}/credentials`, {
1624
headers: {
1725
Authorization: `Bearer ${this.options.token}`
1826
}
1927
}).json()
2028

21-
this.client = mqtt.connect(`${this.creds.protocol}//${this.creds.hostname}:${this.creds.port}`, this.creds)
29+
this.creds = Object.assign(this.creds, { reconnectPeriod: 0 })
30+
// console.log(this.creds)
31+
32+
const options = {
33+
protocol: this.creds.protocol,
34+
host: this.creds.host,
35+
port: this.creds.port,
36+
clientId: this.creds.clientId,
37+
protocolVersion: this.creds.protocolVersion,
38+
username: this.creds.credentials.username,
39+
password: this.creds.credentials.password
40+
}
41+
42+
// console.log(options)
43+
44+
// this.client = mqtt.connect(`${this.creds.protocol}//${this.creds.hostname}:${this.creds.port}`, this.creds)
45+
this.client = mqtt.connect(options)
2246
this.client.on('connect', function () {
2347
agent.connected = true
2448
// console.log('connected')
49+
agent.error = null
2550
agent.client.subscribe('#')
2651
})
52+
this.client.on('subscribe', function () {
53+
// on successful subscribe reset reconnect count
54+
agent.reconnectCount = 0
55+
})
2756
this.client.on('reconnect', function () {
28-
console.log('reconnecting')
57+
// console.log('reconnecting')
2958
})
3059
this.client.on('close', function () {
3160
// console.log('closed')
61+
if (!agent.stopped) {
62+
if (agent.reconnectCount < 3) {
63+
agent.reconnectCount++
64+
agent.reconnectTimeout = setTimeout(() => {
65+
agent.client.reconnect()
66+
}, 5000)
67+
}
68+
}
69+
})
70+
this.client.on('disconnect', function (packet) {
71+
// console.log('MQTTv5 disconnected')
72+
// should check the reason code
73+
// console.log(packet)
3274
})
33-
this.client.on('disconnect', function () {})
3475
this.client.on('error', function (error) {
35-
console.log('error', error)
76+
// console.log('error', error.code)
77+
// console.log(error)
78+
switch (error.code) {
79+
case 'ECONNREFUSED': // connection refused
80+
case 'ENOTFOUND': // DNS lookup failed
81+
agent.error = error.code
82+
break
83+
case 1:
84+
agent.error = 'WRONGMQTTVERSION'
85+
break
86+
case 2:
87+
case 133:
88+
agent.error = 'CLIENTIDNOTALLOWED'
89+
break
90+
case 4:
91+
agent.error = 'MALFORMEDCREDS'
92+
break
93+
case 5:
94+
case 135: // not authorized
95+
agent.error = 'NOTAUTHORIZED'
96+
break
97+
case 138:
98+
agent.error = 'BANNED'
99+
break
100+
default:
101+
// console.log('default error')
102+
}
103+
// console.log(`agent.error set to: ${agent.error}`)
36104
})
37105
this.client.on('message', function (topic) {
38106
// console.log(topic)
@@ -42,6 +110,25 @@ class Agent {
42110
console.log(err)
43111
throw err
44112
}
113+
114+
this.reportInterval = setInterval(async () => {
115+
if (agent.connected) {
116+
// console.log(`${agent.options.forgeURL}/api/v1/teams/${agent.options.team}/brokers/${agent.options.broker}/topics`)
117+
// console.log(JSON.stringify(agent.topics, null, 2))
118+
try {
119+
await got.post(`${agent.options.forgeURL}/api/v1/teams/${agent.options.team}/brokers/${agent.options.broker}/topics`, {
120+
headers: {
121+
Authorization: `Bearer ${this.options.token}`
122+
},
123+
json: agent.topics
124+
})
125+
// clear list so only uploading new topics each time
126+
agent.topics = {}
127+
} catch (err) {
128+
console.log(err)
129+
}
130+
}
131+
}, REPORT_TIME)
45132
}
46133

47134
async start () {
@@ -52,9 +139,13 @@ class Agent {
52139
}
53140

54141
async stop () {
142+
this.stopped = true
55143
if (this.client) {
56144
this.client.end()
57145
}
146+
clearTimeout(this.reconnectTimeout)
147+
clearInterval(this.reportInterval)
148+
this.reportInterval = null
58149
this.connected = false
59150
}
60151

lib/api.js

+46-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,55 @@
1+
const { Agent } = require('./agent')
2+
13
class API {
24
constructor (app, options) {
3-
app.get('/', (request, reply) => {
5+
this.options = options
6+
7+
app.get('/api/v1/status', (request, reply) => {
8+
if (this.agent) {
9+
reply.send({
10+
connected: this.agent.connected,
11+
error: this.agent.error
12+
})
13+
} else {
14+
reply.send({
15+
connected: false
16+
})
17+
}
18+
})
19+
20+
app.post('/api/v1/commands/start', async (request, reply) => {
21+
try {
22+
if (!this.agent) {
23+
this.agent = new Agent(this.options)
24+
}
25+
await this.agent.start()
26+
reply.send({})
27+
} catch (err) {
28+
reply.status(400).send({ error: '', message: '' })
29+
}
430
})
31+
32+
app.post('/api/v1/commands/stop', async (request, reply) => {
33+
try {
34+
if (this.agent) {
35+
await this.stop()
36+
} else {
37+
// send error about not active
38+
}
39+
} catch (err) {
40+
reply.status(400).send({ error: '', message: '' })
41+
}
42+
reply.send({})
43+
})
44+
45+
// default to running
46+
this.agent = new Agent(this.options)
547
}
648

749
async stop () {
8-
50+
if (this.agent) {
51+
await this.agent.stop()
52+
}
953
}
1054
}
1155

test/unit/lib/agent_spec.js

+17-11
Original file line numberDiff line numberDiff line change
@@ -13,32 +13,36 @@ describe('Agent', function () {
1313
let aedes
1414
let mqttServer
1515
let httpServer
16+
let lastClientId = ''
1617

1718
const creds = {
1819
foo: {
19-
hostname: 'localhost',
20-
host: `localhost:${BrokerPort}`,
20+
host: 'localhost',
2121
port: BrokerPort,
2222
protocol: 'mqtt:',
23-
username: 'user',
24-
password: 'password',
25-
clientId: 'foo'
23+
clientId: 'foo',
24+
credentials: {
25+
username: 'user',
26+
password: 'password'
27+
}
2628
},
2729
bar: {
28-
hostname: 'localhost',
29-
host: `localhost:${BrokerPort}`,
30+
host: 'localhost',
3031
port: BrokerPort,
3132
protocol: 'mqtt:',
32-
username: 'user',
33-
password: 'password',
34-
clientId: 'bar'
33+
clientId: 'bar',
34+
credentials: {
35+
username: 'user',
36+
password: 'password'
37+
}
3538
}
3639
}
3740

3841
before(async function () {
3942
aedes = new Aedes()
4043
aedes.authenticate = function (client, username, password, cb) {
4144
// console.log(client.id, username, password.toString('utf8'))
45+
lastClientId = client.id
4246
cb(null, true)
4347
}
4448
mqttServer = net.createServer(aedes.handle)
@@ -47,7 +51,7 @@ describe('Agent', function () {
4751
})
4852

4953
const app = express()
50-
app.get('/api/v1/team/:teamId/broker/:brokerId/creds', function (request, reply) {
54+
app.get('/api/v1/teams/:teamId/brokers/:brokerId/credentials', function (request, reply) {
5155
reply.send(creds[request.params.brokerId])
5256
})
5357

@@ -82,6 +86,7 @@ describe('Agent', function () {
8286
await a.start()
8387
await setTimeout(1000)
8488
a.state().should.have.property('connected', true)
89+
lastClientId.should.eql('foo')
8590
await a.stop()
8691
})
8792

@@ -96,6 +101,7 @@ describe('Agent', function () {
96101
await a.start()
97102
await setTimeout(1000)
98103
a.state().should.have.property('connected', true)
104+
lastClientId.should.eql('bar')
99105
aedes.publish({
100106
topic: 'hello/world',
101107
payload: Buffer.from('HelloWorld'),

0 commit comments

Comments
 (0)