Skip to content

Commit 7794f4e

Browse files
committed
feature: adding aioamqp instrumentation
Signed-off-by: Cagri Yonca <[email protected]>
1 parent 65ecb98 commit 7794f4e

File tree

6 files changed

+219
-0
lines changed

6 files changed

+219
-0
lines changed

src/instana/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ def boot_agent() -> None:
166166

167167
# Import & initialize instrumentation
168168
from instana.instrumentation import (
169+
aioamqp, # noqa: F401
169170
asyncio, # noqa: F401
170171
boto3_inst, # noqa: F401
171172
cassandra_inst, # noqa: F401
@@ -208,6 +209,7 @@ def boot_agent() -> None:
208209
client as tornado_client, # noqa: F401
209210
)
210211
from instana.instrumentation.tornado import (
212+
client as tornado_client, # noqa: F401
211213
server as tornado_server, # noqa: F401
212214
)
213215

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# (c) Copyright IBM Corp. 2025
2+
3+
try:
4+
import aioamqp
5+
from typing import Any, Callable, Dict, Tuple
6+
7+
import wrapt
8+
from opentelemetry.trace.status import StatusCode
9+
10+
from instana.log import logger
11+
from instana.util.traceutils import get_tracer_tuple, tracing_is_off
12+
13+
@wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_publish")
14+
async def basic_publish_with_instana(
15+
wrapped: Callable[..., aioamqp.connect],
16+
instance: object,
17+
argv: Tuple[object, Tuple[object, ...]],
18+
kwargs: Dict[str, Any],
19+
) -> object:
20+
if tracing_is_off():
21+
return await wrapped(*argv, **kwargs)
22+
23+
tracer, parent_span, _ = get_tracer_tuple()
24+
parent_context = parent_span.get_span_context() if parent_span else None
25+
with tracer.start_as_current_span(
26+
"aioamqp-publisher", span_context=parent_context
27+
) as span:
28+
try:
29+
span.set_attribute("aioamqp.exchange", argv[0])
30+
return await wrapped(*argv, **kwargs)
31+
except Exception as exc:
32+
span.record_exception(exc)
33+
logger.debug(f"aioamqp basic_publish_with_instana error: {exc}")
34+
35+
@wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_consume")
36+
def basic_consume_with_instana(
37+
wrapped: Callable[..., aioamqp.connect],
38+
instance: object,
39+
argv: Tuple[object, Tuple[object, ...]],
40+
kwargs: Dict[str, Any],
41+
) -> object:
42+
if tracing_is_off():
43+
return wrapped(*argv, **kwargs)
44+
45+
callback = argv[0]
46+
tracer, parent_span, _ = get_tracer_tuple()
47+
parent_context = parent_span.get_span_context() if parent_span else None
48+
49+
@wrapt.decorator
50+
async def callback_wrapper(
51+
wrapped_callback: Callable[..., aioamqp.connect],
52+
instance: Any,
53+
args: Tuple,
54+
kwargs: Dict,
55+
) -> object:
56+
with tracer.start_as_current_span(
57+
"aioamqp-consumer", span_context=parent_context
58+
) as span:
59+
try:
60+
span.set_status(StatusCode.OK)
61+
span.set_attribute("aioamqp.callback", callback)
62+
span.set_attribute("aioamqp.message", args[1])
63+
span.set_attribute("aioamqp.exchange_name", args[2].exchange_name)
64+
span.set_attribute("aioamqp.routing_key", args[2].routing_key)
65+
return await wrapped_callback(*args, **kwargs)
66+
except Exception as exc:
67+
span.record_exception(exc)
68+
logger.debug(f"aioamqp basic_consume_with_instana error: {exc}")
69+
70+
wrapped_callback = callback_wrapper(callback)
71+
argv = (wrapped_callback,) + argv[1:]
72+
73+
return wrapped(*argv, **kwargs)
74+
75+
logger.debug("Instrumenting aioamqp")
76+
77+
except ImportError:
78+
pass

tests/frameworks/test_aioamqp.py

+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import asyncio
2+
from typing import Any, Generator
3+
4+
import aioamqp
5+
import pytest
6+
7+
from instana.singletons import tracer
8+
from tests.helpers import testenv
9+
from aioamqp.properties import Properties
10+
from aioamqp.envelope import Envelope
11+
12+
testenv["rabbitmq_host"] = "127.0.0.1"
13+
testenv["rabbitmq_port"] = 5672
14+
15+
16+
class TestAioamqp:
17+
@pytest.fixture(autouse=True)
18+
def _resource(self) -> Generator[None, None, None]:
19+
self.recorder = tracer.span_processor
20+
self.recorder.clear_spans()
21+
22+
self.loop = asyncio.new_event_loop()
23+
asyncio.set_event_loop(None)
24+
yield
25+
self.loop.run_until_complete(self.delete_queue())
26+
if self.loop.is_running():
27+
self.loop.close()
28+
29+
async def delete_queue(self) -> None:
30+
transport, protocol = await aioamqp.connect(
31+
testenv["rabbitmq_host"],
32+
testenv["rabbitmq_port"],
33+
)
34+
channel = await protocol.channel()
35+
await channel.queue_delete("message_queue")
36+
await asyncio.sleep(1)
37+
38+
async def publish_message(self) -> None:
39+
transport, protocol = await aioamqp.connect(
40+
testenv["rabbitmq_host"],
41+
testenv["rabbitmq_port"],
42+
)
43+
channel = await protocol.channel()
44+
45+
await channel.queue_declare(queue_name="message_queue")
46+
47+
message = "Instana test message"
48+
await channel.basic_publish(
49+
message.encode(), exchange_name="", routing_key="message_queue"
50+
)
51+
52+
await protocol.close()
53+
transport.close()
54+
55+
async def consume_message(self) -> None:
56+
async def callback(
57+
channel: Any,
58+
body: bytes,
59+
envelope: Envelope,
60+
properties: Properties,
61+
) -> None:
62+
with tracer.start_as_current_span("callback-span"):
63+
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
64+
65+
_, protocol = await aioamqp.connect(
66+
testenv["rabbitmq_host"], testenv["rabbitmq_port"]
67+
)
68+
channel = await protocol.channel()
69+
await channel.queue_declare(queue_name="message_queue")
70+
await channel.basic_consume(callback, queue_name="message_queue", no_ack=False)
71+
72+
def test_basic_publish(self) -> None:
73+
with tracer.start_as_current_span("test-span"):
74+
self.loop.run_until_complete(self.publish_message())
75+
76+
spans = self.recorder.queued_spans()
77+
78+
assert len(spans) == 2
79+
publisher_span = spans[0]
80+
test_span = spans[1]
81+
82+
assert publisher_span.n == "sdk"
83+
assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher"
84+
assert publisher_span.p == test_span.s
85+
86+
assert test_span.n == "sdk"
87+
assert not test_span.p
88+
89+
def test_basic_consumer(self) -> None:
90+
with tracer.start_as_current_span("test-span"):
91+
self.loop.run_until_complete(self.publish_message())
92+
self.loop.run_until_complete(self.consume_message())
93+
94+
spans = self.recorder.queued_spans()
95+
96+
assert len(spans) == 4
97+
98+
publisher_span = spans[0]
99+
callback_span = spans[1]
100+
consumer_span = spans[2]
101+
test_span = spans[3]
102+
103+
assert publisher_span.n == "sdk"
104+
assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher"
105+
assert publisher_span.p == test_span.s
106+
assert (
107+
publisher_span.data["sdk"]["custom"]["tags"]["aioamqp.exchange"]
108+
== "b'Instana test message'"
109+
)
110+
111+
assert callback_span.n == "sdk"
112+
assert callback_span.data["sdk"]["name"] == "callback-span"
113+
assert callback_span.data["sdk"]["type"] == "intermediate"
114+
assert callback_span.p == consumer_span.s
115+
116+
assert consumer_span.n == "sdk"
117+
assert consumer_span.data["sdk"]["name"] == "aioamqp-consumer"
118+
assert consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.callback"]
119+
assert (
120+
consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.message"]
121+
== "b'Instana test message'"
122+
)
123+
assert (
124+
consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.routing_key"]
125+
== "message_queue"
126+
)
127+
assert not consumer_span.data["sdk"]["custom"]["tags"]["exchange_name"]
128+
assert consumer_span.p == test_span.s
129+
130+
assert test_span.n == "sdk"
131+
assert test_span.data["sdk"]["name"] == "test-span"

tests/helpers.py

+6
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@
5757
testenv["mongodb_user"] = os.environ.get("MONGO_USER", None)
5858
testenv["mongodb_pw"] = os.environ.get("MONGO_PW", None)
5959

60+
"""
61+
RabbitMQ Environment
62+
"""
63+
testenv["rabbitmq_host"] = os.environ.get("RABBITMQ_HOST", "127.0.0.1")
64+
testenv["rabbitmq_port"] = os.environ.get("RABBITMQ_PORT", 5672)
65+
6066

6167
"""
6268
Kafka Environment

tests/requirements-pre314.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
aioamqp>=0.15.0
12
aiofiles>=0.5.0
23
aiohttp>=3.8.3
34
boto3>=1.17.74

tests/requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
aioamqp>=0.15.0
12
aiofiles>=0.5.0
23
aiohttp>=3.8.3
34
boto3>=1.17.74

0 commit comments

Comments
 (0)