Skip to content

Commit d1e45ab

Browse files
committed
feat: Add support to Kafka spans.
This commit adds the essential support to handle Kafka spans independently of which supported Python package is used. Signed-off-by: Paulo Vital <[email protected]>
1 parent bf16226 commit d1e45ab

File tree

5 files changed

+93
-10
lines changed

5 files changed

+93
-10
lines changed

src/instana/span/kind.py

+2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
"tornado-server",
3232
"gcps-consumer",
3333
"asgi",
34+
"kafka-consumer",
3435
)
3536

3637
EXIT_SPANS = (
@@ -53,6 +54,7 @@
5354
"pymongo",
5455
"gcs",
5556
"gcps-producer",
57+
"kafka-producer",
5658
)
5759

5860
REGISTERED_SPANS = LOCAL_SPANS + ENTRY_SPANS + EXIT_SPANS

src/instana/span/registered_span.py

+46-10
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,31 @@
11
# (c) Copyright IBM Corp. 2024
22

3+
from typing import TYPE_CHECKING, Any, Dict
4+
5+
from opentelemetry.semconv.trace import SpanAttributes
6+
from opentelemetry.trace import SpanKind
7+
38
from instana.log import logger
49
from instana.span.base_span import BaseSpan
5-
from instana.span.kind import ENTRY_SPANS, EXIT_SPANS, HTTP_SPANS, LOCAL_SPANS
10+
from instana.span.kind import (
11+
ENTRY_SPANS,
12+
EXIT_SPANS,
13+
HTTP_SPANS,
14+
LOCAL_SPANS,
15+
)
616

7-
from opentelemetry.trace import SpanKind
8-
from opentelemetry.semconv.trace import SpanAttributes
17+
if TYPE_CHECKING:
18+
from instana.span.span import InstanaSpan
919

1020

1121
class RegisteredSpan(BaseSpan):
12-
def __init__(self, span, source, service_name, **kwargs) -> None:
22+
def __init__(
23+
self,
24+
span: "InstanaSpan",
25+
source: Dict[str, Any],
26+
service_name: str,
27+
**kwargs: Dict[str, Any],
28+
) -> None:
1329
# pylint: disable=invalid-name
1430
super(RegisteredSpan, self).__init__(span, source, **kwargs)
1531
self.n = span.name
@@ -34,13 +50,17 @@ def __init__(self, span, source, service_name, **kwargs) -> None:
3450
if "gcps" in span.name:
3551
self.n = "gcps"
3652

53+
# unify the span name for kafka-producer and kafka-consumer
54+
if "kafka" in span.name:
55+
self.n = "kafka"
56+
3757
# Logic to store custom attributes for registered spans (not used yet)
3858
if len(span.attributes) > 0:
3959
self.data["sdk"]["custom"]["tags"] = self._validate_attributes(
4060
span.attributes
4161
)
4262

43-
def _populate_entry_span_data(self, span) -> None:
63+
def _populate_entry_span_data(self, span: "InstanaSpan") -> None:
4464
if span.name in HTTP_SPANS:
4565
self._collect_http_attributes(span)
4666

@@ -127,10 +147,14 @@ def _populate_entry_span_data(self, span) -> None:
127147
self.data["rpc"]["params"] = span.attributes.pop("rpc.params", None)
128148
# self.data["rpc"]["baggage"] = span.attributes.pop("rpc.baggage", None)
129149
self.data["rpc"]["error"] = span.attributes.pop("rpc.error", None)
150+
151+
elif span.name.startswith("kafka"):
152+
self._collect_kafka_attributes(span)
153+
130154
else:
131155
logger.debug("SpanRecorder: Unknown entry span: %s" % span.name)
132156

133-
def _populate_local_span_data(self, span) -> None:
157+
def _populate_local_span_data(self, span: "InstanaSpan") -> None:
134158
if span.name == "render":
135159
self.data["render"]["name"] = span.attributes.pop("name", None)
136160
self.data["render"]["type"] = span.attributes.pop("type", None)
@@ -139,7 +163,7 @@ def _populate_local_span_data(self, span) -> None:
139163
else:
140164
logger.debug("SpanRecorder: Unknown local span: %s" % span.name)
141165

142-
def _populate_exit_span_data(self, span) -> None:
166+
def _populate_exit_span_data(self, span: "InstanaSpan") -> None:
143167
if span.name in HTTP_SPANS:
144168
self._collect_http_attributes(span)
145169

@@ -239,8 +263,12 @@ def _populate_exit_span_data(self, span) -> None:
239263
self.data["mysql"]["host"] = span.attributes.pop("host", None)
240264
self.data["mysql"]["port"] = span.attributes.pop("port", None)
241265
self.data["mysql"]["db"] = span.attributes.pop(SpanAttributes.DB_NAME, None)
242-
self.data["mysql"]["user"] = span.attributes.pop(SpanAttributes.DB_USER, None)
243-
self.data["mysql"]["stmt"] = span.attributes.pop(SpanAttributes.DB_STATEMENT, None)
266+
self.data["mysql"]["user"] = span.attributes.pop(
267+
SpanAttributes.DB_USER, None
268+
)
269+
self.data["mysql"]["stmt"] = span.attributes.pop(
270+
SpanAttributes.DB_STATEMENT, None
271+
)
244272
self.data["mysql"]["error"] = span.attributes.pop("mysql.error", None)
245273

246274
elif span.name == "postgres":
@@ -303,10 +331,14 @@ def _populate_exit_span_data(self, span) -> None:
303331
self.data["log"]["parameters"] = event.attributes.pop(
304332
"parameters", None
305333
)
334+
335+
elif span.name.startswith("kafka"):
336+
self._collect_kafka_attributes(span)
337+
306338
else:
307339
logger.debug("SpanRecorder: Unknown exit span: %s" % span.name)
308340

309-
def _collect_http_attributes(self, span) -> None:
341+
def _collect_http_attributes(self, span: "InstanaSpan") -> None:
310342
self.data["http"]["host"] = span.attributes.pop("http.host", None)
311343
self.data["http"]["url"] = span.attributes.pop("http.url", None)
312344
self.data["http"]["path"] = span.attributes.pop("http.path", None)
@@ -325,3 +357,7 @@ def _collect_http_attributes(self, span) -> None:
325357
for key in custom_headers:
326358
trimmed_key = key[12:]
327359
self.data["http"]["header"][trimmed_key] = span.attributes.pop(key)
360+
361+
def _collect_kafka_attributes(self, span: "InstanaSpan") -> None:
362+
self.data["kafka"]["service"] = span.attributes.pop("kafka.service", None)
363+
self.data["kafka"]["access"] = span.attributes.pop("kafka.access", None)

src/instana/span/span.py

+2
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ def record_exception(
165165
self.set_attribute("sqlalchemy.err", message)
166166
elif self.name == "aws.lambda.entry":
167167
self.set_attribute("lambda.error", message)
168+
elif self.name.startswith("kafka"):
169+
self.set_attribute("kafka.error", message)
168170
else:
169171
_attributes = {"message": message}
170172
if attributes:

tests/span/test_registered_span.py

+42
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,14 @@ def test_populate_local_span_data_with_other_name(
146146
"rpc.port": 1234,
147147
},
148148
),
149+
(
150+
"kafka-consumer",
151+
"kafka",
152+
{
153+
"kafka.service": "my-topic",
154+
"kafka.access": "consume",
155+
},
156+
),
149157
],
150158
)
151159
def test_populate_entry_span_data(
@@ -350,6 +358,14 @@ def test_populate_entry_span_data_AWSlambda(
350358
"gcps.top": "MY_SUBSCRIPTION_NAME",
351359
},
352360
),
361+
(
362+
"kafka-producer",
363+
"kafka",
364+
{
365+
"kafka.service": "my-topic",
366+
"kafka.access": "send",
367+
},
368+
),
353369
],
354370
)
355371
def test_populate_exit_span_data(
@@ -453,3 +469,29 @@ def test_populate_exit_span_data_log(
453469

454470
while self.span._events:
455471
self.span._events.pop()
472+
473+
def test_collect_kafka_attributes(
474+
self,
475+
span_context: SpanContext,
476+
span_processor: StanRecorder,
477+
) -> None:
478+
span_name = "test-kafka-registered-span"
479+
attributes = {
480+
"kafka.service": "my-topic",
481+
"kafka.access": "send",
482+
}
483+
service_name = "test-kafka-registered-service"
484+
self.span = InstanaSpan(
485+
span_name, span_context, span_processor, attributes=attributes
486+
)
487+
reg_span = RegisteredSpan(self.span, None, service_name)
488+
489+
excepted_result = {
490+
"kafka.service": attributes["kafka.service"],
491+
"kafka.access": attributes["kafka.access"],
492+
}
493+
494+
reg_span._collect_kafka_attributes(self.span)
495+
496+
assert excepted_result["kafka.service"] == reg_span.data["kafka"]["service"]
497+
assert excepted_result["kafka.access"] == reg_span.data["kafka"]["access"]

tests/span/test_span.py

+1
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,7 @@ def test_span_add_event(
569569
("celery-worker", "error"),
570570
("sqlalchemy", "sqlalchemy.err"),
571571
("aws.lambda.entry", "lambda.error"),
572+
("kafka", "kafka.error"),
572573
],
573574
)
574575
def test_span_record_exception_default(

0 commit comments

Comments
 (0)