-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathFMF.py
302 lines (245 loc) · 7.86 KB
/
FMF.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# FriendMyFriend Server Side Tracker API
# Inteded to be used in a constantly running script
# Made by Clay Shieh
# Referenced Vladimir Smirnov's iCloud API Implementation code for general iCloud authentication workflow and cookie implementation
# https://github.com/mindcollapse/iCloud-API/
import os
import uuid
import json
import time
import requests
import logging
class FMFException(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class FMF():
def __init__(self, aid, password, cache=True, verbose=True):
# credentials
self.aid = aid
self.password = password
self.build_id = "17DProject104"
self.client_id = str(uuid.uuid1()).upper()
self.dsid = None
# connection
self.cookies = None
self.http = requests.Session()
# local
self.fmf_base_url = None
self.contacts = None
self.fmf_map = None
self.first_run = True
# cached info
self.cache = cache
if self.cache:
self.path = os.path.dirname(os.path.abspath(__file__))
self.cpath = os.path.join(self.path, "contacts.json")
self.fpath = os.path.join(self.path, "fmf.json")
if os.path.isfile(self.cpath):
self.contacts = self.persistant_read(self.cpath)
if os.path.isfile(self.fpath):
self.fmf_map = self.persistant_read(self.fpath)
# Logger
logging.basicConfig()
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
if verbose:
self.logger.setLevel(logging.INFO)
self.authenticate()
def persistant_write(self, fname, data):
# data is a dictionary
with open(fname, 'w') as f:
json.dump(data, f)
def persistant_read(self, fname):
with open(fname, 'r') as f:
return json.load(f)
def update_cookies(self, r):
self.cookies = r.cookies
def request(self, url, method="GET", headers=None, body=None, wait_time=10):
# requests function lookup
functions = {
"POST": self.http.post,
"GET": self.http.get
}
r = None
count = 0
max_tries = 3
exp_time = 0.0625
# bad code practice. If LAN is down then it will hit the exception case
# if apple server is down then itll get stuck in while loop and try with exponential backoff
while not r:
# just in case
if count > max_tries:
self.logger.info("Max tries reached")
return None
try:
r = functions[method](url, headers=headers, json=body, cookies=self.cookies)
except Exception as e:
self.logger.debug("Error in request")
self.logger.debug("Error: " + str(e))
self.logger.debug("Response: " + str(r))
r = None
time.sleep(wait_time)
continue
# exponential back off
if exp_time <= 16384: # lowest freq is ~ once per hr for apple server to come up
exp_time *= 2
count = 0
count += 1
time.sleep(exp_time)
return r
def get_service_url(self, resp, service):
if resp:
if service in resp["webservices"].keys():
if resp["webservices"][service]["status"] == "active":
self.logger.info("FMF service enabled on account")
self.fmf_base_url = resp["webservices"][service]["url"]
return
raise FMFException("Please check that FMF is enabled on your iCloud account.")
def get_dsid(self, resp):
if resp:
self.logger.info("Login succesful")
self.dsid = resp["dsInfo"]["dsid"]
else:
raise FMFException("Please check that your login information is correct.")
def authenticate(self):
self.logger.info("Authenticating FMF service")
auth_url = "https://setup.icloud.com/setup/ws/1/login?clientBuildNumber={0}&clientId={1}"
auth_url = auth_url.format(self.build_id, self.client_id)
headers = {
"Origin": "https://www.icloud.com",
"Referer": "https://www.icloud.com"
}
data = {
"apple_id": self.aid,
"password": self.password,
"extended_login": False
}
r = self.request(auth_url, "POST", headers=headers, body=data)
self.update_cookies(r)
auth_resp = r.json()
self.get_dsid(auth_resp)
self.get_service_url(auth_resp, "fmf")
def refresh(self, init=False):
self.logger.info("Refresh called")
action = "refresh"
if init:
action = "init"
fmf_url = "{0}/fmipservice/client/fmfWeb/{1}Client?clientBuildNumber={2}&clientId={3}&dsid={4}"
headers = {
"Origin": "https://www.icloud.com",
"Referer": "https://www.icloud.com"
}
data = {
"clientContext": {
"productType": "fmfWeb",
"appVersion": "1.0",
"contextApp": "com.icloud.web.fmf",
"userInactivityTimeInMS": 1,
"tileServer": "Apple"
}
}
fmf_url = fmf_url.format(self.fmf_base_url, action, self.build_id, self.client_id, self.dsid)
r = self.request(fmf_url, "POST", headers=headers, body=data)
# update the cookies
self.update_cookies(r)
# process data
data = r.json()
# get contacts
name2id = {}
if "contactDetails" in data:
for contact in data["contactDetails"]:
name = contact["firstName"] + " " + contact["lastName"]
name2id[name] = contact["id"]
# get locations
# k: id
# v: [timestamp(ms), country, streetname, streetaddress, coutnrycode, locality, statecode, administrativearea]
fmf_map = {}
if "locations" in data:
for location in data["locations"]:
if location["location"]:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(location["location"]["timestamp"])/1000))
address = location["location"]["address"]
if address:
fmf_map[location["id"]] = [timestamp, address]
continue # sometimes address isn't ready yet
return name2id, fmf_map
def update(self, tries=7, min_tries=2, wait_time=3):
self.logger.info("Find called")
if self.first_run:
# run init first
self.contacts, self.fmf_map = self.refresh(init=True)
for i in range(tries):
new_contacts, new_fmf_map = self.refresh()
# update if anything changed in contacts
if new_contacts != self.contacts:
self.logger.info("Contacts are different")
self.logger.info("Old contacts:")
self.logger.info(self.contacts)
self.contacts.update(new_contacts)
self.logger.info("Updated contacts:")
self.logger.info(self.contacts)
self.logger.info("Updating fmf_map")
different = False
# check if anything changed in fmf_map
for f in new_fmf_map:
# checks if anything changed
if f in self.fmf_map:
if new_fmf_map[f][1] != self.fmf_map[f][1]:
self.fmf_map[f] = new_fmf_map[f]
different = True
# new_fmf_map has something new
else:
self.fmf_map[f] = new_fmf_map[f]
different = True
# nothing changed and not the first run
if not different and i > min_tries - 1:
self.logger.info("nothing changed")
break
time.sleep(wait_time)
# error handling
if not self.contacts:
self.logger.debug("Contacts is empty")
return None
if not self.fmf_map:
self.logger.debug("FMF map is empty")
return None
if self.cache:
self.persistant_write(self.cpath, self.contacts)
self.persistant_write(self.fpath, self.fmf_map)
def get_user_by_name(self, user, update=False, hook=None):
self.logger.info("Finding user: {0}".format(user))
if update:
# update data
self.update()
# use hooks as functions to run other utilities with that information
if user in self.contacts:
if self.contacts[user] in self.fmf_map:
result = self.fmf_map[self.contacts[user]]
if hook:
hook(user, result)
return result
self.logger.debug("User {0} not in contacts or can't be found right now".format(self.contacts[user]))
return None
def get_user_by_id(self, uid, update=False, reverse=True, hook=None):
self.logger.info("Finding user id: {0}".format(uid))
if update:
# update data
self.update()
# find the user name associated with the user id
user = None
for u, u_id in self.contacts.iteritems():
if u_id == uid:
if reverse:
user = u
else:
user = u_id
if user:
if uid in self.fmf_map:
result = self.fmf_map[uid]
if hook:
hook(user, result)
return result
self.logger.debug("UserID {0} not in contacts or can't be found right now".format(uid))
return None