5
5
6
6
import collections
7
7
import json
8
+ import threading
8
9
import requests
9
10
import urllib .parse
10
11
15
16
from .device import Device , DeviceProperties
16
17
from .exception import KarcherHomeAccessDenied , KarcherHomeException , handle_error_code
17
18
from .map import Map
18
- from .mqtt import MqttClient , get_device_topics
19
+ from .mqtt import MqttClient , get_device_topic_property_get_reply , get_device_topics
19
20
from .utils import decrypt , decrypt_map , encrypt , get_nonce , get_random_string , \
20
21
get_timestamp , get_timestamp_ms , is_email , md5
21
22
@@ -33,6 +34,7 @@ def __init__(self, region: Region = Region.EU):
33
34
self ._session = None
34
35
self ._mqtt = None
35
36
self ._device_props = {}
37
+ self ._wait_events = {}
36
38
37
39
d = self .get_urls ()
38
40
# Update base URLs
@@ -126,7 +128,7 @@ def _process_response(self, resp, prop=None):
126
128
return json .loads (decrypt (result [prop ]))
127
129
return result
128
130
129
- def _mqtt_connect (self ):
131
+ def _mqtt_connect (self , wait_for_connect = False ):
130
132
if self ._session is None \
131
133
or self ._session .mqtt_token == '' or self ._session .user_id == '' :
132
134
raise KarcherHomeAccessDenied ('Not authorized' )
@@ -141,9 +143,20 @@ def _mqtt_connect(self):
141
143
port = u .port ,
142
144
username = self ._session .user_id ,
143
145
password = self ._session .mqtt_token )
146
+
147
+ # Special logic for waiting for connection
148
+ event = None
149
+ if wait_for_connect :
150
+ event = threading .Event ()
151
+ self ._mqtt .on_connect = lambda : event .set ()
152
+
144
153
self ._mqtt .connect ()
145
154
self ._mqtt .on_message = self ._process_mqtt_message
146
155
156
+ if wait_for_connect :
157
+ event .wait ()
158
+ self ._mqtt .on_connect = None
159
+
147
160
def get_urls (self ):
148
161
"""Get URLs for API and MQTT."""
149
162
@@ -293,11 +306,15 @@ def _process_mqtt_message(self, topic, msg):
293
306
294
307
if sn is None :
295
308
# Ignore messages for devices we have not subscribed to
309
+ if topic in self ._wait_events :
310
+ self ._wait_events [topic ].set ()
296
311
return
297
312
298
313
if 'thing/event/property/post' in topic \
299
314
or 'thing/event/cur_path/post' in topic \
300
315
or 'upgrade/post' in topic :
316
+ if topic in self ._wait_events :
317
+ self ._wait_events [topic ].set ()
301
318
return
302
319
303
320
if 'thing/service/property/get_reply' in topic :
@@ -306,19 +323,38 @@ def _process_mqtt_message(self, topic, msg):
306
323
# TODO: handle error
307
324
return
308
325
self ._update_device_properties (sn , data ['data' ])
326
+ if topic in self ._wait_events :
327
+ self ._wait_events [topic ].set ()
328
+ return
329
+
330
+ if topic in self ._wait_events :
331
+ self ._wait_events [topic ].set ()
332
+
333
+ def _wait_for_topic (self , topic : str , timeout : float = 5 ):
334
+ if self ._mqtt is None :
335
+ return
336
+
337
+ if topic in self ._wait_events :
309
338
return
310
339
340
+ event = threading .Event ()
341
+ self ._wait_events [topic ] = event
342
+
343
+ event .wait (timeout )
344
+ del self ._wait_events [topic ]
345
+
311
346
def _update_device_properties (self , sn : str , data : dict ):
312
347
if sn not in self ._device_props :
313
348
return
314
349
315
350
self ._device_props [sn ].update (data )
351
+ self ._device_props [sn ].last_update_time = get_timestamp ()
316
352
317
353
def request_device_update (self , dev : Device ):
318
354
"""Request device update."""
319
355
320
356
if self ._session is None \
321
- or self ._session .auth_token == '' or self ._session .user_id == '' :
357
+ or self ._session .mqtt_token == '' or self ._session .user_id == '' :
322
358
raise KarcherHomeAccessDenied ('Not authorized' )
323
359
324
360
self ._mqtt_connect ()
@@ -337,7 +373,24 @@ def request_device_update(self, dev: Device):
337
373
def get_device_properties (self , dev : Device ):
338
374
"""Get device properties if it has subscription."""
339
375
340
- if dev .sn not in self ._device_props :
341
- return None
376
+ if dev .sn in self ._device_props :
377
+ return self ._device_props [dev .sn ]
378
+
379
+ if self ._session is None \
380
+ or self ._session .mqtt_token == '' or self ._session .user_id == '' :
381
+ raise KarcherHomeAccessDenied ('Not authorized' )
382
+
383
+ self ._mqtt_connect (wait_for_connect = True )
384
+ subscr = dev .sn not in self ._device_props
385
+ if subscr :
386
+ self .subscribe_device (dev )
387
+ self .request_device_update (dev )
388
+ self ._wait_for_topic (
389
+ get_device_topic_property_get_reply (dev .product_id , dev .sn ))
390
+
391
+ props = self ._device_props [dev .sn ]
392
+
393
+ if subscr :
394
+ self .unsubscribe_device (dev )
342
395
343
- return self . _device_props [ dev . sn ]
396
+ return props
0 commit comments