-
Notifications
You must be signed in to change notification settings - Fork 417
/
Copy pathaws_auth.py
136 lines (106 loc) · 4.18 KB
/
aws_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from __future__ import annotations
import json
import os
from enum import Enum
from typing import Optional
import botocore.session
from botocore import crt
from botocore.awsrequest import AWSRequest
class ServicePrefix(Enum):
"""
AWS Service Prefixes - Enumerations of the supported service proxy types
URLs:
https://docs.aws.amazon.com/service-authorization/latest/reference/reference_policies_actions-resources-contextkeys.html
"""
LATTICE = "vpc-lattice-svcs"
RESTAPI = "execute-api"
HTTPAPI = "apigateway"
APPSYNC = "appsync"
class SigV4Auth:
"""
Authenticating Requests (AWS Signature Version 4)
Requests that were signed with SigV4 will have SignatureVersion set to AWS4-HMAC-SHA256
Args:
url (str): URL
service (ServicePrefix): AWS service Prefix
region (str, Optional): AWS region
body (dict, optional): Request body
params (dict, optional): Request parameters
headers (dict, optional): Request headers
method (str, optional): Request method
Returns:
SigV4Auth: SigV4Auth instance
Examples
--------
>>> from aws_lambda_powertools.utilities.auth import SigV4Auth, ServicePrefix
>>> prepped = SigV4Auth.prepare_request(region="us-east-2", service=ServicePrefix.LATTICE, url="https://test-fake-service.vpc-lattice-svcs.us-east-2.on.aws")
"""
@staticmethod
def prepare_request(
url: str,
service: ServicePrefix,
region: Optional[str],
body: Optional[dict] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
method: Optional[str] = "GET",
):
if region is None:
region = os.environ.get("AWS_REGION")
if body is not None:
body = json.dumps(body)
else:
body = json.dumps({})
credentials = botocore.session.Session().get_credentials()
signer = crt.auth.CrtSigV4Auth(credentials, service.value, region)
if headers is None:
headers = {"Content-Type": "application/json"}
request = AWSRequest(method=method, url=url, data=body, params=params, headers=headers)
if service.value == "vpc-lattice-svcs":
# payload signing is not supported for vpc-lattice-svcs
request.context["payload_signing_enabled"] = False
signer.add_auth(request)
return request.prepare()
class SigV4aAuth:
"""
Authenticating Requests (AWS Signature Version 4a)
Requests that were signed with SigV4A will have a SignatureVersion set to AWS4-ECDSA-P256-SHA256
Args:
url (str): URL
service (ServicePrefix): AWS service Prefix
region (str, Optional): AWS region
body (dict, optional): Request body
params (dict, optional): Request parameters
headers (dict, optional): Request headers
method (str, optional): Request method
Returns:
SigV4aAuth: SigV4aAuth instance
Examples
--------
>>> from aws_lambda_powertools.utilities.iam import SigV4aAuth, ServicePrefix
>>> prepped = SigV4aAuth.prepare_request(region="us-east-2", service=ServicePrefix.LATTICE, url="https://test-fake-service.vpc-lattice-svcs.us-east-2.on.aws")
"""
@staticmethod
def prepare_request(
url: str,
service: ServicePrefix,
region: Optional[str] = "*",
body: Optional[dict] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
method: Optional[str] = "GET",
):
if body is not None:
body = json.dumps(body)
else:
body = json.dumps({})
credentials = botocore.session.Session().get_credentials()
signer = crt.auth.CrtSigV4AsymAuth(credentials, service.value, region)
if headers is None:
headers = {"Content-Type": "application/json"}
request = AWSRequest(method=method, url=url, data=body, params=params, headers=headers)
if service.value == "vpc-lattice-svcs":
# payload signing is not supported for vpc-lattice-svcs
request.context["payload_signing_enabled"] = False
signer.add_auth(request)
return request.prepare()