Skip to content

Commit 3411b16

Browse files
committed
feat: added suppression downstream headers
Signed-off-by: Cagri Yonca <[email protected]>
1 parent f9693d9 commit 3411b16

File tree

8 files changed

+867
-205
lines changed

8 files changed

+867
-205
lines changed

src/instana/instrumentation/kafka/confluent_kafka_python.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ def trace_kafka_produce(
5858

5959
tracer, parent_span, _ = get_tracer_tuple()
6060
parent_context = parent_span.get_span_context() if parent_span else None
61+
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
62+
"kafka",
63+
"produce",
64+
args[0],
65+
)
6166

6267
with tracer.start_as_current_span(
6368
"kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER
@@ -73,15 +78,21 @@ def trace_kafka_produce(
7378
# dictionary. To maintain compatibility with the headers for the
7479
# Kafka Python library, we will use a list of tuples.
7580
headers = args[6] if len(args) > 6 else kwargs.get("headers", [])
81+
suppression_header = {"x_instana_l_s": "0" if is_suppressed else "1"}
82+
headers.append(suppression_header)
83+
7684
tracer.inject(
7785
span.context,
7886
Format.KAFKA_HEADERS,
7987
headers,
8088
disable_w3c_trace_context=True,
8189
)
8290

83-
try:
91+
headers.remove(suppression_header)
92+
93+
if tracer.exporter.options.kafka_trace_correlation:
8494
kwargs["headers"] = headers
95+
try:
8596
res = wrapped(*args, **kwargs)
8697
except Exception as exc:
8798
span.record_exception(exc)

src/instana/instrumentation/kafka/kafka_python.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ def trace_kafka_send(
3030

3131
tracer, parent_span, _ = get_tracer_tuple()
3232
parent_context = parent_span.get_span_context() if parent_span else None
33-
33+
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
34+
"kafka",
35+
"send",
36+
args[0],
37+
)
3438
with tracer.start_as_current_span(
3539
"kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER
3640
) as span:
@@ -39,15 +43,21 @@ def trace_kafka_send(
3943

4044
# context propagation
4145
headers = kwargs.get("headers", [])
46+
suppression_header = {"x_instana_l_s": "0" if is_suppressed else "1"}
47+
headers.append(suppression_header)
48+
4249
tracer.inject(
4350
span.context,
4451
Format.KAFKA_HEADERS,
4552
headers,
4653
disable_w3c_trace_context=True,
4754
)
4855

49-
try:
56+
headers.remove(suppression_header)
57+
58+
if tracer.exporter.options.kafka_trace_correlation:
5059
kwargs["headers"] = headers
60+
try:
5161
res = wrapped(*args, **kwargs)
5262
except Exception as exc:
5363
span.record_exception(exc)

src/instana/options.py

Lines changed: 76 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -37,36 +37,9 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
3737
self.extra_http_headers = None
3838
self.allow_exit_as_root = False
3939
self.ignore_endpoints = []
40+
self.kafka_trace_correlation = True
4041

41-
if "INSTANA_DEBUG" in os.environ:
42-
self.log_level = logging.DEBUG
43-
self.debug = True
44-
45-
if "INSTANA_EXTRA_HTTP_HEADERS" in os.environ:
46-
self.extra_http_headers = (
47-
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
48-
)
49-
50-
if "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ:
51-
self.ignore_endpoints = parse_ignored_endpoints_from_yaml(
52-
os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"]
53-
)
54-
else:
55-
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
56-
self.ignore_endpoints = parse_ignored_endpoints(
57-
os.environ["INSTANA_IGNORE_ENDPOINTS"]
58-
)
59-
else:
60-
if (
61-
isinstance(config.get("tracing"), dict)
62-
and "ignore_endpoints" in config["tracing"]
63-
):
64-
self.ignore_endpoints = parse_ignored_endpoints(
65-
config["tracing"]["ignore_endpoints"],
66-
)
67-
68-
if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1":
69-
self.allow_exit_as_root = True
42+
self.set_trace_configurations()
7043

7144
# Defaults
7245
self.secrets_matcher = "contains-ignore-case"
@@ -87,6 +60,56 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
8760

8861
self.__dict__.update(kwds)
8962

63+
def set_trace_configurations(self) -> None:
64+
"""
65+
Set tracing configurations from the environment variables and config file.
66+
@return: None
67+
"""
68+
# Use self.configurations to not read local configuration file
69+
# in set_tracing method
70+
if "INSTANA_DEBUG" in os.environ:
71+
self.log_level = logging.DEBUG
72+
self.debug = True
73+
74+
if "INSTANA_EXTRA_HTTP_HEADERS" in os.environ:
75+
self.extra_http_headers = (
76+
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
77+
)
78+
79+
if "1" in [
80+
os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None), # deprecated
81+
os.environ.get("INSTANA_ALLOW_ROOT_EXIT_SPAN", None),
82+
]:
83+
self.allow_exit_as_root = True
84+
85+
# The priority is as follows:
86+
# environment variables > in-code configuration >
87+
# > agent config (configuration.yaml) > default value
88+
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
89+
self.ignore_endpoints = parse_ignored_endpoints(
90+
os.environ["INSTANA_IGNORE_ENDPOINTS"]
91+
)
92+
elif "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ:
93+
self.ignore_endpoints = parse_ignored_endpoints_from_yaml(
94+
os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"]
95+
)
96+
elif (
97+
isinstance(config.get("tracing"), dict)
98+
and "ignore_endpoints" in config["tracing"]
99+
):
100+
self.ignore_endpoints = parse_ignored_endpoints(
101+
config["tracing"]["ignore_endpoints"],
102+
)
103+
104+
if "INSTANA_KAFKA_TRACE_CORRELATION" in os.environ:
105+
self.kafka_trace_correlation = (
106+
os.environ["INSTANA_KAFKA_TRACE_CORRELATION"].lower() == "true"
107+
)
108+
elif isinstance(config.get("tracing"), dict) and "kafka" in config["tracing"]:
109+
self.kafka_trace_correlation = config["tracing"]["kafka"].get(
110+
"trace_correlation", True
111+
)
112+
90113

91114
class StandardOptions(BaseOptions):
92115
"""The options class used when running directly on a host/node with an Instana agent"""
@@ -132,12 +155,30 @@ def set_tracing(self, tracing: Dict[str, Any]) -> None:
132155
@param tracing: tracing configuration dictionary
133156
@return: None
134157
"""
135-
if (
136-
"ignore-endpoints" in tracing
137-
and "INSTANA_IGNORE_ENDPOINTS" not in os.environ
138-
and "tracing" not in config
139-
):
158+
if "ignore-endpoints" in tracing and not self.ignore_endpoints:
140159
self.ignore_endpoints = parse_ignored_endpoints(tracing["ignore-endpoints"])
160+
161+
if "kafka" in tracing:
162+
if (
163+
"INSTANA_KAFKA_TRACE_CORRELATION" not in os.environ
164+
and not (
165+
isinstance(config.get("tracing"), dict)
166+
and "kafka" in config["tracing"]
167+
)
168+
and "trace-correlation" in tracing["kafka"]
169+
):
170+
self.kafka_trace_correlation = (
171+
str(tracing["kafka"].get("trace-correlation", True)) == "true"
172+
)
173+
174+
if (
175+
"header-format" in tracing["kafka"]
176+
and tracing["kafka"]["header-format"] == "binary"
177+
):
178+
logger.warning(
179+
"Binary header format for Kafka is deprecated. Please use string header format."
180+
)
181+
141182
if "extra-http-headers" in tracing:
142183
self.extra_http_headers = tracing["extra-http-headers"]
143184

@@ -156,6 +197,7 @@ def set_from(self, res_data: Dict[str, Any]) -> None:
156197

157198
if "tracing" in res_data:
158199
self.set_tracing(res_data["tracing"])
200+
159201
else:
160202
if "extraHeaders" in res_data:
161203
self.set_extra_headers(res_data["extraHeaders"])

src/instana/propagators/kafka_propagator.py

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ def extract(
7373
disable_w3c_trace_context=disable_w3c_trace_context,
7474
)
7575

76-
except Exception:
77-
logger.debug("kafka_propagator extract error:", exc_info=True)
76+
except Exception as e:
77+
logger.debug(f"kafka_propagator extract error: {e}", exc_info=True)
7878

7979
# Assisted by watsonx Code Assistant
8080
def inject(
@@ -98,15 +98,12 @@ def inject(
9898
span_id = span_context.span_id
9999
dictionary_carrier = self.extract_carrier_headers(carrier)
100100

101+
suppression_level = 1
101102
if dictionary_carrier:
102103
# Suppression `level` made in the child context or in the parent context
103104
# has priority over any non-suppressed `level` setting
104-
child_level = int(
105-
self.extract_instana_headers(dictionary_carrier)[2] or "1"
106-
)
107-
span_context.level = min(child_level, span_context.level)
108-
109-
serializable_level = str(span_context.level)
105+
suppression_level = int(self.extract_instana_headers(dictionary_carrier)[2])
106+
span_context.level = min(suppression_level, span_context.level)
110107

111108
def inject_key_value(carrier, key, value):
112109
if isinstance(carrier, list):
@@ -122,18 +119,18 @@ def inject_key_value(carrier, key, value):
122119
inject_key_value(
123120
carrier,
124121
self.KAFKA_HEADER_KEY_L_S,
125-
serializable_level.encode("utf-8"),
126-
)
127-
inject_key_value(
128-
carrier,
129-
self.KAFKA_HEADER_KEY_T,
130-
hex_id_limited(trace_id).encode("utf-8"),
122+
str(suppression_level).encode("utf-8"),
131123
)
132-
inject_key_value(
133-
carrier,
134-
self.KAFKA_HEADER_KEY_S,
135-
format_span_id(span_id).encode("utf-8"),
136-
)
137-
124+
if suppression_level == 1:
125+
inject_key_value(
126+
carrier,
127+
self.KAFKA_HEADER_KEY_T,
128+
hex_id_limited(trace_id).encode("utf-8"),
129+
)
130+
inject_key_value(
131+
carrier,
132+
self.KAFKA_HEADER_KEY_S,
133+
format_span_id(span_id).encode("utf-8"),
134+
)
138135
except Exception:
139136
logger.debug("KafkaPropagator - inject error:", exc_info=True)

0 commit comments

Comments
 (0)