Skip to content

Commit 0473256

Browse files
committed
feat: Add poll instrumentation for kafka-python.
Signed-off-by: Paulo Vital <[email protected]>
1 parent 06b545a commit 0473256

File tree

2 files changed

+80
-4
lines changed

2 files changed

+80
-4
lines changed

src/instana/instrumentation/kafka/kafka_python.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,45 @@ def trace_kafka_consume(
8585
else:
8686
return res
8787

88+
@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.poll")
89+
def trace_kafka_poll(
90+
wrapped: Callable[..., "kafka.KafkaConsumer.poll"],
91+
instance: "kafka.KafkaConsumer",
92+
args: Tuple[int, str, Tuple[Any, ...]],
93+
kwargs: Dict[str, Any],
94+
) -> Dict[str, Any]:
95+
if tracing_is_off():
96+
return wrapped(*args, **kwargs)
97+
98+
tracer, parent_span, _ = get_tracer_tuple()
99+
100+
# The KafkaConsumer.consume() from the kafka-python-ng call the
101+
# KafkaConsumer.poll() internally, so we do not consider it here.
102+
if parent_span and parent_span.name == "kafka-consumer":
103+
return wrapped(*args, **kwargs)
104+
105+
parent_context = (
106+
parent_span.get_span_context()
107+
if parent_span
108+
else tracer.extract(
109+
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
110+
)
111+
)
112+
113+
with tracer.start_as_current_span(
114+
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
115+
) as span:
116+
topic = list(instance.subscription())[0]
117+
span.set_attribute("kafka.service", topic)
118+
span.set_attribute("kafka.access", "poll")
119+
120+
try:
121+
res = wrapped(*args, **kwargs)
122+
except Exception as exc:
123+
span.record_exception(exc)
124+
else:
125+
return res
126+
88127
logger.debug("Instrumenting Kafka (kafka-python)")
89128
except ImportError:
90129
pass

tests/clients/kafka/test_kafka_python.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,6 @@ def test_trace_kafka_python_send(self) -> None:
8181
assert kafka_span.data["kafka"]["access"] == "send"
8282

8383
def test_trace_kafka_python_consume(self) -> None:
84-
agent.options.allow_exit_as_root = False
85-
8684
# Produce some events
8785
self.producer.send(testenv["kafka_topic"], b"raw_bytes1")
8886
self.producer.send(testenv["kafka_topic"], b"raw_bytes2")
@@ -125,9 +123,48 @@ def test_trace_kafka_python_consume(self) -> None:
125123
assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"]
126124
assert kafka_span.data["kafka"]["access"] == "consume"
127125

128-
def test_trace_kafka_python_error(self) -> None:
129-
agent.options.allow_exit_as_root = False
126+
def test_trace_kafka_python_poll(self) -> None:
127+
# Produce some events
128+
self.producer.send(testenv["kafka_topic"], b"raw_bytes1")
129+
self.producer.send(testenv["kafka_topic"], b"raw_bytes2")
130+
self.producer.flush()
131+
132+
# Consume the events
133+
consumer = KafkaConsumer(
134+
testenv["kafka_topic"],
135+
bootstrap_servers=testenv["kafka_bootstrap_servers"],
136+
auto_offset_reset="earliest", # consume earliest available messages
137+
enable_auto_commit=False, # do not auto-commit offsets
138+
consumer_timeout_ms=1000,
139+
)
140+
141+
with tracer.start_as_current_span("test"):
142+
msg = consumer.poll() # noqa: F841
143+
144+
consumer.close()
130145

146+
spans = self.recorder.queued_spans()
147+
assert len(spans) == 2
148+
149+
kafka_span = spans[0]
150+
test_span = spans[1]
151+
152+
# Same traceId
153+
assert test_span.t == kafka_span.t
154+
155+
# Parent relationships
156+
assert kafka_span.p == test_span.s
157+
158+
# Error logging
159+
assert not test_span.ec
160+
assert not kafka_span.ec
161+
162+
assert kafka_span.n == "kafka"
163+
assert kafka_span.k == SpanKind.SERVER
164+
assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"]
165+
assert kafka_span.data["kafka"]["access"] == "poll"
166+
167+
def test_trace_kafka_python_error(self) -> None:
131168
# Consume the events
132169
consumer = KafkaConsumer(
133170
"inexistent_kafka_topic",

0 commit comments

Comments
 (0)