Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype update for SNS tests in #334 #342

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 106 additions & 44 deletions aws/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,63 +103,93 @@ def cache_key(call):
)


def get_aws_resource(
def get_single_region(
service_name,
method_name,
call_args,
call_kwargs,
cache,
profiles,
regions,
profile,
region,
result_from_error=None,
debug_calls=False,
debug_cache=False,
):
"""
Fetches and yields AWS API JSON responses for all profiles and regions (list params)
Fetches AWS API JSON responses for a single profile and region.
"""
for profile, region in itertools.product(profiles, regions):
call = default_call._replace(
profile=profile,
region=region,
service=service_name,
method=method_name,
args=call_args,
kwargs=call_kwargs,
)
call = default_call._replace(
profile=profile,
region=region,
service=service_name,
method=method_name,
args=call_args,
kwargs=call_kwargs,
)

if debug_calls:
print("calling", call)
if debug_calls:
print("calling", call)

result = None
if cache is not None:
ckey = cache_key(call)
result = cache.get(ckey, None)
result = None
if cache is not None:
ckey = cache_key(call)
result = cache.get(ckey, None)

if debug_cache and result is not None:
print("found cached value for", ckey)

if debug_cache and result is not None:
print("found cached value for", ckey)
if result is None:
client = get_client(call.profile, call.region, call.service)
try:
result = full_results(client, call.method, call.args, call.kwargs)
result["__pytest_meta"] = dict(profile=call.profile, region=call.region)
except botocore.exceptions.ClientError as error:
if result_from_error is None:
raise error
else:
if debug_calls:
print("error fetching resource", error, call)

if result is None:
client = get_client(call.profile, call.region, call.service)
try:
result = full_results(client, call.method, call.args, call.kwargs)
result["__pytest_meta"] = dict(profile=call.profile, region=call.region)
except botocore.exceptions.ClientError as error:
if result_from_error is None:
raise error
else:
if debug_calls:
print("error fetching resource", error, call)
result = result_from_error(error, call)

result = result_from_error(error, call)
if cache is not None:
if debug_cache:
print("setting cache value for", ckey)

if cache is not None:
if debug_cache:
print("setting cache value for", ckey)
cache.set(ckey, result)

cache.set(ckey, result)
return result

yield result

def get_aws_resource(
service_name,
method_name,
call_args,
call_kwargs,
cache,
profiles,
regions,
result_from_error=None,
debug_calls=False,
debug_cache=False,
):
"""
Fetches and yields AWS API JSON responses for all profiles and regions (list params)
"""
for profile in profiles:
for region in regions:
yield get_single_region(
service_name,
method_name,
call_args,
call_kwargs,
cache,
profile,
region,
result_from_error=None,
debug_calls=False,
debug_cache=False,
)


class BotocoreClient:
Expand All @@ -176,8 +206,6 @@ def __init__(self, profiles, cache, debug_calls, debug_cache, offline):
else:
self.regions = get_available_regions()

self.results = []

def get_regions(self):
if self.offline:
return []
Expand All @@ -204,9 +232,9 @@ def get(
regions = ["us-east-1"]

if self.offline:
self.results = []
results = []
else:
self.results = list(
results = list(
get_aws_resource(
service_name,
method_name,
Expand All @@ -221,7 +249,41 @@ def get(
)
)

return self
return BotocoreClientResult(results)

def get_details(
self,
resource,
service_name,
method_name,
call_args,
call_kwargs,
result_from_error=None,
do_not_cache=False,
):
if self.offline:
return None
return get_single_region(
service_name,
method_name,
call_args,
call_kwargs,
profile=resource["__pytest_meta"]["profile"],
region=resource["__pytest_meta"]["region"],
cache=self.cache if not do_not_cache else None,
result_from_error=result_from_error,
debug_calls=self.debug_calls,
debug_cache=self.debug_cache,
)


class BotocoreClientResult:
"""
A result list returned by BotocoreClient.get().
"""

def __init__(self, results):
self.results = results

def values(self):
"""Returns the wrapped value
Expand Down Expand Up @@ -315,7 +377,7 @@ def flatten(self):
...
TypeError: can only concatenate list (not "dict") to list
"""
self.results = sum(self.results, [])
self.results = list(itertools.chain.from_iterable(self.results))
return self

def debug(self):
Expand Down
14 changes: 7 additions & 7 deletions aws/sns/resources.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
from conftest import botocore_client


def sns_subscriptions():
"https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns.html#subscription"
return(
return (
botocore_client.get("sns", "list_subscriptions", [], {})
.extract_key("Subscriptions")
.flatten()
.values()
)


def sns_subscription_attributes():
"https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns.html#subscription"
return [
botocore_client.get(
for subscription in sns_subscriptions():
yield botocore_client.get_details(
resource=subscription,
service_name="sns",
method_name="get_subscription_attributes",
call_args=[],
call_kwargs={"SubscriptionArn": subscription["SubscriptionArn"]},
)
.extract_key("Attributes")
for subscription in sns_subscriptions()
]
)["Attributes"]
11 changes: 6 additions & 5 deletions aws/sns/test_sns_pending_verified.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

from aws.sns.resources import sns_subscription_attributes


@pytest.mark.sns
@pytest.mark.parametrize(
"pending_verification",
sns_subscription_attributes(),
ids=lambda subscription: subscription["PendingVerification"],
"subscription_attrs",
sns_subscription_attributes(),
ids=lambda subscription: subscription["SubscriptionArn"],
)
def test_sns_pending_verified(pending_verification):
assert pending_verification == "false"
def test_sns_pending_verified(subscription_attrs):
assert subscription_attrs["PendingConfirmation"] == "false"