diff --git a/auth_jwt/exceptions.py b/auth_jwt/exceptions.py
index d1b5a80d0d..bc81b3ae12 100644
--- a/auth_jwt/exceptions.py
+++ b/auth_jwt/exceptions.py
@@ -30,3 +30,17 @@ class UnauthorizedInvalidToken(Unauthorized):
class UnauthorizedPartnerNotFound(Unauthorized):
pass
+
+
+class CompositeJwtError(Unauthorized):
+ """Indicate that multiple errors occurred during JWT chain validation."""
+
+ def __init__(self, errors):
+ self.errors = errors
+ super().__init__(
+ "Multiple errors occurred during JWT chain validation:\n"
+ + "\n".join(
+ "{}: {}".format(validator_name, error)
+ for validator_name, error in self.errors.items()
+ )
+ )
diff --git a/auth_jwt/models/auth_jwt_validator.py b/auth_jwt/models/auth_jwt_validator.py
index 5d841888be..3d3c9b4d7c 100644
--- a/auth_jwt/models/auth_jwt_validator.py
+++ b/auth_jwt/models/auth_jwt_validator.py
@@ -67,6 +67,12 @@ class AuthJwtValidator(models.Model):
partner_id_strategy = fields.Selection([("email", "From email claim")])
partner_id_required = fields.Boolean()
+ next_validator_id = fields.Many2one(
+ "auth.jwt.validator",
+ domain="[('id', '!=', id)]",
+ help="Next validator to try if this one fails",
+ )
+
_sql_constraints = [
("name_uniq", "unique(name)", "JWT validator names must be unique !"),
]
@@ -79,6 +85,22 @@ def _check_name(self):
_("Name %r is not a valid python identifier.") % (rec.name,)
)
+ @api.constrains("next_validator_id")
+ def _check_next_validator_id(self):
+ # Prevent circular references
+ for rec in self:
+ validator = rec
+ chain = [validator.name]
+ while validator:
+ validator = validator.next_validator_id
+ chain.append(validator.name)
+ if rec == validator:
+ raise ValidationError(
+ _("Validators mustn't make a closed chain: {}.").format(
+ " -> ".join(chain)
+ )
+ )
+
@api.model
def _get_validator_by_name_domain(self, validator_name):
if validator_name:
diff --git a/auth_jwt/models/ir_http.py b/auth_jwt/models/ir_http.py
index f90b5824c2..89bfec6cde 100644
--- a/auth_jwt/models/ir_http.py
+++ b/auth_jwt/models/ir_http.py
@@ -4,10 +4,11 @@
import logging
import re
-from odoo import SUPERUSER_ID, api, models, registry as registry_get
+from odoo import SUPERUSER_ID, api, models
from odoo.http import request
from ..exceptions import (
+ CompositeJwtError,
UnauthorizedMalformedAuthorizationHeader,
UnauthorizedMissingAuthorizationHeader,
UnauthorizedSessionMismatch,
@@ -55,20 +56,33 @@ def _authenticate(cls, endpoint):
@classmethod
def _auth_method_jwt(cls, validator_name=None):
- assert request.db
assert not request.uid
assert not request.session.uid
token = cls._get_bearer_token()
assert token
- registry = registry_get(request.db)
- with registry.cursor() as cr:
- env = api.Environment(cr, SUPERUSER_ID, {})
- validator = env["auth.jwt.validator"]._get_validator_by_name(validator_name)
- assert len(validator) == 1
- payload = validator._decode(token)
- uid = validator._get_and_check_uid(payload)
- assert uid
- partner_id = validator._get_and_check_partner_id(payload)
+ # # Use request cursor to allow partner creation strategy in validator
+ env = api.Environment(request.cr, SUPERUSER_ID, {})
+ validator = env["auth.jwt.validator"]._get_validator_by_name(validator_name)
+ assert len(validator) == 1
+
+ payload = None
+ exceptions = {}
+ while validator:
+ try:
+ payload = validator._decode(token)
+ break
+ except Exception as e:
+ exceptions[validator.name] = e
+ validator = validator.next_validator_id
+
+ if not payload:
+ if len(exceptions) == 1:
+ raise list(exceptions.values())[0]
+ raise CompositeJwtError(exceptions)
+
+ uid = validator._get_and_check_uid(payload)
+ assert uid
+ partner_id = validator._get_and_check_partner_id(payload)
request.uid = uid # this resets request.env
request.jwt_payload = payload
request.jwt_partner_id = partner_id
diff --git a/auth_jwt/tests/test_auth_jwt.py b/auth_jwt/tests/test_auth_jwt.py
index 937778c521..c1e00c660c 100644
--- a/auth_jwt/tests/test_auth_jwt.py
+++ b/auth_jwt/tests/test_auth_jwt.py
@@ -15,6 +15,7 @@
from ..exceptions import (
AmbiguousJwtValidator,
+ CompositeJwtError,
JwtValidatorNotFound,
UnauthorizedInvalidToken,
UnauthorizedMalformedAuthorizationHeader,
@@ -36,6 +37,7 @@ def _mock_request(self, authorization):
httprequest=Mock(environ=environ),
session=DotDict(),
env=self.env,
+ cr=self.env.cr,
)
# These attributes are added upon successful auth, so make sure
# calling hasattr on the mock when they are not yet set returns False.
@@ -63,45 +65,28 @@ def _create_token(
payload["nbf"] = nbf
return jwt.encode(payload, key=key, algorithm="HS256")
- def _create_validator(self, name, audience="me", partner_id_required=False):
+ def _create_validator(
+ self,
+ name,
+ audience="me",
+ issuer="http://the.issuer",
+ secret_key="thesecret",
+ partner_id_required=False,
+ ):
return self.env["auth.jwt.validator"].create(
dict(
name=name,
signature_type="secret",
secret_algorithm="HS256",
- secret_key="thesecret",
+ secret_key=secret_key,
audience=audience,
- issuer="http://the.issuer",
+ issuer=issuer,
user_id_strategy="static",
partner_id_strategy="email",
partner_id_required=partner_id_required,
)
)
- @contextlib.contextmanager
- def _commit_validator(self, name, audience="me", partner_id_required=False):
- validator = self._create_validator(
- name=name, audience=audience, partner_id_required=partner_id_required
- )
-
- def _mocked_get_validator_by_name(self, validator_name):
- if validator_name == name:
- return validator
- return self.env["auth.jwt.validator"]._get_validator_by_name.origin(
- self, validator_name
- )
-
- try:
- # Patch _get_validator_by_name because IrHttp._auth_method_jwt
- # will look for the validator in another transaction,
- # where the validator we created above would not be visible.
- self.env["auth.jwt.validator"]._patch_method(
- "_get_validator_by_name", _mocked_get_validator_by_name
- )
- yield validator
- finally:
- self.env["auth.jwt.validator"]._revert_method("_get_validator_by_name")
-
def test_missing_authorization_header(self):
with self._mock_request(authorization=None):
with self.assertRaises(UnauthorizedMissingAuthorizationHeader):
@@ -121,64 +106,186 @@ def test_malformed_authorization_header(self):
self.env["ir.http"]._auth_method_jwt()
def test_auth_method_valid_token(self):
- with self._commit_validator("validator"):
- authorization = "Bearer " + self._create_token()
- with self._mock_request(authorization=authorization):
- self.env["ir.http"]._auth_method_jwt_validator()
+ self._create_validator("validator")
+ authorization = "Bearer " + self._create_token()
+ with self._mock_request(authorization=authorization):
+ self.env["ir.http"]._auth_method_jwt_validator()
- def test_auth_method_valid_token_two_validators(self):
- with self._commit_validator(
- "validator2", audience="bad"
- ), self._commit_validator("validator3"):
- authorization = "Bearer " + self._create_token()
- with self._mock_request(authorization=authorization):
- # first validator rejects the token because of invalid audience
- with self.assertRaises(UnauthorizedInvalidToken):
- self.env["ir.http"]._auth_method_jwt_validator2()
- # second validator accepts the token
- self.env["ir.http"]._auth_method_jwt_validator3()
+ def test_auth_method_valid_token_two_validators_one_bad_issuer(self):
+ self._create_validator("validator2", issuer="http://other.issuer")
+ self._create_validator("validator3")
+
+ authorization = "Bearer " + self._create_token()
+ with self._mock_request(authorization=authorization):
+ # first validator rejects the token because of invalid audience
+ with self.assertRaises(UnauthorizedInvalidToken):
+ self.env["ir.http"]._auth_method_jwt_validator2()
+ # second validator accepts the token
+ self.env["ir.http"]._auth_method_jwt_validator3()
+
+ def test_auth_method_valid_token_two_validators_one_bad_issuer_chained(self):
+ validator2 = self._create_validator("validator2", issuer="http://other.issuer")
+ validator3 = self._create_validator("validator3")
+ validator2.next_validator_id = validator3
+
+ authorization = "Bearer " + self._create_token()
+ with self._mock_request(authorization=authorization):
+ # Validator2 rejects the token because of invalid issuer but chain
+ # on validator3 which accepts it
+ self.env["ir.http"]._auth_method_jwt_validator2()
+
+ def test_auth_method_valid_token_two_validators_one_bad_audience(self):
+ self._create_validator("validator2", audience="bad")
+ self._create_validator("validator3")
+
+ authorization = "Bearer " + self._create_token()
+ with self._mock_request(authorization=authorization):
+ # first validator rejects the token because of invalid audience
+ with self.assertRaises(UnauthorizedInvalidToken):
+ self.env["ir.http"]._auth_method_jwt_validator2()
+ # second validator accepts the token
+ self.env["ir.http"]._auth_method_jwt_validator3()
+
+ def test_auth_method_valid_token_two_validators_one_bad_audience_chained(self):
+ validator2 = self._create_validator("validator2", audience="bad")
+ validator3 = self._create_validator("validator3")
+
+ validator2.next_validator_id = validator3
+ authorization = "Bearer " + self._create_token()
+ with self._mock_request(authorization=authorization):
+ self.env["ir.http"]._auth_method_jwt_validator2()
def test_auth_method_invalid_token(self):
# Test invalid token via _auth_method_jwt
# Other types of invalid tokens are unit tested elswhere.
- with self._commit_validator("validator4"):
- authorization = "Bearer " + self._create_token(audience="bad")
- with self._mock_request(authorization=authorization):
- with self.assertRaises(UnauthorizedInvalidToken):
- self.env["ir.http"]._auth_method_jwt_validator4()
+ self._create_validator("validator4")
+ authorization = "Bearer " + self._create_token(audience="bad")
+ with self._mock_request(authorization=authorization):
+ with self.assertRaises(UnauthorizedInvalidToken):
+ self.env["ir.http"]._auth_method_jwt_validator4()
+
+ def test_auth_method_invalid_token_on_chain(self):
+ validator1 = self._create_validator("validator", issuer="http://other.issuer")
+ validator2 = self._create_validator("validator2", audience="bad audience")
+ validator3 = self._create_validator("validator3", secret_key="bad key")
+ validator4 = self._create_validator(
+ "validator4", issuer="http://other.issuer", audience="bad audience"
+ )
+ validator5 = self._create_validator(
+ "validator5", issuer="http://other.issuer", secret_key="bad key"
+ )
+ validator6 = self._create_validator(
+ "validator6", audience="bad audience", secret_key="bad key"
+ )
+ validator7 = self._create_validator(
+ "validator7",
+ issuer="http://other.issuer",
+ audience="bad audience",
+ secret_key="bad key",
+ )
+ validator1.next_validator_id = validator2
+ validator2.next_validator_id = validator3
+ validator3.next_validator_id = validator4
+ validator4.next_validator_id = validator5
+ validator5.next_validator_id = validator6
+ validator6.next_validator_id = validator7
+
+ authorization = "Bearer " + self._create_token()
+ with self._mock_request(authorization=authorization):
+ with self.assertRaises(CompositeJwtError) as composite_error:
+ self.env["ir.http"]._auth_method_jwt_validator()
+ self.assertEqual(
+ str(composite_error.exception),
+ "401 Unauthorized: Multiple errors occurred during JWT chain validation:\n"
+ "validator: 401 Unauthorized: "
+ "The server could not verify that you are authorized to "
+ "access the URL requested. You either supplied the wrong "
+ "credentials (e.g. a bad password), or your browser doesn't "
+ "understand how to supply the credentials required.\n"
+ "validator2: 401 Unauthorized: "
+ "The server could not verify that you are authorized to "
+ "access the URL requested. You either supplied the wrong "
+ "credentials (e.g. a bad password), or your browser doesn't "
+ "understand how to supply the credentials required.\n"
+ "validator3: 401 Unauthorized: "
+ "The server could not verify that you are authorized to "
+ "access the URL requested. You either supplied the wrong "
+ "credentials (e.g. a bad password), or your browser doesn't "
+ "understand how to supply the credentials required.\n"
+ "validator4: 401 Unauthorized: "
+ "The server could not verify that you are authorized to "
+ "access the URL requested. You either supplied the wrong "
+ "credentials (e.g. a bad password), or your browser doesn't "
+ "understand how to supply the credentials required.\n"
+ "validator5: 401 Unauthorized: "
+ "The server could not verify that you are authorized to "
+ "access the URL requested. You either supplied the wrong "
+ "credentials (e.g. a bad password), or your browser doesn't "
+ "understand how to supply the credentials required.\n"
+ "validator6: 401 Unauthorized: "
+ "The server could not verify that you are authorized to "
+ "access the URL requested. You either supplied the wrong "
+ "credentials (e.g. a bad password), or your browser doesn't "
+ "understand how to supply the credentials required.\n"
+ "validator7: 401 Unauthorized: "
+ "The server could not verify that you are authorized to "
+ "access the URL requested. You either supplied the wrong "
+ "credentials (e.g. a bad password), or your browser doesn't "
+ "understand how to supply the credentials required.",
+ )
+
+ def test_invalid_validation_chain(self):
+ validator1 = self._create_validator("validator")
+ validator2 = self._create_validator("validator2")
+ validator3 = self._create_validator("validator3")
+
+ validator1.next_validator_id = validator2
+ validator2.next_validator_id = validator3
+ with self.assertRaises(ValidationError) as error:
+ validator3.next_validator_id = validator1
+ self.assertEqual(
+ str(error.exception),
+ "Validators mustn't make a closed chain: "
+ "validator3 -> validator -> validator2 -> validator3.",
+ )
+
+ def test_invalid_validation_auto_chain(self):
+ validator = self._create_validator("validator")
+ with self.assertRaises(ValidationError) as error:
+ validator.next_validator_id = validator
+ self.assertEqual(
+ str(error.exception),
+ "Validators mustn't make a closed chain: " "validator -> validator.",
+ )
def test_user_id_strategy(self):
- with self._commit_validator("validator5") as validator:
- authorization = "Bearer " + self._create_token()
- with self._mock_request(authorization=authorization) as request:
- self.env["ir.http"]._auth_method_jwt_validator5()
- self.assertEqual(request.uid, validator.static_user_id.id)
+ validator = self._create_validator("validator5")
+ authorization = "Bearer " + self._create_token()
+ with self._mock_request(authorization=authorization) as request:
+ self.env["ir.http"]._auth_method_jwt_validator5()
+ self.assertEqual(request.uid, validator.static_user_id.id)
def test_partner_id_strategy_email_found(self):
partner = self.env["res.partner"].search([("email", "!=", False)])[0]
- with self._commit_validator("validator6"):
- authorization = "Bearer " + self._create_token(email=partner.email)
- with self._mock_request(authorization=authorization) as request:
- self.env["ir.http"]._auth_method_jwt_validator6()
- self.assertEqual(request.jwt_partner_id, partner.id)
+ self._create_validator("validator6")
+ authorization = "Bearer " + self._create_token(email=partner.email)
+ with self._mock_request(authorization=authorization) as request:
+ self.env["ir.http"]._auth_method_jwt_validator6()
+ self.assertEqual(request.jwt_partner_id, partner.id)
def test_partner_id_strategy_email_not_found(self):
- with self._commit_validator("validator6"):
- authorization = "Bearer " + self._create_token(
- email="notanemail@example.com"
- )
- with self._mock_request(authorization=authorization) as request:
- self.env["ir.http"]._auth_method_jwt_validator6()
- self.assertFalse(request.jwt_partner_id)
+ self._create_validator("validator6")
+ authorization = "Bearer " + self._create_token(email="notanemail@example.com")
+ with self._mock_request(authorization=authorization) as request:
+ self.env["ir.http"]._auth_method_jwt_validator6()
+ self.assertFalse(request.jwt_partner_id)
def test_partner_id_strategy_email_not_found_partner_required(self):
- with self._commit_validator("validator6", partner_id_required=True):
- authorization = "Bearer " + self._create_token(
- email="notanemail@example.com"
- )
- with self._mock_request(authorization=authorization):
- with self.assertRaises(UnauthorizedPartnerNotFound):
- self.env["ir.http"]._auth_method_jwt_validator6()
+ self._create_validator("validator6", partner_id_required=True)
+ authorization = "Bearer " + self._create_token(email="notanemail@example.com")
+ with self._mock_request(authorization=authorization):
+ with self.assertRaises(UnauthorizedPartnerNotFound):
+ self.env["ir.http"]._auth_method_jwt_validator6()
def test_get_validator(self):
AuthJwtValidator = self.env["auth.jwt.validator"]
@@ -296,8 +403,8 @@ def test_public_or_jwt_no_token(self):
assert not hasattr(request, "jwt_payload")
def test_public_or_jwt_valid_token(self):
- with self._commit_validator("validator"):
- authorization = "Bearer " + self._create_token()
- with self._mock_request(authorization=authorization) as request:
- self.env["ir.http"]._auth_method_public_or_jwt_validator()
- assert request.jwt_payload["aud"] == "me"
+ self._create_validator("validator")
+ authorization = "Bearer " + self._create_token()
+ with self._mock_request(authorization=authorization) as request:
+ self.env["ir.http"]._auth_method_public_or_jwt_validator()
+ assert request.jwt_payload["aud"] == "me"
diff --git a/auth_jwt/views/auth_jwt_validator_views.xml b/auth_jwt/views/auth_jwt_validator_views.xml
index 11c9c42e75..6bafe97978 100644
--- a/auth_jwt/views/auth_jwt_validator_views.xml
+++ b/auth_jwt/views/auth_jwt_validator_views.xml
@@ -10,6 +10,7 @@
+
@@ -69,6 +70,7 @@
+