Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(token): token endpoint is now configurable #421

Merged
merged 3 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions config/clients/python/template/src/credentials.py.mustache
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{{>partial_header}}

from urllib.parse import urlparse
from urllib.parse import urlparse, urlunparse

from {{packageName}}.exceptions import ApiValueError

Expand Down Expand Up @@ -150,6 +150,31 @@ class Credentials:
"""
self._configuration = value

def _parse_issuer(self, issuer: str):
default_endpoint_path = '/oauth/token'

parsed_url = urlparse(issuer.strip())

try:
parsed_url.port
except ValueError as e:
raise ApiValueError(e)

if parsed_url.netloc is None and parsed_url.path is None:
raise ApiValueError("Invalid issuer")

if parsed_url.scheme == "":
parsed_url = urlparse(f"https://{issuer}")
elif parsed_url.scheme not in ("http", "https"):
raise ApiValueError(f"Invalid issuer scheme {parsed_url.scheme} must be HTTP or HTTPS")

if parsed_url.path in ("", "/"):
parsed_url = parsed_url._replace(path=default_endpoint_path)

valid_url = urlunparse(parsed_url)

return valid_url

def validate_credentials_config(self):
"""
Check whether credentials configuration is valid
Expand All @@ -164,15 +189,6 @@ class Credentials:
if self.configuration is None or none_or_empty(self.configuration.client_id) or none_or_empty(self.configuration.client_secret) or none_or_empty(self.configuration.api_audience) or none_or_empty(self.configuration.api_issuer):
raise ApiValueError('configuration `{}` requires client_id, client_secret, api_audience and api_issuer defined for client_credentials method.')
# validate token issuer
combined_url = 'https://' + self.configuration.api_issuer
parsed_url = None
try:
parsed_url = urlparse(combined_url)
except ValueError:
raise ApiValueError(
f"api_issuer `{self.configuration.api_issuer}` is invalid"
)
if (parsed_url.netloc == ''):
raise ApiValueError(
f"api_issuer `{self.configuration.api_issuer}` is invalid"
)

parsed_issuer_url = self._parse_issuer(self.configuration.api_issuer)

Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
{{>partial_header}}

from unittest import IsolatedAsyncioTestCase
import unittest

import {{packageName}}{{^asyncio}} as openfga_sdk{{/asyncio}}

from {{packageName}}.credentials import CredentialConfiguration, Credentials

from {{packageName}}.exceptions import ApiValueError

class TestCredentials(IsolatedAsyncioTestCase):
"""Credentials unit test"""
Expand Down Expand Up @@ -131,3 +132,58 @@ class TestCredentials(IsolatedAsyncioTestCase):
with self.assertRaises(openfga_sdk.ApiValueError):
credential.validate_credentials_config()


class TestCredentialsIssuer(unittest.TestCase):
def setUp(self):
# Setup a basic configuration that can be modified per test case
self.configuration = CredentialConfiguration(api_issuer="https://example.com")
self.credentials = Credentials(method="client_credentials", configuration=self.configuration)

def test_valid_issuer_https(self):
# Test a valid HTTPS URL
self.configuration.api_issuer = "issuer.fga.example "
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://issuer.fga.example/oauth/token")

def test_valid_issuer_with_oauth_endpoint_https(self):
# Test a valid HTTPS URL
self.configuration.api_issuer = "https://example.com/oauth/token"
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://example.com/oauth/token")

def test_valid_issuer_with_some_endpoint_https(self):
# Test a valid HTTPS URL
self.configuration.api_issuer = "https://example.com/oauth/some/endpoint"
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://example.com/oauth/some/endpoint")

def test_valid_issuer_http(self):
# Test a valid HTTP URL
self.configuration.api_issuer = "fga.example/some_endpoint"
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://fga.example/some_endpoint")

def test_invalid_issuer_no_scheme(self):
# Test an issuer URL without a scheme
self.configuration.api_issuer = "https://issuer.fga.example:8080/some_endpoint "
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://issuer.fga.example:8080/some_endpoint")

def test_invalid_issuer_bad_scheme(self):
# Test an issuer with an unsupported scheme
self.configuration.api_issuer = "ftp://example.com"
with self.assertRaises(ApiValueError):
self.credentials._parse_issuer(self.configuration.api_issuer)

def test_invalid_issuer_with_port(self):
# Test an issuer with an unsupported scheme
self.configuration.api_issuer = "https://issuer.fga.example:8080 "
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://issuer.fga.example:8080/oauth/token")

# this should raise error
def test_invalid_issuer_bad_hostname(self):
# Test an issuer with an invalid hostname
self.configuration.api_issuer = "https://example?.com"
with self.assertRaises(ApiValueError):
self.credentials._parse_issuer(self.configuration.api_issuer)
Loading