12
12
13
13
#include "paho_mqtt.h"
14
14
15
- /**
16
- * MQTT URI farmat:
17
- * domain mode
18
- * tcp://iot.eclipse.org:1883
19
- *
20
- * ipv4 mode
21
- * tcp://192.168.10.1:1883
22
- * ssl://192.168.10.1:1884
23
- *
24
- * ipv6 mode
25
- * tcp://[fe80::20c:29ff:fe9a:a07e]:1883
26
- * ssl://[fe80::20c:29ff:fe9a:a07e]:1884
27
- */
28
- #define MQTT_URI "tcp://iot.eclipse.org:1883"
29
- #define MQTT_USERNAME "admin"
30
- #define MQTT_PASSWORD "admin"
31
- #define MQTT_SUBTOPIC "/mqtt/test"
32
- #define MQTT_PUBTOPIC "/mqtt/test"
15
+ #define MQTT_URI PKG_USING_IOTSHARP_DEVICE_SERVER
16
+ #define MQTT_USERNAME PKG_USING_IOTSHARP_DEVICE_NAME
17
+ #define MQTT_PASSWORD PKG_USING_IOTSHARP_DEVICE_SECRET
18
+
33
19
#define MQTT_WILLMSG "Goodbye!"
34
20
35
21
/* define MQTT client context */
36
22
static MQTTClient client ;
37
23
static int is_started = 0 ;
38
24
39
- static void mqtt_sub_callback (MQTTClient * c , MessageData * msg_data )
25
+ void mqtt_sub_callback (MQTTClient * c , MessageData * msg_data )
40
26
{
41
27
* ((char * )msg_data -> message -> payload + msg_data -> message -> payloadlen ) = '\0' ;
42
28
LOG_D ("mqtt sub callback: %.*s %.*s" ,
@@ -46,7 +32,8 @@ static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
46
32
(char * )msg_data -> message -> payload );
47
33
}
48
34
49
- static void mqtt_sub_default_callback (MQTTClient * c , MessageData * msg_data )
35
+
36
+ void mqtt_sub_default_callback (MQTTClient * c , MessageData * msg_data )
50
37
{
51
38
* ((char * )msg_data -> message -> payload + msg_data -> message -> payloadlen ) = '\0' ;
52
39
LOG_D ("mqtt sub default callback: %.*s %.*s" ,
@@ -59,29 +46,47 @@ static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
59
46
static void mqtt_connect_callback (MQTTClient * c )
60
47
{
61
48
LOG_D ("inter mqtt_connect_callback!" );
49
+
50
+ }
51
+ static void mqtt_new_sub_callback (MQTTClient * client , MessageData * msg_data )
52
+ {
53
+ * ((char * )msg_data -> message -> payload + msg_data -> message -> payloadlen ) = '\0' ;
54
+ LOG_D ("mqtt new subscribe callback: %.*s %.*s" ,
55
+ msg_data -> topicName -> lenstring .len ,
56
+ msg_data -> topicName -> lenstring .data ,
57
+ msg_data -> message -> payloadlen ,
58
+ (char * )msg_data -> message -> payload );
62
59
}
63
60
64
61
static void mqtt_online_callback (MQTTClient * c )
65
62
{
66
63
LOG_D ("inter mqtt_online_callback!" );
64
+ char _rpc_topic [200 ] = { 0 };
65
+ char _attup_topic [200 ] = { 0 };
66
+ sprintf (_rpc_topic , "devices/" PKG_USING_IOTSHARP_DEVICE_NAME "/rpc/request/+/+" );
67
+ sprintf (_attup_topic , "devices/" PKG_USING_IOTSHARP_DEVICE_NAME "/attributes/update/" );
68
+ paho_mqtt_subscribe (& client , QOS1 , _rpc_topic , mqtt_new_sub_callback );
69
+ paho_mqtt_subscribe (& client , QOS1 , _rpc_topic , mqtt_new_sub_callback );
70
+
67
71
}
68
72
69
73
static void mqtt_offline_callback (MQTTClient * c )
70
74
{
71
75
LOG_D ("inter mqtt_offline_callback!" );
76
+ char _rpc_topic [200 ] = { 0 };
77
+ char _attup_topic [200 ] = { 0 };
78
+ sprintf (_rpc_topic , "devices/" PKG_USING_IOTSHARP_DEVICE_NAME "/rpc/request/+/+" );
79
+ sprintf (_attup_topic , "devices/" PKG_USING_IOTSHARP_DEVICE_NAME "/attributes/update/" );
80
+ paho_mqtt_unsubscribe (& client , _rpc_topic );
81
+ paho_mqtt_unsubscribe (& client , _rpc_topic );
72
82
}
73
83
74
- static int mqtt_start ( int argc , char * * argv )
84
+ int iotsharp_start ( void )
75
85
{
76
86
/* init condata param by using MQTTPacket_connectData_initializer */
77
87
MQTTPacket_connectData condata = MQTTPacket_connectData_initializer ;
78
88
static char cid [20 ] = { 0 };
79
89
80
- if (argc != 1 )
81
- {
82
- rt_kprintf ("mqtt_start --start a mqtt worker thread.\n" );
83
- return -1 ;
84
- }
85
90
86
91
if (is_started )
87
92
{
@@ -94,7 +99,7 @@ static int mqtt_start(int argc, char **argv)
94
99
client .uri = MQTT_URI ;
95
100
96
101
/* generate the random client ID */
97
- rt_snprintf (cid , sizeof (cid ), "rtthread %d" , rt_tick_get ());
102
+ rt_snprintf (cid , sizeof (cid ), "iotsharp %d" , rt_tick_get ());
98
103
/* config connect param */
99
104
memcpy (& client .condata , & condata , sizeof (condata ));
100
105
client .condata .clientID .cstring = cid ;
@@ -107,7 +112,7 @@ static int mqtt_start(int argc, char **argv)
107
112
client .condata .willFlag = 1 ;
108
113
client .condata .will .qos = 1 ;
109
114
client .condata .will .retained = 0 ;
110
- client .condata .will .topicName .cstring = MQTT_PUBTOPIC ;
115
+ client .condata .will .topicName .cstring = "/device/me/disconnect" ;
111
116
client .condata .will .message .cstring = MQTT_WILLMSG ;
112
117
113
118
/* malloc buffer. */
@@ -126,9 +131,9 @@ static int mqtt_start(int argc, char **argv)
126
131
client .offline_callback = mqtt_offline_callback ;
127
132
128
133
/* set subscribe table and event callback */
129
- client .messageHandlers [0 ].topicFilter = rt_strdup (MQTT_SUBTOPIC );
130
- client .messageHandlers [0 ].callback = mqtt_sub_callback ;
131
- client .messageHandlers [0 ].qos = QOS1 ;
134
+ // client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
135
+ // client.messageHandlers[0].callback = mqtt_sub_callback;
136
+ // client.messageHandlers[0].qos = QOS1;
132
137
133
138
/* set default subscribe event callback */
134
139
client .defaultMessageHandler = mqtt_sub_default_callback ;
@@ -141,92 +146,62 @@ static int mqtt_start(int argc, char **argv)
141
146
return 0 ;
142
147
}
143
148
144
- static int mqtt_stop ( int argc , char * * argv )
149
+ int iotsahrp_stop ( void )
145
150
{
146
- if (argc != 1 )
147
- {
148
- rt_kprintf ("mqtt_stop --stop mqtt worker thread and free mqtt client object.\n" );
149
- }
150
-
151
151
is_started = 0 ;
152
152
153
153
return paho_mqtt_stop (& client );
154
154
}
155
155
156
- static int mqtt_publish (int argc , char * * argv )
156
+
157
+ static int mqtt_publish_for_gateway (char * subdevicename , int datatype , char * playload )
157
158
{
159
+ char _telemetry_topic [200 ] = { 0 };
160
+ char attributes_topic [200 ] = { 0 };
161
+ sprintf (_telemetry_topic , "devices/%s/telemetry" ,subdevicename );
162
+ sprintf (attributes_topic , "devices/%s/attributes" ,subdevicename );
158
163
if (is_started == 0 )
159
164
{
160
165
LOG_E ("mqtt client is not connected." );
161
166
return -1 ;
162
167
}
163
168
164
- if (argc == 2 )
169
+ if (datatype == 1 )
165
170
{
166
- paho_mqtt_publish (& client , QOS1 , MQTT_PUBTOPIC , argv [ 1 ] );
171
+ paho_mqtt_publish (& client , QOS1 , _telemetry_topic , playload );
167
172
}
168
- else if (argc == 3 )
173
+ else if (datatype == 2 )
169
174
{
170
- paho_mqtt_publish (& client , QOS1 , argv [ 1 ], argv [ 2 ] );
175
+ paho_mqtt_publish (& client , QOS1 , attributes_topic , playload );
171
176
}
172
177
else
173
178
{
174
- rt_kprintf ("mqtt_publish <topic> [message] --mqtt publish message to specified topic .\n" );
179
+ rt_kprintf ("publish message to specified datatype .\n" );
175
180
return -1 ;
176
181
}
177
-
178
182
return 0 ;
179
183
}
180
-
181
- static void mqtt_new_sub_callback (MQTTClient * client , MessageData * msg_data )
184
+ int iotsharp_upload_telemetry_for_gateway (char * _devname ,char * playload )
185
+ {
186
+ return mqtt_publish_for_gateway (_devname ,1 ,playload );
187
+ }
188
+ int iotsharp_upload_telemetry_to_device (char * playload )
189
+ {
190
+ return mqtt_publish_for_gateway ("me" ,1 ,playload );
191
+ }
192
+ int iotsharp_upload_attribute_for_gateway (char * _devname ,char * playload )
182
193
{
183
- * ((char * )msg_data -> message -> payload + msg_data -> message -> payloadlen ) = '\0' ;
184
- LOG_D ("mqtt new subscribe callback: %.*s %.*s" ,
185
- msg_data -> topicName -> lenstring .len ,
186
- msg_data -> topicName -> lenstring .data ,
187
- msg_data -> message -> payloadlen ,
188
- (char * )msg_data -> message -> payload );
194
+ return mqtt_publish_for_gateway (_devname ,2 ,playload );
189
195
}
190
-
191
- static int mqtt_subscribe (int argc , char * * argv )
196
+ int iotsharp_upload_attribute_to_device (char * playload )
192
197
{
193
- if (argc != 2 )
194
- {
195
- rt_kprintf ("mqtt_subscribe [topic] --send an mqtt subscribe packet and wait for suback before returning.\n" );
196
- return -1 ;
197
- }
198
-
199
- if (is_started == 0 )
200
- {
201
- LOG_E ("mqtt client is not connected." );
202
- return -1 ;
203
- }
204
-
205
- return paho_mqtt_subscribe (& client , QOS1 , argv [1 ], mqtt_new_sub_callback );
198
+ return mqtt_publish_for_gateway ("me" ,2 ,playload );
206
199
}
207
200
208
- static int mqtt_unsubscribe (int argc , char * * argv )
209
- {
210
- if (argc != 2 )
211
- {
212
- rt_kprintf ("mqtt_unsubscribe [topic] --send an mqtt unsubscribe packet and wait for suback before returning.\n" );
213
- return -1 ;
214
- }
215
-
216
- if (is_started == 0 )
217
- {
218
- LOG_E ("mqtt client is not connected." );
219
- return -1 ;
220
- }
221
201
222
- return paho_mqtt_unsubscribe (& client , argv [1 ]);
223
- }
224
202
225
203
#ifdef FINSH_USING_MSH
226
- MSH_CMD_EXPORT (mqtt_start , startup mqtt client );
227
- MSH_CMD_EXPORT (mqtt_stop , stop mqtt client );
228
- MSH_CMD_EXPORT (mqtt_publish , mqtt publish message to specified topic );
229
- MSH_CMD_EXPORT (mqtt_subscribe , mqtt subscribe topic );
230
- MSH_CMD_EXPORT (mqtt_unsubscribe , mqtt unsubscribe topic );
204
+
205
+
231
206
#endif /* FINSH_USING_MSH */
232
207
0 commit comments