diff --git a/Makefile b/Makefile
index 7986ef7730179..d34eb4b8f8a95 100644
--- a/Makefile
+++ b/Makefile
@@ -184,7 +184,7 @@ docker-create-push-manifests-light: ## Create and push manifests for the light d
docker-run-tests: ## Initializes the test environment and runs the tests in a docker container
# Remove argparse and dataclasses to fix https://github.com/pytest-dev/pytest/issues/5594
# Note: running "install-test-only" below, to avoid pulling in [runtime] extras from transitive dependencies
- docker run --entrypoint= -v `pwd`/tests/:/opt/code/localstack/tests/ -v `pwd`/target/:/opt/code/localstack/target/ \
+ docker run -e LOCALSTACK_INTERNAL_TEST_COLLECT_METRIC=1 --entrypoint= -v `pwd`/tests/:/opt/code/localstack/tests/ -v `pwd`/target/:/opt/code/localstack/target/ \
bash -c "make install-test-only && make init-testlibs && pip uninstall -y argparse dataclasses && DEBUG=$(DEBUG) LAMBDA_EXECUTOR=local PYTEST_LOGLEVEL=debug PYTEST_ARGS='$(PYTEST_ARGS)' COVERAGE_FILE='$(COVERAGE_FILE)' TEST_PATH='$(TEST_PATH)' make test-coverage"
@@ -207,6 +207,7 @@ test: ## Run automated tests
test-coverage: ## Run automated tests and create coverage report
($(VENV_RUN); python -m coverage --version; \
python -m coverage run $(COVERAGE_ARGS) -m \
pytest --durations=10 --log-cli-level=$(PYTEST_LOGLEVEL) -s $(PYTEST_ARGS) $(TEST_PATH))
diff --git a/localstack/aws/app.py b/localstack/aws/app.py
index 100ab12fe75bc..8a3e6a207481d 100644
--- a/localstack/aws/app.py
+++ b/localstack/aws/app.py
@@ -1,6 +1,7 @@
import logging
from localstack.aws import handlers
+from localstack.aws.handlers.metric_handler import MetricHandler
from localstack.aws.handlers.service_plugin import ServiceLoader
from localstack.services.plugins import SERVICE_PLUGINS, ServiceManager, ServicePluginManager
@@ -21,10 +22,12 @@ def __init__(self, service_manager: ServiceManager = None) -> None:
# lazy-loads services into the router
load_service = ServiceLoader(self.service_manager, self.service_request_router)
+ metric_collector = MetricHandler()
# the main request handler chain
+ metric_collector.create_metric_handler_item,
handlers.parse_service_name, # enforce_cors and content_decoder depend on the service name
@@ -36,6 +39,7 @@ def __init__(self, service_manager: ServiceManager = None) -> None:
+ metric_collector.record_parsed_request,
load_service, # once we have the service request we can make sure we load the service
self.service_request_router, # once we know the service is loaded we can route the request
@@ -62,6 +66,7 @@ def __init__(self, service_manager: ServiceManager = None) -> None:
+ metric_collector.update_metric_collection,
diff --git a/localstack/aws/handlers/metric_handler.py b/localstack/aws/handlers/metric_handler.py
new file mode 100644
index 0000000000000..302d0196645bb
--- /dev/null
+++ b/localstack/aws/handlers/metric_handler.py
@@ -0,0 +1,173 @@
+import copy
+import logging
+from typing import List, Optional
+from localstack import config
+from localstack.aws.api import RequestContext, ServiceRequest
+from localstack.aws.chain import HandlerChain
+from localstack.http import Response
+from localstack.utils.aws.aws_stack import is_internal_call_context
+LOG = logging.getLogger(__name__)
+class MetricHandlerItem:
+ """
+ MetricHandlerItem to reference and update requests by the MetricHandler
+ """
+ request_id: str
+ request_context: RequestContext
+ request_after_parse: Optional[ServiceRequest]
+ def __init__(self, request_contex: RequestContext) -> None:
+ super().__init__()
+ self.request_id = str(hash(request_contex))
+ self.request_context = request_contex
+ self.request_after_parse = None
+class Metric:
+ """
+ Data object to store relevant information for a metric entry in the raw-data collection (csv)
+ """
+ service: str
+ operation: str
+ headers: str
+ parameters: str
+ status_code: int
+ response_code: Optional[str]
+ exception: str
+ origin: str
+ xfail: bool
+ aws_validated: bool
+ snapshot: bool
+ node_id: str
+ "service",
+ "operation",
+ "request_headers",
+ "parameters",
+ "response_code",
+ "response_data",
+ "exception",
+ "origin",
+ "test_node_id",
+ "xfail",
+ "aws_validated",
+ "snapshot",
+ ]
+ def __init__(
+ self,
+ service: str,
+ operation: str,
+ headers: str,
+ parameters: str,
+ response_code: int,
+ response_data: str,
+ exception: str,
+ origin: str,
+ node_id: str = "",
+ xfail: bool = False,
+ aws_validated: bool = False,
+ snapshot: bool = False,
+ ) -> None:
+ self.service = service
+ self.operation = operation
+ self.headers = headers
+ self.parameters = parameters
+ self.response_code = response_code
+ self.response_data = response_data
+ self.exception = exception
+ self.origin = origin
+ self.node_id = node_id
+ self.xfail = xfail
+ self.aws_validated = aws_validated
+ self.snapshot = snapshot
+ def __iter__(self):
+ return iter(
+ [
+ self.service,
+ self.operation,
+ self.headers,
+ self.parameters,
+ self.response_code,
+ self.response_data,
+ self.exception,
+ self.origin,
+ self.node_id,
+ self.xfail,
+ self.aws_validated,
+ self.snapshot,
+ ]
+ )
+class MetricHandler:
+ metric_data: List[Metric] = []
+ def __init__(self) -> None:
+ self.metrics_handler_items = {}
+ def create_metric_handler_item(
+ self, chain: HandlerChain, context: RequestContext, response: Response
+ ):
+ if not config.is_collect_metrics_mode():
+ return
+ item = MetricHandlerItem(context)
+ self.metrics_handler_items[context] = item
+ def _get_metric_handler_item_for_context(self, context: RequestContext) -> MetricHandlerItem:
+ return self.metrics_handler_items[context]
+ def record_parsed_request(
+ self, chain: HandlerChain, context: RequestContext, response: Response
+ ):
+ if not config.is_collect_metrics_mode():
+ return
+ item = self._get_metric_handler_item_for_context(context)
+ item.request_after_parse = copy.deepcopy(context.service_request)
+ def record_exception(
+ self, chain: HandlerChain, exception: Exception, context: RequestContext, response: Response
+ ):
+ if not config.is_collect_metrics_mode():
+ return
+ item = self._get_metric_handler_item_for_context(context)
+ item.caught_exception_name = exception.__class__.__name__
+ def update_metric_collection(
+ self, chain: HandlerChain, context: RequestContext, response: Response
+ ):
+ if not config.is_collect_metrics_mode() or not context.service_operation:
+ return
+ is_internal = is_internal_call_context(context.request.headers)
+ item = self._get_metric_handler_item_for_context(context)
+ # parameters might get changed when dispatched to the service - we use the params stored in request_after_parse
+ parameters = ",".join(item.request_after_parse or "")
+ response_data = response.data.decode("utf-8") if response.status_code >= 300 else ""
+ MetricHandler.metric_data.append(
+ Metric(
+ service=context.service_operation.service,
+ operation=context.service_operation.operation,
+ headers=context.request.headers,
+ parameters=parameters,
+ response_code=response.status_code,
+ response_data=response_data,
+ exception=context.service_exception.__class__.__name__
+ if context.service_exception
+ else "",
+ origin="internal" if is_internal else "external",
+ )
+ )
+ # cleanup
+ del self.metrics_handler_items[context]
diff --git a/localstack/aws/mocking.py b/localstack/aws/mocking.py
new file mode 100644
index 0000000000000..3b6e1609ecd0c
--- /dev/null
+++ b/localstack/aws/mocking.py
@@ -0,0 +1,409 @@
+import logging
+import math
+import random
+import re
+from datetime import date, datetime
+from functools import lru_cache, singledispatch
+from typing import Dict, List, Optional, Set, Tuple, Union, cast
+import botocore
+import networkx
+import rstr
+from botocore.model import ListShape, MapShape, OperationModel, Shape, StringShape, StructureShape
+from localstack.aws.api import RequestContext, ServiceRequest, ServiceResponse
+from localstack.aws.skeleton import DispatchTable, ServiceRequestDispatcher, Skeleton
+from localstack.aws.spec import load_service
+from localstack.utils.sync import retry
+LOG = logging.getLogger(__name__)
+types = {
+ "timestamp",
+ "string",
+ "blob",
+ "map",
+ "list",
+ "long",
+ "structure",
+ "integer",
+ "double",
+ "float",
+ "boolean",
+Instance = Union[
+ Dict[str, "Instance"],
+ List["Instance"],
+ str,
+ bytes,
+ map,
+ list,
+ float,
+ int,
+ bool,
+ date,
+# https://github.com/boto/botocore/issues/2623
+words = [
+ # a few snazzy six-letter words
+ "snazzy",
+ "mohawk",
+ "poncho",
+ "proton",
+ "foobar",
+ "python",
+ "umlaut",
+ "except",
+ "global",
+ "latest",
+class ShapeGraph(networkx.DiGraph):
+ root: Union[ListShape, StructureShape, MapShape]
+ cycle: List[Tuple[str, str]]
+ cycle_shapes: List[str]
+def populate_graph(graph: networkx.DiGraph, root: Shape):
+ stack: List[Shape] = [root]
+ visited: Set[str] = set()
+ while stack:
+ cur = stack.pop()
+ if cur is None:
+ continue
+ if cur.name in visited:
+ continue
+ visited.add(cur.name)
+ graph.add_node(cur.name, shape=cur)
+ if isinstance(cur, ListShape):
+ graph.add_edge(cur.name, cur.member.name)
+ stack.append(cur.member)
+ elif isinstance(cur, StructureShape):
+ for member in cur.members.values():
+ stack.append(member)
+ graph.add_edge(cur.name, member.name)
+ elif isinstance(cur, MapShape):
+ stack.append(cur.key)
+ stack.append(cur.value)
+ graph.add_edge(cur.name, cur.key.name)
+ graph.add_edge(cur.name, cur.value.name)
+ else: # leaf types (int, string, bool, ...)
+ pass
+def shape_graph(root: Shape) -> ShapeGraph:
+ graph = networkx.DiGraph()
+ graph.root = root
+ populate_graph(graph, root)
+ cycles = list()
+ shapes = set()
+ for node in graph.nodes:
+ try:
+ cycle = networkx.find_cycle(graph, source=node)
+ for k, v in cycle:
+ shapes.add(k)
+ shapes.add(v)
+ if cycle not in cycles:
+ cycles.append(cycle)
+ except networkx.NetworkXNoCycle:
+ pass
+ graph.cycles = cycles
+ graph.cycle_shapes = list(shapes)
+ return cast(ShapeGraph, graph)
+def sanitize_pattern(pattern: str) -> str:
+ pattern = pattern.replace("\\p{XDigit}", "[A-Fa-f0-9]")
+ pattern = pattern.replace("\\p{P}", "[.,;]")
+ pattern = pattern.replace("\\p{Punct}", "[.,;]")
+ pattern = pattern.replace("\\p{N}", "[0-9]")
+ pattern = pattern.replace("\\p{L}", "[A-Z]")
+ pattern = pattern.replace("\\p{LD}", "[A-Z]")
+ pattern = pattern.replace("\\p{Z}", "[ ]")
+ pattern = pattern.replace("\\p{S}", "[+\\u-*]")
+ pattern = pattern.replace("\\p{M}", "[`]")
+ pattern = pattern.replace("\\p{IsLetter}", "[a-zA-Z]")
+ pattern = pattern.replace("[:alnum:]", "[a-zA-Z0-9]")
+ return pattern
+def sanitize_arn_pattern(pattern: str) -> str:
+ # clown emoji
+ # some devs were just lazy ...
+ if pattern in [
+ ".*",
+ "arn:.*",
+ "arn:.+",
+ "^arn:.+",
+ "arn:aws.*:*",
+ "^arn:aws.*",
+ "^arn:.*",
+ ".*\\S.*",
+ "^[A-Za-z0-9:\\/_-]*$",
+ "^arn[\\/\\:\\-\\_\\.a-zA-Z0-9]+$",
+ ".{0,1600}",
+ "^arn:[!-~]+$",
+ "[\\S]+",
+ "[\\s\\S]*",
+ "^([\\p{L}\\p{Z}\\p{N}_.:/=+\\-@]*)$",
+ "[a-zA-Z0-9_:\\-\\/]+",
+ ]:
+ pattern = "arn:aws:[a-z]{4}:us-east-1:[0-9]{12}:[a-z]{8}"
+ # common pattern to describe a partition
+ pattern = pattern.replace("arn:[^:]*:", "arn:aws:")
+ pattern = pattern.replace("arn:[a-z\\d-]+", "arn:aws")
+ pattern = pattern.replace("arn:[\\w+=\\/,.@-]+", "arn:aws")
+ pattern = pattern.replace("arn:[a-z-]+?", "arn:aws")
+ pattern = pattern.replace("arn:[a-z0-9][-.a-z0-9]{0,62}", "arn:aws")
+ pattern = pattern.replace(":aws(-\\w+)*", ":aws")
+ pattern = pattern.replace(":aws[a-z\\-]*", ":aws")
+ pattern = pattern.replace(":aws(-[\\w]+)*", ":aws")
+ pattern = pattern.replace(":aws[^:\\s]*", ":aws")
+ pattern = pattern.replace(":aws[A-Za-z0-9-]{0,64}", ":aws")
+ # often the account-id
+ pattern = pattern.replace(":[0-9]+:", ":[0-9]{13}:")
+ pattern = pattern.replace(":\\w{12}:", ":[0-9]{13}:")
+ # substitutions
+ pattern = pattern.replace("[a-z\\-\\d]", "[a-z0-9]")
+ pattern = pattern.replace(
+ "[\\u0020-\\uD7FF\\uE000-\\uFFFD\\uD800\\uDC00-\\uDBFF\\uDFFF\\r\\n\\t]", "[a-z0-9]"
+ )
+ pattern = pattern.replace("[\\w\\d-]", "[a-z0-9]")
+ pattern = pattern.replace("[\\w+=/,.@-]", "[a-z]")
+ pattern = pattern.replace("[^:]", "[a-z]")
+ pattern = pattern.replace("[^/]", "[a-z]")
+ pattern = pattern.replace("\\d+", "[0-9]+")
+ pattern = pattern.replace("\\d*", "[0-9]*")
+ pattern = pattern.replace("\\S+", "[a-z]{4}")
+ pattern = pattern.replace("\\d]", "0-9]")
+ pattern = pattern.replace("[a-z\\d", "[a-z0-9")
+ pattern = pattern.replace("[a-zA-Z\\d", "[a-z0-9")
+ pattern = pattern.replace("^$|", "")
+ pattern = pattern.replace("(^$)|", "")
+ pattern = pattern.replace("[:/]", "[a-z]")
+ pattern = pattern.replace("/.{", "/[a-z]{")
+ pattern = pattern.replace(".{", "[a-z]{")
+ pattern = pattern.replace("-*", "-")
+ pattern = pattern.replace("\\n", "")
+ pattern = pattern.replace("\\r", "")
+ # quantifiers
+ pattern = pattern.replace("{11}{0,1011}", "{11}")
+ pattern = pattern.replace("}+", "}")
+ pattern = pattern.replace("]*", "]{6}")
+ pattern = pattern.replace("]+", "]{6}")
+ pattern = pattern.replace(".*", "[a-z]{6}")
+ pattern = pattern.replace(".+", "[a-z]{6}")
+ return pattern
+custom_arns = {
+ "DeviceFarmArn": "arn:aws:devicefarm:us-east-1:1234567890123:mydevicefarm",
+ "KmsKeyArn": "arn:aws:kms:us-east-1:1234567890123:key/somekmskeythatisawesome",
+def generate_instance(shape: Shape, graph: ShapeGraph) -> Optional[Instance]:
+ if shape is None:
+ return None
+ raise ValueError("could not generate shape for type %s" % shape.type_name)
+def _(shape: StructureShape, graph: ShapeGraph) -> Dict[str, Instance]:
+ if shape.is_tagged_union:
+ k, v = random.choice(list(shape.members.items()))
+ members = {k: v}
+ else:
+ members = shape.members
+ if shape.name in graph.cycle_shapes:
+ return {}
+ return {
+ name: generate_instance(member_shape, graph)
+ for name, member_shape in members.items()
+ if member_shape.name != shape.name
+ }
+def _(shape: ListShape, graph: ShapeGraph) -> List[Instance]:
+ if shape.name in graph.cycle_shapes:
+ return []
+ return [generate_instance(shape.member, graph) for _ in range(shape.metadata.get("min", 1))]
+def _(shape: MapShape, graph: ShapeGraph) -> Dict[str, Instance]:
+ if shape.name in graph.cycle_shapes:
+ return {}
+ return {generate_instance(shape.key, graph): generate_instance(shape.value, graph)}
+def generate_arn(shape: StringShape):
+ if not shape.metadata:
+ return "arn:aws:ec2:us-east-1:1234567890123:instance/i-abcde0123456789f"
+ def _generate_arn():
+ # some custom hacks
+ if shape.name in custom_arns:
+ return custom_arns[shape.name]
+ max_len = shape.metadata.get("max") or math.inf
+ min_len = shape.metadata.get("min") or 0
+ pattern = shape.metadata.get("pattern")
+ if pattern:
+ # FIXME: also conforming to length may be difficult
+ pattern = sanitize_arn_pattern(pattern)
+ arn = rstr.xeger(pattern)
+ else:
+ arn = "arn:aws:ec2:us-east-1:1234567890123:instance/i-abcde0123456789f"
+ # if there's a value set for the region, replace with a randomly picked region
+ # TODO: splitting the ARNs here by ":" sometimes fails for some reason (e.g. or dynamodb for some reason)
+ arn_parts = arn.split(":")
+ if len(arn_parts) >= 4:
+ region = arn_parts[3]
+ if region:
+ # TODO: check service in ARN and try to get the actual region for the service
+ regions = botocore.session.Session().get_available_regions("lambda")
+ picked_region = random.choice(regions)
+ arn_parts[3] = picked_region
+ arn = ":".join(arn_parts)
+ if len(arn) > max_len:
+ arn = arn[:max_len]
+ if len(arn) < min_len or len(arn) > max_len:
+ raise ValueError(
+ f"generated arn {arn} for shape {shape.name} does not match constraints {shape.metadata}"
+ )
+ return arn
+ return retry(_generate_arn, retries=10, sleep_before=0, sleep=0)
+custom_strings = {"DailyTime": "12:10", "WeeklyTime": "1:12:10"}
+def _(shape: StringShape, graph: ShapeGraph) -> str:
+ if shape.enum:
+ return shape.enum[0]
+ if shape.name in custom_strings:
+ return custom_strings[shape.name]
+ if (
+ shape.name.endswith("ARN")
+ or shape.name.endswith("Arn")
+ or shape.name == "AmazonResourceName"
+ ):
+ return generate_arn(shape)
+ max_len: int = shape.metadata.get("max") or 256
+ min_len: int = shape.metadata.get("min") or 0
+ str_len = min(min_len or 6, max_len)
+ pattern = shape.metadata.get("pattern")
+ if not pattern or pattern in [".*", "^.*$", ".+"]:
+ if min_len <= 6 and max_len >= 6:
+ # pick a random six-letter word, to spice things up. this will be the case most of the time.
+ return random.choice(words)
+ else:
+ return "a" * str_len
+ pattern = sanitize_pattern(pattern)
+ try:
+ # try to return something simple first
+ random_string = "a" * str_len
+ if re.match(pattern, random_string):
+ return random_string
+ val = rstr.xeger(pattern)
+ # TODO: this will break the pattern if the string needs to end with something that we may cut off.
+ return val[: min(max_len, len(val))]
+ except re.error:
+ # TODO: this will likely break the pattern
+ return "0" * str_len
+def _(shape: Shape, graph: ShapeGraph) -> Union[int, float, bool, bytes, date]:
+ if shape.type_name in ["integer", "long"]:
+ return shape.metadata.get("min", 1)
+ if shape.type_name in ["float", "double"]:
+ return shape.metadata.get("min", 1.0)
+ if shape.type_name == "boolean":
+ return True
+ if shape.type_name == "blob":
+ # TODO: better blob generator
+ return b"0" * shape.metadata.get("min", 1)
+ if shape.type_name == "timestamp":
+ return datetime.now()
+ raise ValueError("unknown type %s" % shape.type_name)
+def is_cyclic_shape(shape: Shape) -> bool:
+ return True if shape_graph(shape).cycle else False
+def generate_response(operation: OperationModel):
+ graph = shape_graph(operation.output_shape)
+ response = generate_instance(graph.root, graph)
+ response.pop("nextToken", None)
+ return response
+def generate_request(operation: OperationModel):
+ graph = shape_graph(operation.input_shape)
+ return generate_instance(graph.root, graph)
+def return_mock_response(context: RequestContext, request: ServiceRequest) -> ServiceResponse:
+ return generate_response(context.operation)
+def create_mocking_dispatch_table(service) -> DispatchTable:
+ dispatch_table = {}
+ for operation in service.operation_names:
+ # resolve the bound function of the delegate
+ # create a dispatcher
+ dispatch_table[operation] = ServiceRequestDispatcher(
+ return_mock_response,
+ operation=operation,
+ pass_context=True,
+ expand_parameters=False,
+ )
+ return dispatch_table
+def get_mocking_skeleton(service: str) -> Skeleton:
+ service = load_service(service)
+ return Skeleton(service, create_mocking_dispatch_table(service))
diff --git a/localstack/config.py b/localstack/config.py
index 4a21b0c78978a..ec747d3a4f433 100644
--- a/localstack/config.py
+++ b/localstack/config.py
@@ -16,6 +16,7 @@
@@ -787,6 +788,11 @@ def is_local_test_mode() -> bool:
return is_env_true(ENV_INTERNAL_TEST_RUN)
+def is_collect_metrics_mode() -> bool:
+ """Returns True if metric collection is enabled."""
def collect_config_items() -> List[Tuple[str, Any]]:
"""Returns a list of key-value tuples of LocalStack configuration values."""
none = object() # sentinel object
diff --git a/localstack/constants.py b/localstack/constants.py
index a2a80eb181d66..ac105e12c3aab 100644
--- a/localstack/constants.py
+++ b/localstack/constants.py
@@ -82,6 +82,9 @@
# environment variable name to tag local test runs
+# environment variable name to tag collect metrics during a test run
# environment variable that flags whether pro was activated. do not use for security purposes!
diff --git a/localstack/testing/pytest/metric_collection.py b/localstack/testing/pytest/metric_collection.py
new file mode 100644
index 0000000000000..371a6b3a70902
--- /dev/null
+++ b/localstack/testing/pytest/metric_collection.py
@@ -0,0 +1,63 @@
+import csv
+import os
+import re
+from datetime import datetime
+from pathlib import Path
+from typing import Optional
+import pytest
+from _pytest.main import Session
+from _pytest.nodes import Item
+from localstack.aws.handlers.metric_handler import Metric, MetricHandler
+from localstack.utils.strings import short_uid
+BASE_PATH = os.path.join(os.path.dirname(__file__), "../../../target/metric_reports")
+FNAME_RAW_DATA_CSV = os.path.join(
+ f"metric-report-raw-data-{datetime.utcnow().strftime('%Y-%m-%d__%H_%M_%S')}-{short_uid()}.csv",
+def pytest_sessionstart(session: "Session") -> None:
+ Path(BASE_PATH).mkdir(parents=True, exist_ok=True)
+ pattern = re.compile("--junitxml=(.*)\\.xml")
+ if session.config.invocation_params:
+ for ip in session.config.invocation_params.args:
+ if m := pattern.match(ip):
+ report_file_name = m.groups()[-1].split("/")[-1]
+ FNAME_RAW_DATA_CSV = os.path.join(
+ f"metric-report-raw-data-{datetime.utcnow().strftime('%Y-%m-%d__%H_%M_%S')}-{report_file_name}.csv",
+ )
+ with open(FNAME_RAW_DATA_CSV, "w") as fd:
+ writer = csv.writer(fd)
+ writer.writerow(Metric.RAW_DATA_HEADER)
+def pytest_runtest_teardown(item: "Item", nextitem: Optional["Item"]) -> None:
+ node_id = item.nodeid
+ xfail = False
+ aws_validated = False
+ snapshot = False
+ for _ in item.iter_markers(name="xfail"):
+ xfail = True
+ for _ in item.iter_markers(name="aws_validated"):
+ aws_validated = True
+ if hasattr(item, "fixturenames") and "snapshot" in item.fixturenames:
+ snapshot = True
+ for metric in MetricHandler.metric_data:
+ metric.xfail = xfail
+ metric.aws_validated = aws_validated
+ metric.snapshot = snapshot
+ metric.node_id = node_id
+ with open(FNAME_RAW_DATA_CSV, "a") as fd:
+ writer = csv.writer(fd)
+ writer.writerows(MetricHandler.metric_data)
+ MetricHandler.metric_data.clear()
diff --git a/scripts/.gitignore b/scripts/.gitignore
new file mode 100644
index 0000000000000..343a25cb84882
--- /dev/null
+++ b/scripts/.gitignore
@@ -0,0 +1,2 @@
diff --git a/scripts/capture_notimplemented_responses.py b/scripts/capture_notimplemented_responses.py
new file mode 100644
index 0000000000000..5f27aa37942d5
--- /dev/null
+++ b/scripts/capture_notimplemented_responses.py
@@ -0,0 +1,325 @@
+import csv
+import logging
+from typing import TypedDict
+import botocore.config
+import click
+from botocore.exceptions import (
+ ClientError,
+ ConnectTimeoutError,
+ EndpointConnectionError,
+ ReadTimeoutError,
+from botocore.parsers import ResponseParserError
+from localstack.aws.mocking import generate_request
+from localstack.aws.spec import ServiceCatalog
+from localstack.utils.aws import aws_stack
+service_models = ServiceCatalog()
+# TODO: generate these via a script in PRO
+# generate with e.g. http http://localhost:4566/health | jq ".services | keys[]" | pbcopy
+latest_services_pro = [
+ "acm",
+ "amplify",
+ "apigateway",
+ "apigatewaymanagementapi",
+ "apigatewayv2",
+ "appconfig",
+ "application-autoscaling",
+ "appsync",
+ "athena",
+ "autoscaling",
+ "azure",
+ "backup",
+ "batch",
+ "ce",
+ "cloudformation",
+ "cloudfront",
+ "cloudtrail",
+ "cloudwatch",
+ "codecommit",
+ "cognito-identity",
+ "cognito-idp",
+ "config",
+ "docdb",
+ "dynamodb",
+ "dynamodbstreams",
+ "ec2",
+ "ecr",
+ "ecs",
+ "efs",
+ "eks",
+ "elasticache",
+ "elasticbeanstalk",
+ "elb",
+ "elbv2",
+ "emr",
+ "es",
+ "events",
+ "firehose",
+ "glacier",
+ "glue",
+ "iam",
+ "iot",
+ "iot-data",
+ "iotanalytics",
+ "iotwireless",
+ "kafka",
+ "kinesis",
+ "kinesisanalytics",
+ "kinesisanalyticsv2",
+ "kms",
+ "lakeformation",
+ "lambda",
+ "logs",
+ "mediastore",
+ "mediastore-data",
+ "mwaa",
+ "neptune",
+ "opensearch",
+ "organizations",
+ "qldb",
+ "qldb-session",
+ "rds",
+ "rds-data",
+ "redshift",
+ "redshift-data",
+ "resource-groups",
+ "resourcegroupstaggingapi",
+ "route53",
+ "route53resolver",
+ "s3",
+ "s3control",
+ "sagemaker",
+ "secretsmanager",
+ "serverlessrepo",
+ "servicediscovery",
+ "ses",
+ "sesv2",
+ "sns",
+ "sqs",
+ "ssm",
+ "stepfunctions",
+ "sts",
+ "support",
+ "swf",
+ "timestream-query",
+ "timestream-write",
+ "transfer",
+ "xray",
+exclude_services = {"azure"}
+latest_services_pro = [s for s in latest_services_pro if s not in exclude_services]
+class RowEntry(TypedDict, total=False):
+ service: str
+ operation: str
+ status_code: int
+ error_code: str
+ error_message: str
+ is_implemented: bool
+def simulate_call(service: str, op: str) -> RowEntry:
+ """generates a mock request based on the service and operation model and sends it to the API"""
+ client = aws_stack.create_external_boto_client(
+ service,
+ config=botocore.config.Config(
+ parameter_validation=False,
+ retries={"max_attempts": 0, "total_max_attempts": 1},
+ connect_timeout=1,
+ read_timeout=1,
+ ),
+ )
+ service_model = service_models.get(service)
+ op_model = service_model.operation_model(op)
+ parameters = generate_request(op_model) # should be generate_parameters I guess
+ result = RowEntry(service=service, operation=op, status_code=0)
+ logging.debug(parameters)
+ try:
+ response = client._make_api_call(op, parameters)
+ result["status_code"] = response["ResponseMetadata"]["HTTPStatusCode"]
+ except ClientError as ce:
+ result["status_code"] = ce.response["ResponseMetadata"]["HTTPStatusCode"]
+ result["error_code"] = ce.response.get("Error", {}).get("Code", "Unknown?")
+ result["error_message"] = ce.response.get("Error", {}).get("Message", "Unknown?")
+ except (ReadTimeoutError, ConnectTimeoutError):
+ logging.warning("Reached timeout. Assuming it is implemented.")
+ result["status_code"] = STATUS_TIMEOUT_ERROR
+ except EndpointConnectionError:
+ # TODO: investigate further;for now assuming not implemented
+ logging.warning("Connection failed. Assuming it is not implemented.")
+ result["status_code"] = STATUS_CONNECTION_ERROR
+ except ResponseParserError:
+ # TODO: this is actually a bit tricky and might have to be handled on a service by service basis again
+ logging.warning("Parsing issue. Assuming it is implemented.")
+ result["status_code"] = STATUS_PARSING_ERROR
+ except Exception as e:
+ logging.exception(e)
+ return result
+def map_to_notimplemented(row: RowEntry) -> bool:
+ """
+ Some simple heuristics to check the API responses and classify them into implemented/notimplemented
+ Ideally they all should behave the same way when receiving requests for not yet implemented endpoints
+ (501 with a "not yet implemented" message)
+ :param row: the RowEntry
+ :return: True if we assume it is not implemented, False otherwise
+ """
+ if row["status_code"] in [STATUS_TIMEOUT_ERROR, STATUS_PARSING_ERROR]:
+ # parsing or timeout issue, interpreted as implemented until there's a better heuristic
+ return False
+ if row["status_code"] == STATUS_CONNECTION_ERROR:
+ # affected services:
+ # lakeformation, GetQueryStat
+ # lakeformation, GetQueryStatistics
+ # lakeformation, GetWorkUnitResults
+ # lakeformation, GetWorkUnits
+ # lakeformation, StartQueryPlanning
+ # servicediscovery, DiscoverInstances
+ # stepfunctions, StartSyncExecution
+ return True
+ if row["service"] == "dynamodb" and row.get("error_code") == "UnknownOperationException":
+ return True
+ if row["service"] == "lambda" and row["status_code"] == 404 and row.get("error_code") == "404":
+ return True
+ if (
+ row["service"] == "apigateway"
+ and row["status_code"] == 404
+ and row.get("error_code") == "404"
+ and row.get("error_message") is not None
+ and "The requested URL was not found on the server" in row.get("error_message", "")
+ ):
+ return True
+ if (
+ row["service"] == "apigatewayv2"
+ and row["status_code"] == 501
+ and row.get("error_message") is not None
+ and "not yet implemented" in row.get("error_message", "")
+ ):
+ return True
+ if row.get("error_message") is not None and "not yet implemented" in row.get(
+ "error_message", ""
+ ):
+ return True
+ if row["status_code"] == 501:
+ return True
+ return False
+def run_script(services: list[str]):
+ """send requests against all APIs"""
+ with (
+ open("implementation_coverage_full.csv", "w") as csvfile,
+ open("implementation_coverage_aggregated.csv", "w") as aggregatefile,
+ ):
+ full_w = csv.DictWriter(
+ csvfile,
+ fieldnames=[
+ "service",
+ "operation",
+ "status_code",
+ "error_code",
+ "error_message",
+ "is_implemented",
+ ],
+ )
+ aggregated_w = csv.DictWriter(
+ aggregatefile,
+ fieldnames=["service", "operation", "implemented_count", "full_count", "percentage"],
+ )
+ full_w.writeheader()
+ aggregated_w.writeheader()
+ responses = {}
+ for service_name in services:
+ service = service_models.get(service_name)
+ for op_name in service.operation_names:
+ # here's the important part (the actual service call!)
+ response = simulate_call(service_name, op_name)
+ responses.setdefault(service_name, {})[op_name] = response
+ is_implemented = str(not map_to_notimplemented(response))
+ full_w.writerow(response | {"is_implemented": is_implemented})
+ # calculate aggregate for service
+ all_count = len(responses[service_name].values())
+ implemented_count = len(
+ [r for r in responses[service_name].values() if not map_to_notimplemented(r)]
+ )
+ implemented_percentage = implemented_count / all_count
+ aggregated_w.writerow(
+ {
+ "service": response["service"],
+ "operation": response["operation"],
+ "implemented_count": implemented_count,
+ "full_count": all_count,
+ "percentage": f"{implemented_percentage * 100:.1f}",
+ }
+ )
+def calculate_percentages():
+ aggregate = {}
+ implemented_aggregate = {}
+ aggregate_list = []
+ with open("./output-notimplemented.csv", "r") as fd:
+ reader = csv.DictReader(fd, fieldnames=["service", "operation", "implemented"])
+ for line in reader:
+ if line["implemented"] == "implemented":
+ continue
+ aggregate.setdefault(line["service"], {}).setdefault(line["operation"], line)
+ for service in aggregate.keys():
+ vals = aggregate[service].values()
+ all_count = len(vals)
+ implemented_count = len([v for v in vals if v["implemented"] == "True"])
+ implemented_aggregate[service] = implemented_count / all_count
+ aggregate_list.append(
+ {
+ "service": service,
+ "count": all_count,
+ "implemented": implemented_count,
+ "percentage": implemented_count / all_count,
+ }
+ )
+ aggregate_list.sort(key=lambda k: k["percentage"])
+ with open("implementation_coverage_aggregated.csv", "w") as csv_fd:
+ writer = csv.DictWriter(
+ csv_fd, fieldnames=["service", "percentage", "implemented", "count"]
+ )
+ writer.writeheader()
+ for agg in aggregate_list:
+ agg["percentage"] = f"{agg['percentage'] * 100:.1f}"
+ writer.writerow(agg)
+def main():
+ run_script(latest_services_pro)
+if __name__ == "__main__":
+ main()
diff --git a/scripts/metric_aggregator.py b/scripts/metric_aggregator.py
new file mode 100644
index 0000000000000..af99f723c9d6b
--- /dev/null
+++ b/scripts/metric_aggregator.py
@@ -0,0 +1,230 @@
+import copy
+import csv
+import datetime
+import json
+import logging
+import os
+import sys
+from pathlib import Path
+from typing import Dict, Optional
+from localstack.aws.handlers.metric_handler import Metric
+from localstack.services.plugins import SERVICE_PLUGINS
+LOG = logging.getLogger(__name__)
+template_implemented_item = "- [X] "
+template_not_implemented_item = "- [ ] "
+def _generate_details_block(details_title: str, details: dict) -> str:
+ output = f" {details_title}
+ for e, count in details.items():
+ if count > 0:
+ output += f" {template_implemented_item}{e}\n"
+ else:
+ output += f" {template_not_implemented_item}{e}\n"
+ output += " \n"
+ return output
+def create_readable_report(file_name: str, metrics: dict):
+ output = "# Metric Collection Report of Integration Tests #\n\n"
+ output += "**__Disclaimer__**: naive calculation of test coverage - if operation is called at least once, it is considered as 'covered'.\n"
+ for service in sorted(metrics.keys()):
+ output += f"## {service} ##\n"
+ details = metrics[service]
+ if not details["service_attributes"]["pro"]:
+ output += "community\n"
+ elif not details["service_attributes"]["community"]:
+ output += "pro only\n"
+ else:
+ output += "community, and pro features\n"
+ del metrics[service]["service_attributes"]
+ operation_counter = len(details)
+ operation_tested = 0
+ tmp = ""
+ for operation in sorted(details.keys()):
+ op_details = details[operation]
+ if op_details.get("invoked", 0) > 0:
+ operation_tested += 1
+ tmp += f"{template_implemented_item}{operation}\n"
+ else:
+ tmp += f"{template_not_implemented_item}{operation}\n"
+ if op_details.get("parameters"):
+ parameters = op_details.get("parameters")
+ if parameters:
+ tmp += _generate_details_block("parameters hit", parameters)
+ if op_details.get("errors"):
+ tmp += _generate_details_block("errors hit", op_details["errors"])
+ output += f"{operation_tested/operation_counter*100:.2f}% test coverage
\n\n{tmp}\n \n"
+ with open(file_name, "a") as fd:
+ fd.write(f"{output}\n")
+ output = ""
+def _init_service_metric_counter() -> Dict:
+ metric_recorder = {}
+ from localstack.aws.spec import load_service
+ for s, provider in SERVICE_PLUGINS.api_provider_specs.items():
+ try:
+ service = load_service(s)
+ ops = {}
+ service_attributes = {"pro": "pro" in provider, "community": "default" in provider}
+ ops["service_attributes"] = service_attributes
+ for op in service.operation_names:
+ attributes = {}
+ attributes["invoked"] = 0
+ if hasattr(service.operation_model(op).input_shape, "members"):
+ params = {}
+ for n in service.operation_model(op).input_shape.members:
+ params[n] = 0
+ attributes["parameters"] = params
+ if hasattr(service.operation_model(op), "error_shapes"):
+ exceptions = {}
+ for e in service.operation_model(op).error_shapes:
+ exceptions[e.name] = 0
+ attributes["errors"] = exceptions
+ ops[op] = attributes
+ metric_recorder[s] = ops
+ except Exception:
+ LOG.debug(f"cannot load service '{s}'")
+ return metric_recorder
+def print_usage():
+ print("missing argument: directory")
+ print("usage: python metric_aggregator.py [amd64|arch64]")
+def write_json(file_name: str, metric_dict: dict):
+ with open(file_name, "w") as fd:
+ fd.write(json.dumps(metric_dict, indent=2, sort_keys=True))
+def _print_diff(metric_recorder_internal, metric_recorder_external):
+ for key, val in metric_recorder_internal.items():
+ for subkey, val in val.items():
+ if isinstance(val, dict) and val.get("invoked"):
+ if val["invoked"] > 0 and not metric_recorder_external[key][subkey]["invoked"]:
+ print(f"found invocation mismatch: {key}.{subkey}")
+def append_row_to_raw_collection(collection_raw_csv_file_name, row, arch):
+ with open(collection_raw_csv_file_name, "a") as fd:
+ writer = csv.writer(fd)
+ row.append(arch)
+ writer.writerow(row)
+def aggregate_recorded_raw_data(
+ base_dir: str, collection_raw_csv: Optional[str] = None, collect_for_arch: Optional[str] = ""
+) -> dict:
+ pathlist = Path(base_dir).rglob("metric-report-raw-data-*.csv")
+ recorded = _init_service_metric_counter()
+ for path in pathlist:
+ print(f"checking {str(path)}")
+ with open(path, "r") as csv_obj:
+ csv_dict_reader = csv.reader(csv_obj)
+ # skip the header
+ next(csv_dict_reader)
+ for row in csv_dict_reader:
+ if collection_raw_csv:
+ arch = ""
+ if "arm64" in str(path):
+ arch = "arm64"
+ elif "amd64" in str(path):
+ arch = "amd64"
+ append_row_to_raw_collection(collection_raw_csv, copy.deepcopy(row), arch)
+ metric: Metric = Metric(*row)
+ if metric.xfail == "True":
+ print(f"test {metric.node_id} marked as xfail")
+ continue
+ if collect_for_arch and collect_for_arch not in str(path):
+ continue
+ service = recorded[metric.service]
+ ops = service[metric.operation]
+ errors = ops.setdefault("errors", {})
+ if metric.exception:
+ exception = metric.exception
+ errors[exception] = ops.get(exception, 0) + 1
+ elif int(metric.response_code) >= 300:
+ for expected_error in ops.get("errors", {}).keys():
+ if expected_error in metric.response_data:
+ # assume we have a match
+ errors[expected_error] += 1
+ LOG.warning(
+ f"Exception assumed for {metric.service}.{metric.operation}: code {metric.response_code}"
+ )
+ break
+ ops["invoked"] += 1
+ if not metric.parameters:
+ params = ops.setdefault("parameters", {})
+ params["_none_"] = params.get("_none_", 0) + 1
+ else:
+ for p in metric.parameters.split(","):
+ ops["parameters"][p] += 1
+ test_list = ops.setdefault("tests", [])
+ if metric.node_id not in test_list:
+ test_list.append(metric.node_id)
+ return recorded
+def main():
+ if not len(sys.argv) >= 2 or not Path(sys.argv[1]).is_dir():
+ print_usage()
+ return
+ base_dir = sys.argv[1]
+ collect_for_arch = ""
+ if len(sys.argv) == 3:
+ collect_for_arch = sys.argv[2]
+ if collect_for_arch not in ("amd64", "arm64"):
+ print_usage()
+ return
+ print(
+ f"Set target to '{collect_for_arch}' - will only aggregate for these test results. Raw collection of all files.\n"
+ )
+ # TODO: removed splitting of internal/external recorded calls, as some pro tests use 'internals' to connect to service
+ metrics_path = os.path.join(base_dir, "metrics")
+ Path(metrics_path).mkdir(parents=True, exist_ok=True)
+ dtime = datetime.datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%s")
+ collection_raw_csv = os.path.join(metrics_path, f"raw-collected-data-{dtime}.csv")
+ with open(collection_raw_csv, "w") as fd:
+ writer = csv.writer(fd)
+ header = Metric.RAW_DATA_HEADER.copy()
+ header.append("arch")
+ writer.writerow(header)
+ recorded_metrics = aggregate_recorded_raw_data(base_dir, collection_raw_csv, collect_for_arch)
+ write_json(
+ os.path.join(
+ metrics_path,
+ f"metric-report-{dtime}{collect_for_arch}.json",
+ ),
+ recorded_metrics,
+ )
+ filename = os.path.join(metrics_path, f"metric-report-{dtime}{collect_for_arch}.md")
+ create_readable_report(filename, recorded_metrics)
+if __name__ == "__main__":
+ main()
diff --git a/setup.cfg b/setup.cfg
index 5dc41fa78ea25..294f2aa8bf734 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -119,3 +119,5 @@ dev =
+ networkx>=2.8.4
+ rstr>=3.2.0
diff --git a/tests/integration/awslambda/test_lambda_size_lims.py b/tests/integration/awslambda/test_lambda_size_lims.py
index 9b71e1ce66b10..e16ee727ab14e 100644
--- a/tests/integration/awslambda/test_lambda_size_lims.py
+++ b/tests/integration/awslambda/test_lambda_size_lims.py
@@ -3,8 +3,7 @@
import pytest
-from localstack.aws.accounts import get_aws_account_id
-from localstack.services.awslambda.lambda_api import LAMBDA_DEFAULT_HANDLER, LAMBDA_TEST_ROLE
+from localstack.services.awslambda.lambda_api import LAMBDA_DEFAULT_HANDLER
from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON37
from localstack.utils import testutil
from localstack.utils.common import short_uid
@@ -26,7 +25,8 @@ def generate_sized_python_str(size):
class TestLambdaSizeLimits:
- def test_oversized_lambda(self, lambda_client, s3_client, s3_bucket):
+ @pytest.mark.aws_validated
+ def test_oversized_lambda(self, lambda_client, s3_client, s3_bucket, lambda_su_role):
function_name = f"test_lambda_{short_uid()}"
bucket_key = "test_lambda.zip"
code_str = generate_sized_python_str(FUNCTION_MAX_UNZIPPED_SIZE)
@@ -43,7 +43,7 @@ def test_oversized_lambda(self, lambda_client, s3_client, s3_bucket):
- Role=LAMBDA_TEST_ROLE.format(account_id=get_aws_account_id()),
+ Role=lambda_su_role,
Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},
@@ -51,7 +51,8 @@ def test_oversized_lambda(self, lambda_client, s3_client, s3_bucket):
r"An error occurred \(InvalidParameterValueException\) when calling the CreateFunction operation\: Unzipped size must be smaller than [0-9]* bytes"
- def test_large_lambda(self, lambda_client, s3_client, s3_bucket):
+ @pytest.mark.aws_validated
+ def test_large_lambda(self, lambda_client, s3_client, s3_bucket, lambda_su_role):
function_name = f"test_lambda_{short_uid()}"
bucket_key = "test_lambda.zip"
code_str = generate_sized_python_str(FUNCTION_MAX_UNZIPPED_SIZE - 1000)
@@ -63,20 +64,17 @@ def test_large_lambda(self, lambda_client, s3_client, s3_bucket):
s3_client.upload_fileobj(BytesIO(zip_file), s3_bucket, bucket_key)
# create lambda function
- result = lambda_client.create_function(
- FunctionName=function_name,
- Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},
- Timeout=10,
- )
- function_arn = result["FunctionArn"]
- assert testutil.response_arn_matches_partition(lambda_client, function_arn)
+ try:
+ result = lambda_client.create_function(
+ FunctionName=function_name,
+ Role=lambda_su_role,
+ Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},
+ Timeout=10,
+ )
- # clean up
- lambda_client.delete_function(FunctionName=function_name)
- with pytest.raises(Exception) as exc:
+ function_arn = result["FunctionArn"]
+ assert testutil.response_arn_matches_partition(lambda_client, function_arn)
+ finally:
- exc.match("ResourceNotFoundException")
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
index 40be53e6ecf6c..c94461ed81f96 100644
--- a/tests/integration/conftest.py
+++ b/tests/integration/conftest.py
@@ -4,6 +4,7 @@
It is thread/process safe to run with pytest-parallel, however not for pytest-xdist.
import logging
import multiprocessing as mp
import os
@@ -31,6 +32,9 @@
# collection of functions that should be executed to initialize tests
test_init_functions = set()
+if config.is_collect_metrics_mode():
+ pytest_plugins = "localstack.testing.pytest.metric_collection"
def pytest_configure(config):