Skip to content

Fix username/password authentication with 2-step web login #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 57 additions & 63 deletions pyecobee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,39 +173,76 @@ def request_tokens(self) -> bool:
except (KeyError, TypeError) as err:
_LOGGER.debug(f"Error obtaining tokens from ecobee: {err}")
return False

def request_tokens_web(self) -> bool:
assert self.auth0_token is not None, "auth0 token must be set before calling request_tokens_web"
# Keep all cookies in a session
session = requests.Session()

resp = requests.get(ECOBEE_AUTH_BASE_URL + "/" + ECOBEE_ENDPOINT_AUTH, cookies={"auth0": self.auth0_token}, params={
"client_id": ECOBEE_WEB_CLIENT_ID,
"scope": "smartWrite",
"response_type": "token",
"response_mode": "form_post",
"redirect_uri": "https://www.ecobee.com/home/authCallback",
"audience": "https://prod.ecobee.com/api/v1",
}, timeout=ECOBEE_DEFAULT_TIMEOUT)
# Get the auth0 token and redirect to the identifier step of the login flow
auth0_url = f"{ECOBEE_AUTH_BASE_URL}/{ECOBEE_ENDPOINT_AUTH}"
resp = session.get(
auth0_url,
params={
"response_type": "token",
"response_mode": "form_post",
"client_id": ECOBEE_WEB_CLIENT_ID,
"redirect_uri": "https://www.ecobee.com/home/authCallback",
"audience": "https://prod.ecobee.com/api/v1",
"scope": "openid smartWrite piiWrite piiRead smartRead deleteGrants",
},
)
if "auth0" not in session.cookies:
_LOGGER.error(
f"Failed to obtain auth0 token from {auth0_url}: {resp.status_code} {resp.text}"
)
return False
else:
self.auth0_token = session.cookies["auth0"]

# Submit the identifier/username and redirect to the password step
identifier_url = resp.url
resp = session.post(
identifier_url,
data={
"username": self.username,
},
)
if resp.status_code != 200:
_LOGGER.error(f"Failed to refresh access token: {resp.status_code} {resp.text}")
_LOGGER.error(f"Failed to submit username: {resp.status_code} {resp.text}")
return False

# Submit the password and get the access_token
password_url = resp.url
resp = session.post(
password_url,
data={
"username": self.username,
"password": self.password,
},
)
if resp.status_code != 200:
_LOGGER.error(f"Failed to submit password: {resp.status_code} {resp.text}")
return False

if (auth0 := resp.cookies.get("auth0")) is None:
_LOGGER.error("Failed to refresh access token: no auth0 cookie in response")
self.auth0_token = auth0

# Parse the response HTML for the access token and expiration
if (access_token := resp.text.split('name="access_token" value="')[1].split('"')[0]) is None:
if (
access_token := resp.text.split('name="access_token" value="')[1].split(
'"'
)[0]
) is None:
_LOGGER.error("Failed to refresh bearer token: no access token in response")
return False

self.access_token = access_token

if (expires_in := resp.text.split('name="expires_in" value="')[1].split('"')[0]) is None:
if (
expires_in := resp.text.split('name="expires_in" value="')[1].split('"')[0]
) is None:
_LOGGER.error("Failed to refresh bearer token: no expiration in response")
return False

expires_at = datetime.datetime.now() + datetime.timedelta(seconds=int(expires_in))
expires_at = datetime.datetime.now() + datetime.timedelta(
seconds=int(expires_in)
)
_LOGGER.debug(f"Access token expires at {expires_at}")

self._write_config()
Expand All @@ -214,9 +251,6 @@ def request_tokens_web(self) -> bool:

def refresh_tokens(self) -> bool:
if self.username and self.password:
self.request_auth0_token()

if self.auth0_token is not None:
return self.request_tokens_web()

"""Refreshes ecobee API tokens."""
Expand Down Expand Up @@ -246,46 +280,6 @@ def refresh_tokens(self) -> bool:
_LOGGER.debug(f"Error refreshing tokens from ecobee: {err}")
return False

def request_auth0_token(self) -> bool:
"""Get the auth0 token via username/password."""
session = requests.Session()
url = f"{ECOBEE_AUTH_BASE_URL}/{ECOBEE_ENDPOINT_AUTH}"
resp = session.get(
url,
params = {
"response_type": "token",
"response_mode": "form_post",
"client_id": ECOBEE_WEB_CLIENT_ID,
"redirect_uri": "https://www.ecobee.com/home/authCallback",
"audience": "https://prod.ecobee.com/api/v1",
"scope": "openid smartWrite piiWrite piiRead smartRead deleteGrants",
}
)
if resp.status_code != 200:
_LOGGER.error(f"Failed to obtain auth0 token from {url}: {resp.status_code} {resp.text}")
return False

redirect_url = resp.url
resp = session.post(
redirect_url,
data={
"username": self.username,
"password": self.password,
"action": "default"
}
)
if resp.status_code != 200:
_LOGGER.error(f"Failed to obtain auth0 token from {redirect_url}: {resp.status_code} {resp.text}")
return False
if (auth0 := resp.cookies.get("auth0")) is None:
_LOGGER.error(f"Failed to obtain auth0 token from {redirect_url}: no auth0 cookie in response")
self.auth0_token = None
return False

_LOGGER.debug(f"Obtained auth0 token: {auth0}")
self.auth0_token = auth0
return True

def get_thermostats(self) -> bool:
"""Gets a json-list of thermostats from ecobee and caches in self.thermostats."""
param_string = {
Expand Down