Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions osc_sdk/problem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import json

import defusedxml.ElementTree as ET


class ProblemDecoder(json.JSONDecoder):
def decode(self, s):
data = super().decode(s)
if isinstance(data, dict):
return self._make_problem(data)
return data

def _make_problem(self, data):
type_ = data.pop("type", None)
status = data.pop("status", None)
title = data.pop("title", None)
detail = data.pop("detail", None)
instance = data.pop("instance", None)
return Problem(type_, status, title, detail, instance, **data)


class Problem(Exception):
def __init__(self, type_, status, title, detail, instance, **kwargs):
self._type = type_ or "about:blank"
self.status = status
self.title = title
self.detail = detail
self.instance = instance
self.extras = kwargs

for k in self.extras:
if k in ["type", "status", "title", "detail", "instance"]:
raise ValueError(f"Reserved key '{k}' used in Problem extra arguments.")

def __str__(self):
return self.title

def __repr__(self):
return f"{self.__class__.__name__}<type={self._type}; status={self.status}; title={self.title}>"

def msg(self):
msg = (
f"type = {self._type}, "
f"status = {self.status}, "
f"title = {self.title}, "
f"detail = {self.detail}, "
f"instance = {self.instance}, "
f"extras = {self.extras}"
)
return msg

@property
def type(self):
return self._type


class LegacyProblemDecoder(json.JSONDecoder):
def decode(self, s):
data = super().decode(s)
if isinstance(data, dict):
return self._make_legacy_problem(data)
return data

def _make_legacy_problem(self, data):
request_id = None
error_code = None
code_type = None

if "__type" in data:
error_code = data.get("__type")
else:
request_id = (data.get("ResponseContext") or {}).get("RequestId")
errors = data.get("Errors")
if errors:
error = errors[0]
error_code = error.get("Code")
reason = error.get("Type")
if error.get("Details"):
code_type = reason
else:
code_type = None
return LegacyProblem(None, error_code, code_type, request_id, None)


class LegacyProblem(Exception):
def __init__(self, status, error_code, code_type, request_id, url):
self.status = status
self.error_code = error_code
self.code_type = code_type
self.request_id = request_id
self.url = url

def msg(self):
msg = (
f"status = {self.status}, "
f"code = {self.error_code}, "
f"{'code_type = ' if self.code_type is not None else ''}"
f"{self.code_type + ', ' if self.code_type is not None else ''}"
f"request_id = {self.request_id}, "
f"url = {self.url}"
)
return msg


def api_error(response):
try:
problem = None
ct = response.headers.get("content-type") or ""
if "application/json" in ct:
problem = response.json(cls=LegacyProblemDecoder)
problem.status = problem.status or str(response.status_code)
problem.url = response.url
elif "application/problem+json" in ct:
problem = response.json(cls=ProblemDecoder)
problem.status = problem.status or str(response.status_code)

if problem:
return problem
except json.JSONDecodeError:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.
pass

try:
error = ET.fromstring(response.text)

err = dict()
for key, attr in [
("Code", "error_code"),
("Message", "status"),
("RequestId", "request_id"),
("RequestID", "request_id"),
]:
value = next((x.text for x in error.iter() if x.tag.endswith(key)), None)
if value:
err[attr] = value

return LegacyProblem(**err)
except:
raise Exception(
f"Could not decode error response from {response.url} with status code {response.status_code}"
)
79 changes: 5 additions & 74 deletions osc_sdk/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from requests.models import Response
from typing_extensions import TypedDict

from .problem import api_error

CANONICAL_URI = "/"
CONFIGURATION_FILE = "config.json"
CONFIGURATION_FOLDER = ".osc"
Expand Down Expand Up @@ -121,77 +123,6 @@ class Tag(TypedDict):
Values: List[str]


@dataclass
class OscApiException(Exception):
http_response: InitVar[Response]

status_code: int = field(init=False)
error_code: Optional[str] = field(default=None, init=False)
message: Optional[str] = field(default=None, init=False)
code_type: Optional[str] = field(default=None, init=False)
request_id: Optional[str] = field(default=None, init=False)

def __post_init__(self, http_response: Response):
super().__init__()
self.status_code = http_response.status_code
# Set error details
self._set(http_response)

def __str__(self) -> str:
return (
f"Error --> status = {self.status_code}, "
f"code = {self.error_code}, "
f"{'code_type = ' if self.code_type is not None else ''}"
f"{self.code_type + ', ' if self.code_type is not None else ''}"
f"Reason = {self.message}, "
f"request_id = {self.request_id}"
)

def _set(self, http_response: Response):
content = http_response.content.decode()
# In case it is JSON error format
try:
error = json.loads(content)
except json.JSONDecodeError:
pass
else:
if "__type" in error:
self.error_code = error.get("__type")
self.message = error.get("message")
self.request_id = http_response.headers.get("x-amz-requestid")
else:
self.request_id = (error.get("ResponseContext") or {}).get("RequestId")
errors = error.get("Errors")
if errors:
error = errors[0]
self.error_code = error.get("Code")
self.message = error.get("Type")
if error.get("Details"):
self.code_type = self.message
self.message = error.get("Details")
else:
self.code_type = None
return

# In case it is XML format
try:
error = ET.fromstring(content)
except ET.ParseError:
return
else:
for key, attr in [
("Code", "error_code"),
("Message", "message"),
("RequestId", "request_id"),
("RequestID", "request_id"),
]:
value = next(
(x.text for x in error.iter() if x.tag.endswith(key)), None
)
if value:
setattr(self, attr, value)


@dataclass
class ApiCall:
profile: str = DEFAULT_PROFILE
Expand Down Expand Up @@ -437,7 +368,7 @@ def make_request(self, call: str, **kwargs: CallParameters):
class XmlApiCall(ApiCall):
def get_response(self, http_response: Response) -> Union[str, ResponseContent]:
if http_response.status_code not in SUCCESS_CODES:
raise OscApiException(http_response)
raise api_error(http_response)
try:
return cast(ResponseContent, xmltodict.parse(http_response.content))
except Exception:
Expand Down Expand Up @@ -502,7 +433,7 @@ def get_parameters(

def get_response(self, http_response: Response) -> ResponseContent:
if http_response.status_code not in SUCCESS_CODES:
raise OscApiException(http_response)
raise api_error(http_response)

return json.loads(http_response.text)

Expand Down Expand Up @@ -641,7 +572,7 @@ class DirectLinkCall(JsonApiCall):

def get_response(self, http_response: Response) -> ResponseContent:
if http_response.status_code not in SUCCESS_CODES:
raise OscApiException(http_response)
raise api_error(http_response)

res = json.loads(http_response.text)
res["requestid"] = http_response.headers["x-amz-requestid"]
Expand Down
14 changes: 7 additions & 7 deletions osc_sdk/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class Env(object):
access_key: str
secret_key: str
endpoint_icu: str
endpoint_api: str
region: str


Expand All @@ -19,17 +19,17 @@ def env() -> Env:
return Env(
access_key=os.getenv("OSC_TEST_ACCESS_KEY", ""),
secret_key=os.getenv("OSC_TEST_SECRET_KEY", ""),
endpoint_icu=os.getenv("OSC_TEST_ENDPOINT_ICU", ""),
endpoint_api=os.getenv("OSC_TEST_ENDPOINT_API", ""),
region=os.getenv("OSC_TEST_REGION", ""),
)


def test_icu_auth_ak_sk(env):
icu = sdk.IcuCall(
def test_api_auth_ak_sk(env):
api = sdk.OSCCall(
access_key=env.access_key,
secret_key=env.secret_key,
endpoint=env.endpoint_icu,
endpoint=env.endpoint_api,
region_name=env.region,
)
icu.make_request("GetAccount")
assert len(icu.response) > 0
api.make_request("ReadAccounts")
assert len(api.response) > 0
5 changes: 3 additions & 2 deletions osc_sdk/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from . import sdk
from .problem import LegacyProblem


@dataclass
Expand Down Expand Up @@ -32,6 +33,6 @@ def test_bad_filter(env):
endpoint=env.endpoint_api,
region_name=env.region,
)
with pytest.raises(sdk.OscApiException) as e:
with pytest.raises(LegacyProblem) as e:
oapi.make_request("ReadImages", Filters='"bad_filter"')
assert e.value.status_code == 400
assert e.value.status == "400"
31 changes: 0 additions & 31 deletions osc_sdk/test_noauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,6 @@ def env() -> Env:
)


def test_icu_noauth_call_with_auth_env(env):
icu = sdk.IcuCall(
access_key=env.access_key,
secret_key=env.secret_key,
endpoint=env.endpoint_icu,
region_name=env.region,
)
icu.make_request("ReadPublicCatalog")
assert len(icu.response)


def test_icu_noauth_call_with_empty_auth_env(env):
icu = sdk.IcuCall( # nosec
access_key="",
secret_key="",
endpoint=env.endpoint_icu,
region_name=env.region,
)
icu.make_request("ReadPublicCatalog")
assert len(icu.response)


def test_icu_noauth_basic(env):
icu = sdk.IcuCall(
endpoint=env.endpoint_icu,
region_name=env.region,
)
icu.make_request("ReadPublicCatalog")
assert len(icu.response)


def test_api_noauth_call_with_auth_env(env):
api = sdk.OSCCall(
access_key=env.access_key,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pytest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ fi
PROJECT_ROOT=$(cd "$(dirname $0)/.." && pwd)
cd $PROJECT_ROOT
. .venv/bin/activate
pytest osc_sdk &> /dev/null
pytest osc_sdk
echo "OK"
Loading