Skip to content

Commit 9ef40bc

Browse files
committed
tests: Add tests for aio-pika instrumentation
Signed-off-by: Varsha GS <[email protected]>
1 parent 8a7cf2f commit 9ef40bc

File tree

2 files changed

+169
-0
lines changed

2 files changed

+169
-0
lines changed

tests/clients/test_aio_pika.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# (c) Copyright IBM Corp. 2025
2+
3+
import pytest
4+
from typing import Generator
5+
import asyncio
6+
from aio_pika import Message, connect, connect_robust
7+
8+
from instana.singletons import agent, tracer
9+
10+
11+
class TestAioPika:
12+
@pytest.fixture(autouse=True)
13+
def _resource(self) -> Generator[None, None, None]:
14+
"""SetUp and TearDown"""
15+
# setup
16+
self.recorder = tracer.span_processor
17+
self.recorder.clear_spans()
18+
19+
self.loop = asyncio.new_event_loop()
20+
asyncio.set_event_loop(None)
21+
self.queue_name = "test.queue"
22+
yield
23+
# teardown
24+
self.loop.run_until_complete(self.delete_queue())
25+
if self.loop.is_running():
26+
self.loop.close()
27+
# Ensure that allow_exit_as_root has the default value
28+
agent.options.allow_exit_as_root = False
29+
30+
async def publish_message(self) -> None:
31+
# Perform connection
32+
connection = await connect()
33+
34+
async with connection:
35+
# Creating a channel
36+
channel = await connection.channel()
37+
38+
# Declaring queue
39+
queue_name = self.queue_name
40+
queue = await channel.declare_queue(queue_name)
41+
42+
# Declaring exchange
43+
exchange = await channel.declare_exchange("test.exchange")
44+
await queue.bind(exchange, routing_key=queue_name)
45+
46+
# Sending the message
47+
await exchange.publish(
48+
Message(f"Hello {queue_name}".encode()),
49+
routing_key=queue_name,
50+
)
51+
52+
async def delete_queue(self) -> None:
53+
connection = await connect()
54+
55+
async with connection:
56+
channel = await connection.channel()
57+
await channel.queue_delete(self.queue_name)
58+
59+
async def consume_message(self, connect_method) -> None:
60+
connection = await connect_method()
61+
62+
async with connection:
63+
# Creating channel
64+
channel = await connection.channel()
65+
66+
# Declaring queue
67+
queue = await channel.declare_queue(self.queue_name)
68+
69+
async with queue.iterator() as queue_iter:
70+
async for message in queue_iter:
71+
async with message.process():
72+
if queue.name in message.body.decode():
73+
break
74+
75+
def test_basic_publish(self) -> None:
76+
with tracer.start_as_current_span("test"):
77+
self.loop.run_until_complete(self.publish_message())
78+
79+
spans = self.recorder.queued_spans()
80+
assert len(spans) == 2
81+
82+
rabbitmq_span = spans[0]
83+
test_span = spans[1]
84+
85+
# Same traceId
86+
assert test_span.t == rabbitmq_span.t
87+
88+
# Parent relationships
89+
assert rabbitmq_span.p == test_span.s
90+
91+
# Error logging
92+
assert not test_span.ec
93+
assert not rabbitmq_span.ec
94+
95+
# Span attributes
96+
assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange"
97+
assert rabbitmq_span.data["rabbitmq"]["sort"] == "publish"
98+
assert rabbitmq_span.data["rabbitmq"]["address"]
99+
assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue"
100+
assert rabbitmq_span.stack
101+
assert isinstance(rabbitmq_span.stack, list)
102+
assert len(rabbitmq_span.stack) > 0
103+
104+
def test_basic_publish_as_root_exit_span(self) -> None:
105+
agent.options.allow_exit_as_root = True
106+
self.loop.run_until_complete(self.publish_message())
107+
108+
spans = self.recorder.queued_spans()
109+
assert len(spans) == 1
110+
111+
rabbitmq_span = spans[0]
112+
113+
# Parent relationships
114+
assert not rabbitmq_span.p
115+
116+
# Error logging
117+
assert not rabbitmq_span.ec
118+
119+
# Span attributes
120+
assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange"
121+
assert rabbitmq_span.data["rabbitmq"]["sort"] == "publish"
122+
assert rabbitmq_span.data["rabbitmq"]["address"]
123+
assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue"
124+
assert rabbitmq_span.stack
125+
assert isinstance(rabbitmq_span.stack, list)
126+
assert len(rabbitmq_span.stack) > 0
127+
128+
@pytest.mark.parametrize(
129+
"connect_method",
130+
[connect, connect_robust],
131+
)
132+
def test_basic_consume(self, connect_method) -> None:
133+
with tracer.start_as_current_span("test"):
134+
self.loop.run_until_complete(self.publish_message())
135+
self.loop.run_until_complete(self.consume_message(connect_method))
136+
137+
spans = self.recorder.queued_spans()
138+
assert len(spans) == 3
139+
140+
rabbitmq_publisher_span = spans[0]
141+
rabbitmq_consumer_span = spans[1]
142+
test_span = spans[2]
143+
144+
# Same traceId
145+
assert test_span.t == rabbitmq_publisher_span.t
146+
assert rabbitmq_publisher_span.t == rabbitmq_consumer_span.t
147+
148+
# Parent relationships
149+
assert rabbitmq_publisher_span.p == test_span.s
150+
assert rabbitmq_consumer_span.p == rabbitmq_publisher_span.s
151+
152+
# Error logging
153+
assert not rabbitmq_publisher_span.ec
154+
assert not rabbitmq_consumer_span.ec
155+
assert not test_span.ec
156+
157+
# Span attributes
158+
def assert_span_info(rabbitmq_span, sort) -> None:
159+
assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange"
160+
assert rabbitmq_span.data["rabbitmq"]["sort"] == sort
161+
assert rabbitmq_span.data["rabbitmq"]["address"]
162+
assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue"
163+
assert rabbitmq_span.stack
164+
assert isinstance(rabbitmq_span.stack, list)
165+
assert len(rabbitmq_span.stack) > 0
166+
167+
assert_span_info(rabbitmq_publisher_span, "publish")
168+
assert_span_info(rabbitmq_consumer_span, "consume")

tests/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
aioamqp>=0.15.0
33
aiofiles>=0.5.0
44
aiohttp>=3.8.3
5+
aio-pika>=9.5.5
56
boto3>=1.17.74
67
bottle>=0.12.25
78
celery>=5.2.7

0 commit comments

Comments
 (0)