Skip to content

Commit e46923d

Browse files
committed
feat: added span filtering for kafka-python module
Signed-off-by: Cagri Yonca <[email protected]>
1 parent 3f9798d commit e46923d

16 files changed

+482
-142
lines changed

pyproject.toml

+6-5
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,19 @@ dependencies = [
5252
"opentelemetry-api>=1.27.0",
5353
"opentelemetry-semantic-conventions>=0.48b0",
5454
"typing_extensions>=4.12.2",
55+
"pyyaml>=6.0.2",
5556
]
5657

5758
[project.entry-points."instana"]
5859
string = "instana:load"
5960

6061
[project.optional-dependencies]
6162
dev = [
62-
"pytest",
63-
"pytest-cov",
64-
"pytest-mock",
65-
"pre-commit>=3.0.0",
66-
"ruff"
63+
"pytest",
64+
"pytest-cov",
65+
"pytest-mock",
66+
"pre-commit>=3.0.0",
67+
"ruff",
6768
]
6869

6970
[project.urls]

src/instana/agent/host.py

+30-12
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from instana.options import StandardOptions
2323
from instana.util import to_json
2424
from instana.util.runtime import get_py_source
25-
from instana.util.span_utils import get_operation_specifier
25+
from instana.util.span_utils import get_operation_specifiers
2626
from instana.version import VERSION
2727

2828

@@ -351,13 +351,18 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
351351
Filters given span list using ignore-endpoint variable and returns the list of filtered spans.
352352
"""
353353
filtered_spans = []
354+
endpoint = ""
354355
for span in spans:
355356
if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"):
356357
service = span.n
357-
operation_specifier = get_operation_specifier(service)
358-
endpoint = span.data[service][operation_specifier]
359-
if isinstance(endpoint, str) and self.__is_service_or_endpoint_ignored(
360-
service, endpoint
358+
operation_specifier_key, service_specifier_key = (
359+
get_operation_specifiers(service)
360+
)
361+
if service == "kafka":
362+
endpoint = span.data[service][service_specifier_key]
363+
method = span.data[service][operation_specifier_key]
364+
if isinstance(method, str) and self.__is_endpoint_ignored(
365+
service, method, endpoint
361366
):
362367
continue
363368
else:
@@ -366,15 +371,28 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
366371
filtered_spans.append(span)
367372
return filtered_spans
368373

369-
def __is_service_or_endpoint_ignored(
370-
self, service: str, endpoint: str = ""
374+
def __is_endpoint_ignored(
375+
self,
376+
service: str,
377+
method: str = "",
378+
endpoint: str = "",
371379
) -> bool:
372380
"""Check if the given service and endpoint combination should be ignored."""
373-
374-
return (
375-
service.lower() in self.options.ignore_endpoints
376-
or f"{service.lower()}.{endpoint.lower()}" in self.options.ignore_endpoints
377-
)
381+
service = service.lower()
382+
method = method.lower()
383+
endpoint = endpoint.lower()
384+
filter_rules = [
385+
f"{service}.{method}", # service.method
386+
f"{service}.*", # service.*
387+
]
388+
389+
if service == "kafka" and endpoint:
390+
filter_rules += [
391+
f"{service}.{method}.{endpoint}", # service.method.endpoint
392+
f"{service}.*.{endpoint}", # service.*.endpoint
393+
f"{service}.{method}.*", # service.method.*
394+
]
395+
return any(rule in self.options.ignore_endpoints for rule in filter_rules)
378396

379397
def handle_agent_tasks(self, task: Dict[str, Any]) -> None:
380398
"""

src/instana/options.py

+17-9
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from typing import Any, Dict
2020

2121
from instana.log import logger
22-
from instana.util.config import parse_ignored_endpoints
22+
from instana.util.config import (
23+
parse_ignored_endpoints,
24+
parse_ignored_endpoints_from_yaml,
25+
)
2326
from instana.util.runtime import determine_service_name
2427
from instana.configurator import config
2528

@@ -44,18 +47,23 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
4447
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
4548
)
4649

47-
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
48-
self.ignore_endpoints = parse_ignored_endpoints(
49-
os.environ["INSTANA_IGNORE_ENDPOINTS"]
50+
if "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ:
51+
self.ignore_endpoints = parse_ignored_endpoints_from_yaml(
52+
os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"]
5053
)
5154
else:
52-
if (
53-
isinstance(config.get("tracing"), dict)
54-
and "ignore_endpoints" in config["tracing"]
55-
):
55+
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
5656
self.ignore_endpoints = parse_ignored_endpoints(
57-
config["tracing"]["ignore_endpoints"],
57+
os.environ["INSTANA_IGNORE_ENDPOINTS"]
5858
)
59+
else:
60+
if (
61+
isinstance(config.get("tracing"), dict)
62+
and "ignore_endpoints" in config["tracing"]
63+
):
64+
self.ignore_endpoints = parse_ignored_endpoints(
65+
config["tracing"]["ignore_endpoints"],
66+
)
5967

6068
if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1":
6169
self.allow_exit_as_root = True

src/instana/util/config.py

+72-19
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,28 @@
1+
import itertools
12
from typing import Any, Dict, List, Union
3+
24
from instana.log import logger
5+
from instana.util.config_reader import ConfigReader
36

47

58
def parse_service_pair(pair: str) -> List[str]:
69
"""
710
Parses a pair string to prepare a list of ignored endpoints.
811
912
@param pair: String format:
10-
- "service1:endpoint1,endpoint2" or "service1:endpoint1" or "service1"
11-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
13+
- "service1:method1,method2" or "service1:method1" or "service1"
14+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
1215
"""
1316
pair_list = []
1417
if ":" in pair:
15-
service, endpoints = pair.split(":", 1)
18+
service, methods = pair.split(":", 1)
1619
service = service.strip()
17-
endpoint_list = [ep.strip() for ep in endpoints.split(",") if ep.strip()]
20+
method_list = [ep.strip() for ep in methods.split(",") if ep.strip()]
1821

19-
for endpoint in endpoint_list:
20-
pair_list.append(f"{service}.{endpoint}")
22+
for method in method_list:
23+
pair_list.append(f"{service}.{method}")
2124
else:
22-
pair_list.append(pair)
25+
pair_list.append(f"{pair}.*")
2326
return pair_list
2427

2528

@@ -28,8 +31,8 @@ def parse_ignored_endpoints_string(params: str) -> List[str]:
2831
Parses a string to prepare a list of ignored endpoints.
2932
3033
@param params: String format:
31-
- "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
32-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
34+
- "service1:method1,method2;service2:method3" or "service1;service2"
35+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
3336
"""
3437
ignore_endpoints = []
3538
if params:
@@ -46,18 +49,45 @@ def parse_ignored_endpoints_dict(params: Dict[str, Any]) -> List[str]:
4649
Parses a dictionary to prepare a list of ignored endpoints.
4750
4851
@param params: Dict format:
49-
- {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]}
50-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
52+
- {"service1": ["method1", "method2"], "service2": ["method3"]}
53+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
5154
"""
5255
ignore_endpoints = []
5356

54-
for service, endpoints in params.items():
55-
if not endpoints: # filtering all service
56-
ignore_endpoints.append(service.lower())
57+
for service, methods in params.items():
58+
if not methods: # filtering all service
59+
ignore_endpoints.append(f"{service.lower()}.*")
5760
else: # filtering specific endpoints
58-
for endpoint in endpoints:
59-
ignore_endpoints.append(f"{service.lower()}.{endpoint.lower()}")
61+
ignore_endpoints = parse_endpoints_of_service(
62+
ignore_endpoints, service, methods
63+
)
64+
65+
return ignore_endpoints
66+
6067

68+
def parse_endpoints_of_service(
69+
ignore_endpoints: List[str],
70+
service: str,
71+
methods: Union[str, List[str]],
72+
) -> List[str]:
73+
"""
74+
Parses endpoints of each service.
75+
76+
@param ignore_endpoints: A list of rules for endpoints to be filtered.
77+
@param service: The name of the service to be filtered.
78+
@param methods: A list of specific endpoints of the service to be filtered.
79+
"""
80+
if service == "kafka" and isinstance(methods, list):
81+
for rule in methods:
82+
for method, endpoint in itertools.product(
83+
rule["methods"], rule["endpoints"]
84+
):
85+
ignore_endpoints.append(
86+
f"{service.lower()}.{method.lower()}.{endpoint.lower()}"
87+
)
88+
else:
89+
for method in methods:
90+
ignore_endpoints.append(f"{service.lower()}.{method.lower()}")
6191
return ignore_endpoints
6292

6393

@@ -66,9 +96,9 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]:
6696
Parses input to prepare a list for ignored endpoints.
6797
6898
@param params: Can be either:
69-
- String: "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
70-
- Dict: {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]}
71-
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
99+
- String: "service1:method1,method2;service2:method3" or "service1;service2"
100+
- Dict: {"service1": ["method1", "method2"], "service2": ["method3"]}
101+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
72102
"""
73103
try:
74104
if isinstance(params, str):
@@ -80,3 +110,26 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]:
80110
except Exception as e:
81111
logger.debug("Error parsing ignored endpoints: %s", str(e))
82112
return []
113+
114+
115+
def parse_ignored_endpoints_from_yaml(file_path: str) -> List[str]:
116+
"""
117+
Parses configuration yaml file and prepares a list of ignored endpoints.
118+
119+
@param file_path: Path of the file as a string
120+
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*", "kafka.method.topic", "kafka.*.topic", "kafka.method.*"]
121+
"""
122+
config_reader = ConfigReader(file_path)
123+
ignore_endpoints_dict = None
124+
if "tracing" in config_reader.data:
125+
ignore_endpoints_dict = config_reader.data["tracing"].get("ignore-endpoints")
126+
elif "com.instana.tracing" in config_reader.data:
127+
logger.debug('Please use "tracing" instead of "com.instana.tracing"')
128+
ignore_endpoints_dict = config_reader.data["com.instana.tracing"].get(
129+
"ignore-endpoints"
130+
)
131+
if ignore_endpoints_dict:
132+
ignored_endpoints = parse_ignored_endpoints(ignore_endpoints_dict)
133+
return ignored_endpoints
134+
else:
135+
return []

src/instana/util/config_reader.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from instana.log import logger
2+
import yaml
3+
4+
5+
class ConfigReader:
6+
def __init__(self, file_path: str) -> None:
7+
self.file_path = file_path
8+
self.data = None
9+
self.load_file()
10+
11+
def load_file(self) -> None:
12+
"""Loads and parses the YAML file"""
13+
try:
14+
with open(self.file_path, "r") as file:
15+
self.data = yaml.safe_load(file)
16+
except FileNotFoundError:
17+
logger.error(f"Configuration file has not found: {self.file_path}")
18+
except yaml.YAMLError as e:
19+
logger.error(f"Error parsing YAML file: {e}")

src/instana/util/span_utils.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
# (c) Copyright IBM Corp. 2025
22

3-
from typing import Optional
3+
from typing import Tuple
44

55

6-
def get_operation_specifier(span_name: str) -> Optional[str]:
6+
def get_operation_specifiers(span_name: str) -> Tuple[str, str]:
77
"""Get the specific operation specifier for the given span."""
8-
operation_specifier = ""
8+
operation_specifier_key = ""
9+
service_specifier_key = ""
910
if span_name == "redis":
10-
operation_specifier = "command"
11+
operation_specifier_key = "command"
1112
elif span_name == "dynamodb":
12-
operation_specifier = "op"
13-
return operation_specifier
13+
operation_specifier_key = "op"
14+
elif span_name == "kafka":
15+
operation_specifier_key = "access"
16+
service_specifier_key = "service"
17+
return operation_specifier_key, service_specifier_key

tests/agent/test_host.py

+10-20
Original file line numberDiff line numberDiff line change
@@ -692,31 +692,21 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None:
692692
assert "should_send_snapshot_data: True" in caplog.messages
693693

694694
def test_is_service_or_endpoint_ignored(self) -> None:
695-
self.agent.options.ignore_endpoints.append("service1")
696-
self.agent.options.ignore_endpoints.append("service2.endpoint1")
695+
self.agent.options.ignore_endpoints.append("service1.*")
696+
self.agent.options.ignore_endpoints.append("service2.method1")
697697

698698
# ignore all endpoints of service1
699-
assert self.agent._HostAgent__is_service_or_endpoint_ignored("service1")
700-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
701-
"service1", "endpoint1"
702-
)
703-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
704-
"service1", "endpoint2"
705-
)
699+
assert self.agent._HostAgent__is_endpoint_ignored("service1")
700+
assert self.agent._HostAgent__is_endpoint_ignored("service1", "method1")
701+
assert self.agent._HostAgent__is_endpoint_ignored("service1", "method2")
706702

707703
# case-insensitive
708-
assert self.agent._HostAgent__is_service_or_endpoint_ignored("SERVICE1")
709-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
710-
"service1", "ENDPOINT1"
711-
)
704+
assert self.agent._HostAgent__is_endpoint_ignored("SERVICE1")
705+
assert self.agent._HostAgent__is_endpoint_ignored("service1", "METHOD1")
712706

713707
# ignore only endpoint1 of service2
714-
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
715-
"service2", "endpoint1"
716-
)
717-
assert not self.agent._HostAgent__is_service_or_endpoint_ignored(
718-
"service2", "endpoint2"
719-
)
708+
assert self.agent._HostAgent__is_endpoint_ignored("service2", "method1")
709+
assert not self.agent._HostAgent__is_endpoint_ignored("service2", "method2")
720710

721711
# don't ignore other services
722-
assert not self.agent._HostAgent__is_service_or_endpoint_ignored("service3")
712+
assert not self.agent._HostAgent__is_endpoint_ignored("service3")

tests/clients/boto3/test_boto3_dynamodb.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_ignore_dynamodb(self) -> None:
9494
assert dynamodb_span not in filtered_spans
9595

9696
def test_ignore_create_table(self) -> None:
97-
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb.createtable"
97+
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb:createtable"
9898
agent.options = StandardOptions()
9999

100100
with tracer.start_as_current_span("test"):

0 commit comments

Comments
 (0)