forked from delphiki/AirStatus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
162 lines (131 loc) · 4.98 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import os
from bleak import discover
from bleak.exc import BleakDBusError
from asyncio import new_event_loop, set_event_loop, get_event_loop
from time import sleep, time_ns
from binascii import hexlify
from json import dumps
from sys import argv
from datetime import datetime
# Configure update duration (update after n seconds)
UPDATE_DURATION = 1
MIN_RSSI = -60
AIRPODS_MANUFACTURER = 76
AIRPODS_DATA_LENGTH = 54
RECENT_BEACONS_MAX_T_NS = 10000000000 # 10 Seconds
recent_beacons = []
# maybe put this in a class...
MODEL_DICT = { 'e': "AirPodsPro",
'f': "AirPods2",
'2': "AirPods1",
'a': "AirPodsMax",
'b': "PowerbeatsPro",
'5': "BeatsX",
'0': "BeatsFlex",
'6': "BeatsSolo3",
'9': "BeatsStudio3",
'3': "Powerbeats3",
'*': "unknown ({})"
}
def identify_model(_model):
return MODEL_DICT.setdefault(_model, MODEL_DICT['*'].format(_model))
def get_best_result(device):
recent_beacons.append({
"time": time_ns(),
"device": device
})
strongest_beacon = None
i = 0
while i < len(recent_beacons):
if(time_ns() - recent_beacons[i]["time"] > RECENT_BEACONS_MAX_T_NS):
recent_beacons.pop(i)
continue
if (strongest_beacon == None or strongest_beacon.rssi < recent_beacons[i]["device"].rssi):
strongest_beacon = recent_beacons[i]["device"]
i += 1
if (strongest_beacon != None and strongest_beacon.address == device.address):
strongest_beacon = device
return strongest_beacon
# Getting data with hex format
async def get_device():
# Scanning for devices
devices = await discover()
for d in devices:
# Checking for AirPods
d = get_best_result(d)
if d.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in d.metadata['manufacturer_data']:
data_hex = hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER]))
if len(data_hex) == AIRPODS_DATA_LENGTH:
return data_hex
return False
# Same as get_device() but it's standalone method instead of async
def get_data_hex():
new_loop = new_event_loop()
set_event_loop(new_loop)
loop = get_event_loop()
a = loop.run_until_complete(get_device())
loop.close()
return a
# Getting data from hex string and converting it to dict(json)
def get_data():
raw = get_data_hex()
# Return blank data if airpods not found
if not raw:
return dict(status=0, model="AirPods not found")
flip: bool = is_flipped(raw)
# On 7th position we can get AirPods model, gen1, gen2, Pro or Max
model = identify_model(chr(raw[7]))
# Checking left AirPod for availability and storing charge in variable
status_tmp = int("" + chr(raw[12 if flip else 13]), 16)
left_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))
# Checking right AirPod for availability and storing charge in variable
status_tmp = int("" + chr(raw[13 if flip else 12]), 16)
right_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))
# Checking AirPods case for availability and storing charge in variable
status_tmp = int("" + chr(raw[15]), 16)
case_status = (100 if status_tmp == 10 else (status_tmp * 10 + 5 if status_tmp <= 10 else -1))
# On 14th position we can get charge status of AirPods
charging_status = int("" + chr(raw[14]), 16)
charging_left:bool = (charging_status & (0b00000010 if flip else 0b00000001)) != 0
charging_right:bool = (charging_status & (0b00000001 if flip else 0b00000010)) != 0
charging_case:bool = (charging_status & 0b00000100) != 0
# Return result info in dict format
return dict(
status=1,
charge=dict(
left=left_status,
right=right_status,
case=case_status
),
charging_left=charging_left,
charging_right=charging_right,
charging_case=charging_case,
model=model,
date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
raw=raw.decode("utf-8")
)
# Return if left and right is flipped in the data
def is_flipped(raw):
return (int("" + chr(raw[10]), 16) & 0x02) == 0
def run():
output_file = argv[-1]
while True:
try:
data = get_data()
if data["status"] == 1:
json_data = dumps(data)
if len(argv) > 1:
f = open(output_file, "a")
f.write(json_data+"\n")
f.close()
else:
print(json_data)
status = open("/run/user/{}/airpods_status.json".format(os.getuid()), "w")
status.write(json_data)
status.close()
except BleakDBusError:
# do nothing
sleep(10)
sleep(UPDATE_DURATION)
if __name__ == '__main__':
run()