-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathidentity_request.py
147 lines (130 loc) · 5.1 KB
/
identity_request.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#
# Copyright 2019 Red Hat, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""Test Case extension to collect common test data."""
import uuid
import os
from base64 import b64encode
from json import dumps as json_dumps
from unittest.mock import Mock
from django.test import TestCase, override_settings
from faker import Faker
from api.models import Tenant
from api.common import RH_IDENTITY_HEADER
@override_settings(REPLICATION_TO_RELATION_ENABLED=True, PRINCIPAL_USER_DOMAIN="redhat")
class IdentityRequest(TestCase):
"""Parent Class for IAM test cases."""
fake = Faker()
@classmethod
def setUpClass(cls):
"""Set up each test class."""
super().setUpClass()
cls.customer_data = cls._create_customer_data()
cls.user_data = cls._create_user_data()
cls.request_context = cls._create_request_context(cls.customer_data, cls.user_data)
cls.tenant_name = cls.customer_data.get("tenant_name")
cls.tenant = Tenant(
tenant_name=cls.tenant_name,
account_id=cls.customer_data["account_id"],
org_id=cls.customer_data["org_id"],
ready=True,
)
cls.tenant.save()
cls.headers = cls.request_context["request"].META
@classmethod
def tearDownClass(cls):
"""Tear down the class."""
# TODO: avoid deletion of cls.tenant tests ?
if cls.tenant.id is not None:
cls.tenant.delete()
super().tearDownClass()
@classmethod
def _create_customer_data(cls):
"""Create customer data."""
account = cls.fake.ean8()
tenant = f"acct{account}"
org_id = cls.fake.ean8()
customer = {"account_id": account, "tenant_name": tenant, "org_id": org_id}
return customer
@classmethod
def _create_user_data(cls):
"""Create user data."""
user_data = {"username": cls.fake.user_name(), "email": cls.fake.email(), "user_id": cls.fake.ean8()}
return user_data
def _create_service_account_data(cls) -> dict[str, str]:
"""Create service account data"""
client_id = str(uuid.uuid4())
return {"client_id": client_id, "username": f"service-account-{client_id}"}
@classmethod
def _create_request_context(
cls,
customer_data: dict[str, str],
user_data: dict[str, str] = None,
is_org_admin: bool = True,
is_internal: bool = False,
cross_account: bool = False,
service_account_data: dict[str, str] = None,
):
"""Create the request context for a user."""
customer = customer_data
account = customer.get("account_id")
org_id = customer.get("org_id", None)
identity = cls._build_identity(
user_data, account, org_id, is_org_admin, is_internal, service_account_data=service_account_data
)
if cross_account:
identity["identity"]["internal"] = {"cross_access": True}
json_identity = json_dumps(identity)
mock_header = b64encode(json_identity.encode("utf-8"))
request = Mock()
request.headers = {RH_IDENTITY_HEADER: mock_header}
request.META = {RH_IDENTITY_HEADER: mock_header}
request.scope = {}
request_context = {"request": request}
return request_context
@classmethod
def _build_identity(
cls,
user_data: dict[str, str],
account: str,
org_id: str,
is_org_admin: bool,
is_internal: bool,
service_account_data: dict[str, str] = None,
):
identity = {"identity": {"account_number": account, "org_id": org_id}}
if user_data is not None:
identity["identity"]["user"] = {
"username": user_data.get("username"),
"email": user_data.get("email"),
"is_org_admin": is_org_admin,
"user_id": "1111111",
}
if service_account_data:
identity["identity"]["service_account"] = {
"client_id": service_account_data.get("client_id"),
"username": service_account_data.get("username"),
}
if is_internal:
identity["identity"]["type"] = "Associate"
identity["identity"]["associate"] = identity.get("identity").get("user")
identity["identity"]["user"]["is_internal"] = True
else:
if user_data:
identity["identity"]["type"] = "User"
else:
identity["identity"]["type"] = "ServiceAccount"
return identity