-
Notifications
You must be signed in to change notification settings - Fork 238
/
Copy pathMQTTClient.h
205 lines (169 loc) · 6.74 KB
/
MQTTClient.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#ifndef MQTT_CLIENT_H
#define MQTT_CLIENT_H
// include functional API if possible. remove min and max macros for some
// platforms as they will be defined again by Arduino later
#if defined(ESP8266) || (defined ESP32)
#include <functional>
#define MQTT_HAS_FUNCTIONAL 1
#elif defined(__has_include)
#if __has_include(<functional>)
#if defined(min)
#undef min
#endif
#if defined(max)
#undef max
#endif
#include <functional>
#define MQTT_HAS_FUNCTIONAL 1
#else
#define MQTT_HAS_FUNCTIONAL 0
#endif
#else
#define MQTT_HAS_FUNCTIONAL 0
#endif
#include <Arduino.h>
#include <Client.h>
#include <Stream.h>
extern "C" {
#include "lwmqtt/lwmqtt.h"
}
typedef uint32_t (*MQTTClientClockSource)();
typedef struct {
uint32_t start;
uint32_t timeout;
MQTTClientClockSource millis;
} lwmqtt_arduino_timer_t;
typedef struct {
Client *client;
size_t segmentLength;
uint32_t writeDelayMs;
} lwmqtt_arduino_network_t;
class MQTTClient;
typedef void (*MQTTClientCallbackSimple)(String &topic, String &payload);
typedef void (*MQTTClientCallbackAdvanced)(MQTTClient *client, char topic[], char bytes[], int length);
#if MQTT_HAS_FUNCTIONAL
typedef std::function<void(String &topic, String &payload)> MQTTClientCallbackSimpleFunction;
typedef std::function<void(MQTTClient *client, char topic[], char bytes[], int length)>
MQTTClientCallbackAdvancedFunction;
#endif
typedef struct {
MQTTClient *client = nullptr;
MQTTClientCallbackSimple simple = nullptr;
MQTTClientCallbackAdvanced advanced = nullptr;
#if MQTT_HAS_FUNCTIONAL
MQTTClientCallbackSimpleFunction functionSimple = nullptr;
MQTTClientCallbackAdvancedFunction functionAdvanced = nullptr;
#endif
} MQTTClientCallback;
class MQTTClient {
private:
size_t readBufSize = 0;
size_t writeBufSize = 0;
uint8_t *readBuf = nullptr;
uint8_t *writeBuf = nullptr;
uint16_t keepAlive = 10;
bool cleanSession = true;
uint32_t timeout = 1000;
bool _sessionPresent = false;
Client *netClient = nullptr;
const char *hostname = nullptr;
IPAddress address;
int port = 0;
lwmqtt_will_t *will = nullptr;
MQTTClientCallback callback;
lwmqtt_arduino_network_t network = {nullptr};
lwmqtt_arduino_timer_t timer1 = {0, 0, nullptr};
lwmqtt_arduino_timer_t timer2 = {0, 0, nullptr};
lwmqtt_client_t client = lwmqtt_client_t();
bool _connected = false;
uint16_t nextDupPacketID = 0;
lwmqtt_return_code_t _returnCode = (lwmqtt_return_code_t)0;
lwmqtt_err_t _lastError = (lwmqtt_err_t)0;
uint32_t _droppedMessages = 0;
size_t _segmentLength = 65535;
uint32_t _writeDelayMs = 0;
public:
void *ref = nullptr;
explicit MQTTClient(int bufSize = 128) : MQTTClient(bufSize, bufSize) {}
MQTTClient(int readBufSize, int writeBufSize);
~MQTTClient();
void begin(Client &_client);
void begin(const char _hostname[], Client &_client) { this->begin(_hostname, 1883, _client); }
void begin(const char _hostname[], int _port, Client &_client) {
this->begin(_client);
this->setHost(_hostname, _port);
}
void begin(IPAddress _address, Client &_client) { this->begin(_address, 1883, _client); }
void begin(IPAddress _address, int _port, Client &_client) {
this->begin(_client);
this->setHost(_address, _port);
}
void onMessage(MQTTClientCallbackSimple cb);
void onMessageAdvanced(MQTTClientCallbackAdvanced cb);
#if MQTT_HAS_FUNCTIONAL
void onMessage(MQTTClientCallbackSimpleFunction cb);
void onMessageAdvanced(MQTTClientCallbackAdvancedFunction cb);
#endif
void setClockSource(MQTTClientClockSource cb);
void setHost(const char _hostname[]) { this->setHost(_hostname, 1883); }
void setHost(const char hostname[], int port);
void setHost(IPAddress _address) { this->setHost(_address, 1883); }
void setHost(IPAddress _address, int port);
void setWill(const char topic[]) { this->setWill(topic, ""); }
void setWill(const char topic[], const char payload[]) { this->setWill(topic, payload, false, 0); }
void setWill(const char topic[], const char payload[], bool retained, int qos);
void clearWill();
void setKeepAlive(int keepAlive);
void setCleanSession(bool cleanSession);
void setTimeout(int timeout);
void setOptions(int _keepAlive, bool _cleanSession, int _timeout) {
this->setKeepAlive(_keepAlive);
this->setCleanSession(_cleanSession);
this->setTimeout(_timeout);
}
void setNetworkSegmentedWrite(size_t segmentLength, uint32_t writeDelayMs);
void dropOverflow(bool enabled);
uint32_t droppedMessages() { return this->_droppedMessages; }
bool connect(const char clientId[], bool skip = false) { return this->connect(clientId, nullptr, nullptr, skip); }
bool connect(const char clientId[], const char username[], bool skip = false) {
return this->connect(clientId, username, nullptr, skip);
}
bool connect(const char clientID[], const char username[], const char password[], bool skip = false);
bool publish(const String &topic) { return this->publish(topic.c_str(), ""); }
bool publish(const char topic[]) { return this->publish(topic, ""); }
bool publish(const String &topic, const String &payload) { return this->publish(topic.c_str(), payload.c_str()); }
bool publish(const String &topic, const String &payload, bool retained, int qos) {
return this->publish(topic.c_str(), payload.c_str(), retained, qos);
}
bool publish(const char topic[], const String &payload) { return this->publish(topic, payload.c_str()); }
bool publish(const char topic[], const String &payload, bool retained, int qos) {
return this->publish(topic, payload.c_str(), retained, qos);
}
bool publish(const char topic[], const char payload[]) {
return this->publish(topic, (char *)payload, (int)strlen(payload));
}
bool publish(const char topic[], const char payload[], bool retained, int qos) {
return this->publish(topic, (char *)payload, (int)strlen(payload), retained, qos);
}
bool publish(const char topic[], const char payload[], int length) {
return this->publish(topic, payload, length, false, 0);
}
bool publish(const char topic[], const char payload[], int length, bool retained, int qos);
uint16_t lastPacketID();
void prepareDuplicate(uint16_t packetID);
bool subscribe(const String &topic) { return this->subscribe(topic.c_str()); }
bool subscribe(const String &topic, int qos) { return this->subscribe(topic.c_str(), qos); }
bool subscribe(const char topic[]) { return this->subscribe(topic, 0); }
bool subscribe(const char topic[], int qos);
bool unsubscribe(const String &topic) { return this->unsubscribe(topic.c_str()); }
bool unsubscribe(const char topic[]);
bool loop();
bool connected();
bool sessionPresent() { return this->_sessionPresent; }
lwmqtt_err_t lastError() { return this->_lastError; }
lwmqtt_return_code_t returnCode() { return this->_returnCode; }
bool disconnect();
private:
void close();
};
#endif