Skip to content

Commit f641954

Browse files
committed
feat: added span filtering for kafka-python module
Signed-off-by: Cagri Yonca <[email protected]>
1 parent fdaaf3e commit f641954

16 files changed

+733
-329
lines changed

pyproject.toml

+6-5
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,19 @@ dependencies = [
5252
"opentelemetry-api>=1.27.0",
5353
"opentelemetry-semantic-conventions>=0.48b0",
5454
"typing_extensions>=4.12.2",
55+
"pyyaml>=6.0.2",
5556
]
5657

5758
[project.entry-points."instana"]
5859
string = "instana:load"
5960

6061
[project.optional-dependencies]
6162
dev = [
62-
"pytest",
63-
"pytest-cov",
64-
"pytest-mock",
65-
"pre-commit>=3.0.0",
66-
"ruff"
63+
"pytest",
64+
"pytest-cov",
65+
"pytest-mock",
66+
"pre-commit>=3.0.0",
67+
"ruff",
6768
]
6869

6970
[project.urls]

src/instana/agent/host.py

+30-12
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from instana.options import StandardOptions
2323
from instana.util import to_json
2424
from instana.util.runtime import get_py_source
25-
from instana.util.span_utils import get_operation_specifier
25+
from instana.util.span_utils import get_operation_specifiers
2626
from instana.version import VERSION
2727

2828

@@ -351,13 +351,18 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
351351
Filters given span list using ignore-endpoint variable and returns the list of filtered spans.
352352
"""
353353
filtered_spans = []
354+
endpoint = ""
354355
for span in spans:
355356
if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"):
356357
service = span.n
357-
operation_specifier = get_operation_specifier(service)
358-
endpoint = span.data[service][operation_specifier]
359-
if isinstance(endpoint, str) and self.__is_service_or_endpoint_ignored(
360-
service, endpoint
358+
operation_specifier_key, service_specifier_key = (
359+
get_operation_specifiers(service)
360+
)
361+
if service == "kafka":
362+
endpoint = span.data[service][service_specifier_key]
363+
method = span.data[service][operation_specifier_key]
364+
if isinstance(method, str) and self.__is_endpoint_ignored(
365+
service, method, endpoint
361366
):
362367
continue
363368
else:
@@ -366,15 +371,28 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
366371
filtered_spans.append(span)
367372
return filtered_spans
368373

369-
def __is_service_or_endpoint_ignored(
370-
self, service: str, endpoint: str = ""
374+
def __is_endpoint_ignored(
375+
self,
376+
service: str,
377+
method: str = "",
378+
endpoint: str = "",
371379
) -> bool:
372380
"""Check if the given service and endpoint combination should be ignored."""
373-
374-
return (
375-
service.lower() in self.options.ignore_endpoints
376-
or f"{service.lower()}.{endpoint.lower()}" in self.options.ignore_endpoints
377-
)
381+
service = service.lower()
382+
method = method.lower()
383+
endpoint = endpoint.lower()
384+
filter_rules = [
385+
f"{service}.{method}", # service.method
386+
f"{service}.*", # service.*
387+
]
388+
389+
if service == "kafka" and endpoint:
390+
filter_rules += [
391+
f"{service}.{method}.{endpoint}", # service.method.endpoint
392+
f"{service}.*.{endpoint}", # service.*.endpoint
393+
f"{service}.{method}.*", # service.method.*
394+
]
395+
return any(rule in self.options.ignore_endpoints for rule in filter_rules)
378396

379397
def handle_agent_tasks(self, task: Dict[str, Any]) -> None:
380398
"""

src/instana/options.py

+18-10
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from typing import Any, Dict
2020

2121
from instana.log import logger
22-
from instana.util.config import parse_ignored_endpoints
22+
from instana.util.config import (
23+
parse_ignored_endpoints,
24+
parse_ignored_endpoints_from_yaml,
25+
)
2326
from instana.util.runtime import determine_service_name
2427
from instana.configurator import config
2528

@@ -44,18 +47,23 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
4447
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
4548
)
4649

47-
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
48-
self.ignore_endpoints = parse_ignored_endpoints(
49-
os.environ["INSTANA_IGNORE_ENDPOINTS"]
50+
if "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ:
51+
self.ignore_endpoints = parse_ignored_endpoints_from_yaml(
52+
os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"]
5053
)
5154
else:
52-
if (
53-
isinstance(config.get("tracing"), dict)
54-
and "ignore_endpoints" in config["tracing"]
55-
):
55+
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
5656
self.ignore_endpoints = parse_ignored_endpoints(
57-
config["tracing"]["ignore_endpoints"],
57+
os.environ["INSTANA_IGNORE_ENDPOINTS"]
5858
)
59+
else:
60+
if (
61+
isinstance(config.get("tracing"), dict)
62+
and "ignore_endpoints" in config["tracing"]
63+
):
64+
self.ignore_endpoints = parse_ignored_endpoints(
65+
config["tracing"]["ignore_endpoints"],
66+
)
5967

6068
if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1":
6169
self.allow_exit_as_root = True
@@ -141,7 +149,7 @@ def set_from(self, res_data: Dict[str, Any]) -> None:
141149
"""
142150
if not res_data or not isinstance(res_data, dict):
143151
logger.debug(f"options.set_from: Wrong data type - {type(res_data)}")
144-
return
152+
return
145153

146154
if "secrets" in res_data:
147155
self.set_secrets(res_data["secrets"])

src/instana/util/config.py

+78-20
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,41 @@
1+
# (c) Copyright IBM Corp. 2025
2+
3+
import itertools
4+
import os
15
from typing import Any, Dict, List, Union
6+
27
from instana.log import logger
8+
from instana.util.config_reader import ConfigReader
39

410

511
def parse_service_pair(pair: str) -> List[str]:
612
"""
713
Parses a pair string to prepare a list of ignored endpoints.
814
915
@param pair: String format:
10-
- "service1:endpoint1,endpoint2" or "service1:endpoint1" or "service1"
11-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
16+
- "service1:method1,method2" or "service1:method1" or "service1"
17+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
1218
"""
1319
pair_list = []
1420
if ":" in pair:
15-
service, endpoints = pair.split(":", 1)
21+
service, methods = pair.split(":", 1)
1622
service = service.strip()
17-
endpoint_list = [ep.strip() for ep in endpoints.split(",") if ep.strip()]
23+
method_list = [ep.strip() for ep in methods.split(",") if ep.strip()]
1824

19-
for endpoint in endpoint_list:
20-
pair_list.append(f"{service}.{endpoint}")
25+
for method in method_list:
26+
pair_list.append(f"{service}.{method}")
2127
else:
22-
pair_list.append(pair)
28+
pair_list.append(f"{pair}.*")
2329
return pair_list
2430

2531

26-
def parse_ignored_endpoints_string(params: str) -> List[str]:
32+
def parse_ignored_endpoints_string(params: Union[str, os.PathLike]) -> List[str]:
2733
"""
2834
Parses a string to prepare a list of ignored endpoints.
2935
3036
@param params: String format:
31-
- "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
32-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
37+
- "service1:method1,method2;service2:method3" or "service1;service2"
38+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
3339
"""
3440
ignore_endpoints = []
3541
if params:
@@ -46,18 +52,45 @@ def parse_ignored_endpoints_dict(params: Dict[str, Any]) -> List[str]:
4652
Parses a dictionary to prepare a list of ignored endpoints.
4753
4854
@param params: Dict format:
49-
- {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]}
50-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
55+
- {"service1": ["method1", "method2"], "service2": ["method3"]}
56+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
5157
"""
5258
ignore_endpoints = []
5359

54-
for service, endpoints in params.items():
55-
if not endpoints: # filtering all service
56-
ignore_endpoints.append(service.lower())
60+
for service, methods in params.items():
61+
if not methods: # filtering all service
62+
ignore_endpoints.append(f"{service.lower()}.*")
5763
else: # filtering specific endpoints
58-
for endpoint in endpoints:
59-
ignore_endpoints.append(f"{service.lower()}.{endpoint.lower()}")
64+
ignore_endpoints = parse_endpoints_of_service(
65+
ignore_endpoints, service, methods
66+
)
67+
68+
return ignore_endpoints
6069

70+
71+
def parse_endpoints_of_service(
72+
ignore_endpoints: List[str],
73+
service: str,
74+
methods: Union[str, List[str]],
75+
) -> List[str]:
76+
"""
77+
Parses endpoints of each service.
78+
79+
@param ignore_endpoints: A list of rules for endpoints to be filtered.
80+
@param service: The name of the service to be filtered.
81+
@param methods: A list of specific endpoints of the service to be filtered.
82+
"""
83+
if service == "kafka" and isinstance(methods, list):
84+
for rule in methods:
85+
for method, endpoint in itertools.product(
86+
rule["methods"], rule["endpoints"]
87+
):
88+
ignore_endpoints.append(
89+
f"{service.lower()}.{method.lower()}.{endpoint.lower()}"
90+
)
91+
else:
92+
for method in methods:
93+
ignore_endpoints.append(f"{service.lower()}.{method.lower()}")
6194
return ignore_endpoints
6295

6396

@@ -66,9 +99,9 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]:
6699
Parses input to prepare a list for ignored endpoints.
67100
68101
@param params: Can be either:
69-
- String: "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
70-
- Dict: {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]}
71-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
102+
- String: "service1:method1,method2;service2:method3" or "service1;service2"
103+
- Dict: {"service1": ["method1", "method2"], "service2": ["method3"]}
104+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
72105
"""
73106
try:
74107
if isinstance(params, str):
@@ -80,3 +113,28 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]:
80113
except Exception as e:
81114
logger.debug("Error parsing ignored endpoints: %s", str(e))
82115
return []
116+
117+
118+
def parse_ignored_endpoints_from_yaml(file_path: str) -> List[str]:
119+
"""
120+
Parses configuration yaml file and prepares a list of ignored endpoints.
121+
122+
@param file_path: Path of the file as a string
123+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*", "kafka.method.topic", "kafka.*.topic", "kafka.method.*"]
124+
"""
125+
config_reader = ConfigReader(file_path)
126+
ignore_endpoints_dict = None
127+
if "tracing" in config_reader.data:
128+
ignore_endpoints_dict = config_reader.data["tracing"].get("ignore-endpoints")
129+
elif "com.instana.tracing" in config_reader.data:
130+
logger.warning(
131+
'Please use "tracing" instead of "com.instana.tracing" for local configuration file.'
132+
)
133+
ignore_endpoints_dict = config_reader.data["com.instana.tracing"].get(
134+
"ignore-endpoints"
135+
)
136+
if ignore_endpoints_dict:
137+
ignored_endpoints = parse_ignored_endpoints(ignore_endpoints_dict)
138+
return ignored_endpoints
139+
else:
140+
return []

src/instana/util/config_reader.py

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# (c) Copyright IBM Corp. 2025
2+
3+
from typing import Union
4+
from instana.log import logger
5+
import yaml
6+
7+
8+
class ConfigReader:
9+
def __init__(self, file_path: Union[str]) -> None:
10+
self.file_path = file_path
11+
self.data = None
12+
self.load_file()
13+
14+
def load_file(self) -> None:
15+
"""Loads and parses the YAML file"""
16+
try:
17+
with open(self.file_path, "r") as file:
18+
self.data = yaml.safe_load(file)
19+
except FileNotFoundError:
20+
logger.error(f"Configuration file has not found: {self.file_path}")
21+
except yaml.YAMLError as e:
22+
logger.error(f"Error parsing YAML file: {e}")

src/instana/util/span_utils.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
# (c) Copyright IBM Corp. 2025
22

3-
from typing import Optional
3+
from typing import Tuple
44

55

6-
def get_operation_specifier(span_name: str) -> Optional[str]:
6+
def get_operation_specifiers(span_name: str) -> Tuple[str, str]:
77
"""Get the specific operation specifier for the given span."""
8-
operation_specifier = ""
8+
operation_specifier_key = ""
9+
service_specifier_key = ""
910
if span_name == "redis":
10-
operation_specifier = "command"
11+
operation_specifier_key = "command"
1112
elif span_name == "dynamodb":
12-
operation_specifier = "op"
13-
return operation_specifier
13+
operation_specifier_key = "op"
14+
elif span_name == "kafka":
15+
operation_specifier_key = "access"
16+
service_specifier_key = "service"
17+
return operation_specifier_key, service_specifier_key

tests/agent/test_host.py

+10-20
Original file line numberDiff line numberDiff line change
@@ -692,31 +692,21 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None:
692692
assert "should_send_snapshot_data: True" in caplog.messages
693693

694694
def test_is_service_or_endpoint_ignored(self) -> None:
695-
self.agent.options.ignore_endpoints.append("service1")
696-
self.agent.options.ignore_endpoints.append("service2.endpoint1")
695+
self.agent.options.ignore_endpoints.append("service1.*")
696+
self.agent.options.ignore_endpoints.append("service2.method1")
697697

698698
# ignore all endpoints of service1
699-
assert self.agent._HostAgent__is_service_or_endpoint_ignored("service1")
700-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
701-
"service1", "endpoint1"
702-
)
703-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
704-
"service1", "endpoint2"
705-
)
699+
assert self.agent._HostAgent__is_endpoint_ignored("service1")
700+
assert self.agent._HostAgent__is_endpoint_ignored("service1", "method1")
701+
assert self.agent._HostAgent__is_endpoint_ignored("service1", "method2")
706702

707703
# case-insensitive
708-
assert self.agent._HostAgent__is_service_or_endpoint_ignored("SERVICE1")
709-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
710-
"service1", "ENDPOINT1"
711-
)
704+
assert self.agent._HostAgent__is_endpoint_ignored("SERVICE1")
705+
assert self.agent._HostAgent__is_endpoint_ignored("service1", "METHOD1")
712706

713707
# ignore only endpoint1 of service2
714-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
715-
"service2", "endpoint1"
716-
)
717-
assert not self.agent._HostAgent__is_service_or_endpoint_ignored(
718-
"service2", "endpoint2"
719-
)
708+
assert self.agent._HostAgent__is_endpoint_ignored("service2", "method1")
709+
assert not self.agent._HostAgent__is_endpoint_ignored("service2", "method2")
720710

721711
# don't ignore other services
722-
assert not self.agent._HostAgent__is_service_or_endpoint_ignored("service3")
712+
assert not self.agent._HostAgent__is_endpoint_ignored("service3")

tests/clients/boto3/test_boto3_dynamodb.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_ignore_dynamodb(self) -> None:
9494
assert dynamodb_span not in filtered_spans
9595

9696
def test_ignore_create_table(self) -> None:
97-
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb.createtable"
97+
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb:createtable"
9898
agent.options = StandardOptions()
9999

100100
with tracer.start_as_current_span("test"):

0 commit comments

Comments
 (0)