Skip to content
4 changes: 4 additions & 0 deletions nmdc_runtime/api/core/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ def get_access_token_expiration(token) -> datetime:


class OAuth2PasswordOrClientCredentialsBearer(OAuth2):
"""
TODO: Document this undocumented class.
"""

def __init__(
self,
tokenUrl: str,
Expand Down
105 changes: 86 additions & 19 deletions nmdc_runtime/api/models/user.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
from typing import List, Optional, Union

import pymongo.database
from fastapi import Depends, HTTPException
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from jose import jwt
from pydantic import BaseModel
from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError

from nmdc_runtime.api.core.auth import (
verify_password,
Expand Down Expand Up @@ -71,36 +73,101 @@ async def get_current_user(
whose username is the site client's `client_id`.

Raises an exception if the token is invalid.

Reference: The following web page contains information about JWT claims:
https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-token-claims
"""

if mdb.invalidated_tokens.find_one({"_id": token}):
raise credentials_exception
# Define some exceptions, which contain actionable—but not sensitive—information.
invalid_subject_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token is invalid. Please log in again.",
headers={"WWW-Authenticate": "Bearer"},
)
invalid_claims_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token is invalid. Please log in again.",
headers={"WWW-Authenticate": "Bearer"},
)
invalid_token_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token is invalid. Please log in again.",
headers={"WWW-Authenticate": "Bearer"},
)
invalidated_token_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token has been invalidated. Please log in again.",
headers={"WWW-Authenticate": "Bearer"},
)
expired_token_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token has expired. Please log in again.",
headers={"WWW-Authenticate": "Bearer"},
)
invalid_or_missing_token_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Access token is invalid or missing. Please log in again.",
headers={"WWW-Authenticate": "Bearer"},
)

# Check whether there is a token, and whether it has been invalidated.
if token is None:
raise invalid_or_missing_token_exception
elif mdb.invalidated_tokens.find_one({"_id": token}):
raise invalidated_token_exception

# Validate the signature of the JWT and extract its payload.
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
subject: str = payload.get("sub")
if subject is None:
raise credentials_exception
if not subject.startswith("user:") and not subject.startswith("client:"):
raise credentials_exception

# subject is in the form "user:foo" or "client:bar"
username = subject.split(":", 1)[1]
token_data = TokenData(subject=username)
except ExpiredSignatureError as e:
logging.exception(e)
raise expired_token_exception
except JWTClaimsError as e:
logging.exception(e)
raise invalid_claims_exception
except (JWTError, AttributeError) as e:
print(f"jwt error: {e}")
raise credentials_exception
logging.exception(e)
raise invalid_token_exception

# Extract the prefix and the username from the subject.
subject: Optional[str] = payload.get("sub", None)
if isinstance(subject, str):
if subject.startswith("user:"):
subject_prefix = "user:"
elif subject.startswith("client:"):
subject_prefix = "client:"
else:
logging.warning("The subject contains an invalid prefix.")
raise invalid_subject_exception
username = subject.removeprefix(subject_prefix)
if username == "":
logging.warning("The subject contains nothing after the prefix.")
raise invalid_subject_exception
else:
logging.warning("The subject is not a string.")
raise invalid_subject_exception
token_data = TokenData(subject=username)

# Coerce a "client" into a "user"
# TODO: consolidate the client/user distinction.
if subject.startswith("user:"):
if not isinstance(token_data.subject, str):
logging.warning("The subject is not a string.")
raise invalid_subject_exception
elif subject_prefix == "user:":
user = get_user(mdb, username=token_data.subject)
elif subject.startswith("client:"):
elif subject_prefix == "client:":
# construct a user from the client_id
user = get_client_user(mdb, client_id=token_data.subject)
else:
raise credentials_exception
# Note: We already validate the subject's prefix above, so we expect this case to never occur.
logging.warning("The subject prefix is not something we recognize.")
user = None

if user is None:
raise credentials_exception
logging.warning(
f"Failed to resolve token subject '{token_data.subject}' to a user."
)
raise invalid_subject_exception
return user


Expand Down
Loading