From b7ac9badfc24ec7960dd7b6847d621f4555c2043 Mon Sep 17 00:00:00 2001 From: steffyP Date: Mon, 11 Jul 2022 19:15:52 +0200 Subject: [PATCH] add feature for collecting parity metrics (#6305) Co-authored-by: Dominik Schubert Co-authored-by: Thomas Rausch --- Makefile | 3 +- localstack/aws/app.py | 5 + localstack/aws/handlers/metric_handler.py | 173 ++++++++ localstack/aws/mocking.py | 409 ++++++++++++++++++ localstack/config.py | 6 + localstack/constants.py | 3 + .../testing/pytest/metric_collection.py | 63 +++ scripts/.gitignore | 2 + scripts/capture_notimplemented_responses.py | 325 ++++++++++++++ scripts/metric_aggregator.py | 230 ++++++++++ setup.cfg | 2 + .../awslambda/test_lambda_size_lims.py | 38 +- tests/integration/conftest.py | 4 + 13 files changed, 1242 insertions(+), 21 deletions(-) create mode 100644 localstack/aws/handlers/metric_handler.py create mode 100644 localstack/aws/mocking.py create mode 100644 localstack/testing/pytest/metric_collection.py create mode 100644 scripts/.gitignore create mode 100644 scripts/capture_notimplemented_responses.py create mode 100644 scripts/metric_aggregator.py diff --git a/Makefile b/Makefile index 7986ef7730179..d34eb4b8f8a95 100644 --- a/Makefile +++ b/Makefile @@ -184,7 +184,7 @@ docker-create-push-manifests-light: ## Create and push manifests for the light d docker-run-tests: ## Initializes the test environment and runs the tests in a docker container # Remove argparse and dataclasses to fix https://github.com/pytest-dev/pytest/issues/5594 # Note: running "install-test-only" below, to avoid pulling in [runtime] extras from transitive dependencies - docker run --entrypoint= -v `pwd`/tests/:/opt/code/localstack/tests/ -v `pwd`/target/:/opt/code/localstack/target/ \ + docker run -e LOCALSTACK_INTERNAL_TEST_COLLECT_METRIC=1 --entrypoint= -v `pwd`/tests/:/opt/code/localstack/tests/ -v `pwd`/target/:/opt/code/localstack/target/ \ $(IMAGE_NAME_FULL) \ bash -c "make install-test-only && make init-testlibs && pip uninstall -y argparse dataclasses && DEBUG=$(DEBUG) LAMBDA_EXECUTOR=local PYTEST_LOGLEVEL=debug PYTEST_ARGS='$(PYTEST_ARGS)' COVERAGE_FILE='$(COVERAGE_FILE)' TEST_PATH='$(TEST_PATH)' make test-coverage" @@ -207,6 +207,7 @@ test: ## Run automated tests test-coverage: ## Run automated tests and create coverage report ($(VENV_RUN); python -m coverage --version; \ DEBUG=$(DEBUG) \ + LOCALSTACK_INTERNAL_TEST_COLLECT_METRIC=1 \ python -m coverage run $(COVERAGE_ARGS) -m \ pytest --durations=10 --log-cli-level=$(PYTEST_LOGLEVEL) -s $(PYTEST_ARGS) $(TEST_PATH)) diff --git a/localstack/aws/app.py b/localstack/aws/app.py index 100ab12fe75bc..8a3e6a207481d 100644 --- a/localstack/aws/app.py +++ b/localstack/aws/app.py @@ -1,6 +1,7 @@ import logging from localstack.aws import handlers +from localstack.aws.handlers.metric_handler import MetricHandler from localstack.aws.handlers.service_plugin import ServiceLoader from localstack.services.plugins import SERVICE_PLUGINS, ServiceManager, ServicePluginManager @@ -21,10 +22,12 @@ def __init__(self, service_manager: ServiceManager = None) -> None: # lazy-loads services into the router load_service = ServiceLoader(self.service_manager, self.service_request_router) + metric_collector = MetricHandler() # the main request handler chain self.request_handlers.extend( [ handlers.push_request_context, + metric_collector.create_metric_handler_item, handlers.parse_service_name, # enforce_cors and content_decoder depend on the service name handlers.enforce_cors, handlers.content_decoder, @@ -36,6 +39,7 @@ def __init__(self, service_manager: ServiceManager = None) -> None: handlers.add_region_from_header, handlers.add_account_id, handlers.parse_service_request, + metric_collector.record_parsed_request, handlers.serve_custom_service_request_handlers, load_service, # once we have the service request we can make sure we load the service self.service_request_router, # once we know the service is loaded we can route the request @@ -62,6 +66,7 @@ def __init__(self, service_manager: ServiceManager = None) -> None: handlers.log_response, handlers.count_service_request, handlers.pop_request_context, + metric_collector.update_metric_collection, ] ) diff --git a/localstack/aws/handlers/metric_handler.py b/localstack/aws/handlers/metric_handler.py new file mode 100644 index 0000000000000..302d0196645bb --- /dev/null +++ b/localstack/aws/handlers/metric_handler.py @@ -0,0 +1,173 @@ +import copy +import logging +from typing import List, Optional + +from localstack import config +from localstack.aws.api import RequestContext, ServiceRequest +from localstack.aws.chain import HandlerChain +from localstack.http import Response +from localstack.utils.aws.aws_stack import is_internal_call_context + +LOG = logging.getLogger(__name__) + + +class MetricHandlerItem: + """ + MetricHandlerItem to reference and update requests by the MetricHandler + """ + + request_id: str + request_context: RequestContext + request_after_parse: Optional[ServiceRequest] + + def __init__(self, request_contex: RequestContext) -> None: + super().__init__() + self.request_id = str(hash(request_contex)) + self.request_context = request_contex + self.request_after_parse = None + + +class Metric: + """ + Data object to store relevant information for a metric entry in the raw-data collection (csv) + """ + + service: str + operation: str + headers: str + parameters: str + status_code: int + response_code: Optional[str] + exception: str + origin: str + xfail: bool + aws_validated: bool + snapshot: bool + node_id: str + + RAW_DATA_HEADER = [ + "service", + "operation", + "request_headers", + "parameters", + "response_code", + "response_data", + "exception", + "origin", + "test_node_id", + "xfail", + "aws_validated", + "snapshot", + ] + + def __init__( + self, + service: str, + operation: str, + headers: str, + parameters: str, + response_code: int, + response_data: str, + exception: str, + origin: str, + node_id: str = "", + xfail: bool = False, + aws_validated: bool = False, + snapshot: bool = False, + ) -> None: + self.service = service + self.operation = operation + self.headers = headers + self.parameters = parameters + self.response_code = response_code + self.response_data = response_data + self.exception = exception + self.origin = origin + self.node_id = node_id + self.xfail = xfail + self.aws_validated = aws_validated + self.snapshot = snapshot + + def __iter__(self): + return iter( + [ + self.service, + self.operation, + self.headers, + self.parameters, + self.response_code, + self.response_data, + self.exception, + self.origin, + self.node_id, + self.xfail, + self.aws_validated, + self.snapshot, + ] + ) + + +class MetricHandler: + metric_data: List[Metric] = [] + + def __init__(self) -> None: + self.metrics_handler_items = {} + + def create_metric_handler_item( + self, chain: HandlerChain, context: RequestContext, response: Response + ): + if not config.is_collect_metrics_mode(): + return + item = MetricHandlerItem(context) + self.metrics_handler_items[context] = item + + def _get_metric_handler_item_for_context(self, context: RequestContext) -> MetricHandlerItem: + return self.metrics_handler_items[context] + + def record_parsed_request( + self, chain: HandlerChain, context: RequestContext, response: Response + ): + if not config.is_collect_metrics_mode(): + return + item = self._get_metric_handler_item_for_context(context) + item.request_after_parse = copy.deepcopy(context.service_request) + + def record_exception( + self, chain: HandlerChain, exception: Exception, context: RequestContext, response: Response + ): + if not config.is_collect_metrics_mode(): + return + item = self._get_metric_handler_item_for_context(context) + item.caught_exception_name = exception.__class__.__name__ + + def update_metric_collection( + self, chain: HandlerChain, context: RequestContext, response: Response + ): + if not config.is_collect_metrics_mode() or not context.service_operation: + return + + is_internal = is_internal_call_context(context.request.headers) + item = self._get_metric_handler_item_for_context(context) + + # parameters might get changed when dispatched to the service - we use the params stored in request_after_parse + parameters = ",".join(item.request_after_parse or "") + + response_data = response.data.decode("utf-8") if response.status_code >= 300 else "" + + MetricHandler.metric_data.append( + Metric( + service=context.service_operation.service, + operation=context.service_operation.operation, + headers=context.request.headers, + parameters=parameters, + response_code=response.status_code, + response_data=response_data, + exception=context.service_exception.__class__.__name__ + if context.service_exception + else "", + origin="internal" if is_internal else "external", + ) + ) + + # cleanup + del self.metrics_handler_items[context] diff --git a/localstack/aws/mocking.py b/localstack/aws/mocking.py new file mode 100644 index 0000000000000..3b6e1609ecd0c --- /dev/null +++ b/localstack/aws/mocking.py @@ -0,0 +1,409 @@ +import logging +import math +import random +import re +from datetime import date, datetime +from functools import lru_cache, singledispatch +from typing import Dict, List, Optional, Set, Tuple, Union, cast + +import botocore +import networkx +import rstr +from botocore.model import ListShape, MapShape, OperationModel, Shape, StringShape, StructureShape + +from localstack.aws.api import RequestContext, ServiceRequest, ServiceResponse +from localstack.aws.skeleton import DispatchTable, ServiceRequestDispatcher, Skeleton +from localstack.aws.spec import load_service +from localstack.utils.sync import retry + +LOG = logging.getLogger(__name__) + +types = { + "timestamp", + "string", + "blob", + "map", + "list", + "long", + "structure", + "integer", + "double", + "float", + "boolean", +} + +Instance = Union[ + Dict[str, "Instance"], + List["Instance"], + str, + bytes, + map, + list, + float, + int, + bool, + date, +] + +# https://github.com/boto/botocore/issues/2623 +StringShape.METADATA_ATTRS.append("pattern") + +words = [ + # a few snazzy six-letter words + "snazzy", + "mohawk", + "poncho", + "proton", + "foobar", + "python", + "umlaut", + "except", + "global", + "latest", +] + + +class ShapeGraph(networkx.DiGraph): + root: Union[ListShape, StructureShape, MapShape] + cycle: List[Tuple[str, str]] + cycle_shapes: List[str] + + +def populate_graph(graph: networkx.DiGraph, root: Shape): + stack: List[Shape] = [root] + visited: Set[str] = set() + + while stack: + cur = stack.pop() + if cur is None: + continue + + if cur.name in visited: + continue + + visited.add(cur.name) + graph.add_node(cur.name, shape=cur) + + if isinstance(cur, ListShape): + graph.add_edge(cur.name, cur.member.name) + stack.append(cur.member) + elif isinstance(cur, StructureShape): + for member in cur.members.values(): + stack.append(member) + graph.add_edge(cur.name, member.name) + elif isinstance(cur, MapShape): + stack.append(cur.key) + stack.append(cur.value) + graph.add_edge(cur.name, cur.key.name) + graph.add_edge(cur.name, cur.value.name) + + else: # leaf types (int, string, bool, ...) + pass + + +def shape_graph(root: Shape) -> ShapeGraph: + graph = networkx.DiGraph() + graph.root = root + populate_graph(graph, root) + + cycles = list() + shapes = set() + for node in graph.nodes: + try: + cycle = networkx.find_cycle(graph, source=node) + for k, v in cycle: + shapes.add(k) + shapes.add(v) + + if cycle not in cycles: + cycles.append(cycle) + except networkx.NetworkXNoCycle: + pass + + graph.cycles = cycles + graph.cycle_shapes = list(shapes) + + return cast(ShapeGraph, graph) + + +def sanitize_pattern(pattern: str) -> str: + pattern = pattern.replace("\\p{XDigit}", "[A-Fa-f0-9]") + pattern = pattern.replace("\\p{P}", "[.,;]") + pattern = pattern.replace("\\p{Punct}", "[.,;]") + pattern = pattern.replace("\\p{N}", "[0-9]") + pattern = pattern.replace("\\p{L}", "[A-Z]") + pattern = pattern.replace("\\p{LD}", "[A-Z]") + pattern = pattern.replace("\\p{Z}", "[ ]") + pattern = pattern.replace("\\p{S}", "[+\\u-*]") + pattern = pattern.replace("\\p{M}", "[`]") + pattern = pattern.replace("\\p{IsLetter}", "[a-zA-Z]") + pattern = pattern.replace("[:alnum:]", "[a-zA-Z0-9]") + return pattern + + +def sanitize_arn_pattern(pattern: str) -> str: + # clown emoji + + # some devs were just lazy ... + if pattern in [ + ".*", + "arn:.*", + "arn:.+", + "^arn:.+", + "arn:aws.*:*", + "^arn:aws.*", + "^arn:.*", + ".*\\S.*", + "^[A-Za-z0-9:\\/_-]*$", + "^arn[\\/\\:\\-\\_\\.a-zA-Z0-9]+$", + ".{0,1600}", + "^arn:[!-~]+$", + "[\\S]+", + "[\\s\\S]*", + "^([\\p{L}\\p{Z}\\p{N}_.:/=+\\-@]*)$", + "[a-zA-Z0-9_:\\-\\/]+", + ]: + pattern = "arn:aws:[a-z]{4}:us-east-1:[0-9]{12}:[a-z]{8}" + + # common pattern to describe a partition + pattern = pattern.replace("arn:[^:]*:", "arn:aws:") + pattern = pattern.replace("arn:[a-z\\d-]+", "arn:aws") + pattern = pattern.replace("arn:[\\w+=\\/,.@-]+", "arn:aws") + pattern = pattern.replace("arn:[a-z-]+?", "arn:aws") + pattern = pattern.replace("arn:[a-z0-9][-.a-z0-9]{0,62}", "arn:aws") + pattern = pattern.replace(":aws(-\\w+)*", ":aws") + pattern = pattern.replace(":aws[a-z\\-]*", ":aws") + pattern = pattern.replace(":aws(-[\\w]+)*", ":aws") + pattern = pattern.replace(":aws[^:\\s]*", ":aws") + pattern = pattern.replace(":aws[A-Za-z0-9-]{0,64}", ":aws") + # often the account-id + pattern = pattern.replace(":[0-9]+:", ":[0-9]{13}:") + pattern = pattern.replace(":\\w{12}:", ":[0-9]{13}:") + # substitutions + pattern = pattern.replace("[a-z\\-\\d]", "[a-z0-9]") + pattern = pattern.replace( + "[\\u0020-\\uD7FF\\uE000-\\uFFFD\\uD800\\uDC00-\\uDBFF\\uDFFF\\r\\n\\t]", "[a-z0-9]" + ) + pattern = pattern.replace("[\\w\\d-]", "[a-z0-9]") + pattern = pattern.replace("[\\w+=/,.@-]", "[a-z]") + pattern = pattern.replace("[^:]", "[a-z]") + pattern = pattern.replace("[^/]", "[a-z]") + pattern = pattern.replace("\\d+", "[0-9]+") + pattern = pattern.replace("\\d*", "[0-9]*") + pattern = pattern.replace("\\S+", "[a-z]{4}") + pattern = pattern.replace("\\d]", "0-9]") + pattern = pattern.replace("[a-z\\d", "[a-z0-9") + pattern = pattern.replace("[a-zA-Z\\d", "[a-z0-9") + pattern = pattern.replace("^$|", "") + pattern = pattern.replace("(^$)|", "") + pattern = pattern.replace("[:/]", "[a-z]") + pattern = pattern.replace("/.{", "/[a-z]{") + pattern = pattern.replace(".{", "[a-z]{") + pattern = pattern.replace("-*", "-") + pattern = pattern.replace("\\n", "") + pattern = pattern.replace("\\r", "") + # quantifiers + pattern = pattern.replace("{11}{0,1011}", "{11}") + pattern = pattern.replace("}+", "}") + pattern = pattern.replace("]*", "]{6}") + pattern = pattern.replace("]+", "]{6}") + pattern = pattern.replace(".*", "[a-z]{6}") + pattern = pattern.replace(".+", "[a-z]{6}") + + return pattern + + +custom_arns = { + "DeviceFarmArn": "arn:aws:devicefarm:us-east-1:1234567890123:mydevicefarm", + "KmsKeyArn": "arn:aws:kms:us-east-1:1234567890123:key/somekmskeythatisawesome", +} + + +@singledispatch +def generate_instance(shape: Shape, graph: ShapeGraph) -> Optional[Instance]: + if shape is None: + return None + raise ValueError("could not generate shape for type %s" % shape.type_name) + + +@generate_instance.register +def _(shape: StructureShape, graph: ShapeGraph) -> Dict[str, Instance]: + if shape.is_tagged_union: + k, v = random.choice(list(shape.members.items())) + members = {k: v} + else: + members = shape.members + + if shape.name in graph.cycle_shapes: + return {} + + return { + name: generate_instance(member_shape, graph) + for name, member_shape in members.items() + if member_shape.name != shape.name + } + + +@generate_instance.register +def _(shape: ListShape, graph: ShapeGraph) -> List[Instance]: + if shape.name in graph.cycle_shapes: + return [] + return [generate_instance(shape.member, graph) for _ in range(shape.metadata.get("min", 1))] + + +@generate_instance.register +def _(shape: MapShape, graph: ShapeGraph) -> Dict[str, Instance]: + if shape.name in graph.cycle_shapes: + return {} + return {generate_instance(shape.key, graph): generate_instance(shape.value, graph)} + + +def generate_arn(shape: StringShape): + if not shape.metadata: + return "arn:aws:ec2:us-east-1:1234567890123:instance/i-abcde0123456789f" + + def _generate_arn(): + # some custom hacks + if shape.name in custom_arns: + return custom_arns[shape.name] + + max_len = shape.metadata.get("max") or math.inf + min_len = shape.metadata.get("min") or 0 + + pattern = shape.metadata.get("pattern") + if pattern: + # FIXME: also conforming to length may be difficult + pattern = sanitize_arn_pattern(pattern) + arn = rstr.xeger(pattern) + else: + arn = "arn:aws:ec2:us-east-1:1234567890123:instance/i-abcde0123456789f" + + # if there's a value set for the region, replace with a randomly picked region + # TODO: splitting the ARNs here by ":" sometimes fails for some reason (e.g. or dynamodb for some reason) + arn_parts = arn.split(":") + if len(arn_parts) >= 4: + region = arn_parts[3] + if region: + # TODO: check service in ARN and try to get the actual region for the service + regions = botocore.session.Session().get_available_regions("lambda") + picked_region = random.choice(regions) + arn_parts[3] = picked_region + arn = ":".join(arn_parts) + + if len(arn) > max_len: + arn = arn[:max_len] + + if len(arn) < min_len or len(arn) > max_len: + raise ValueError( + f"generated arn {arn} for shape {shape.name} does not match constraints {shape.metadata}" + ) + + return arn + + return retry(_generate_arn, retries=10, sleep_before=0, sleep=0) + + +custom_strings = {"DailyTime": "12:10", "WeeklyTime": "1:12:10"} + + +@generate_instance.register +def _(shape: StringShape, graph: ShapeGraph) -> str: + if shape.enum: + return shape.enum[0] + + if shape.name in custom_strings: + return custom_strings[shape.name] + + if ( + shape.name.endswith("ARN") + or shape.name.endswith("Arn") + or shape.name == "AmazonResourceName" + ): + return generate_arn(shape) + + max_len: int = shape.metadata.get("max") or 256 + min_len: int = shape.metadata.get("min") or 0 + str_len = min(min_len or 6, max_len) + + pattern = shape.metadata.get("pattern") + + if not pattern or pattern in [".*", "^.*$", ".+"]: + if min_len <= 6 and max_len >= 6: + # pick a random six-letter word, to spice things up. this will be the case most of the time. + return random.choice(words) + else: + return "a" * str_len + + pattern = sanitize_pattern(pattern) + + try: + # try to return something simple first + random_string = "a" * str_len + if re.match(pattern, random_string): + return random_string + + val = rstr.xeger(pattern) + # TODO: this will break the pattern if the string needs to end with something that we may cut off. + return val[: min(max_len, len(val))] + except re.error: + # TODO: this will likely break the pattern + return "0" * str_len + + +@generate_instance.register +def _(shape: Shape, graph: ShapeGraph) -> Union[int, float, bool, bytes, date]: + if shape.type_name in ["integer", "long"]: + return shape.metadata.get("min", 1) + if shape.type_name in ["float", "double"]: + return shape.metadata.get("min", 1.0) + if shape.type_name == "boolean": + return True + if shape.type_name == "blob": + # TODO: better blob generator + return b"0" * shape.metadata.get("min", 1) + if shape.type_name == "timestamp": + return datetime.now() + + raise ValueError("unknown type %s" % shape.type_name) + + +def is_cyclic_shape(shape: Shape) -> bool: + return True if shape_graph(shape).cycle else False + + +def generate_response(operation: OperationModel): + graph = shape_graph(operation.output_shape) + response = generate_instance(graph.root, graph) + response.pop("nextToken", None) + return response + + +def generate_request(operation: OperationModel): + graph = shape_graph(operation.input_shape) + return generate_instance(graph.root, graph) + + +def return_mock_response(context: RequestContext, request: ServiceRequest) -> ServiceResponse: + return generate_response(context.operation) + + +def create_mocking_dispatch_table(service) -> DispatchTable: + dispatch_table = {} + + for operation in service.operation_names: + # resolve the bound function of the delegate + # create a dispatcher + dispatch_table[operation] = ServiceRequestDispatcher( + return_mock_response, + operation=operation, + pass_context=True, + expand_parameters=False, + ) + + return dispatch_table + + +@lru_cache() +def get_mocking_skeleton(service: str) -> Skeleton: + service = load_service(service) + return Skeleton(service, create_mocking_dispatch_table(service)) diff --git a/localstack/config.py b/localstack/config.py index 4a21b0c78978a..ec747d3a4f433 100644 --- a/localstack/config.py +++ b/localstack/config.py @@ -16,6 +16,7 @@ DEFAULT_PORT_EDGE, DEFAULT_SERVICE_PORTS, DEFAULT_VOLUME_DIR, + ENV_INTERNAL_TEST_COLLECT_METRIC, ENV_INTERNAL_TEST_RUN, FALSE_STRINGS, LOCALHOST, @@ -787,6 +788,11 @@ def is_local_test_mode() -> bool: return is_env_true(ENV_INTERNAL_TEST_RUN) +def is_collect_metrics_mode() -> bool: + """Returns True if metric collection is enabled.""" + return is_env_true(ENV_INTERNAL_TEST_COLLECT_METRIC) + + def collect_config_items() -> List[Tuple[str, Any]]: """Returns a list of key-value tuples of LocalStack configuration values.""" none = object() # sentinel object diff --git a/localstack/constants.py b/localstack/constants.py index a2a80eb181d66..ac105e12c3aab 100644 --- a/localstack/constants.py +++ b/localstack/constants.py @@ -82,6 +82,9 @@ # environment variable name to tag local test runs ENV_INTERNAL_TEST_RUN = "LOCALSTACK_INTERNAL_TEST_RUN" +# environment variable name to tag collect metrics during a test run +ENV_INTERNAL_TEST_COLLECT_METRIC = "LOCALSTACK_INTERNAL_TEST_COLLECT_METRIC" + # environment variable that flags whether pro was activated. do not use for security purposes! ENV_PRO_ACTIVATED = "PRO_ACTIVATED" diff --git a/localstack/testing/pytest/metric_collection.py b/localstack/testing/pytest/metric_collection.py new file mode 100644 index 0000000000000..371a6b3a70902 --- /dev/null +++ b/localstack/testing/pytest/metric_collection.py @@ -0,0 +1,63 @@ +import csv +import os +import re +from datetime import datetime +from pathlib import Path +from typing import Optional + +import pytest +from _pytest.main import Session +from _pytest.nodes import Item + +from localstack.aws.handlers.metric_handler import Metric, MetricHandler +from localstack.utils.strings import short_uid + +BASE_PATH = os.path.join(os.path.dirname(__file__), "../../../target/metric_reports") +FNAME_RAW_DATA_CSV = os.path.join( + BASE_PATH, + f"metric-report-raw-data-{datetime.utcnow().strftime('%Y-%m-%d__%H_%M_%S')}-{short_uid()}.csv", +) + + +@pytest.hookimpl() +def pytest_sessionstart(session: "Session") -> None: + Path(BASE_PATH).mkdir(parents=True, exist_ok=True) + pattern = re.compile("--junitxml=(.*)\\.xml") + if session.config.invocation_params: + for ip in session.config.invocation_params.args: + if m := pattern.match(ip): + report_file_name = m.groups()[-1].split("/")[-1] + global FNAME_RAW_DATA_CSV + FNAME_RAW_DATA_CSV = os.path.join( + BASE_PATH, + f"metric-report-raw-data-{datetime.utcnow().strftime('%Y-%m-%d__%H_%M_%S')}-{report_file_name}.csv", + ) + + with open(FNAME_RAW_DATA_CSV, "w") as fd: + writer = csv.writer(fd) + writer.writerow(Metric.RAW_DATA_HEADER) + + +@pytest.hookimpl() +def pytest_runtest_teardown(item: "Item", nextitem: Optional["Item"]) -> None: + node_id = item.nodeid + xfail = False + aws_validated = False + snapshot = False + + for _ in item.iter_markers(name="xfail"): + xfail = True + for _ in item.iter_markers(name="aws_validated"): + aws_validated = True + if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames: + snapshot = True + for metric in MetricHandler.metric_data: + metric.xfail = xfail + metric.aws_validated = aws_validated + metric.snapshot = snapshot + metric.node_id = node_id + + with open(FNAME_RAW_DATA_CSV, "a") as fd: + writer = csv.writer(fd) + writer.writerows(MetricHandler.metric_data) + MetricHandler.metric_data.clear() diff --git a/scripts/.gitignore b/scripts/.gitignore new file mode 100644 index 0000000000000..343a25cb84882 --- /dev/null +++ b/scripts/.gitignore @@ -0,0 +1,2 @@ +*.csv +*.json diff --git a/scripts/capture_notimplemented_responses.py b/scripts/capture_notimplemented_responses.py new file mode 100644 index 0000000000000..5f27aa37942d5 --- /dev/null +++ b/scripts/capture_notimplemented_responses.py @@ -0,0 +1,325 @@ +import csv +import logging +from typing import TypedDict + +import botocore.config +import click +from botocore.exceptions import ( + ClientError, + ConnectTimeoutError, + EndpointConnectionError, + ReadTimeoutError, +) +from botocore.parsers import ResponseParserError + +from localstack.aws.mocking import generate_request +from localstack.aws.spec import ServiceCatalog +from localstack.utils.aws import aws_stack + +logging.basicConfig(level=logging.INFO) +service_models = ServiceCatalog() + +STATUS_TIMEOUT_ERROR = 901 +STATUS_PARSING_ERROR = 902 +STATUS_CONNECTION_ERROR = 903 + +# TODO: generate these via a script in PRO +# generate with e.g. http http://localhost:4566/health | jq ".services | keys[]" | pbcopy +latest_services_pro = [ + "acm", + "amplify", + "apigateway", + "apigatewaymanagementapi", + "apigatewayv2", + "appconfig", + "application-autoscaling", + "appsync", + "athena", + "autoscaling", + "azure", + "backup", + "batch", + "ce", + "cloudformation", + "cloudfront", + "cloudtrail", + "cloudwatch", + "codecommit", + "cognito-identity", + "cognito-idp", + "config", + "docdb", + "dynamodb", + "dynamodbstreams", + "ec2", + "ecr", + "ecs", + "efs", + "eks", + "elasticache", + "elasticbeanstalk", + "elb", + "elbv2", + "emr", + "es", + "events", + "firehose", + "glacier", + "glue", + "iam", + "iot", + "iot-data", + "iotanalytics", + "iotwireless", + "kafka", + "kinesis", + "kinesisanalytics", + "kinesisanalyticsv2", + "kms", + "lakeformation", + "lambda", + "logs", + "mediastore", + "mediastore-data", + "mwaa", + "neptune", + "opensearch", + "organizations", + "qldb", + "qldb-session", + "rds", + "rds-data", + "redshift", + "redshift-data", + "resource-groups", + "resourcegroupstaggingapi", + "route53", + "route53resolver", + "s3", + "s3control", + "sagemaker", + "secretsmanager", + "serverlessrepo", + "servicediscovery", + "ses", + "sesv2", + "sns", + "sqs", + "ssm", + "stepfunctions", + "sts", + "support", + "swf", + "timestream-query", + "timestream-write", + "transfer", + "xray", +] +exclude_services = {"azure"} +latest_services_pro = [s for s in latest_services_pro if s not in exclude_services] +latest_services_pro.sort() + + +class RowEntry(TypedDict, total=False): + service: str + operation: str + status_code: int + error_code: str + error_message: str + is_implemented: bool + + +def simulate_call(service: str, op: str) -> RowEntry: + """generates a mock request based on the service and operation model and sends it to the API""" + client = aws_stack.create_external_boto_client( + service, + config=botocore.config.Config( + parameter_validation=False, + retries={"max_attempts": 0, "total_max_attempts": 1}, + connect_timeout=1, + read_timeout=1, + ), + ) + + service_model = service_models.get(service) + op_model = service_model.operation_model(op) + parameters = generate_request(op_model) # should be generate_parameters I guess + + result = RowEntry(service=service, operation=op, status_code=0) + logging.debug(parameters) + try: + response = client._make_api_call(op, parameters) + result["status_code"] = response["ResponseMetadata"]["HTTPStatusCode"] + except ClientError as ce: + result["status_code"] = ce.response["ResponseMetadata"]["HTTPStatusCode"] + result["error_code"] = ce.response.get("Error", {}).get("Code", "Unknown?") + result["error_message"] = ce.response.get("Error", {}).get("Message", "Unknown?") + except (ReadTimeoutError, ConnectTimeoutError): + logging.warning("Reached timeout. Assuming it is implemented.") + result["status_code"] = STATUS_TIMEOUT_ERROR + except EndpointConnectionError: + # TODO: investigate further;for now assuming not implemented + logging.warning("Connection failed. Assuming it is not implemented.") + result["status_code"] = STATUS_CONNECTION_ERROR + except ResponseParserError: + # TODO: this is actually a bit tricky and might have to be handled on a service by service basis again + logging.warning("Parsing issue. Assuming it is implemented.") + result["status_code"] = STATUS_PARSING_ERROR + except Exception as e: + logging.exception(e) + return result + + +def map_to_notimplemented(row: RowEntry) -> bool: + """ + Some simple heuristics to check the API responses and classify them into implemented/notimplemented + + Ideally they all should behave the same way when receiving requests for not yet implemented endpoints + (501 with a "not yet implemented" message) + + :param row: the RowEntry + :return: True if we assume it is not implemented, False otherwise + """ + + if row["status_code"] in [STATUS_TIMEOUT_ERROR, STATUS_PARSING_ERROR]: + # parsing or timeout issue, interpreted as implemented until there's a better heuristic + return False + if row["status_code"] == STATUS_CONNECTION_ERROR: + # affected services: + # lakeformation, GetQueryStat + # lakeformation, GetQueryStatistics + # lakeformation, GetWorkUnitResults + # lakeformation, GetWorkUnits + # lakeformation, StartQueryPlanning + # servicediscovery, DiscoverInstances + # stepfunctions, StartSyncExecution + return True + if row["service"] == "dynamodb" and row.get("error_code") == "UnknownOperationException": + return True + if row["service"] == "lambda" and row["status_code"] == 404 and row.get("error_code") == "404": + return True + if ( + row["service"] == "apigateway" + and row["status_code"] == 404 + and row.get("error_code") == "404" + and row.get("error_message") is not None + and "The requested URL was not found on the server" in row.get("error_message", "") + ): + return True + if ( + row["service"] == "apigatewayv2" + and row["status_code"] == 501 + and row.get("error_message") is not None + and "not yet implemented" in row.get("error_message", "") + ): + return True + if row.get("error_message") is not None and "not yet implemented" in row.get( + "error_message", "" + ): + return True + if row["status_code"] == 501: + return True + return False + + +def run_script(services: list[str]): + """send requests against all APIs""" + with ( + open("implementation_coverage_full.csv", "w") as csvfile, + open("implementation_coverage_aggregated.csv", "w") as aggregatefile, + ): + full_w = csv.DictWriter( + csvfile, + fieldnames=[ + "service", + "operation", + "status_code", + "error_code", + "error_message", + "is_implemented", + ], + ) + aggregated_w = csv.DictWriter( + aggregatefile, + fieldnames=["service", "operation", "implemented_count", "full_count", "percentage"], + ) + + full_w.writeheader() + aggregated_w.writeheader() + + responses = {} + for service_name in services: + service = service_models.get(service_name) + for op_name in service.operation_names: + # here's the important part (the actual service call!) + response = simulate_call(service_name, op_name) + + responses.setdefault(service_name, {})[op_name] = response + is_implemented = str(not map_to_notimplemented(response)) + full_w.writerow(response | {"is_implemented": is_implemented}) + + # calculate aggregate for service + all_count = len(responses[service_name].values()) + implemented_count = len( + [r for r in responses[service_name].values() if not map_to_notimplemented(r)] + ) + implemented_percentage = implemented_count / all_count + + aggregated_w.writerow( + { + "service": response["service"], + "operation": response["operation"], + "implemented_count": implemented_count, + "full_count": all_count, + "percentage": f"{implemented_percentage * 100:.1f}", + } + ) + + +def calculate_percentages(): + aggregate = {} + + implemented_aggregate = {} + aggregate_list = [] + + with open("./output-notimplemented.csv", "r") as fd: + reader = csv.DictReader(fd, fieldnames=["service", "operation", "implemented"]) + for line in reader: + if line["implemented"] == "implemented": + continue + aggregate.setdefault(line["service"], {}).setdefault(line["operation"], line) + + for service in aggregate.keys(): + vals = aggregate[service].values() + all_count = len(vals) + implemented_count = len([v for v in vals if v["implemented"] == "True"]) + implemented_aggregate[service] = implemented_count / all_count + aggregate_list.append( + { + "service": service, + "count": all_count, + "implemented": implemented_count, + "percentage": implemented_count / all_count, + } + ) + + aggregate_list.sort(key=lambda k: k["percentage"]) + + with open("implementation_coverage_aggregated.csv", "w") as csv_fd: + writer = csv.DictWriter( + csv_fd, fieldnames=["service", "percentage", "implemented", "count"] + ) + writer.writeheader() + + for agg in aggregate_list: + agg["percentage"] = f"{agg['percentage'] * 100:.1f}" + writer.writerow(agg) + + +@click.command() +def main(): + run_script(latest_services_pro) + + +if __name__ == "__main__": + main() diff --git a/scripts/metric_aggregator.py b/scripts/metric_aggregator.py new file mode 100644 index 0000000000000..af99f723c9d6b --- /dev/null +++ b/scripts/metric_aggregator.py @@ -0,0 +1,230 @@ +import copy +import csv +import datetime +import json +import logging +import os +import sys +from pathlib import Path +from typing import Dict, Optional + +from localstack.aws.handlers.metric_handler import Metric +from localstack.services.plugins import SERVICE_PLUGINS + +LOG = logging.getLogger(__name__) + +template_implemented_item = "- [X] " +template_not_implemented_item = "- [ ] " + + +def _generate_details_block(details_title: str, details: dict) -> str: + output = f"
{details_title}\n\n" + for e, count in details.items(): + if count > 0: + output += f" {template_implemented_item}{e}\n" + else: + output += f" {template_not_implemented_item}{e}\n" + output += "
\n" + return output + + +def create_readable_report(file_name: str, metrics: dict): + output = "# Metric Collection Report of Integration Tests #\n\n" + output += "**__Disclaimer__**: naive calculation of test coverage - if operation is called at least once, it is considered as 'covered'.\n" + for service in sorted(metrics.keys()): + output += f"## {service} ##\n" + details = metrics[service] + if not details["service_attributes"]["pro"]: + output += "community\n" + elif not details["service_attributes"]["community"]: + output += "pro only\n" + else: + output += "community, and pro features\n" + del metrics[service]["service_attributes"] + + operation_counter = len(details) + operation_tested = 0 + + tmp = "" + for operation in sorted(details.keys()): + op_details = details[operation] + if op_details.get("invoked", 0) > 0: + operation_tested += 1 + tmp += f"{template_implemented_item}{operation}\n" + else: + tmp += f"{template_not_implemented_item}{operation}\n" + if op_details.get("parameters"): + parameters = op_details.get("parameters") + if parameters: + tmp += _generate_details_block("parameters hit", parameters) + if op_details.get("errors"): + tmp += _generate_details_block("errors hit", op_details["errors"]) + + output += f"
{operation_tested/operation_counter*100:.2f}% test coverage\n\n{tmp}\n
\n" + + with open(file_name, "a") as fd: + fd.write(f"{output}\n") + output = "" + + +def _init_service_metric_counter() -> Dict: + metric_recorder = {} + from localstack.aws.spec import load_service + + for s, provider in SERVICE_PLUGINS.api_provider_specs.items(): + try: + service = load_service(s) + ops = {} + service_attributes = {"pro": "pro" in provider, "community": "default" in provider} + ops["service_attributes"] = service_attributes + for op in service.operation_names: + attributes = {} + attributes["invoked"] = 0 + if hasattr(service.operation_model(op).input_shape, "members"): + params = {} + for n in service.operation_model(op).input_shape.members: + params[n] = 0 + attributes["parameters"] = params + if hasattr(service.operation_model(op), "error_shapes"): + exceptions = {} + for e in service.operation_model(op).error_shapes: + exceptions[e.name] = 0 + attributes["errors"] = exceptions + ops[op] = attributes + + metric_recorder[s] = ops + except Exception: + LOG.debug(f"cannot load service '{s}'") + return metric_recorder + + +def print_usage(): + print("missing argument: directory") + print("usage: python metric_aggregator.py [amd64|arch64]") + + +def write_json(file_name: str, metric_dict: dict): + with open(file_name, "w") as fd: + fd.write(json.dumps(metric_dict, indent=2, sort_keys=True)) + + +def _print_diff(metric_recorder_internal, metric_recorder_external): + for key, val in metric_recorder_internal.items(): + for subkey, val in val.items(): + if isinstance(val, dict) and val.get("invoked"): + if val["invoked"] > 0 and not metric_recorder_external[key][subkey]["invoked"]: + print(f"found invocation mismatch: {key}.{subkey}") + + +def append_row_to_raw_collection(collection_raw_csv_file_name, row, arch): + with open(collection_raw_csv_file_name, "a") as fd: + writer = csv.writer(fd) + row.append(arch) + writer.writerow(row) + + +def aggregate_recorded_raw_data( + base_dir: str, collection_raw_csv: Optional[str] = None, collect_for_arch: Optional[str] = "" +) -> dict: + pathlist = Path(base_dir).rglob("metric-report-raw-data-*.csv") + recorded = _init_service_metric_counter() + for path in pathlist: + print(f"checking {str(path)}") + with open(path, "r") as csv_obj: + csv_dict_reader = csv.reader(csv_obj) + # skip the header + next(csv_dict_reader) + for row in csv_dict_reader: + if collection_raw_csv: + arch = "" + if "arm64" in str(path): + arch = "arm64" + elif "amd64" in str(path): + arch = "amd64" + append_row_to_raw_collection(collection_raw_csv, copy.deepcopy(row), arch) + + metric: Metric = Metric(*row) + if metric.xfail == "True": + print(f"test {metric.node_id} marked as xfail") + continue + if collect_for_arch and collect_for_arch not in str(path): + continue + + service = recorded[metric.service] + ops = service[metric.operation] + + errors = ops.setdefault("errors", {}) + if metric.exception: + exception = metric.exception + errors[exception] = ops.get(exception, 0) + 1 + elif int(metric.response_code) >= 300: + for expected_error in ops.get("errors", {}).keys(): + if expected_error in metric.response_data: + # assume we have a match + errors[expected_error] += 1 + LOG.warning( + f"Exception assumed for {metric.service}.{metric.operation}: code {metric.response_code}" + ) + break + + ops["invoked"] += 1 + if not metric.parameters: + params = ops.setdefault("parameters", {}) + params["_none_"] = params.get("_none_", 0) + 1 + else: + for p in metric.parameters.split(","): + ops["parameters"][p] += 1 + + test_list = ops.setdefault("tests", []) + if metric.node_id not in test_list: + test_list.append(metric.node_id) + + return recorded + + +def main(): + if not len(sys.argv) >= 2 or not Path(sys.argv[1]).is_dir(): + print_usage() + return + + base_dir = sys.argv[1] + collect_for_arch = "" + if len(sys.argv) == 3: + collect_for_arch = sys.argv[2] + if collect_for_arch not in ("amd64", "arm64"): + print_usage() + return + print( + f"Set target to '{collect_for_arch}' - will only aggregate for these test results. Raw collection of all files.\n" + ) + + # TODO: removed splitting of internal/external recorded calls, as some pro tests use 'internals' to connect to service + + metrics_path = os.path.join(base_dir, "metrics") + Path(metrics_path).mkdir(parents=True, exist_ok=True) + dtime = datetime.datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%s") + + collection_raw_csv = os.path.join(metrics_path, f"raw-collected-data-{dtime}.csv") + + with open(collection_raw_csv, "w") as fd: + writer = csv.writer(fd) + header = Metric.RAW_DATA_HEADER.copy() + header.append("arch") + writer.writerow(header) + + recorded_metrics = aggregate_recorded_raw_data(base_dir, collection_raw_csv, collect_for_arch) + + write_json( + os.path.join( + metrics_path, + f"metric-report-{dtime}{collect_for_arch}.json", + ), + recorded_metrics, + ) + + filename = os.path.join(metrics_path, f"metric-report-{dtime}{collect_for_arch}.md") + create_readable_report(filename, recorded_metrics) + + +if __name__ == "__main__": + main() diff --git a/setup.cfg b/setup.cfg index 5dc41fa78ea25..294f2aa8bf734 100644 --- a/setup.cfg +++ b/setup.cfg @@ -119,3 +119,5 @@ dev = pandoc pypandoc autoflake + networkx>=2.8.4 + rstr>=3.2.0 diff --git a/tests/integration/awslambda/test_lambda_size_lims.py b/tests/integration/awslambda/test_lambda_size_lims.py index 9b71e1ce66b10..e16ee727ab14e 100644 --- a/tests/integration/awslambda/test_lambda_size_lims.py +++ b/tests/integration/awslambda/test_lambda_size_lims.py @@ -3,8 +3,7 @@ import pytest -from localstack.aws.accounts import get_aws_account_id -from localstack.services.awslambda.lambda_api import LAMBDA_DEFAULT_HANDLER, LAMBDA_TEST_ROLE +from localstack.services.awslambda.lambda_api import LAMBDA_DEFAULT_HANDLER from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON37 from localstack.utils import testutil from localstack.utils.common import short_uid @@ -26,7 +25,8 @@ def generate_sized_python_str(size): class TestLambdaSizeLimits: - def test_oversized_lambda(self, lambda_client, s3_client, s3_bucket): + @pytest.mark.aws_validated + def test_oversized_lambda(self, lambda_client, s3_client, s3_bucket, lambda_su_role): function_name = f"test_lambda_{short_uid()}" bucket_key = "test_lambda.zip" code_str = generate_sized_python_str(FUNCTION_MAX_UNZIPPED_SIZE) @@ -43,7 +43,7 @@ def test_oversized_lambda(self, lambda_client, s3_client, s3_bucket): FunctionName=function_name, Runtime=LAMBDA_RUNTIME_PYTHON37, Handler=LAMBDA_DEFAULT_HANDLER, - Role=LAMBDA_TEST_ROLE.format(account_id=get_aws_account_id()), + Role=lambda_su_role, Code={"S3Bucket": s3_bucket, "S3Key": bucket_key}, Timeout=10, ) @@ -51,7 +51,8 @@ def test_oversized_lambda(self, lambda_client, s3_client, s3_bucket): r"An error occurred \(InvalidParameterValueException\) when calling the CreateFunction operation\: Unzipped size must be smaller than [0-9]* bytes" ) - def test_large_lambda(self, lambda_client, s3_client, s3_bucket): + @pytest.mark.aws_validated + def test_large_lambda(self, lambda_client, s3_client, s3_bucket, lambda_su_role): function_name = f"test_lambda_{short_uid()}" bucket_key = "test_lambda.zip" code_str = generate_sized_python_str(FUNCTION_MAX_UNZIPPED_SIZE - 1000) @@ -63,20 +64,17 @@ def test_large_lambda(self, lambda_client, s3_client, s3_bucket): s3_client.upload_fileobj(BytesIO(zip_file), s3_bucket, bucket_key) # create lambda function - result = lambda_client.create_function( - FunctionName=function_name, - Runtime=LAMBDA_RUNTIME_PYTHON37, - Handler=LAMBDA_DEFAULT_HANDLER, - Role=LAMBDA_TEST_ROLE, - Code={"S3Bucket": s3_bucket, "S3Key": bucket_key}, - Timeout=10, - ) - - function_arn = result["FunctionArn"] - assert testutil.response_arn_matches_partition(lambda_client, function_arn) + try: + result = lambda_client.create_function( + FunctionName=function_name, + Runtime=LAMBDA_RUNTIME_PYTHON37, + Handler=LAMBDA_DEFAULT_HANDLER, + Role=lambda_su_role, + Code={"S3Bucket": s3_bucket, "S3Key": bucket_key}, + Timeout=10, + ) - # clean up - lambda_client.delete_function(FunctionName=function_name) - with pytest.raises(Exception) as exc: + function_arn = result["FunctionArn"] + assert testutil.response_arn_matches_partition(lambda_client, function_arn) + finally: lambda_client.delete_function(FunctionName=function_name) - exc.match("ResourceNotFoundException") diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 40be53e6ecf6c..c94461ed81f96 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -4,6 +4,7 @@ It is thread/process safe to run with pytest-parallel, however not for pytest-xdist. """ + import logging import multiprocessing as mp import os @@ -31,6 +32,9 @@ # collection of functions that should be executed to initialize tests test_init_functions = set() +if config.is_collect_metrics_mode(): + pytest_plugins = "localstack.testing.pytest.metric_collection" + @pytest.hookimpl() def pytest_configure(config):