Skip to content

Commit 0c61b0f

Browse files
committed
fix: updated amqp instrumentation, adapted attributes and unittests
Signed-off-by: Cagri Yonca <[email protected]>
1 parent 284a161 commit 0c61b0f

File tree

4 files changed

+103
-26
lines changed

4 files changed

+103
-26
lines changed

src/instana/instrumentation/aioamqp.py

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,31 @@ async def basic_publish_with_instana(
2626
"aioamqp-publisher", span_context=parent_context
2727
) as span:
2828
try:
29-
span.set_attribute("aioamqp.exchange", argv[0])
30-
return await wrapped(*argv, **kwargs)
29+
span.set_attribute("amqp.message", argv[0])
30+
span.set_attribute("amqp.command", "publish")
31+
span.set_attribute("amqp.exchange_name", kwargs.get("exchange_name"))
32+
span.set_attribute("amqp.routing_key", kwargs.get("routing_key"))
33+
34+
protocol = getattr(instance, "protocol", None)
35+
transport = getattr(protocol, "_transport", None)
36+
extra = getattr(transport, "_extra", {}) if transport else {}
37+
peername = extra.get("peername")
38+
if (
39+
peername
40+
and isinstance(peername, (list, tuple))
41+
and len(peername) >= 2
42+
):
43+
connection_info = f"{peername[0]}:{peername[1]}"
44+
else:
45+
connection_info = "unknown"
46+
span.set_attribute("amqp.connection", connection_info)
47+
48+
response = await wrapped(*argv, **kwargs)
3149
except Exception as exc:
3250
span.record_exception(exc)
3351
logger.debug(f"aioamqp basic_publish_with_instana error: {exc}")
52+
finally:
53+
return response
3454

3555
@wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_consume")
3656
async def basic_consume_with_instana(
@@ -58,14 +78,32 @@ async def callback_wrapper(
5878
) as span:
5979
try:
6080
span.set_status(StatusCode.OK)
61-
span.set_attribute("aioamqp.callback", callback)
62-
span.set_attribute("aioamqp.message", args[1])
63-
span.set_attribute("aioamqp.exchange_name", args[2].exchange_name)
64-
span.set_attribute("aioamqp.routing_key", args[2].routing_key)
65-
return await wrapped_callback(*args, **kwargs)
81+
span.set_attribute("amqp.callback", callback)
82+
span.set_attribute("amqp.message", args[1])
83+
span.set_attribute("amqp.command", "consume")
84+
span.set_attribute("amqp.exchange_name", args[2].exchange_name)
85+
span.set_attribute("amqp.routing_key", args[2].routing_key)
86+
87+
protocol = getattr(args[0], "protocol", None)
88+
transport = getattr(protocol, "_transport", None)
89+
extra = getattr(transport, "_extra", {}) if transport else {}
90+
peername = extra.get("peername")
91+
if (
92+
peername
93+
and isinstance(peername, (list, tuple))
94+
and len(peername) >= 2
95+
):
96+
connection_info = f"{peername[0]}:{peername[1]}"
97+
else:
98+
connection_info = "unknown"
99+
span.set_attribute("amqp.connection", connection_info)
100+
101+
response = await wrapped_callback(*args, **kwargs)
66102
except Exception as exc:
67103
span.record_exception(exc)
68104
logger.debug(f"aioamqp basic_consume_with_instana error: {exc}")
105+
finally:
106+
return response
69107

70108
wrapped_callback = callback_wrapper(callback)
71109
argv = (wrapped_callback,) + argv[1:]

src/instana/span/kind.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
)
2222

2323
ENTRY_SPANS = (
24+
"aioamqp-consumer",
2425
"aiohttp-server",
2526
"aws.lambda.entry",
2627
"celery-worker",
@@ -35,6 +36,7 @@
3536
)
3637

3738
EXIT_SPANS = (
39+
"aioamqp-publisher",
3840
"aiohttp-client",
3941
"boto3",
4042
"cassandra",

src/instana/span/registered_span.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ def __init__(
5454
if "kafka" in span.name:
5555
self.n = "kafka"
5656

57+
# unify the span name for aioamqp-producer and aioamqp-consumer
58+
if "amqp" in span.name:
59+
self.n = "amqp"
60+
5761
# Logic to store custom attributes for registered spans (not used yet)
5862
if len(span.attributes) > 0:
5963
self.data["sdk"]["custom"]["tags"] = self._validate_attributes(
@@ -64,6 +68,21 @@ def _populate_entry_span_data(self, span: "InstanaSpan") -> None:
6468
if span.name in HTTP_SPANS:
6569
self._collect_http_attributes(span)
6670

71+
elif span.name == "aioamqp-consumer":
72+
self.data["amqp"]["callback"] = span.attributes.pop("amqp.callback", None)
73+
self.data["amqp"]["message"] = span.attributes.pop("amqp.message", None)
74+
self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None)
75+
self.data["amqp"]["exchange_name"] = span.attributes.pop(
76+
"amqp.exchange_name", None
77+
)
78+
self.data["amqp"]["routingkey"] = span.attributes.pop(
79+
"amqp.routing_key", None
80+
)
81+
self.data["amqp"]["connection"] = span.attributes.pop(
82+
"amqp.connection", None
83+
)
84+
self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None)
85+
6786
elif span.name == "aws.lambda.entry":
6887
self.data["lambda"]["arn"] = span.attributes.pop("lambda.arn", "Unknown")
6988
self.data["lambda"]["alias"] = None
@@ -167,6 +186,20 @@ def _populate_exit_span_data(self, span: "InstanaSpan") -> None:
167186
if span.name in HTTP_SPANS:
168187
self._collect_http_attributes(span)
169188

189+
elif span.name == "aioamqp-publisher":
190+
self.data["amqp"]["message"] = span.attributes.pop("amqp.message", None)
191+
self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None)
192+
self.data["amqp"]["exchange_name"] = span.attributes.pop(
193+
"amqp.exchange_name", None
194+
)
195+
self.data["amqp"]["routingkey"] = span.attributes.pop(
196+
"amqp.routing_key", None
197+
)
198+
self.data["amqp"]["connection"] = span.attributes.pop(
199+
"amqp.connection", None
200+
)
201+
self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None)
202+
170203
elif span.name == "boto3":
171204
# boto3 also sends http attributes
172205
self._collect_http_attributes(span)

tests/frameworks/test_aioamqp.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def _resource(self) -> Generator[None, None, None]:
2727
self.loop.close()
2828

2929
async def delete_queue(self) -> None:
30-
transport, protocol = await aioamqp.connect(
30+
_, protocol = await aioamqp.connect(
3131
testenv["rabbitmq_host"],
3232
testenv["rabbitmq_port"],
3333
)
@@ -79,8 +79,13 @@ def test_basic_publish(self) -> None:
7979
publisher_span = spans[0]
8080
test_span = spans[1]
8181

82-
assert publisher_span.n == "sdk"
83-
assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher"
82+
assert publisher_span.n == "amqp"
83+
assert publisher_span.data["amqp"]["message"] == b"Instana test message"
84+
assert publisher_span.data["amqp"]["command"] == "publish"
85+
assert not publisher_span.data["amqp"]["exchange_name"]
86+
assert publisher_span.data["amqp"]["routingkey"] == "message_queue"
87+
assert publisher_span.data["amqp"]["connection"] == "127.0.0.1:5672"
88+
8489
assert publisher_span.p == test_span.s
8590

8691
assert test_span.n == "sdk"
@@ -100,31 +105,30 @@ def test_basic_consumer(self) -> None:
100105
consumer_span = spans[2]
101106
test_span = spans[3]
102107

103-
assert publisher_span.n == "sdk"
104-
assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher"
108+
assert publisher_span.n == "amqp"
109+
assert publisher_span.data["amqp"]["message"] == b"Instana test message"
110+
assert publisher_span.data["amqp"]["command"] == "publish"
111+
assert not publisher_span.data["amqp"]["exchange_name"]
112+
assert publisher_span.data["amqp"]["routingkey"] == "message_queue"
113+
assert publisher_span.data["amqp"]["connection"] == "127.0.0.1:5672"
105114
assert publisher_span.p == test_span.s
106-
assert (
107-
publisher_span.data["sdk"]["custom"]["tags"]["aioamqp.exchange"]
108-
== "b'Instana test message'"
109-
)
110115

111116
assert callback_span.n == "sdk"
112117
assert callback_span.data["sdk"]["name"] == "callback-span"
113118
assert callback_span.data["sdk"]["type"] == "intermediate"
114119
assert callback_span.p == consumer_span.s
115120

116-
assert consumer_span.n == "sdk"
117-
assert consumer_span.data["sdk"]["name"] == "aioamqp-consumer"
118-
assert consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.callback"]
119-
assert (
120-
consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.message"]
121-
== "b'Instana test message'"
122-
)
121+
assert consumer_span.n == "amqp"
122+
assert consumer_span.data["amqp"]["message"] == b"Instana test message"
123+
assert consumer_span.data["amqp"]["command"] == "consume"
124+
assert not consumer_span.data["amqp"]["exchange_name"]
125+
assert consumer_span.data["amqp"]["routingkey"] == "message_queue"
126+
127+
assert consumer_span.data["amqp"]["connection"] == "127.0.0.1:5672"
123128
assert (
124-
consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.routing_key"]
125-
== "message_queue"
129+
consumer_span.data["amqp"]["connection"]
130+
== publisher_span.data["amqp"]["connection"]
126131
)
127-
assert not consumer_span.data["sdk"]["custom"]["tags"]["exchange_name"]
128132
assert consumer_span.p == test_span.s
129133

130134
assert test_span.n == "sdk"

0 commit comments

Comments
 (0)