From 5d50a38f49dabb64c5064a7d32b66b1a64cccc12 Mon Sep 17 00:00:00 2001 From: bt-dlagin Date: Tue, 7 Jan 2025 14:12:15 +0100 Subject: [PATCH] [18.0][MIG] auth_saml: Migration to 18.0 --- auth_saml/__manifest__.py | 2 +- auth_saml/controllers/main.py | 35 ++-- auth_saml/models/auth_saml_provider.py | 2 +- auth_saml/models/res_users.py | 19 +- auth_saml/tests/test_pysaml.py | 231 +++++++++++++++++++++++-- auth_saml/views/auth_saml.xml | 10 +- auth_saml/views/res_users.xml | 4 +- 7 files changed, 259 insertions(+), 44 deletions(-) diff --git a/auth_saml/__manifest__.py b/auth_saml/__manifest__.py index f3596d6d52..bf6d3cc792 100644 --- a/auth_saml/__manifest__.py +++ b/auth_saml/__manifest__.py @@ -4,7 +4,7 @@ { "name": "SAML2 Authentication", - "version": "17.0.1.0.0", + "version": "18.0.1.0.0", "category": "Tools", "author": "XCG Consulting, Odoo Community Association (OCA)", "maintainers": ["vincent-hatakeyama"], diff --git a/auth_saml/controllers/main.py b/auth_saml/controllers/main.py index f241d3d503..7df39f2b77 100644 --- a/auth_saml/controllers/main.py +++ b/auth_saml/controllers/main.py @@ -17,9 +17,7 @@ exceptions, http, models, -) -from odoo import ( - registry as registry_get, + modules, ) from odoo.http import request from odoo.tools.misc import clean_context @@ -173,10 +171,9 @@ def _get_saml_extra_relaystate(self): } return state - @http.route("/auth_saml/get_auth_request", type="http", auth="none") + @http.route("/auth_saml/get_auth_request", type="http", auth="none", readonly=False) def get_auth_request(self, pid): provider_id = int(pid) - provider = request.env["auth.saml.provider"].sudo().browse(provider_id) redirect_url = provider._get_auth_request( self._get_saml_extra_relaystate(), request.httprequest.url_root.rstrip("/") @@ -191,7 +188,9 @@ def get_auth_request(self, pid): redirect.autocorrect_location_header = True return redirect - @http.route("/auth_saml/signin", type="http", auth="none", csrf=False) + @http.route( + "/auth_saml/signin", type="http", auth="none", csrf=False, readonly=False + ) @fragment_to_query_string def signin(self, **kw): """ @@ -241,16 +240,20 @@ def signin(self, **kw): url = f"/#action={action}" elif menu: url = f"/#menu_id={menu}" - pre_uid = request.session.authenticate(*credentials) + + credentials_dict = { + "login": credentials[1], + "token": credentials[2], + "type": "saml_token", + } + pre_uid = request.session.authenticate(dbname, credentials_dict) resp = request.redirect(_get_login_redirect_url(pre_uid, url), 303) resp.autocorrect_location_header = False return resp except exceptions.AccessDenied: - # saml credentials not valid, - # user could be on a temporary session + # saml credentials not valid, user could be on a temporary session _logger.info("SAML2: access denied") - url = "/web/login?saml_error=expired" redirect = werkzeug.utils.redirect(url, 303) redirect.autocorrect_location_header = False @@ -265,7 +268,9 @@ def signin(self, **kw): redirect.autocorrect_location_header = False return redirect - @http.route("/auth_saml/metadata", type="http", auth="none", csrf=False) + @http.route( + "/auth_saml/metadata", type="http", auth="none", csrf=False, readonly=False + ) def saml_metadata(self, **kw): provider = kw.get("p") dbname = kw.get("d") @@ -273,17 +278,15 @@ def saml_metadata(self, **kw): if not dbname or not provider: _logger.debug("Metadata page asked without database name or provider id") - return request.not_found(_("Missing parameters")) + raise request.not_found(_("Missing parameters")) provider = int(provider) - registry = registry_get(dbname) - - with registry.cursor() as cr: + with modules.registry.Registry(dbname).cursor() as cr: env = api.Environment(cr, SUPERUSER_ID, {}) client = env["auth.saml.provider"].sudo().browse(provider) if not client.exists(): - return request.not_found(_("Unknown provider")) + raise request.not_found(_("Unknown provider")) return request.make_response( client._metadata_string( diff --git a/auth_saml/models/auth_saml_provider.py b/auth_saml/models/auth_saml_provider.py index fc75664eee..05f26ccfd4 100644 --- a/auth_saml/models/auth_saml_provider.py +++ b/auth_saml/models/auth_saml_provider.py @@ -164,7 +164,7 @@ def _compute_sp_metadata_url(self): qs = urllib.parse.urlencode({"p": record.id, "d": self.env.cr.dbname}) record.sp_metadata_url = urllib.parse.urljoin( - base_url, f"/auth_saml/metadata?{qs}" + base_url, (f"/auth_saml/metadata?{qs}") ) def _get_cert_key_path(self, field="sp_pem_public"): diff --git a/auth_saml/models/res_users.py b/auth_saml/models/res_users.py index 412b5c6994..90b4eb222a 100644 --- a/auth_saml/models/res_users.py +++ b/auth_saml/models/res_users.py @@ -7,7 +7,7 @@ import passlib -from odoo import SUPERUSER_ID, _, api, fields, models, registry, tools +from odoo import SUPERUSER_ID, _, api, fields, models, modules, tools from odoo.exceptions import AccessDenied, ValidationError from .ir_config_parameter import ALLOW_SAML_UID_AND_PASSWORD @@ -48,7 +48,7 @@ def _auth_saml_signin(self, provider: int, validation: dict, saml_response) -> s if len(user) != 1: raise AccessDenied() - with registry(self.env.cr.dbname).cursor() as new_cr: + with modules.registry.Registry(self.env.cr.dbname).cursor() as new_cr: new_env = api.Environment(new_cr, self.env.uid, self.env.context) # Update the token. Need to be committed, otherwise the token is not visible # to other envs, like the one used in login_and_redirect @@ -76,7 +76,7 @@ def auth_saml(self, provider: int, saml_response: str, base_url: str = None): # return user credentials return self.env.cr.dbname, login, saml_response - def _check_credentials(self, password, env): + def _check_credentials(self, credential, env): """Override to handle SAML auths. The token can be a password if the user has used the normal form... @@ -85,9 +85,10 @@ def _check_credentials(self, password, env): """ try: # Attempt a regular login (via other auth addons) first. - return super()._check_credentials(password, env) - + return super()._check_credentials(credential, env) except (AccessDenied, passlib.exc.PasswordSizeError): + if not (credential["type"] == "saml_token" and credential["token"]): + raise passwd_allowed = ( env["interactive"] or not self.env.user._rpc_api_keys_only() ) @@ -100,12 +101,16 @@ def _check_credentials(self, password, env): .search( [ ("user_id", "=", self.env.user.id), - ("saml_access_token", "=", password), + ("saml_access_token", "=", credential["token"]), ] ) ) if token: - return + return { + "uid": self.env.user.id, + "auth_method": "saml", + "mfa": "default", + } raise AccessDenied() from None @api.model diff --git a/auth_saml/tests/test_pysaml.py b/auth_saml/tests/test_pysaml.py index 9abccf5765..1e5e4cf966 100644 --- a/auth_saml/tests/test_pysaml.py +++ b/auth_saml/tests/test_pysaml.py @@ -8,6 +8,8 @@ from odoo.exceptions import AccessDenied, UserError, ValidationError from odoo.tests import HttpCase, tagged +from odoo.addons.auth_saml.controllers.main import fragment_to_query_string + from .fake_idp import DummyResponse, FakeIDP @@ -78,6 +80,13 @@ def setUp(self): ) ) + # Define a sample function to test the decorator + def dummy_function(self, **kw): + return "Function executed" + + # Apply the decorator to the dummy function + self.decorated_function = fragment_to_query_string(dummy_function) + def test_ensure_provider_appears_on_login(self): # SAML provider should be listed in the login page response = self.url_open("/web/login") @@ -125,7 +134,7 @@ def test__compute_sp_metadata_url__provider_has_sp_baseurl(self): {"p": self.saml_provider.id, "d": self.env.cr.dbname} ) expected_url = urllib.parse.urljoin( - "http://example.com", f"/auth_saml/metadata?{expected_qs}" + "http://example.com", (f"/auth_saml/metadata?{expected_qs}") ) # Assert that sp_metadata_url is set correctly self.assertEqual(self.saml_provider.sp_metadata_url, expected_url) @@ -200,7 +209,10 @@ def test_login_no_saml(self): # Try to log in with a non-existing SAML token with self.assertRaises(AccessDenied): - self.authenticate(user="test@example.com", password="test_saml_token") + self.user._check_credentials( + {"type": "password", "password": "test_saml_token"}, + {"interactive": True}, + ) redirect_url = self.saml_provider._get_auth_request() self.assertIn("http://localhost:8000/sso/redirect?SAMLRequest=", redirect_url) @@ -254,7 +266,10 @@ def test_login_with_saml(self): # We should not be able to log in with the wrong token with self.assertRaises(AccessDenied): - self.authenticate(user="test@example.com", password=f"{token}-WRONG") + self.user._check_credentials( + {"type": "password", "password": "WRONG_TOKEN"}, + {"interactive": True}, + ) # User should now be able to log in with the token self.authenticate(user="test@example.com", password=token) @@ -268,8 +283,9 @@ def test_disallow_user_password_when_changing_ir_config_parameter(self): ).value = "False" # The password should be blank and the user should not be able to connect with self.assertRaises(AccessDenied): - self.authenticate( - user="user@example.com", password="NesTNSte9340D720te>/-A" + self.user._check_credentials( + {"type": "password", "password": "NesTNSte9340D720te>/-A"}, + {"interactive": True}, ) def test_disallow_user_password_new_user(self): @@ -332,18 +348,19 @@ def test_disallow_user_password_no_password_set(self): with self.assertRaises(ValidationError): user.password = "new password" - def test_disallow_user_password(self): + def test_disallow_user_password_on_option_disable(self): """Test that existing user password is deleted when adding an SAML provider when the disallow option is set.""" + self.authenticate(user="test@example.com", password="Lu,ums-7vRU>0i]=YDLa") # change the option self.browse_ref( "auth_saml.allow_saml_uid_and_internal_password" ).value = "False" - # Test that existing user password is deleted when adding an SAML provider - self.authenticate(user="test@example.com", password="Lu,ums-7vRU>0i]=YDLa") - self.add_provider_to_user() with self.assertRaises(AccessDenied): - self.authenticate(user="test@example.com", password="Lu,ums-7vRU>0i]=YDLa") + self.user._check_credentials( + {"type": "password", "password": "Lu,ums-7vRU>0i]=YDLa"}, + {"interactive": True}, + ) def test_disallow_user_admin_can_have_password(self): """Test that admin can have its password set @@ -417,6 +434,196 @@ def test_disallow_user_password_when_changing_settings(self): ).execute() with self.assertRaises(AccessDenied): - self.authenticate( - user="user@example.com", password="NesTNSte9340D720te>/-A" + self.user._check_credentials( + {"type": "password", "password": "NesTNSte9340D720te>/-A"}, + {"interactive": True}, ) + + def test_fragment_to_query_string_no_kw(self): + """Test the case where no keyword arguments are passed.""" + response = self.decorated_function(self) + expected_html = """""" + self.assertEqual(response.strip(), expected_html.strip()) + + def test_fragment_to_query_string_with_kw(self): + """Test the case where keyword arguments are passed.""" + response = self.decorated_function(self, key="value") + self.assertEqual(response, "Function executed") + + def test_sig_alg_selection(self): + """Test that _sig_alg_selection is returning correct selection.""" + expected_selection = [ + ("SIG_RSA_SHA1", "SIG_RSA_SHA1"), + ("SIG_RSA_SHA224", "SIG_RSA_SHA224"), + ("SIG_RSA_SHA256", "SIG_RSA_SHA256"), + ("SIG_RSA_SHA384", "SIG_RSA_SHA384"), + ("SIG_RSA_SHA512", "SIG_RSA_SHA512"), + ] + self.assertEqual(self.saml_provider._sig_alg_selection(), expected_selection) + + def test_saml_metadata_invalid_provider(self): + """Accessing SAML metadata with an invalid provider ID should return 404.""" + response = self.url_open(f"/auth_saml/metadata?p=999999&d={self.env.cr.dbname}") + self.assertEqual(response.status_code, 404) + self.assertIn("Unknown provider", response.text) + + def test_saml_metadata_missing_parameters(self): + """Accessing the SAML metadata endpoint without params should return 404.""" + response = self.url_open("/auth_saml/metadata") + self.assertEqual(response.status_code, 404) + self.assertIn("Missing parameters", response.text) + + def test_saml_provider_deactivation(self): + """A deactivated SAML provider should not be usable for authentication.""" + self.saml_provider.active = False + + redirect_url = self.saml_provider._get_auth_request() + response = self.idp.fake_login(redirect_url) + unpacked_response = response._unpack() + + with self.assertRaises(AccessDenied): + self.env["res.users"].sudo().auth_saml( + self.saml_provider.id, unpacked_response.get("SAMLResponse"), None + ) + + def test_compute_sp_metadata_url_for_new_record(self): + """Test that sp_metadata_url is set to False for a new (unsaved) provider.""" + new_provider = self.env["auth.saml.provider"].new( + {"name": "New SAML Provider", "sp_baseurl": "http://example.com"} + ) + new_provider._compute_sp_metadata_url() + self.assertFalse(new_provider.sp_metadata_url) + + def test_store_outstanding_request(self): + """Test that the SAML request ID is stored in the auth_saml.request model.""" + reqid = "test-request-id" + self.saml_provider._store_outstanding_request(reqid) + + request = self.env["auth_saml.request"].search( + [("saml_request_id", "=", reqid)] + ) + self.assertTrue(request) + self.assertEqual(request.saml_provider_id.id, self.saml_provider.id) + + def test_get_auth_request_redirect_url(self): + """Test that _get_auth_request returns a valid redirect URL.""" + redirect_url = self.saml_provider._get_auth_request() + self.assertIsNotNone(redirect_url) + self.assertIn("SAMLRequest=", redirect_url) + + def test_get_auth_request_valid_provider(self): + """Test that get_auth_request returns a redirect for a valid provider.""" + response = self.url_open( + f"/auth_saml/get_auth_request?pid={self.saml_provider.id}", + allow_redirects=False, + ) + self.assertEqual(response.status_code, 303) + self.assertIn("Location", response.headers) + self.assertIn("SAMLRequest=", response.headers["Location"]) + + def test_create_res_users_saml(self): + """Test that creating a SAML mapping removes the password when disallowed.""" + user = self.env["res.users"].create( + { + "name": "Test User", + "login": "testuser@example.com", + "password": "securepassword", + } + ) + self.env["ir.config_parameter"].set_param( + "auth_saml.allow_saml_uid_and_internal_password", "False" + ) + self.env["res.users.saml"].create( + { + "user_id": user.id, + "saml_provider_id": self.env["auth.saml.provider"] + .create( + { + "name": "Demo Provider", + "sig_alg": "SIG_RSA_SHA1", + "idp_metadata": "fake_metadata", + "sp_pem_public": base64.b64encode(b"public_key"), + "sp_pem_private": base64.b64encode(b"private_key"), + } + ) + .id, + "saml_uid": "testuser@example.com", + } + ) + self.assertFalse(user.password) + + def test_missing_parameters_in_metadata(self): + """Test that missing parameters in the SAML metadata request return a 404.""" + response = self.url_open("/auth_saml/metadata") + self.assertEqual(response.status_code, 404) + self.assertIn("Missing parameters", response.text) + + def test_saml_request_creation(self): + """Test that a SAML request is correctly stored in the model.""" + self.env["auth_saml.request"].create( + { + "saml_provider_id": self.saml_provider.id, + "saml_request_id": "test-request-id", + } + ) + request = self.env["auth_saml.request"].search( + [("saml_request_id", "=", "test-request-id")] + ) + self.assertTrue(request) + self.assertEqual(request.saml_provider_id.id, self.saml_provider.id) + + def test_get_cert_key_path_tempfile(self): + """Test _get_cert_key_path for non-file storage locations.""" + # Create a mock attachment with base64-encoded data + self.env["ir.attachment"].create( + { + "name": "test.pem", + "datas": base64.b64encode(b"dummy_cert_key_content").decode("utf-8"), + "res_model": "auth.saml.provider", + "res_field": "sp_pem_public", + "res_id": 1, + } + ) + # Mock the _storage method to return a non-file location + with patch( + "odoo.addons.base.models.ir_attachment.IrAttachment._storage", + return_value="db", + ): + provider = self.env["auth.saml.provider"].browse(1) + cert_key_path = provider._get_cert_key_path("sp_pem_public") + + # Verify the temporary file was created and contains the expected content + with open(cert_key_path, "rb") as f: + content = f.read() + self.assertEqual(content, b"dummy_cert_key_content") + + # Clean up the temporary file + os.remove(cert_key_path) + + def test_signin_no_relaystate_redirect(self): + """Test redirect to /?type=signup when RelayState is missing.""" + self.add_provider_to_user() + + redirect_url = self.saml_provider._get_auth_request() + self.assertIn("http://localhost:8000/sso/redirect?SAMLRequest=", redirect_url) + + # Simulate the login request and remove RelayState from the response + response = self.idp.fake_login(redirect_url) + response.text = response.text.replace('name="RelayState" value="', "") + + signin_response = self.url_open( + "/auth_saml/signin", + data={"SAMLResponse": response.text}, + allow_redirects=False, + ) + self.assertEqual(signin_response.status_code, 303) + self.assertIn("/?type=signup", signin_response.headers["Location"]) diff --git a/auth_saml/views/auth_saml.xml b/auth_saml/views/auth_saml.xml index 9ee7dc0335..e9ed50207a 100644 --- a/auth_saml/views/auth_saml.xml +++ b/auth_saml/views/auth_saml.xml @@ -48,12 +48,12 @@ auth.saml.provider.list auth.saml.provider - + - + @@ -168,10 +168,10 @@ name="attribute_mapping_ids" context="{'default_provider_id': id}" > - + - +

Mapped attributes are copied from the SAML response at every logon, if available. If multiple values are returned (i.e. a list) then the first value is used. @@ -191,7 +191,7 @@ Providers auth.saml.provider - tree,form + list,form - + - +