Skip to content

Commit 8a7cf2f

Browse files
committed
feat: Add aio-pika instrumentation
Signed-off-by: Varsha GS <[email protected]>
1 parent 284a161 commit 8a7cf2f

File tree

2 files changed

+83
-0
lines changed

2 files changed

+83
-0
lines changed

src/instana/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ def boot_agent() -> None:
189189
starlette, # noqa: F401
190190
urllib3, # noqa: F401
191191
spyne, # noqa: F401
192+
aio_pika, # noqa: F401
192193
)
193194
from instana.instrumentation.aiohttp import (
194195
client as aiohttp_client, # noqa: F401
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# (c) Copyright IBM Corp. 2025
2+
3+
try:
4+
import aio_pika
5+
import wrapt
6+
7+
from instana.log import logger
8+
from instana.propagators.format import Format
9+
from instana.util.traceutils import get_tracer_tuple, tracing_is_off
10+
from instana.singletons import tracer
11+
12+
def _extract_span_attributes(span, connection, sort, routing_key, exchange) -> None:
13+
span.set_attribute("address", str(connection.url))
14+
15+
span.set_attribute("sort", sort)
16+
span.set_attribute("key", routing_key)
17+
span.set_attribute("exchange", exchange)
18+
19+
@wrapt.patch_function_wrapper("aio_pika", "Exchange.publish")
20+
async def publish_with_instana(wrapped, instance, args, kwargs):
21+
if tracing_is_off():
22+
return await wrapped(*args, **kwargs)
23+
24+
tracer, parent_span, _ = get_tracer_tuple()
25+
parent_context = parent_span.get_span_context() if parent_span else None
26+
27+
with tracer.start_as_current_span(
28+
"rabbitmq", span_context=parent_context
29+
) as span:
30+
connection = instance.channel._connection
31+
_extract_span_attributes(
32+
span, connection, "publish", kwargs["routing_key"], instance.name
33+
)
34+
35+
message = args[0]
36+
tracer.inject(
37+
span.context,
38+
Format.HTTP_HEADERS,
39+
message.properties.headers,
40+
disable_w3c_trace_context=True,
41+
)
42+
try:
43+
response = await wrapped(*args, **kwargs)
44+
except Exception as exc:
45+
span.record_exception(exc)
46+
else:
47+
return response
48+
49+
@wrapt.patch_function_wrapper("aio_pika", "Queue.consume")
50+
async def consume_with_instana(wrapped, instance, args, kwargs):
51+
connection = instance.channel._connection
52+
callback = kwargs["callback"] if kwargs.get("callback") else args[0]
53+
54+
@wrapt.decorator
55+
async def callback_wrapper(wrapped, instance, args, kwargs):
56+
message = args[0]
57+
parent_context = tracer.extract(
58+
Format.HTTP_HEADERS, message.headers, disable_w3c_trace_context=True
59+
)
60+
with tracer.start_as_current_span(
61+
"rabbitmq", span_context=parent_context
62+
) as span:
63+
_extract_span_attributes(span, connection, "consume", message.routing_key, message.exchange)
64+
try:
65+
response = await wrapped(*args, **kwargs)
66+
except Exception as exc:
67+
span.record_exception(exc)
68+
else:
69+
return response
70+
71+
wrapped_callback = callback_wrapper(callback)
72+
if kwargs.get("callback"):
73+
kwargs["callback"] = wrapped_callback
74+
else:
75+
args = (wrapped_callback,) + args[1:]
76+
77+
return await wrapped(*args, **kwargs)
78+
79+
logger.debug("Instrumenting aio-pika")
80+
81+
except ImportError:
82+
pass

0 commit comments

Comments
 (0)