Skip to content

Commit

Permalink
fix issue with sending event from lambda to eventbus (localstack#8844)
Browse files Browse the repository at this point in the history
  • Loading branch information
steffyP authored Aug 8, 2023
1 parent 3fae602 commit 570f085
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 18 deletions.
14 changes: 9 additions & 5 deletions localstack/utils/aws/message_forwarding.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ def send_event_to_target(
queue_url = get_sqs_queue_url(target_arn)
msg_group_id = collections.get_safe(target_attributes, "$.SqsParameters.MessageGroupId")
kwargs = {"MessageGroupId": msg_group_id} if msg_group_id else {}
sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(event), **kwargs)
sqs_client.send_message(
QueueUrl=queue_url, MessageBody=json.dumps(event, separators=(",", ":")), **kwargs
)

elif ":states:" in target_arn:
stepfunctions_client = connect_to(region_name=region).stepfunctions
Expand All @@ -96,14 +98,16 @@ def send_event_to_target(
service_principal=source_service, source_arn=source_arn
)
eventbus_name = target_arn.split(":")[-1].split("/")[-1]
detail = event.get("detail") or event
resources = event.get("resources") or [source_arn] if source_arn else []
events_client.put_events(
Entries=[
{
"EventBusName": eventbus_name,
"Source": event.get("source"),
"DetailType": event.get("detail-type"),
"Detail": json.dumps(event.get("detail", {})),
"Resources": event.get("resources", []),
"Source": event.get("source", source_service) or "",
"DetailType": event.get("detail-type", ""),
"Detail": json.dumps(detail),
"Resources": resources,
}
]
)
Expand Down
174 changes: 161 additions & 13 deletions tests/aws/awslambda/test_lambda_destinations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,32 @@
import json
import os
import time

from typing import TYPE_CHECKING

import aws_cdk as cdk
import aws_cdk.aws_events as events
import aws_cdk.aws_events_targets as targets
import aws_cdk.aws_lambda as awslambda
import aws_cdk.aws_lambda_destinations as destinations
import aws_cdk.aws_sqs as sqs
import pytest
from aws_cdk.aws_events import EventPattern, Rule, RuleTargetInput
from aws_cdk.aws_lambda_event_sources import SqsEventSource

from localstack import config
from localstack.aws.api.lambda_ import Runtime
from localstack.testing.aws.lambda_utils import is_old_provider
from localstack.testing.aws.util import is_aws_cloud
from localstack.testing.pytest import markers
from localstack.testing.scenario.provisioning import InfraProvisioner
from localstack.utils.strings import short_uid, to_bytes, to_str
from localstack.utils.sync import retry, wait_until
from tests.aws.awslambda.functions import lambda_integration
from tests.aws.awslambda.test_lambda import TEST_LAMBDA_PYTHON

if TYPE_CHECKING:
from mypy_boto3_s3 import CloudWatchLogsClient


class TestLambdaDLQ:
@markers.snapshot.skip_snapshot_verify(paths=["$..DeadLetterConfig", "$..result"])
Expand Down Expand Up @@ -93,6 +106,20 @@ def receive_dlq():
)


def wait_until_log_group_exists(fn_name: str, logs_client: "CloudWatchLogsClient"):
def log_group_exists():
return (
len(
logs_client.describe_log_groups(logGroupNamePrefix=f"/aws/lambda/{fn_name}")[
"logGroups"
]
)
== 1
)

wait_until(log_group_exists, max_retries=30 if is_aws_cloud() else 10)


class TestLambdaDestinationSqs:
@markers.snapshot.skip_snapshot_verify(
condition=is_old_provider,
Expand Down Expand Up @@ -398,17 +425,8 @@ def test_maxeventage(
)

# wait for log group to exist
def log_group_exists():
return (
len(
aws_client.logs.describe_log_groups(
logGroupNamePrefix=f"/aws/lambda/{fn_name}"
)["logGroups"]
)
== 1
)

wait_until(log_group_exists)
wait_until_log_group_exists(fn_name, aws_client.logs)

def get_filtered_event_count() -> int:
filter_result = retry(
Expand Down Expand Up @@ -476,5 +494,135 @@ def _assert_event_count(count: int):
# ... # TODO
#
#
# class TestLambdaDestinationEventbridge:
# ... # TODO
class TestLambdaDestinationEventbridge:
EVENT_BRIDGE_STACK = "EventbridgeStack"
INPUT_FUNCTION_NAME = "InputFunc"
TRIGGERED_FUNCTION_NAME = "TriggeredFunc"
TEST_QUEUE_NAME = "TestQueueName"

INPUT_LAMBDA_CODE = """
def handler(event, context):
return {
"hello": "world",
"test": "abc",
"val": 5,
"success": True
}
"""
TRIGGERED_LAMBDA_CODE = """
import json
def handler(event, context):
print(json.dumps(event))
return {"invocation": True}
"""

@pytest.fixture(scope="class", autouse=True)
def infrastructure(self, aws_client):
infra = InfraProvisioner(aws_client)
input_fn_name = f"input-fn-{short_uid()}"
triggered_fn_name = f"triggered-fn-{short_uid()}"
app = cdk.App()

# setup a stack with two lambdas:
# - input-lambda will be invoked manually
# - its output is written to SQS queue by using an EventBridge
# - triggered lambda invoked by SQS event source
stack = cdk.Stack(app, self.EVENT_BRIDGE_STACK)
event_bus = events.EventBus(
stack, "MortgageQuotesEventBus", event_bus_name="MortgageQuotesEventBus"
)

test_queue = sqs.Queue(
stack,
"TestQueue",
retention_period=cdk.Duration.minutes(5),
removal_policy=cdk.RemovalPolicy.DESTROY,
)

message_filter_rule = Rule(
stack,
"EmptyFilterRule",
event_bus=event_bus,
rule_name="CustomRule",
event_pattern=EventPattern(version=["0"]),
)

message_filter_rule.add_target(
targets.SqsQueue(
queue=test_queue,
message=RuleTargetInput.from_event_path("$.detail.responsePayload"),
)
)

input_func = awslambda.Function(
stack,
"InputLambda",
runtime=awslambda.Runtime.PYTHON_3_10,
handler="index.handler",
code=awslambda.InlineCode(code=self.INPUT_LAMBDA_CODE),
function_name=input_fn_name,
on_success=destinations.EventBridgeDestination(event_bus=event_bus),
)

triggered_func = awslambda.Function(
stack,
"TriggeredLambda",
runtime=awslambda.Runtime.PYTHON_3_10,
code=awslambda.InlineCode(code=self.TRIGGERED_LAMBDA_CODE),
handler="index.handler",
function_name=triggered_fn_name,
)

triggered_func.add_event_source(SqsEventSource(test_queue, batch_size=10))

cdk.CfnOutput(stack, self.INPUT_FUNCTION_NAME, value=input_func.function_name)
cdk.CfnOutput(stack, self.TRIGGERED_FUNCTION_NAME, value=triggered_func.function_name)
cdk.CfnOutput(stack, self.TEST_QUEUE_NAME, value=test_queue.queue_name)
infra.add_cdk_stack(stack)

with infra.provisioner(skip_teardown=False) as prov:
yield prov

@markers.aws.validated
@markers.snapshot.skip_snapshot_verify(paths=["$..AWSTraceHeader", "$..SenderId"])
def test_invoke_lambda_eventbridge(self, infrastructure, aws_client, snapshot):
outputs = infrastructure.get_stack_outputs(self.EVENT_BRIDGE_STACK)
input_fn_name = outputs.get(self.INPUT_FUNCTION_NAME)
triggered_fn_name = outputs.get(self.TRIGGERED_FUNCTION_NAME)
test_queue_name = outputs.get(self.TEST_QUEUE_NAME)

snapshot.add_transformer(snapshot.transform.key_value("messageId"))
snapshot.add_transformer(snapshot.transform.key_value("receiptHandle"))
snapshot.add_transformer(
snapshot.transform.key_value("SenderId"), priority=2
) # TODO currently on LS sender-id == account-id -> replaces part of the eventSourceARN without the priority
snapshot.add_transformer(
snapshot.transform.key_value(
"AWSTraceHeader", "trace-header", reference_replacement=False
)
)
snapshot.add_transformer(
snapshot.transform.key_value("md5OfBody", reference_replacement=False)
)
snapshot.add_transformer(snapshot.transform.regex(test_queue_name, "TestQueue"))

aws_client.awslambda.invoke(
FunctionName=input_fn_name,
Payload=b"{}",
InvocationType="Event", # important, otherwise destinations won't be triggered
)
# wait until triggered lambda was invoked
wait_until_log_group_exists(triggered_fn_name, aws_client.logs)

def _filter_message_triggered():
log_events = aws_client.logs.filter_log_events(
logGroupName=f"/aws/lambda/{triggered_fn_name}"
)["events"]
filtered_logs = [event for event in log_events if event["message"].startswith("{")]
assert len(filtered_logs) >= 1
filtered_logs.sort(key=lambda e: e["timestamp"], reverse=True)
return filtered_logs[0]

log = retry(_filter_message_triggered, retries=50 if is_aws_cloud() else 10)
snapshot.match("filtered_message_event_bus_sqs", log["message"])
26 changes: 26 additions & 0 deletions tests/aws/awslambda/test_lambda_destinations.snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -591,5 +591,31 @@
}
}
}
},
"tests/aws/awslambda/test_lambda_destinations.py::TestLambdaDestinationEventbridge::test_invoke_lambda_eventbridge": {
"recorded-date": "07-08-2023, 17:39:40",
"recorded-content": {
"filtered_message_event_bus_sqs": {
"Records": [
{
"messageId": "<message-id:1>",
"receiptHandle": "<receipt-handle:1>",
"body": "{\"hello\":\"world\",\"test\":\"abc\",\"val\":5,\"success\":true}",
"attributes": {
"ApproximateReceiveCount": "1",
"AWSTraceHeader": "trace-header",
"SentTimestamp": "timestamp",
"SenderId": "<sender-id:1>",
"ApproximateFirstReceiveTimestamp": "timestamp"
},
"messageAttributes": {},
"md5OfBody": "md5-of-body",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:<region>:111111111111:TestQueue",
"awsRegion": "<region>"
}
]
}
}
}
}

0 comments on commit 570f085

Please sign in to comment.