Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the AWS client to facilitate follow-up requests. #356

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 117 additions & 45 deletions aws/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,63 +103,93 @@ def cache_key(call):
)


def get_aws_resource(
def get_single_region(
service_name,
method_name,
call_args,
call_kwargs,
cache,
profiles,
regions,
profile,
region,
result_from_error=None,
debug_calls=False,
debug_cache=False,
):
"""
Fetches and yields AWS API JSON responses for all profiles and regions (list params)
Fetches AWS API JSON responses for a single profile and region.
"""
for profile, region in itertools.product(profiles, regions):
call = default_call._replace(
profile=profile,
region=region,
service=service_name,
method=method_name,
args=call_args,
kwargs=call_kwargs,
)
call = default_call._replace(
profile=profile,
region=region,
service=service_name,
method=method_name,
args=call_args,
kwargs=call_kwargs,
)

if debug_calls:
print("calling", call)
if debug_calls:
print("calling", call)

result = None
if cache is not None:
ckey = cache_key(call)
result = cache.get(ckey, None)
result = None
if cache is not None:
ckey = cache_key(call)
result = cache.get(ckey, None)

if debug_cache and result is not None:
print("found cached value for", ckey)
if debug_cache and result is not None:
print("found cached value for", ckey)

if result is None:
client = get_client(call.profile, call.region, call.service)
try:
result = full_results(client, call.method, call.args, call.kwargs)
result["__pytest_meta"] = dict(profile=call.profile, region=call.region)
except botocore.exceptions.ClientError as error:
if result_from_error is None:
raise error
else:
if debug_calls:
print("error fetching resource", error, call)
if result is None:
client = get_client(call.profile, call.region, call.service)
try:
result = full_results(client, call.method, call.args, call.kwargs)
result["__pytest_meta"] = dict(profile=call.profile, region=call.region)
except botocore.exceptions.ClientError as error:
if result_from_error is None:
raise error
else:
if debug_calls:
print("error fetching resource", error, call)

result = result_from_error(error, call)
result = result_from_error(error, call)

if cache is not None:
if debug_cache:
print("setting cache value for", ckey)
if cache is not None:
if debug_cache:
print("setting cache value for", ckey)

cache.set(ckey, result)

return result

cache.set(ckey, result)

yield result
def get_aws_resource(
service_name,
method_name,
call_args,
call_kwargs,
cache,
profiles,
regions,
result_from_error=None,
debug_calls=False,
debug_cache=False,
):
"""
Fetches and yields AWS API JSON responses for all profiles and regions (list params)
"""
for profile in profiles:
for region in regions:
yield get_single_region(
service_name,
method_name,
call_args,
call_kwargs,
cache,
profile,
region,
result_from_error=None,
debug_calls=False,
debug_cache=False,
)


class BotocoreClient:
Expand All @@ -178,8 +208,6 @@ def __init__(self, profiles, regions, cache, debug_calls, debug_cache, offline):
else:
self.regions = get_available_regions()

self.results = []

def get_regions(self):
if self.offline:
return []
Expand All @@ -196,7 +224,9 @@ def get(
result_from_error=None,
do_not_cache=False,
):

"""
Retrieve resources across the given regions and profiles.
"""
# TODO:
# For services that don't have the concept of regions,
# we don't want to do the exact same test N times where
Expand All @@ -206,9 +236,9 @@ def get(
regions = ["us-east-1"]

if self.offline:
self.results = []
results = []
else:
self.results = list(
results = list(
get_aws_resource(
service_name,
method_name,
Expand All @@ -223,7 +253,49 @@ def get(
)
)

return self
return BotocoreClientResult(results)

def get_details(
self,
resource,
service_name,
method_name,
call_args,
call_kwargs,
result_from_error=None,
do_not_cache=False,
):
"""
Retrieve details for the given resource.

The resource should be a dictionary returned by the `get()` method,
containing the `__pytest_meta` dictionary with metadata. You still need
to pass whatever argument the botocore function requires to identify
the resource you are requesting details of.
"""
if self.offline:
return None
return get_single_region(
service_name,
method_name,
call_args,
call_kwargs,
profile=resource["__pytest_meta"]["profile"],
region=resource["__pytest_meta"]["region"],
cache=self.cache if not do_not_cache else None,
result_from_error=result_from_error,
debug_calls=self.debug_calls,
debug_cache=self.debug_cache,
)


class BotocoreClientResult:
"""
A result list returned by BotocoreClient.get().
"""

def __init__(self, results):
self.results = results

def values(self):
"""Returns the wrapped value
Expand Down Expand Up @@ -317,7 +389,7 @@ def flatten(self):
...
TypeError: can only concatenate list (not "dict") to list
"""
self.results = sum(self.results, [])
self.results = list(itertools.chain.from_iterable(self.results))
return self

def debug(self):
Expand Down
29 changes: 13 additions & 16 deletions aws/ec2/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,19 @@ def ec2_ebs_snapshots():

def ec2_ebs_snapshots_create_permission():
"https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.describe_snapshot_attribute"
return sum(
[
botocore_client.get(
service_name="ec2",
method_name="describe_snapshot_attribute",
call_args=[],
call_kwargs={
"Attribute": "createVolumePermission",
"SnapshotId": snapshot["SnapshotId"],
},
regions=[snapshot["__pytest_meta"]["region"]],
).values()
for snapshot in ec2_ebs_snapshots()
],
[],
)
return [
botocore_client.get_details(
resource=snapshot,
service_name="ec2",
method_name="describe_snapshot_attribute",
call_args=[],
call_kwargs={
"Attribute": "createVolumePermission",
"SnapshotId": snapshot["SnapshotId"],
},
)
for snapshot in ec2_ebs_snapshots()
]


def ec2_flow_logs():
Expand Down
2 changes: 2 additions & 0 deletions aws/elasticsearch/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
def elasticsearch_domains():
"""
http://botocore.readthedocs.io/en/latest/reference/services/es.html#ElasticsearchService.Client.describe_elasticsearch_domains

This test does not work properly when using multiple AWS accounts.
"""
# You can only get 5 at a time.
domains_list = list_elasticsearch_domains()
Expand Down
47 changes: 15 additions & 32 deletions aws/elb/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,19 @@ def elbs(with_tags=True):
.values()
)

if not with_tags:
return elbs

elbs_with_tags = []
for elb in elbs:
tags = (
botocore_client.get(
if with_tags:
for elb in elbs:
tags = botocore_client.get_details(
resource=elb,
service_name="elb",
method_name="describe_tags",
call_args=[],
call_kwargs={"LoadBalancerNames": [elb["LoadBalancerName"]]},
regions=[elb["__pytest_meta"]["region"]],
)
.extract_key("TagDescriptions")
.flatten()
.values()
)
# This check is probably unneeded
if len(tags) >= 1:
tags = tags[0]
if "Tags" in tags:
elb["Tags"] = tags["Tags"]
elbs_with_tags.append(elb)
)["TagDescriptions"]
if "Tags" in tags:
elb["Tags"] = tags["Tags"]

return elbs_with_tags
return elbs


def elbs_v2():
Expand All @@ -55,18 +43,13 @@ def elb_attributes(elb):
"""
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/elb.html#ElasticLoadBalancing.Client.describe_load_balancer_attributes
"""
return (
botocore_client.get(
"elb",
"describe_load_balancer_attributes",
[],
call_kwargs={"LoadBalancerName": elb["LoadBalancerName"]},
regions=[elb["__pytest_meta"]["region"]],
)
.extract_key("LoadBalancerAttributes")
.debug()
.values()
)[0]
return botocore_client.get_details(
elb,
"elb",
"describe_load_balancer_attributes",
[],
call_kwargs={"LoadBalancerName": elb["LoadBalancerName"]},
)["LoadBalancerAttributes"]


def elbs_with_attributes():
Expand Down
Loading