Skip to content

Commit bf16226

Browse files
committed
feat: Add KafkaPropagator to handle context propagation.
Signed-off-by: Paulo Vital <[email protected]>
1 parent 6e59e9b commit bf16226

File tree

5 files changed

+94
-7
lines changed

5 files changed

+94
-7
lines changed

src/instana/propagators/format.py

+12
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,15 @@ class Format(object):
5050
should use a prefix or other convention to distinguish tracer-specific
5151
key:value pairs.
5252
"""
53+
54+
KAFKA_HEADERS = "kafka_headers"
55+
"""
56+
The KAFKA_HEADERS format represents :class:`SpanContext`\\ s in a python
57+
``dict`` mapping from character-restricted strings to strings.
58+
59+
Keys and values in the KAFKA_HEADERS carrier must be suitable for use as
60+
HTTP headers (without modification or further escaping). That is, the
61+
keys have a greatly restricted character set, casing for the keys may not
62+
be preserved by various intermediaries, and the values should be
63+
URL-escaped.
64+
"""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# (c) Copyright IBM Corp. 2025
2+
from typing import TYPE_CHECKING
3+
4+
from opentelemetry.trace.span import format_span_id
5+
6+
from instana.log import logger
7+
from instana.propagators.base_propagator import BasePropagator, CarrierT
8+
from instana.util.ids import hex_id_limited
9+
10+
if TYPE_CHECKING:
11+
from instana.span_context import SpanContext
12+
13+
14+
class KafkaPropagator(BasePropagator):
15+
"""
16+
Instana Propagator for Format.KAFKA_HEADERS.
17+
18+
The KAFKA_HEADERS format deals with key-values with string to string mapping.
19+
The character set should be restricted to HTTP compatible.
20+
"""
21+
22+
def __init__(self) -> None:
23+
super(KafkaPropagator, self).__init__()
24+
25+
def inject(
26+
self,
27+
span_context: "SpanContext",
28+
carrier: CarrierT,
29+
disable_w3c_trace_context: bool = True,
30+
) -> None:
31+
trace_id = span_context.trace_id
32+
span_id = span_context.span_id
33+
dictionary_carrier = self.extract_headers_dict(carrier)
34+
35+
if dictionary_carrier:
36+
# Suppression `level` made in the child context or in the parent context
37+
# has priority over any non-suppressed `level` setting
38+
child_level = int(
39+
self.extract_instana_headers(dictionary_carrier)[2] or "1"
40+
)
41+
span_context.level = min(child_level, span_context.level)
42+
43+
serializable_level = str(span_context.level)
44+
45+
def inject_key_value(carrier, key, value):
46+
if isinstance(carrier, list):
47+
carrier.append((key, value))
48+
elif isinstance(carrier, dict) or "__setitem__" in dir(carrier):
49+
carrier[key] = value
50+
else:
51+
raise Exception(
52+
f"KafkaPropagator: Unsupported carrier type {type(carrier)}",
53+
)
54+
55+
try:
56+
inject_key_value(carrier, "X_INSTANA_L_S", serializable_level)
57+
inject_key_value(carrier, "X_INSTANA_T", hex_id_limited(trace_id))
58+
inject_key_value(carrier, "X_INSTANA_S", format_span_id(span_id))
59+
60+
except Exception:
61+
logger.debug("KafkaPropagator - inject error:", exc_info=True)

src/instana/tracer.py

+17-5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from instana.propagators.exceptions import UnsupportedFormatException
2727
from instana.propagators.format import Format
2828
from instana.propagators.http_propagator import HTTPPropagator
29+
from instana.propagators.kafka_propagator import KafkaPropagator
2930
from instana.propagators.text_propagator import TextPropagator
3031
from instana.recorder import StanRecorder
3132
from instana.sampling import InstanaSampler, Sampler
@@ -53,6 +54,7 @@ def __init__(
5354
self._propagators[Format.HTTP_HEADERS] = HTTPPropagator()
5455
self._propagators[Format.TEXT_MAP] = TextPropagator()
5556
self._propagators[Format.BINARY] = BinaryPropagator()
57+
self._propagators[Format.KAFKA_HEADERS] = KafkaPropagator()
5658

5759
def get_tracer(
5860
self,
@@ -118,7 +120,9 @@ def start_span(
118120
record_exception: bool = True,
119121
set_status_on_exception: bool = True,
120122
) -> InstanaSpan:
121-
parent_context = span_context if span_context else get_current_span().get_span_context()
123+
parent_context = (
124+
span_context if span_context else get_current_span().get_span_context()
125+
)
122126

123127
if parent_context and not isinstance(parent_context, SpanContext):
124128
raise TypeError("parent_context must be an Instana SpanContext or None.")
@@ -224,9 +228,13 @@ def _create_span_context(self, parent_context: SpanContext) -> SpanContext:
224228
level=(parent_context.level if parent_context else 1),
225229
synthetic=(parent_context.synthetic if parent_context else False),
226230
trace_parent=(parent_context.trace_parent if parent_context else None),
227-
instana_ancestor=(parent_context.instana_ancestor if parent_context else None),
231+
instana_ancestor=(
232+
parent_context.instana_ancestor if parent_context else None
233+
),
228234
long_trace_id=(parent_context.long_trace_id if parent_context else None),
229-
correlation_type=(parent_context.correlation_type if parent_context else None),
235+
correlation_type=(
236+
parent_context.correlation_type if parent_context else None
237+
),
230238
correlation_id=(parent_context.correlation_id if parent_context else None),
231239
traceparent=(parent_context.traceparent if parent_context else None),
232240
tracestate=(parent_context.tracestate if parent_context else None),
@@ -237,7 +245,9 @@ def _create_span_context(self, parent_context: SpanContext) -> SpanContext:
237245
def inject(
238246
self,
239247
span_context: SpanContext,
240-
format: Union[Format.BINARY, Format.HTTP_HEADERS, Format.TEXT_MAP],
248+
format: Union[
249+
Format.BINARY, Format.HTTP_HEADERS, Format.TEXT_MAP, Format.KAFKA_HEADERS
250+
],
241251
carrier: "CarrierT",
242252
disable_w3c_trace_context: bool = False,
243253
) -> Optional["CarrierT"]:
@@ -250,7 +260,9 @@ def inject(
250260

251261
def extract(
252262
self,
253-
format: Union[Format.BINARY, Format.HTTP_HEADERS, Format.TEXT_MAP],
263+
format: Union[
264+
Format.BINARY, Format.HTTP_HEADERS, Format.TEXT_MAP, Format.KAFKA_HEADERS
265+
],
254266
carrier: "CarrierT",
255267
disable_w3c_trace_context: bool = False,
256268
) -> Optional[Context]:

tests/test_tracer.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def test_tracer_defaults(tracer_provider: InstanaTracerProvider) -> None:
2727
assert isinstance(tracer._sampler, InstanaSampler)
2828
assert isinstance(tracer.span_processor, StanRecorder)
2929
assert isinstance(tracer.exporter, HostAgent)
30-
assert len(tracer._propagators) == 3
30+
assert len(tracer._propagators) == 4
3131

3232

3333
def test_tracer_start_span(

tests/test_tracer_provider.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from instana.propagators.binary_propagator import BinaryPropagator
88
from instana.propagators.format import Format
99
from instana.propagators.http_propagator import HTTPPropagator
10+
from instana.propagators.kafka_propagator import KafkaPropagator
1011
from instana.propagators.text_propagator import TextPropagator
1112
from instana.recorder import StanRecorder
1213
from instana.sampling import InstanaSampler
@@ -18,10 +19,11 @@ def test_tracer_provider_defaults() -> None:
1819
assert isinstance(provider.sampler, InstanaSampler)
1920
assert isinstance(provider._span_processor, StanRecorder)
2021
assert isinstance(provider._exporter, HostAgent)
21-
assert len(provider._propagators) == 3
22+
assert len(provider._propagators) == 4
2223
assert isinstance(provider._propagators[Format.HTTP_HEADERS], HTTPPropagator)
2324
assert isinstance(provider._propagators[Format.TEXT_MAP], TextPropagator)
2425
assert isinstance(provider._propagators[Format.BINARY], BinaryPropagator)
26+
assert isinstance(provider._propagators[Format.KAFKA_HEADERS], KafkaPropagator)
2527

2628

2729
def test_tracer_provider_get_tracer() -> None:

0 commit comments

Comments
 (0)