Skip to content

fix: updated amqp instrumentation, adapted attributes and unittests #738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions src/instana/instrumentation/aioamqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,31 @@ async def basic_publish_with_instana(
"aioamqp-publisher", span_context=parent_context
) as span:
try:
span.set_attribute("aioamqp.exchange", argv[0])
return await wrapped(*argv, **kwargs)
span.set_attribute("amqp.message", argv[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find an entry for amqp.message attribute in the backend/ui-client code. Could you please confirm if we need this here?
If yes, can you decode() the message from bytes to str and then store?

span.set_attribute("amqp.command", "publish")
span.set_attribute("amqp.exchange_name", kwargs.get("exchange_name"))
span.set_attribute("amqp.routing_key", kwargs.get("routing_key"))

protocol = getattr(instance, "protocol", None)
transport = getattr(protocol, "_transport", None)
extra = getattr(transport, "_extra", {}) if transport else {}
peername = extra.get("peername")
if (
peername
and isinstance(peername, (list, tuple))
and len(peername) >= 2
):
connection_info = f"{peername[0]}:{peername[1]}"
else:
connection_info = "unknown"
span.set_attribute("amqp.connection", connection_info)

response = await wrapped(*argv, **kwargs)
except Exception as exc:
span.record_exception(exc)
logger.debug(f"aioamqp basic_publish_with_instana error: {exc}")
else:
return response

@wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_consume")
async def basic_consume_with_instana(
Expand Down Expand Up @@ -58,14 +78,32 @@ async def callback_wrapper(
) as span:
try:
span.set_status(StatusCode.OK)
span.set_attribute("aioamqp.callback", callback)
span.set_attribute("aioamqp.message", args[1])
span.set_attribute("aioamqp.exchange_name", args[2].exchange_name)
span.set_attribute("aioamqp.routing_key", args[2].routing_key)
return await wrapped_callback(*args, **kwargs)
span.set_attribute("amqp.callback", callback)
span.set_attribute("amqp.message", args[1])
span.set_attribute("amqp.command", "consume")
span.set_attribute("amqp.exchange_name", args[2].exchange_name)
span.set_attribute("amqp.routing_key", args[2].routing_key)

protocol = getattr(args[0], "protocol", None)
transport = getattr(protocol, "_transport", None)
extra = getattr(transport, "_extra", {}) if transport else {}
peername = extra.get("peername")
if (
peername
and isinstance(peername, (list, tuple))
and len(peername) >= 2
):
connection_info = f"{peername[0]}:{peername[1]}"
else:
connection_info = "unknown"
span.set_attribute("amqp.connection", connection_info)

response = await wrapped_callback(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
logger.debug(f"aioamqp basic_consume_with_instana error: {exc}")
else:
return response

wrapped_callback = callback_wrapper(callback)
argv = (wrapped_callback,) + argv[1:]
Expand Down
2 changes: 2 additions & 0 deletions src/instana/span/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)

ENTRY_SPANS = (
"aioamqp-consumer",
"aiohttp-server",
"aws.lambda.entry",
"celery-worker",
Expand All @@ -35,6 +36,7 @@
)

EXIT_SPANS = (
"aioamqp-publisher",
"aiohttp-client",
"boto3",
"cassandra",
Expand Down
33 changes: 33 additions & 0 deletions src/instana/span/registered_span.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def __init__(
if "kafka" in span.name:
self.n = "kafka"

# unify the span name for aioamqp-producer and aioamqp-consumer
if "amqp" in span.name:
self.n = "amqp"

# Logic to store custom attributes for registered spans (not used yet)
if len(span.attributes) > 0:
self.data["sdk"]["custom"]["tags"] = self._validate_attributes(
Expand All @@ -64,6 +68,21 @@ def _populate_entry_span_data(self, span: "InstanaSpan") -> None:
if span.name in HTTP_SPANS:
self._collect_http_attributes(span)

elif span.name == "aioamqp-consumer":
self.data["amqp"]["callback"] = span.attributes.pop("amqp.callback", None)
self.data["amqp"]["message"] = span.attributes.pop("amqp.message", None)
self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None)
self.data["amqp"]["exchange_name"] = span.attributes.pop(
"amqp.exchange_name", None
)
self.data["amqp"]["routingkey"] = span.attributes.pop(
"amqp.routing_key", None
)
self.data["amqp"]["connection"] = span.attributes.pop(
"amqp.connection", None
)
self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None)

elif span.name == "aws.lambda.entry":
self.data["lambda"]["arn"] = span.attributes.pop("lambda.arn", "Unknown")
self.data["lambda"]["alias"] = None
Expand Down Expand Up @@ -167,6 +186,20 @@ def _populate_exit_span_data(self, span: "InstanaSpan") -> None:
if span.name in HTTP_SPANS:
self._collect_http_attributes(span)

elif span.name == "aioamqp-publisher":
self.data["amqp"]["message"] = span.attributes.pop("amqp.message", None)
self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None)
self.data["amqp"]["exchange_name"] = span.attributes.pop(
"amqp.exchange_name", None
)
self.data["amqp"]["routingkey"] = span.attributes.pop(
"amqp.routing_key", None
)
self.data["amqp"]["connection"] = span.attributes.pop(
"amqp.connection", None
)
self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None)

elif span.name == "boto3":
# boto3 also sends http attributes
self._collect_http_attributes(span)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def _resource(self) -> Generator[None, None, None]:
self.loop.close()

async def delete_queue(self) -> None:
transport, protocol = await aioamqp.connect(
_, protocol = await aioamqp.connect(
testenv["rabbitmq_host"],
testenv["rabbitmq_port"],
)
Expand Down Expand Up @@ -79,8 +79,13 @@ def test_basic_publish(self) -> None:
publisher_span = spans[0]
test_span = spans[1]

assert publisher_span.n == "sdk"
assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher"
assert publisher_span.n == "amqp"
assert publisher_span.data["amqp"]["message"] == b"Instana test message"
assert publisher_span.data["amqp"]["command"] == "publish"
assert not publisher_span.data["amqp"]["exchange_name"]
assert publisher_span.data["amqp"]["routingkey"] == "message_queue"
assert publisher_span.data["amqp"]["connection"] == "127.0.0.1:5672"

assert publisher_span.p == test_span.s

assert test_span.n == "sdk"
Expand All @@ -100,31 +105,30 @@ def test_basic_consumer(self) -> None:
consumer_span = spans[2]
test_span = spans[3]

assert publisher_span.n == "sdk"
assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher"
assert publisher_span.n == "amqp"
assert publisher_span.data["amqp"]["message"] == b"Instana test message"
assert publisher_span.data["amqp"]["command"] == "publish"
assert not publisher_span.data["amqp"]["exchange_name"]
assert publisher_span.data["amqp"]["routingkey"] == "message_queue"
assert publisher_span.data["amqp"]["connection"] == "127.0.0.1:5672"
assert publisher_span.p == test_span.s
assert (
publisher_span.data["sdk"]["custom"]["tags"]["aioamqp.exchange"]
== "b'Instana test message'"
)

assert callback_span.n == "sdk"
assert callback_span.data["sdk"]["name"] == "callback-span"
assert callback_span.data["sdk"]["type"] == "intermediate"
assert callback_span.p == consumer_span.s

assert consumer_span.n == "sdk"
assert consumer_span.data["sdk"]["name"] == "aioamqp-consumer"
assert consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.callback"]
assert (
consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.message"]
== "b'Instana test message'"
)
assert consumer_span.n == "amqp"
assert consumer_span.data["amqp"]["message"] == b"Instana test message"
assert consumer_span.data["amqp"]["command"] == "consume"
assert not consumer_span.data["amqp"]["exchange_name"]
assert consumer_span.data["amqp"]["routingkey"] == "message_queue"

assert consumer_span.data["amqp"]["connection"] == "127.0.0.1:5672"
assert (
consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.routing_key"]
== "message_queue"
consumer_span.data["amqp"]["connection"]
== publisher_span.data["amqp"]["connection"]
)
assert not consumer_span.data["sdk"]["custom"]["tags"]["exchange_name"]
assert consumer_span.p == test_span.s

assert test_span.n == "sdk"
Expand Down
Loading