Skip to content

Commit

Permalink
add method to switch tls verification, move to single quotes
Browse files Browse the repository at this point in the history
  • Loading branch information
blind-oracle committed Feb 3, 2025
1 parent ab5089b commit 8e24a77
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 245 deletions.
8 changes: 4 additions & 4 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@


if len(argv) < 3:
print("Incorrect arguments. Pass brand, login and password as arguments.")
print('Incorrect arguments. Pass brand, login and password as arguments.')
exit(1)

brand, login, password = argv[1], argv[2], argv[3]

brand = BRANDS.get(brand, None)
if brand is None:
print(f"Incorrect brand. Possible values are: {
", ".join([x for x in BRANDS.keys()])}")
print(f'Incorrect brand. Possible values are: {
', '.join([x for x in BRANDS.keys()])}')
exit(1)

c = Client(login, password, "", brand=brand, debug=len(argv) > 4)
c = Client(login, password, '', brand=brand, debug=len(argv) > 4)
c.refresh()
v = c.get_vehicles()

Expand Down
153 changes: 78 additions & 75 deletions pyfiat/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,126 +30,129 @@ def __init__(self, email: str, password: str, pin: str, brand: Brand, disable_tl
self.expire_time: datetime = None

if disable_tls_verification:
import urllib3
self.sess.verify = False
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

if debug:
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log = logging.getLogger('requests.packages.urllib3')
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True

def _with_default_params(self, params: dict):
return params | {
"targetEnv": "jssdk",
"loginMode": "standard",
"sdk": "js_latest",
"authMode": "cookie",
"sdkBuild": "12234",
"format": "json",
"APIKey": self.brand.login_api_key,
'targetEnv': 'jssdk',
'loginMode': 'standard',
'sdk': 'js_latest',
'authMode': 'cookie',
'sdkBuild': '12234',
'format': 'json',
'APIKey': self.brand.login_api_key,
}

def _default_aws_headers(self, key: str):
return {
"x-clientapp-name": "CWP",
"x-clientapp-version": "1.0",
"clientrequestid": uuid.uuid4().hex.upper()[0:16],
"x-api-key": key,
"locale": self.brand.locale,
"x-originator-type": "web",
'x-clientapp-name': 'CWP',
'x-clientapp-version': '1.0',
'clientrequestid': uuid.uuid4().hex.upper()[0:16],
'x-api-key': key,
'locale': self.brand.locale,
'x-originator-type': 'web',
}

def set_tls_verification(self, verify: bool):
self.sess.verify = verify

def login(self):
"""Logs into the Fiat Cloud and caches the auth tokens"""
'''Logs into the Fiat Cloud and caches the auth tokens'''

if self.cognito_client is None:
self.cognito_client = boto3.client(
'cognito-identity', self.brand.region)

r = self.sess.request(
method="GET",
url=self.brand.login_url + "/accounts.webSdkBootstrap",
params={"apiKey": self.brand.login_api_key}
method='GET',
url=self.brand.login_url + '/accounts.webSdkBootstrap',
params={'apiKey': self.brand.login_api_key}
)

r.raise_for_status()
r = r.json()

if r['statusCode'] != 200:
raise Exception(f"bootstrap failed: {r}")
raise Exception(f'bootstrap failed: {r}')

r = self.sess.request(
method="POST",
url=self.brand.login_url + "/accounts.login",
method='POST',
url=self.brand.login_url + '/accounts.login',
params=self._with_default_params({
"loginID": self.email,
"password": self.password,
"sessionExpiration": 300,
"include": "profile,data,emails,subscriptions,preferences"
'loginID': self.email,
'password': self.password,
'sessionExpiration': 300,
'include': 'profile,data,emails,subscriptions,preferences'
})
)

r.raise_for_status()
r = r.json()

if r['statusCode'] != 200:
raise Exception(f"account login failed: {r}")
raise Exception(f'account login failed: {r}')

self.uid = r['UID']
login_token = r['sessionInfo']['login_token']

r = self.sess.request(
method="POST",
url=self.brand.login_url + "/accounts.getJWT",
method='POST',
url=self.brand.login_url + '/accounts.getJWT',
params=self._with_default_params({
"login_token": login_token,
"fields": "profile.firstName,profile.lastName,profile.email,country,locale,data.disclaimerCodeGSDP"
'login_token': login_token,
'fields': 'profile.firstName,profile.lastName,profile.email,country,locale,data.disclaimerCodeGSDP'
})
)

r.raise_for_status()
r = r.json()

if r['statusCode'] != 200:
raise Exception(f"unable to obtain JWT: {r}")
raise Exception(f'unable to obtain JWT: {r}')

exc = None
for url in self.brand.token_url:
try:
r = self.sess.request(
method="POST",
method='POST',
url=url,
headers=self._default_aws_headers(
self.brand.api.key) | {"content-type": "application/json"},
json={"gigya_token": r['id_token']}
self.brand.api.key) | {'content-type': 'application/json'},
json={'gigya_token': r['id_token']}
)
except Exception as e:
exc = e
else:
break
else:
raise Exception(f"unable to obtain token: {exc}")
raise Exception(f'unable to obtain token: {exc}')

r.raise_for_status()
r = r.json()

token = r.get('Token', None)
identity_id = r.get('IdentityId', None)
if token is None or identity_id is None:
raise Exception(f"unable to obtain identity & token: {r}")
raise Exception(f'unable to obtain identity & token: {r}')

r = self.cognito_client.get_credentials_for_identity(
IdentityId=identity_id,
Logins={"cognito-identity.amazonaws.com": token},
Logins={'cognito-identity.amazonaws.com': token},
)

creds = r.get('Credentials', None)
if not creds:
raise Exception(f"unable to obtain AWS credentials: {r}")
raise Exception(f'unable to obtain AWS credentials: {r}')

self.aws_auth = AWSSigV4(
'execute-api',
Expand All @@ -162,7 +165,7 @@ def login(self):
self.expire_time = creds['Expiration']

def _refresh_token_if_needed(self):
"""Checks if token is available and fresh, refreshes it otherwise"""
'''Checks if token is available and fresh, refreshes it otherwise'''

if self.dev_mode:
return
Expand All @@ -171,88 +174,88 @@ def _refresh_token_if_needed(self):
try:
self.login()
except Exception as e:
raise Exception(f"unable to login: {e}")
raise Exception(f'unable to login: {e}')

def list_vehicles(self) -> list[dict]:
"""Loads a list of vehicles with general info"""
'''Loads a list of vehicles with general info'''

if self.dev_mode:
with open("test_list.json") as f:
with open('test_list.json') as f:
return json.load(f)['vehicles']

self._refresh_token_if_needed()

return self.sess.request(
method="GET",
url=self.brand.api.url + f"/v4/accounts/{self.uid}/vehicles",
method='GET',
url=self.brand.api.url + f'/v4/accounts/{self.uid}/vehicles',
headers=self._default_aws_headers(
self.brand.api.key) | {"content-type": "application/json"},
params={"stage": "ALL"},
self.brand.api.key) | {'content-type': 'application/json'},
params={'stage': 'ALL'},
auth=self.aws_auth,
).json()['vehicles']

def get_vehicle(self, vin: str) -> dict:
"""Gets a more detailed info abount a vehicle with a given VIN"""
'''Gets a more detailed info abount a vehicle with a given VIN'''

if self.dev_mode:
with open(f"test_vehicle_{vin}.json") as f:
with open(f'test_vehicle_{vin}.json') as f:
return json.load(f)

self._refresh_token_if_needed()

return self.sess.request(
method="GET",
url=self.brand.api.url + f"/v2/accounts/{self.uid}/vehicles/{vin}/status",
method='GET',
url=self.brand.api.url + f'/v2/accounts/{self.uid}/vehicles/{vin}/status',
headers=self._default_aws_headers(
self.brand.api.key) | {"content-type": "application/json"},
self.brand.api.key) | {'content-type': 'application/json'},
auth=self.aws_auth,
).json()

def get_vehicle_status(self, vin: str) -> dict:
"""Loads another part of status of a vehicle with a given VIN"""
'''Loads another part of status of a vehicle with a given VIN'''

if self.dev_mode:
with open(f"test_vehicle_status_{vin}.json") as f:
with open(f'test_vehicle_status_{vin}.json') as f:
return json.load(f)

self._refresh_token_if_needed()

return self.sess.request(
method="GET",
method='GET',
url=self.brand.api.url +
f"/v1/accounts/{self.uid}/vehicles/{vin}/remote/status",
f'/v1/accounts/{self.uid}/vehicles/{vin}/remote/status',
headers=self._default_aws_headers(
self.brand.api.key) | {"content-type": "application/json"},
self.brand.api.key) | {'content-type': 'application/json'},
auth=self.aws_auth,
).json()

def get_vehicle_location(self, vin: str) -> dict:
"""Gets last known location of a vehicle with a given VIN"""
'''Gets last known location of a vehicle with a given VIN'''

if self.dev_mode:
with open(f"test_vehicle_location_{vin}.json") as f:
with open(f'test_vehicle_location_{vin}.json') as f:
return json.load(f)

self._refresh_token_if_needed()

return self.sess.request(
method="GET",
method='GET',
url=self.brand.api.url +
f"/v1/accounts/{self.uid}/vehicles/{vin}/location/lastknown",
f'/v1/accounts/{self.uid}/vehicles/{vin}/location/lastknown',
headers=self._default_aws_headers(
self.brand.api.key) | {"content-type": "application/json"},
self.brand.api.key) | {'content-type': 'application/json'},
auth=self.aws_auth,
).json()

def command(self,
vin: str, cmd: Command):
"""Sends given command to the vehicle with a given VIN"""
'''Sends given command to the vehicle with a given VIN'''

if self.dev_mode:
return

data = {
'pin': base64.b64encode(self.pin.encode()).decode(encoding="utf-8"),
'pin': base64.b64encode(self.pin.encode()).decode(encoding='utf-8'),
}

self._refresh_token_if_needed()
Expand All @@ -261,11 +264,11 @@ def command(self,
for auth in self.brand.auth:
try:
r = self.sess.request(
method="POST",
method='POST',
url=auth.url +
f"/v1/accounts/{self.uid}/ignite/pin/authenticate",
f'/v1/accounts/{self.uid}/ignite/pin/authenticate',
headers=self._default_aws_headers(auth.token) | {
"content-type": "application/json"},
'content-type': 'application/json'},
auth=self.aws_auth,
json=data,
).json()
Expand All @@ -274,26 +277,26 @@ def command(self,
else:
break
else:
raise Exception(f"Authentication failed: {exc}")
raise Exception(f'Authentication failed: {exc}')

if not 'token' in r:
raise Exception("authentication failed: no token found")
raise Exception('authentication failed: no token found')

data = {
"command": cmd.name,
"pinAuth": r['token'],
'command': cmd.name,
'pinAuth': r['token'],
}

r = self.sess.request(
method="POST",
method='POST',
url=self.brand.api.url +
f"/v1/accounts/{self.uid}/vehicles/{vin}/{cmd.url}",
f'/v1/accounts/{self.uid}/vehicles/{vin}/{cmd.url}',
headers=self._default_aws_headers(
self.brand.api.key) | {"content-type": "application/json"},
self.brand.api.key) | {'content-type': 'application/json'},
auth=self.aws_auth,
json=data,
).json()

if not 'responseStatus' in r or r['responseStatus'] != 'pending':
error = r.get('debugMsg', 'unknown error')
raise Exception(f"command queuing failed: {error}")
raise Exception(f'command queuing failed: {error}')
Loading

0 comments on commit 8e24a77

Please sign in to comment.