-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathbase_service.py
532 lines (448 loc) · 22.2 KB
/
base_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
# coding: utf-8
# Copyright 2019 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gzip
import io
import json as json_import
import logging
import platform
from http.cookiejar import CookieJar
from os.path import basename
from typing import Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
import requests
from requests.structures import CaseInsensitiveDict
from requests.exceptions import JSONDecodeError
from urllib3.exceptions import MaxRetryError
from urllib3.util.retry import Retry
from ibm_cloud_sdk_core.authenticators import Authenticator
from .api_exception import ApiException
from .detailed_response import DetailedResponse
from .token_managers.token_manager import TokenManager
from .utils import (
has_bad_first_or_last_char,
is_json_mimetype,
remove_null_values,
cleanup_values,
read_external_sources,
strip_extra_slashes,
SSLHTTPAdapter,
GzipStream,
)
from .version import __version__
# Uncomment this to enable http debugging
# import http.client as http_client
# http_client.HTTPConnection.debuglevel = 1
logger = logging.getLogger(__name__)
MAX_REDIRECTS = 10
SAFE_HEADERS = ['authorization', 'www-authenticate', 'cookie', 'cookie2']
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-locals
class BaseService:
"""Common functionality shared by generated service classes.
The base service authenticates requests via its authenticator, stores cookies, and
wraps responses from the service endpoint in DetailedResponse or APIException objects.
Keyword Arguments:
service_url: Url to the service endpoint. Defaults to None.
authenticator: Adds authentication data to service requests. Defaults to None.
disable_ssl_verification: A flag that indicates whether verification of the server's SSL
certificate should be disabled or not. Defaults to False.
enable_gzip_compression: A flag that indicates whether to enable gzip compression on request bodies
Attributes:
service_url (str): Url to the service endpoint.
authenticator (Authenticator): Adds authentication data to service requests.
disable_ssl_verification (bool): A flag that indicates whether verification of
the server's SSL certificate should be disabled or not.
default_headers (dict): A dictionary of headers to be sent with every HTTP request to the service endpoint.
jar (http.cookiejar.CookieJar): Stores cookies received from the service.
http_config (dict): A dictionary containing values that control the timeout, proxies, and etc of HTTP requests.
http_client (Session): A configurable session which can use Transport Adapters to configure retries, timeouts,
proxies, etc. globally for all requests.
enable_gzip_compression (bool): A flag that indicates whether to enable gzip compression on request bodies
Raises:
ValueError: If Authenticator is not provided or invalid type.
"""
SDK_NAME = 'ibm-python-sdk-core'
ERROR_MSG_DISABLE_SSL = (
'The connection failed because the SSL certificate is not valid. To use a self-signed '
'certificate, disable verification of the server\'s SSL certificate by invoking the '
'set_disable_ssl_verification(True) on your service instance and/ or use the '
'disable_ssl_verification option of the authenticator.'
)
def __init__(
self,
*,
service_url: str = None,
authenticator: Authenticator = None,
disable_ssl_verification: bool = False,
enable_gzip_compression: bool = False,
) -> None:
self.set_service_url(service_url)
self.http_client = requests.Session()
self.http_config = {}
self.jar = CookieJar()
self.authenticator = authenticator
self.disable_ssl_verification = disable_ssl_verification
self.default_headers = None
self.enable_gzip_compression = enable_gzip_compression
self._set_user_agent_header(self._build_user_agent())
self.retry_config = None
self.http_adapter = SSLHTTPAdapter()
if not self.authenticator:
raise ValueError('authenticator must be provided')
if not isinstance(self.authenticator, Authenticator):
raise ValueError('authenticator should be of type Authenticator')
self.http_client.mount('http://', self.http_adapter)
self.http_client.mount('https://', self.http_adapter)
def enable_retries(self, max_retries: int = 4, retry_interval: float = 1.0) -> None:
"""Enable automatic retries on the underlying http client used by the BaseService instance.
Args:
max_retries: the maximum number of retries to attempt for a failed retryable request
retry_interval: the default wait time (in seconds) to use for the first retry attempt.
In general, if a response includes the Retry-After header, that will be used for
the wait time associated with the retry attempt. If the Retry-After header is not
present, then the wait time is based on the retry_interval and retry attempt number:
wait_time = retry_interval * (2 ^ (n-1)), where n is the retry attempt number
"""
self.retry_config = Retry(
total=max_retries,
backoff_factor=retry_interval,
# List of HTTP status codes to retry on in addition to Timeout/Connection Errors
status_forcelist=[429, 500, 502, 503, 504],
# List of HTTP methods to retry on
# Omitting this will default to all methods except POST
allowed_methods=['HEAD', 'GET', 'PUT', 'DELETE', 'OPTIONS', 'TRACE', 'POST'],
)
self.http_adapter = SSLHTTPAdapter(max_retries=self.retry_config)
self.http_client.mount('http://', self.http_adapter)
self.http_client.mount('https://', self.http_adapter)
def disable_retries(self):
"""Remove retry config from http_adapter"""
self.retry_config = None
self.http_adapter = SSLHTTPAdapter()
self.http_client.mount('http://', self.http_adapter)
self.http_client.mount('https://', self.http_adapter)
@staticmethod
def _get_system_info() -> str:
return '{0} {1} {2}'.format(
platform.system(), platform.release(), platform.python_version() # OS # OS version # Python version
)
def _build_user_agent(self) -> str:
return '{0}-{1} {2}'.format(self.SDK_NAME, __version__, self._get_system_info())
def configure_service(self, service_name: str) -> None:
"""Look for external configuration of a service. Set service properties.
Try to get config from external sources, with the following priority:
1. Credentials file(ibm-credentials.env)
2. Environment variables
3. VCAP Services(Cloud Foundry)
Args:
service_name: The service name
Raises:
ValueError: If service_name is not a string.
"""
if not isinstance(service_name, str):
raise ValueError('Service_name must be of type string.')
config = read_external_sources(service_name)
if config.get('URL'):
self.set_service_url(config.get('URL'))
if config.get('DISABLE_SSL'):
self.set_disable_ssl_verification(config.get('DISABLE_SSL').lower() == 'true')
if config.get('ENABLE_GZIP'):
self.set_enable_gzip_compression(config.get('ENABLE_GZIP').lower() == 'true')
if config.get('ENABLE_RETRIES'):
if config.get('ENABLE_RETRIES').lower() == 'true':
kwargs = {}
if config.get('MAX_RETRIES'):
kwargs["max_retries"] = int(config.get('MAX_RETRIES'))
if config.get('RETRY_INTERVAL'):
kwargs["retry_interval"] = float(config.get('RETRY_INTERVAL'))
self.enable_retries(**kwargs)
def _set_user_agent_header(self, user_agent_string: str) -> None:
self.user_agent_header = {'User-Agent': user_agent_string}
def set_http_config(self, http_config: dict) -> None:
"""Sets the http config dictionary.
The dictionary can contain values that control the timeout, proxies, and etc of HTTP requests.
Arguments:
http_config: Configuration values to customize HTTP behaviors.
Raises:
TypeError: http_config is not a dict.
"""
if isinstance(http_config, dict):
self.http_config = http_config
if (
self.authenticator
and hasattr(self.authenticator, 'token_manager')
and isinstance(self.authenticator.token_manager, TokenManager)
):
self.authenticator.token_manager.http_config = http_config
else:
raise TypeError("http_config parameter must be a dictionary")
def set_disable_ssl_verification(self, status: bool = False) -> None:
"""Set the flag that indicates whether verification of
the server's SSL certificate should be disabled or not.
Keyword Arguments:
status: set to true to disable ssl verification (default: {False})
"""
self.disable_ssl_verification = status
def set_service_url(self, service_url: str) -> None:
"""Set the url the service will make HTTP requests too.
Arguments:
service_url: The WHATWG URL standard origin ex. https://example.service.com
Raises:
ValueError: Improperly formatted service_url
"""
if has_bad_first_or_last_char(service_url):
raise ValueError(
'The service url shouldn\'t start or end with curly brackets or quotes. '
'Be sure to remove any {} and \" characters surrounding your service url'
)
if service_url is not None:
service_url = service_url.rstrip('/')
self.service_url = service_url
def get_http_client(self) -> requests.sessions.Session:
"""Get the http client session currently used by the service.
Returns:
The http client session currently used by the service.
"""
return self.http_client
def set_http_client(self, http_client: requests.sessions.Session) -> None:
"""Set current http client session
Arguments:
http_client: A new requests session client
"""
if isinstance(http_client, requests.sessions.Session):
self.http_client = http_client
else:
raise TypeError("http_client parameter must be a requests.sessions.Session")
def get_authenticator(self) -> Authenticator:
"""Get the authenticator currently used by the service.
Returns:
The authenticator currently used by the service.
"""
return self.authenticator
def set_default_headers(self, headers: Dict[str, str]) -> None:
"""Set http headers to be sent in every request.
Arguments:
headers: A dictionary of headers
"""
if isinstance(headers, dict):
self.default_headers = headers
else:
raise TypeError("headers parameter must be a dictionary")
# pylint: disable=too-many-branches
def send(self, request: requests.Request, **kwargs) -> DetailedResponse:
"""Send a request and wrap the response in a DetailedResponse or APIException.
Args:
request: The request to send to the service endpoint.
Raises:
ApiException: The exception from the API.
Returns:
The response from the request.
"""
# Use a one minute timeout when our caller doesn't give a timeout.
# http://docs.python-requests.org/en/master/user/quickstart/#timeouts
# We also disable the default redirection, to have more granular control
# over the headers sent in each request.
kwargs = dict({'timeout': 60, 'allow_redirects': False}, **kwargs)
kwargs = dict(kwargs, **self.http_config)
if self.disable_ssl_verification:
kwargs['verify'] = False
# Check to see if the caller specified the 'stream' argument.
stream_response = kwargs.get('stream') or False
# Remove the keys we set manually, don't let the user to overwrite these.
reserved_keys = ['method', 'url', 'headers', 'params', 'cookies']
silent_keys = ['headers']
for key in reserved_keys:
if key in kwargs:
del kwargs[key]
if key not in silent_keys:
logger.warning('"%s" has been removed from the request', key)
try:
response = self.http_client.request(**request, cookies=self.jar, **kwargs)
# Handle HTTP redirects.
redirects_count = 0
# Check if the response is a redirect to another host.
while response.is_redirect and response.next is not None:
redirects_count += 1
if redirects_count > MAX_REDIRECTS:
# Raise an error if the maximum number of redirects has been reached.
raise MaxRetryError(
None, response.url, reason=f'reached the maximum number of redirects: {MAX_REDIRECTS}'
)
# The `requests` package has already prepared a request that can almost be used as-is.
next_request = response.next
from_host = urlparse(response.request.url).hostname
to_host = urlparse(next_request.url).hostname
same_host = from_host == to_host
safe_domain = from_host.endswith('.cloud.ibm.com') and to_host.endswith('.cloud.ibm.com')
# If both the original and the redirected URL are under the `.cloud.ibm.com` domain,
# copy the safe headers that are used for authentication purposes,
if same_host or safe_domain:
original_headers = request.get('headers')
for header, value in original_headers.items():
if header.lower() in SAFE_HEADERS:
next_request.headers[header] = value
# otherwise remove them manually, because `urllib3` doesn't strip all of them.
else:
for header in SAFE_HEADERS:
next_request.headers.pop(header, None)
response = self.http_client.send(next_request, **kwargs)
# Process a "success" response.
if 200 <= response.status_code <= 299:
if response.status_code == 204 or request['method'] == 'HEAD':
# There is no body content for a HEAD response or a 204 response.
result = None
elif stream_response:
result = response
elif not response.text:
result = None
elif is_json_mimetype(response.headers.get('Content-Type')):
# If this is a JSON response, then try to unmarshal it.
try:
result = response.json(strict=False)
except JSONDecodeError as err:
raise ApiException(
code=response.status_code,
http_response=response,
message='Error processing the HTTP response',
) from err
else:
# Non-JSON response, just use response body as-is.
result = response
return DetailedResponse(response=result, headers=response.headers, status_code=response.status_code)
# Received error status code from server, raise an APIException.
raise ApiException(response.status_code, http_response=response)
except requests.exceptions.SSLError:
logger.exception(self.ERROR_MSG_DISABLE_SSL)
raise
def set_enable_gzip_compression(self, should_enable_compression: bool = False) -> None:
"""Set value to enable gzip compression on request bodies"""
self.enable_gzip_compression = should_enable_compression
def get_enable_gzip_compression(self) -> bool:
"""Get value for enabling gzip compression on request bodies"""
return self.enable_gzip_compression
def prepare_request(
self,
method: str,
url: str,
*,
headers: Optional[dict] = None,
params: Optional[dict] = None,
data: Optional[Union[str, dict]] = None,
files: Optional[Union[Dict[str, Tuple[str]], List[Tuple[str, Tuple[str, ...]]]]] = None,
**kwargs,
) -> dict:
"""Build a dict that represents an HTTP service request.
Clean up headers, add default http configuration, convert data
into json, process files, and merge all into a single request dict.
Args:
method: The HTTP method of the request ex. GET, POST, etc.
url: The origin + pathname according to WHATWG spec.
Keyword Arguments:
headers: Headers of the request.
params: Querystring data to be appended to the url.
data: The request body. Converted to json if a dict.
files: 'files' can be a dictionary (i.e { '<part-name>': (<tuple>)}),
or a list of tuples [ (<part-name>, (<tuple>))... ]
Returns:
Prepared request dictionary.
"""
# pylint: disable=unused-argument; necessary for kwargs
request = {'method': method}
# validate the service url is set
if not self.service_url:
raise ValueError('The service_url is required')
# Combine the service_url and operation path to form the request url.
# Note: we have already stripped any trailing slashes from the service_url
# and we know that the operation path ('url') will start with a slash.
request['url'] = strip_extra_slashes(self.service_url + url)
headers = remove_null_values(headers) if headers else {}
headers = cleanup_values(headers)
headers = CaseInsensitiveDict(headers)
if self.default_headers is not None:
headers.update(self.default_headers)
if 'user-agent' not in headers:
headers.update(self.user_agent_header)
request['headers'] = headers
params = remove_null_values(params)
params = cleanup_values(params)
request['params'] = params
if isinstance(data, str):
data = data.encode('utf-8')
elif isinstance(data, dict) and data:
data = remove_null_values(data)
if headers.get('content-type') is None:
headers.update({'content-type': 'application/json'})
data = json_import.dumps(data).encode('utf-8')
request['data'] = data
self.authenticator.authenticate(request)
# Compress the request body if applicable
if self.get_enable_gzip_compression() and 'content-encoding' not in headers and request['data'] is not None:
headers['content-encoding'] = 'gzip'
request['headers'] = headers
# If the provided data is a file-like object, we create `GzipStream` which will handle
# the compression on-the-fly when the requests package starts reading its content.
# This helps avoid OOM errors when the opened file is too big.
# In any other cases, we use the in memory compression directly from
# the `gzip` package for backward compatibility.
raw_data = request['data']
request['data'] = GzipStream(raw_data) if isinstance(raw_data, io.IOBase) else gzip.compress(raw_data)
# Next, we need to process the 'files' argument to try to fill in
# any missing filenames where possible.
# 'files' can be a dictionary (i.e { '<part-name>': (<tuple>)} )
# or a list of tuples [ (<part-name>, (<tuple>))... ]
# If 'files' is a dictionary we'll convert it to a list of tuples.
new_files = []
if files is not None:
# If 'files' is a dictionary, transform it into a list of tuples.
if isinstance(files, dict):
files = remove_null_values(files)
files = files.items()
# Next, fill in any missing filenames from file tuples.
for part_name, file_tuple in files:
if file_tuple and len(file_tuple) == 3 and file_tuple[0] is None:
file = file_tuple[1]
if file and hasattr(file, 'name'):
filename = basename(file.name)
file_tuple = (filename, file_tuple[1], file_tuple[2])
new_files.append((part_name, file_tuple))
request['files'] = new_files
return request
@staticmethod
def encode_path_vars(*args: str) -> List[str]:
"""Encode path variables to be substituted into a URL path.
Arguments:
args: A list of strings to be URL path encoded
Returns:
A list of encoded strings that are safe to substitute into a URL path.
"""
return (requests.utils.quote(x, safe='') for x in args)
# The methods below are kept for compatibility and should be removed
# in the next major release.
# pylint: disable=protected-access
@staticmethod
def _convert_model(val: str) -> None:
if isinstance(val, str):
val = json_import.loads(val)
if hasattr(val, "_to_dict"):
return val._to_dict()
return val
@staticmethod
def _convert_list(val: list) -> None:
if isinstance(val, list):
return ",".join(val)
return val
@staticmethod
def _encode_path_vars(*args) -> None:
return (requests.utils.quote(x, safe='') for x in args)