12
12
13
13
from paho .mqtt .client import Client as PahoClient
14
14
15
- from pioreactor .config import leader_address
15
+ from pioreactor .config import mqtt_address
16
+ from pioreactor .config import mqtt_port
17
+ from pioreactor .config import config
16
18
from pioreactor .types import MQTTMessage
17
19
18
20
@@ -70,7 +72,7 @@ class QOS:
70
72
71
73
72
74
def create_client (
73
- hostname : Optional [ str ] = None ,
75
+ hostname : str = mqtt_address ,
74
76
last_will : Optional [dict ] = None ,
75
77
client_id : str = "" ,
76
78
keepalive = 60 ,
@@ -80,6 +82,7 @@ def create_client(
80
82
on_disconnect : Optional [Callable ] = None ,
81
83
on_message : Optional [Callable ] = None ,
82
84
userdata : Optional [dict ] = None ,
85
+ port : int = mqtt_port
83
86
):
84
87
"""
85
88
Create a MQTT client and connect to a host.
@@ -98,7 +101,10 @@ def default_on_connect(client: Client, userdata, flags, rc: int, properties=None
98
101
clean_session = clean_session ,
99
102
userdata = userdata ,
100
103
)
101
- client .username_pw_set ("pioreactor" , "raspberry" )
104
+ client .username_pw_set (
105
+ config .get ("mqtt" , "username" , fallback = "pioreactor" ),
106
+ config .get ("mqtt" , "password" , fallback = "raspberry" )
107
+ )
102
108
103
109
if on_connect :
104
110
client .on_connect = on_connect # type: ignore
@@ -114,12 +120,9 @@ def default_on_connect(client: Client, userdata, flags, rc: int, properties=None
114
120
if last_will is not None :
115
121
client .will_set (** last_will )
116
122
117
- if hostname is None :
118
- hostname = leader_address
119
-
120
123
for retries in range (1 , max_connection_attempts + 1 ):
121
124
try :
122
- client .connect (hostname , keepalive = keepalive )
125
+ client .connect (hostname , port , keepalive = keepalive )
123
126
except (socket .gaierror , OSError ):
124
127
if retries == max_connection_attempts :
125
128
break
@@ -131,7 +134,7 @@ def default_on_connect(client: Client, userdata, flags, rc: int, properties=None
131
134
return client
132
135
133
136
134
- def publish (topic : str , message , hostname : str = leader_address , retries : int = 10 , ** mqtt_kwargs ) -> None :
137
+ def publish (topic : str , message , hostname : str = mqtt_address , retries : int = 10 , port : int = mqtt_port , ** mqtt_kwargs ) -> None :
135
138
from paho .mqtt import publish as mqtt_publish
136
139
import socket
137
140
@@ -141,6 +144,7 @@ def publish(topic: str, message, hostname: str = leader_address, retries: int =
141
144
topic ,
142
145
payload = message ,
143
146
hostname = hostname ,
147
+ port = port ,
144
148
auth = {"username" : "pioreactor" , "password" : "raspberry" },
145
149
** mqtt_kwargs ,
146
150
)
@@ -164,11 +168,12 @@ def publish(topic: str, message, hostname: str = leader_address, retries: int =
164
168
165
169
def subscribe (
166
170
topics : str | list [str ],
167
- hostname : str = leader_address ,
171
+ hostname : str = mqtt_address ,
168
172
retries : int = 5 ,
169
173
timeout : Optional [float ] = None ,
170
174
allow_retained : bool = True ,
171
175
name : Optional [str ] = None ,
176
+ port : int = mqtt_port ,
172
177
** mqtt_kwargs ,
173
178
) -> Optional [MQTTMessage ]:
174
179
"""
@@ -222,7 +227,7 @@ def on_message(client: Client, userdata, message: MQTTMessage) -> None:
222
227
client .username_pw_set ("pioreactor" , "raspberry" )
223
228
client .on_connect = on_connect # type: ignore
224
229
client .on_message = on_message # type: ignore
225
- client .connect (hostname )
230
+ client .connect (hostname , port = port )
226
231
227
232
if timeout is None :
228
233
client .loop_forever ()
@@ -255,11 +260,12 @@ def on_message(client: Client, userdata, message: MQTTMessage) -> None:
255
260
def subscribe_and_callback (
256
261
callback : Callable [[MQTTMessage ], Any ],
257
262
topics : str | list [str ],
258
- hostname : str = leader_address ,
263
+ hostname : str = mqtt_address ,
259
264
last_will : Optional [dict ] = None ,
260
265
name : Optional [str ] = None ,
261
266
allow_retained : bool = True ,
262
267
client : Optional [Client ] = None ,
268
+ port : int = mqtt_port ,
263
269
** mqtt_kwargs ,
264
270
) -> Client :
265
271
"""
@@ -313,6 +319,7 @@ def on_connect(client: Client, userdata: dict, *args):
313
319
on_connect = on_connect ,
314
320
on_message = wrap_callback (callback ),
315
321
userdata = userdata ,
322
+ port = port ,
316
323
** mqtt_kwargs ,
317
324
)
318
325
@@ -325,7 +332,7 @@ def on_connect(client: Client, userdata: dict, *args):
325
332
return client
326
333
327
334
328
- def prune_retained_messages (topics_to_prune : str = "#" , hostname = leader_address ):
335
+ def prune_retained_messages (topics_to_prune : str = "#" , hostname = mqtt_address ):
329
336
topics = []
330
337
331
338
def on_message (message ):
0 commit comments