-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathclient.py
164 lines (133 loc) · 4.29 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import asyncio
import aiohttp
from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1 as MQTT_QOS_1
import json
import logging
from aiohttp import web
import async_timeout
# import sys
# argv = sys.argv
# if len(argv) < 2:
# sys.exit()
MQTT_SERVER = 'ha.ifts.ml/mqtt'
MQTT_USER = 'ha_user'
MQTT_PASS = 'client'
MQTT_PROTOCOL = 'wss'
MSG_PASS = b'encrypt+key+in+base64/=='
API_HEADERS = { 'x-ha-access': 'test' }
MQTT_TOPIC = 'test1'
HTTP_PREFIX = 'http://127.0.0.1:8123/'
logger = logging.getLogger(__name__)
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
import zlib
import base64
real_key = base64.b64decode(MSG_PASS)
async def decrypt(data):
if len(data) < 34:
return None
nonce = data[0:16]
tag = data[16:32]
encdata = data[32:]
encobj = AES.new(real_key, AES.MODE_GCM, nonce=nonce)
try:
plaindata = encobj.decrypt_and_verify(encdata, tag)
except ValueError:
return None
zobj = zlib.decompressobj(wbits=-15)
result = zobj.decompress(plaindata) + zobj.flush()
return result
async def encrypt(data):
zobj = zlib.compressobj(wbits=-15)
compressed = zobj.compress(data) + zobj.flush()
encobj = AES.new(real_key, AES.MODE_GCM)
encdata, tag = encobj.encrypt_and_digest(compressed)
return encobj.nonce + tag + encdata
MQTT_TOPIC_REQ = 'req/' + MQTT_TOPIC
MQTT_TOPIC_RESP = 'resp/' + MQTT_TOPIC
_G = {}
# Fix bug in HBMQTT when using websocket
async def _hbmqtt_hook_conn(self, *args):
if hasattr(self.session, 'broker_uri_old'):
self.session.broker_uri = self.session.broker_uri_old
else:
self.session.broker_uri_old = self.session.broker_uri
return await self._old_connect_coro(*args)
MQTTClient._old_connect_coro = MQTTClient._connect_coro
MQTTClient._connect_coro = _hbmqtt_hook_conn
mqtt_cfg = {
'default_qos': 1,
'keep_alive': 45,
'auto_reconnect': False,
}
mqtt = None
mqtttopics = []
async def mqtt_init(topic):
global mqtt
if not mqtt:
mqtt = MQTTClient(config=mqtt_cfg)
await mqtt.connect(MQTT_PROTOCOL + '://' + MQTT_USER + ':' + MQTT_PASS + '@' + MQTT_SERVER)
mqtttopics.append((topic, MQTT_QOS_1))
await mqtt.subscribe(mqtttopics)
async def mqtt_checkreconnect():
if not mqtt.session.transitions.is_connected():
await mqtt.reconnect()
await mqtt.subscribe(mqtttopics)
async def mqtt_publish(topic, data):
await mqtt_checkreconnect()
await mqtt.publish(topic, data, MQTT_QOS_1)
async def mqtt_receive():
await mqtt_checkreconnect()
return await mqtt.deliver_message()
async def http_process(method, url, headers, body):
method_ = method.upper()
url_ = HTTP_PREFIX + url
try:
async with _G['session'].request(method_, url_, headers=headers, data=body) as resp:
respdata = await resp.text()
except Exception:
return 503, 'HA not accessable'
return resp.status, respdata
async def proc_req(reqdata):
if not reqdata:
return
obj = json.loads(reqdata.decode())
code, resp = await http_process(
obj.get('method'),
obj.get('url'),
obj.get('headers'),
obj.get('body')
)
obj = {'callid': obj.get('callid'), 'code': code,'result': resp}
data = await encrypt(json.dumps(obj).encode())
await mqtt_publish(MQTT_TOPIC_RESP, data)
q_req = asyncio.Queue()
async def queue_loop():
while True:
try:
pkt = (await mqtt_receive()).publish_packet
topic = pkt.variable_header.topic_name
if topic.startswith('req'):
data = await decrypt(pkt.payload.data)
if data is None:
continue
await q_req.put(data)
else:
logger.warning('Topic not recongize: ' + topic)
except Exception as e:
logger.error(e)
async def main_client():
_G['session'] = aiohttp.ClientSession(headers=API_HEADERS)
await mqtt_init(MQTT_TOPIC_REQ)
asyncio.ensure_future(queue_loop())
while True:
try:
req = await q_req.get()
asyncio.ensure_future(proc_req(req))
except Exception as e:
logger.error(e)
loop = asyncio.get_event_loop()
asyncio.ensure_future(main_client())
loop.run_forever()
loop.close()