1
1
# (c) Copyright IBM Corp. 2025
2
2
3
+ import os
3
4
from typing import Generator
4
5
5
6
import pytest
11
12
from confluent_kafka .admin import AdminClient , NewTopic
12
13
from opentelemetry .trace import SpanKind
13
14
15
+ from instana .options import StandardOptions
14
16
from instana .singletons import agent , tracer
17
+ from instana .util .config import parse_ignored_endpoints_from_yaml
15
18
from tests .helpers import get_first_span_by_filter , testenv
16
19
17
20
@@ -29,7 +32,7 @@ def _resource(self) -> Generator[None, None, None]:
29
32
self .kafka_client = AdminClient (self .kafka_config )
30
33
31
34
try :
32
- topics = self .kafka_client .create_topics ( # noqa: F841
35
+ _ = self .kafka_client .create_topics ( # noqa: F841
33
36
[
34
37
NewTopic (
35
38
testenv ["kafka_topic" ],
@@ -187,6 +190,130 @@ def test_trace_confluent_kafka_error(self) -> None:
187
190
== "num_messages must be between 0 and 1000000 (1M)"
188
191
)
189
192
193
+ def test_ignore_confluent_kafka (self ) -> None :
194
+ os .environ ["INSTANA_IGNORE_ENDPOINTS" ] = "kafka"
195
+ agent .options = StandardOptions ()
196
+
197
+ with tracer .start_as_current_span ("test" ):
198
+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes" )
199
+ self .producer .flush (timeout = 10 )
200
+
201
+ spans = self .recorder .queued_spans ()
202
+ assert len (spans ) == 2
203
+
204
+ filtered_spans = agent .filter_spans (spans )
205
+ assert len (filtered_spans ) == 1
206
+
207
+ def test_ignore_confluent_kafka_producer (self ) -> None :
208
+ os .environ ["INSTANA_IGNORE_ENDPOINTS" ] = "kafka:produce"
209
+ agent .options = StandardOptions ()
210
+
211
+ with tracer .start_as_current_span ("test-span" ):
212
+ # Produce some events
213
+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes1" )
214
+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes2" )
215
+ self .producer .flush ()
216
+
217
+ # Consume the events
218
+ consumer_config = self .kafka_config .copy ()
219
+ consumer_config ["group.id" ] = "my-group"
220
+ consumer_config ["auto.offset.reset" ] = "earliest"
221
+
222
+ consumer = Consumer (consumer_config )
223
+ consumer .subscribe ([testenv ["kafka_topic" ]])
224
+ consumer .consume (num_messages = 2 , timeout = 60 )
225
+
226
+ consumer .close ()
227
+
228
+ spans = self .recorder .queued_spans ()
229
+ assert len (spans ) == 5
230
+
231
+ filtered_spans = agent .filter_spans (spans )
232
+ assert len (filtered_spans ) == 3
233
+
234
+ def test_ignore_confluent_kafka_consumer (self ) -> None :
235
+ os .environ ["INSTANA_IGNORE_ENDPOINTS" ] = "kafka:consume"
236
+ agent .options = StandardOptions ()
237
+
238
+ with tracer .start_as_current_span ("test-span" ):
239
+ # Produce some events
240
+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes1" )
241
+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes2" )
242
+ self .producer .flush ()
243
+
244
+ # Consume the events
245
+ consumer_config = self .kafka_config .copy ()
246
+ consumer_config ["group.id" ] = "my-group"
247
+ consumer_config ["auto.offset.reset" ] = "earliest"
248
+
249
+ consumer = Consumer (consumer_config )
250
+ consumer .subscribe ([testenv ["kafka_topic" ]])
251
+ consumer .consume (num_messages = 2 , timeout = 60 )
252
+
253
+ consumer .close ()
254
+
255
+ spans = self .recorder .queued_spans ()
256
+ assert len (spans ) == 5
257
+
258
+ filtered_spans = agent .filter_spans (spans )
259
+ assert len (filtered_spans ) == 3
260
+
261
+ def test_ignore_confluent_specific_topic (self ) -> None :
262
+ os .environ ["INSTANA_IGNORE_ENDPOINTS" ] = "kafka:consume"
263
+ os .environ ["INSTANA_IGNORE_ENDPOINTS_PATH" ] = (
264
+ "tests/util/test_configuration-1.yaml"
265
+ )
266
+
267
+ agent .options = StandardOptions ()
268
+
269
+ with tracer .start_as_current_span ("test-span" ):
270
+ # Produce some events
271
+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes1" )
272
+ self .producer .flush ()
273
+
274
+ # Consume the events
275
+ consumer_config = self .kafka_config .copy ()
276
+ consumer_config ["group.id" ] = "my-group"
277
+ consumer_config ["auto.offset.reset" ] = "earliest"
278
+
279
+ consumer = Consumer (consumer_config )
280
+ consumer .subscribe ([testenv ["kafka_topic" ]])
281
+ consumer .consume (num_messages = 1 , timeout = 60 )
282
+
283
+ consumer .close ()
284
+
285
+ spans = self .recorder .queued_spans ()
286
+ assert len (spans ) == 3
287
+
288
+ filtered_spans = agent .filter_spans (spans )
289
+ assert len (filtered_spans ) == 1
290
+
291
+ def test_ignore_confluent_specific_topic_with_config_file (self ) -> None :
292
+ agent .options .ignore_endpoints = parse_ignored_endpoints_from_yaml (
293
+ "tests/util/test_configuration-1.yaml"
294
+ )
295
+
296
+ with tracer .start_as_current_span ("test-span" ):
297
+ # Produce some events
298
+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes1" )
299
+ self .producer .flush ()
300
+
301
+ # Consume the events
302
+ consumer_config = self .kafka_config .copy ()
303
+ consumer_config ["group.id" ] = "my-group"
304
+ consumer_config ["auto.offset.reset" ] = "earliest"
305
+
306
+ consumer = Consumer (consumer_config )
307
+ consumer .subscribe ([testenv ["kafka_topic" ]])
308
+ consumer .consume (num_messages = 1 , timeout = 60 )
309
+ consumer .close ()
310
+
311
+ spans = self .recorder .queued_spans ()
312
+ assert len (spans ) == 3
313
+
314
+ filtered_spans = agent .filter_spans (spans )
315
+ assert len (filtered_spans ) == 1
316
+
190
317
def test_confluent_kafka_consumer_root_exit (self ) -> None :
191
318
agent .options .allow_exit_as_root = True
192
319
0 commit comments