diff --git a/auth_oauth_multi_token/models/auth_oauth_multi_token.py b/auth_oauth_multi_token/models/auth_oauth_multi_token.py index a1c11f04b8..12b1d6bbdd 100644 --- a/auth_oauth_multi_token/models/auth_oauth_multi_token.py +++ b/auth_oauth_multi_token/models/auth_oauth_multi_token.py @@ -46,8 +46,8 @@ def _oauth_validate_multi_token(self): user_tokens = self._oauth_user_tokens(token.user_id.id) max_token = token.user_id.oauth_access_max_token if user_tokens and len(user_tokens) > max_token: - # clear last token - user_tokens[max_token - 1]._oauth_clear_token() + # clear tokens beyond the max + user_tokens[max_token:]._oauth_clear_token() def _oauth_clear_token(self): """Disable current token records.""" diff --git a/auth_oauth_multi_token/models/res_users.py b/auth_oauth_multi_token/models/res_users.py index 53c4bff115..8a74af0a91 100644 --- a/auth_oauth_multi_token/models/res_users.py +++ b/auth_oauth_multi_token/models/res_users.py @@ -5,10 +5,6 @@ from odoo import api, exceptions, fields, models -from odoo.addons import base - -base.models.res_users.USER_PRIVATE_FIELDS.append("oauth_master_uuid") - class ResUsers(models.Model): _inherit = "res.users" @@ -27,10 +23,9 @@ def _generate_oauth_master_uuid(self): oauth_access_max_token = fields.Integer( string="Max Number of Simultaneous Connections", default=10, required=True ) - oauth_master_uuid = fields.Char( - string="Master UUID", - copy=False, - readonly=True, + + # use the oauth_access_token field as oauth_master_uuid + oauth_access_token = fields.Char( required=True, default=lambda self: self._generate_oauth_master_uuid(), ) @@ -39,22 +34,36 @@ def _generate_oauth_master_uuid(self): def multi_token_model(self): return self.env["auth.oauth.multi.token"] + @api.model + def _generate_signup_values(self, provider, validation, params): + """Because access_token was replaced in + _auth_oauth_signin we need to replace it here.""" + res = super()._generate_signup_values(provider, validation, params) + res["oauth_access_token"] = params["access_token_multi"] + return res + @api.model def _auth_oauth_signin(self, provider, validation, params): """Override to handle sign-in with multi token.""" - res = super()._auth_oauth_signin(provider, validation, params) + params["access_token_multi"] = params["access_token"] - oauth_uid = validation["user_id"] # Lookup for user by oauth uid and provider + oauth_uid = validation["user_id"] user = self.search( [("oauth_uid", "=", oauth_uid), ("oauth_provider_id", "=", provider)] ) + + # Because access_token is automatically written to the user, we need to replace + # this by the existing oauth_access_token which acts as oauth_master_uuid + params["access_token"] = user.oauth_access_token + res = super()._auth_oauth_signin(provider, validation, params) + if not user: raise exceptions.AccessDenied() user.ensure_one() # user found and unique: create a token self.multi_token_model.create( - {"user_id": user.id, "oauth_access_token": params["access_token"]} + {"user_id": user.id, "oauth_access_token": params["access_token_multi"]} ) return res @@ -62,8 +71,7 @@ def action_oauth_clear_token(self): """Inactivate current user tokens.""" self.mapped("oauth_access_token_ids")._oauth_clear_token() for res in self: - res.oauth_access_token = False - res.oauth_master_uuid = self._generate_oauth_master_uuid() + res.oauth_access_token = self._generate_oauth_master_uuid() @api.model def _check_credentials(self, password, env): @@ -71,13 +79,17 @@ def _check_credentials(self, password, env): try: return super()._check_credentials(password, env) except exceptions.AccessDenied: - res = self.multi_token_model.sudo().search( - [("user_id", "=", self.env.uid), ("oauth_access_token", "=", password)] + passwd_allowed = ( + env["interactive"] or not self.env.user._rpc_api_keys_only() ) - if not res: - raise + if passwd_allowed and self.env.user.active: + res = self.multi_token_model.sudo().search( + [ + ("user_id", "=", self.env.uid), + ("oauth_access_token", "=", password), + ] + ) + if res: + return - def _get_session_token_fields(self): - res = super()._get_session_token_fields() - res.remove("oauth_access_token") - return res | {"oauth_master_uuid"} + raise diff --git a/auth_oauth_multi_token/tests/test_multi_token.py b/auth_oauth_multi_token/tests/test_multi_token.py index f63affd9b8..6aec2ffb42 100644 --- a/auth_oauth_multi_token/tests/test_multi_token.py +++ b/auth_oauth_multi_token/tests/test_multi_token.py @@ -2,8 +2,10 @@ # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl) import json +import uuid from odoo import exceptions +from odoo.tests import users from odoo.tests.common import TransactionCase @@ -26,9 +28,10 @@ def setUpClass(cls): ) def _fake_params(self, **kw): + random = uuid.uuid4().hex params = { - "state": json.dumps({"t": "FAKE_TOKEN"}), - "access_token": "FAKE_ACCESS_TOKEN", + "state": json.dumps({"t": f"FAKE_TOKEN_{random}"}), + "access_token": f"FAKE_ACCESS_TOKEN_{random}", } params.update(kw) return params @@ -48,8 +51,10 @@ def _test_one_token(self): "user_id": "oauth_uid_johndoe", } params = self._fake_params() - login = self.user_model._auth_oauth_signin( - self.provider_google.id, validation, params + login = ( + self.env["res.users"] + .sudo() + ._auth_oauth_signin(self.provider_google.id, validation, params) ) self.assertEqual(login, "johndoe") @@ -80,10 +85,54 @@ def test_access_multi_token(self): len(self.user.oauth_access_token_ids), self.user.oauth_access_max_token ) - def test_remove_oauth_access_token(self): + @users("johndoe") + def test_access_multi_token_first_removed(self): + # no token yet + self.assertFalse(self.user.oauth_access_token_ids) + + # login the first token + validation = { + "user_id": "oauth_uid_johndoe", + } + params = self._fake_params() + login = ( + self.env["res.users"] + .sudo() + ._auth_oauth_signin(self.provider_google.id, validation, params) + ) + self.assertEqual(login, "johndoe") + + # login is working + self.env["res.users"]._check_credentials( + params["access_token_multi"], {"interactive": False} + ) + + # use as many token as max allowed + for token_count in range(2, self.user.oauth_access_max_token + 1): + self._test_one_token() + self.assertEqual(len(self.user.oauth_access_token_ids), token_count) + self.assertEqual( + len(self.token_model._oauth_user_tokens(self.user.id)), token_count + ) + + # exceed the number, token removed and login blocked + self._test_one_token() + with self.assertRaises(exceptions.AccessDenied): + self.env["res.users"]._check_credentials( + params["access_token_multi"], {"interactive": False} + ) + + # token count does not exceed max number + self.assertEqual( + len(self.user.oauth_access_token_ids), self.user.oauth_access_max_token + ) + + def test_oauth_access_token_odoo_sh(self): + # do not change the _get_session_token_fields result to stay compatible + # with odoo.sh res = self.user._get_session_token_fields() - self.assertFalse("oauth_access_token" in res) - self.assertTrue("oauth_master_uuid" in res) + self.assertTrue("oauth_access_token" in res) + self.assertFalse("oauth_master_uuid" in res) def test_action_oauth_clear_token(self): self.user.action_oauth_clear_token()