Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion oidcauthlib/auth/token_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,17 @@ async def fetch_well_known_config_and_jwks_async(self) -> None:
response.raise_for_status()
jwks_data: Dict[str, Any] = response.json()
for key in jwks_data.get("keys", []):
kid = key.get("kid")
# if there is no matching "kid" in keys then add it
if not any([k.get("kid") == key.get("kid") for k in keys]):
if not any([k.get("kid") == kid for k in keys]):
keys.append(key)
else:
# Log warning if a duplicate kid is found
logger.warning(
f"Duplicate key ID '{kid}' found when fetching JWKS from {jwks_uri}. "
f"This may indicate overlapping keys from different providers. "
f"Skipping duplicate key from {auth_config.auth_provider}."
)

logger.info(
f"Successfully fetched JWKS from {jwks_uri}, keys= {len(keys)}"
Expand Down Expand Up @@ -230,6 +238,52 @@ async def verify_token_async(self, *, token: str) -> Token | None:
"client_id"
) # AWS Cognito does not have aud claim but has client_id

# Require audience to be present (either 'aud' or 'client_id')
if audience is None:
raise AuthorizationBearerTokenInvalidException(
message="Token is missing 'aud' and 'client_id' claims",
token=token,
)

# Validate that the token matches a configured provider securely.
# Require audience to match; if an issuer is configured for that provider, require the issuer to match as well.
token_matches_config = False
for auth_config in self.auth_configs:
audience_matches = audience == auth_config.audience
if not audience_matches:
continue

# Check if both issuer and audience match this provider's configuration
if auth_config.issuer is not None:
if issuer == auth_config.issuer:
token_matches_config = True
logger.debug(
f"Token matched auth config: provider={auth_config.auth_provider}, "
f"issuer_matches=True, audience_matches=True"
)
break
else:
# audience matched, but issuer did not; try the next provider
continue

# No issuer configured for this provider; audience match is sufficient
token_matches_config = True
logger.debug(
f"Token matched auth config: provider={auth_config.auth_provider}, "
f"issuer_matches=False (not configured), audience_matches=True"
)
break

if not token_matches_config:
logger.warning(
f"Token validation failed: issuer '{issuer}' and audience '{audience}' "
f"do not match any configured auth provider"
)
raise AuthorizationBearerTokenInvalidException(
message=f"Token issuer '{issuer}' and audience '{audience}' do not match any configured auth provider",
token=token,
)

exp = verified.claims.get("exp")
now = time.time()
# convert exp and now to ET (America/New_York) for logging
Expand Down Expand Up @@ -275,6 +329,9 @@ def to_eastern_time(ts: Optional[float]) -> str:
issuer=issuer,
audience=audience,
) from e
except AuthorizationBearerTokenInvalidException:
# Re-raise our custom validation exceptions without wrapping them
raise
except Exception as e:
raise AuthorizationBearerTokenInvalidException(
message=f"Invalid token provided. Exp: {exp_str}, Now: {now_str}. Please check the token:\n{token}.",
Expand Down
4 changes: 3 additions & 1 deletion oidcauthlib/utilities/environment/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def mongo_db_cache_disable_delete(self) -> Optional[bool]:
@property
def auth_providers(self) -> Optional[list[str]]:
auth_providers: str | None = os.environ.get("AUTH_PROVIDERS")
return auth_providers.split(",") if auth_providers else None
return (
[p.strip() for p in auth_providers.split(",")] if auth_providers else None
)

@property
def oauth_referring_email(self) -> Optional[str]:
Expand Down
151 changes: 151 additions & 0 deletions tests/auth/test_token_reader_multi_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""
Tests for TokenReader with multiple OAuth providers.
Specifically tests the fix for the enumeration bug where tokens from one provider
could be incorrectly validated when multiple providers are configured.
"""

import pytest
from unittest.mock import patch, MagicMock
from typing import List, Any
from joserfc.jwk import KeySet
from oidcauthlib.auth.token_reader import TokenReader
from oidcauthlib.auth.config.auth_config import AuthConfig
from oidcauthlib.auth.config.auth_config_reader import AuthConfigReader
from oidcauthlib.auth.exceptions.authorization_bearer_token_invalid_exception import (
AuthorizationBearerTokenInvalidException,
)
from oidcauthlib.utilities.environment.abstract_environment_variables import (
AbstractEnvironmentVariables,
)


class MockEnvironmentVariables(AbstractEnvironmentVariables):
"""Mock environment variables for testing"""

def __init__(self, providers: List[str]) -> None:
self._providers = providers

@property
def auth_providers(self) -> List[str]:
return self._providers

@property
def oauth_cache(self) -> str:
return "memory"

@property
def mongo_uri(self) -> str | None:
return None

@property
def mongo_db_name(self) -> str | None:
return None

@property
def mongo_db_username(self) -> str | None:
return None

@property
def mongo_db_password(self) -> str | None:
return None

@property
def mongo_db_auth_cache_collection_name(self) -> str | None:
return None

@property
def mongo_db_cache_disable_delete(self) -> bool | None:
return None

@property
def oauth_referring_email(self) -> str | None:
return None

@property
def oauth_referring_subject(self) -> str | None:
return None

@property
def auth_redirect_uri(self) -> str | None:
return None


def mock_fetch_jwks_for_token_reader(token_reader: TokenReader) -> Any:
"""Helper to mock JWKS fetching for a token reader"""

async def mock_fetch_jwks() -> None:
# Create a mock KeySet that evaluates as truthy
mock_keyset = MagicMock(spec=KeySet)
mock_keyset.keys = [] # Empty keys list
mock_keyset.__bool__ = lambda self: True # Make it truthy
token_reader.jwks = mock_keyset

return mock_fetch_jwks


@pytest.mark.asyncio
async def test_token_validation_rejects_wrong_issuer_and_audience() -> None:
"""
Test that a token with a valid signature but wrong issuer and audience is rejected.
This tests the fix for the enumeration bug where tokens from one provider
would be accepted even when they should be rejected.
"""
# Setup two auth configs
auth_config_1 = AuthConfig(
auth_provider="PROVIDER1",
audience="audience1",
issuer="https://provider1.example.com",
client_id="client1",
client_secret="secret1", # pragma: allowlist secret
well_known_uri="https://provider1.example.com/.well-known/openid-configuration",
)
auth_config_2 = AuthConfig(
auth_provider="PROVIDER2",
audience="audience2",
issuer="https://provider2.example.com",
client_id="client2",
client_secret="secret2", # pragma: allowlist secret
well_known_uri="https://provider2.example.com/.well-known/openid-configuration",
)

# Mock the auth config reader
env_vars = MockEnvironmentVariables(["PROVIDER1", "PROVIDER2"])
auth_config_reader = AuthConfigReader(environment_variables=env_vars)

# Mock get_config_for_auth_provider to return our configs
with patch.object(
auth_config_reader,
"get_config_for_auth_provider",
side_effect=lambda auth_provider: auth_config_1
if auth_provider == "PROVIDER1"
else auth_config_2,
):
# Create token reader
token_reader = TokenReader(
auth_config_reader=auth_config_reader, algorithms=["RS256"]
)

mock_fetch = mock_fetch_jwks_for_token_reader(token_reader)

# Mock jwt.decode to return claims with wrong issuer and audience
with (
patch.object(
token_reader, "fetch_well_known_config_and_jwks_async", mock_fetch
),
patch("oidcauthlib.auth.token_reader.jwt.decode") as mock_decode,
):
mock_verified = MagicMock()
mock_verified.claims = {
"iss": "https://wrong-issuer.example.com", # Wrong issuer
"aud": "wrong-audience", # Wrong audience
"exp": 9999999999, # Far future
"sub": "test-user",
}
mock_decode.return_value = mock_verified

# Try to verify token - should fail because issuer/audience don't match
with pytest.raises(AuthorizationBearerTokenInvalidException) as exc_info:
await token_reader.verify_token_async(token="fake.jwt.token")

# Check error message mentions issuer/audience mismatch
assert "do not match any configured auth provider" in str(exc_info.value)