Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,22 @@
from aio_pika.abc import AbstractChannel, AbstractMessage

from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
MESSAGING_MESSAGE_ID,
MESSAGING_OPERATION,
MESSAGING_SYSTEM,
)
from opentelemetry.semconv._incubating.attributes.net_attributes import (
NET_PEER_NAME,
NET_PEER_PORT,
)
from opentelemetry.semconv.trace import (
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Span, SpanKind, Tracer

_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: "rabbitmq"}
_DEFAULT_ATTRIBUTES = {MESSAGING_SYSTEM: "rabbitmq"}


class SpanBuilder:
Expand All @@ -44,6 +53,8 @@ def set_operation(self, operation: MessagingOperationValues):

def set_destination(self, destination: str):
self._destination = destination
# TODO: Update this implementation once the semantic conventions for messaging stabilize
# See: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/messaging.md
self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination

def set_channel(self, channel: AbstractChannel):
Expand All @@ -61,17 +72,15 @@ def set_channel(self, channel: AbstractChannel):
url = connection.url
self._attributes.update(
{
SpanAttributes.NET_PEER_NAME: url.host,
SpanAttributes.NET_PEER_PORT: url.port or 5672,
NET_PEER_NAME: url.host,
NET_PEER_PORT: url.port or 5672,
}
)

def set_message(self, message: AbstractMessage):
properties = message.properties
if properties.message_id:
self._attributes[SpanAttributes.MESSAGING_MESSAGE_ID] = (
properties.message_id
)
self._attributes[MESSAGING_MESSAGE_ID] = properties.message_id
if properties.correlation_id:
self._attributes[SpanAttributes.MESSAGING_CONVERSATION_ID] = (
properties.correlation_id
Expand All @@ -81,10 +90,10 @@ def build(self) -> Optional[Span]:
if not is_instrumentation_enabled():
return None
if self._operation:
self._attributes[SpanAttributes.MESSAGING_OPERATION] = (
self._operation.value
)
self._attributes[MESSAGING_OPERATION] = self._operation.value
else:
# TODO: Update this implementation once the semantic conventions for messaging stabilize
# See: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/messaging.md
self._attributes[SpanAttributes.MESSAGING_TEMP_DESTINATION] = True
span = self._tracer.start_span(
self._generate_span_name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
AIOPIKA_VERSION_INFO = tuple(int(v) for v in aiopika_version.split("."))
MESSAGE_ID = "meesage_id"
CORRELATION_ID = "correlation_id"
MESSAGING_SYSTEM = "rabbitmq"
MESSAGING_SYSTEM_VALUE = "rabbitmq"
EXCHANGE_NAME = "exchange_name"
QUEUE_NAME = "queue_name"
ROUTING_KEY = "routing_key"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
from opentelemetry.instrumentation.aio_pika.callback_decorator import (
CallbackDecorator,
)
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
MESSAGING_MESSAGE_ID,
MESSAGING_OPERATION,
MESSAGING_SYSTEM,
)
from opentelemetry.semconv._incubating.attributes.net_attributes import (
NET_PEER_NAME,
NET_PEER_PORT,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer

Expand All @@ -30,7 +39,7 @@
EXCHANGE_NAME,
MESSAGE,
MESSAGE_ID,
MESSAGING_SYSTEM,
MESSAGING_SYSTEM_VALUE,
QUEUE_NAME,
SERVER_HOST,
SERVER_PORT,
Expand All @@ -40,13 +49,13 @@
@skipIf(AIOPIKA_VERSION_INFO >= (8, 0), "Only for aio_pika 7")
class TestInstrumentedQueueAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
MESSAGING_SYSTEM: MESSAGING_SYSTEM_VALUE,
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
NET_PEER_NAME: SERVER_HOST,
NET_PEER_PORT: SERVER_PORT,
MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_OPERATION: "receive",
MESSAGING_OPERATION: "receive",
}

def setUp(self):
Expand Down Expand Up @@ -80,13 +89,13 @@ def test_decorate_callback(self):
@skipIf(AIOPIKA_VERSION_INFO <= (8, 0), "Only for aio_pika 8")
class TestInstrumentedQueueAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
MESSAGING_SYSTEM: MESSAGING_SYSTEM_VALUE,
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
NET_PEER_NAME: SERVER_HOST,
NET_PEER_PORT: SERVER_PORT,
MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_OPERATION: "receive",
MESSAGING_OPERATION: "receive",
}

def setUp(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@
from opentelemetry.instrumentation.aio_pika.publish_decorator import (
PublishDecorator,
)
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
MESSAGING_MESSAGE_ID,
MESSAGING_SYSTEM,
)
from opentelemetry.semconv._incubating.attributes.net_attributes import (
NET_PEER_NAME,
NET_PEER_PORT,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer

Expand All @@ -34,7 +42,7 @@
EXCHANGE_NAME,
MESSAGE,
MESSAGE_ID,
MESSAGING_SYSTEM,
MESSAGING_SYSTEM_VALUE,
ROUTING_KEY,
SERVER_HOST,
SERVER_PORT,
Expand All @@ -44,11 +52,11 @@
@skipIf(AIOPIKA_VERSION_INFO >= (8, 0), "Only for aio_pika 7")
class TestInstrumentedExchangeAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
MESSAGING_SYSTEM: MESSAGING_SYSTEM_VALUE,
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
NET_PEER_NAME: SERVER_HOST,
NET_PEER_PORT: SERVER_PORT,
MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_TEMP_DESTINATION: True,
}
Expand Down Expand Up @@ -123,11 +131,11 @@ def test_publish_works_with_not_recording_span_robust(self):
@skipIf(AIOPIKA_VERSION_INFO <= (8, 0), "Only for aio_pika 8")
class TestInstrumentedExchangeAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
MESSAGING_SYSTEM: MESSAGING_SYSTEM_VALUE,
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
NET_PEER_NAME: SERVER_HOST,
NET_PEER_PORT: SERVER_PORT,
MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_TEMP_DESTINATION: True,
}
Expand Down