Skip to content

chore(telemetry): integration error telemetry logs are sent through a function call #13510

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 1 addition & 55 deletions ddtrace/internal/telemetry/logging.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import logging
import os
import traceback
from typing import Union

from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL


class DDTelemetryLogHandler(logging.Handler):
class DDTelemetryErrorHandler(logging.Handler):
CWD = os.getcwd()

def __init__(self, telemetry_writer):
Expand All @@ -16,58 +12,8 @@ def __init__(self, telemetry_writer):
def emit(self, record: logging.LogRecord) -> None:
"""This function will:
- Log all records with a level of ERROR or higher with telemetry
- Log all caught exception originated from ddtrace.contrib modules
"""
if record.levelno >= logging.ERROR:
# Capture start up errors
full_file_name = os.path.join(record.pathname, record.filename)
self.telemetry_writer.add_error(1, record.msg % record.args, full_file_name, record.lineno)

# Capture errors logged in the ddtrace integrations
if record.name.startswith("ddtrace.contrib"):
telemetry_level = (
TELEMETRY_LOG_LEVEL.ERROR
if record.levelno >= logging.ERROR
else TELEMETRY_LOG_LEVEL.WARNING
if record.levelno == logging.WARNING
else TELEMETRY_LOG_LEVEL.DEBUG
)
# Only collect telemetry for logs with a traceback
stack_trace = self._format_stack_trace(record.exc_info)
if stack_trace is not None:
# Report only exceptions with a stack trace
self.telemetry_writer.add_log(
telemetry_level,
record.msg,
stack_trace=stack_trace,
)

def _format_stack_trace(self, exc_info) -> Union[str, None]:
if exc_info is None:
return None

exc_type, exc_value, exc_traceback = exc_info
if exc_traceback:
tb = traceback.extract_tb(exc_traceback)
formatted_tb = ["Traceback (most recent call last):"]
for filename, lineno, funcname, srcline in tb:
if self._should_redact(filename):
formatted_tb.append(" <REDACTED>")
else:
relative_filename = self._format_file_path(filename)
formatted_line = f' File "{relative_filename}", line {lineno}, in {funcname}\n {srcline}'
formatted_tb.append(formatted_line)
if exc_type:
formatted_tb.append(f"{exc_type.__module__}.{exc_type.__name__}: {exc_value}")
return "\n".join(formatted_tb)

return None

def _should_redact(self, filename: str) -> bool:
return "ddtrace" not in filename

def _format_file_path(self, filename):
try:
return os.path.relpath(filename, start=self.CWD)
except ValueError:
return filename
45 changes: 42 additions & 3 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import sys
import time
import traceback
from typing import TYPE_CHECKING # noqa:F401
from typing import Any # noqa:F401
from typing import Dict # noqa:F401
Expand Down Expand Up @@ -38,7 +39,7 @@
from .data import get_host_info
from .data import get_python_config_vars
from .data import update_imported_dependencies
from .logging import DDTelemetryLogHandler
from .logging import DDTelemetryErrorHandler
from .metrics_namespaces import MetricNamespace
from .metrics_namespaces import MetricTagType
from .metrics_namespaces import MetricType
Expand Down Expand Up @@ -145,6 +146,7 @@ class TelemetryWriter(PeriodicService):
# payloads is only used in tests and is not required to process Telemetry events.
_sequence = itertools.count(1)
_ORIGINAL_EXCEPTHOOK = staticmethod(sys.excepthook)
CWD = os.getcwd()

def __init__(self, is_periodic=True, agentless=None):
# type: (bool, Optional[bool]) -> None
Expand Down Expand Up @@ -204,8 +206,8 @@ def __init__(self, is_periodic=True, agentless=None):
# Force app started for unit tests
if config.FORCE_START:
self._app_started()
if config.LOG_COLLECTION_ENABLED:
get_logger("ddtrace").addHandler(DDTelemetryLogHandler(self))
# Send logged error to telemetry
get_logger("ddtrace").addHandler(DDTelemetryErrorHandler(self))

def enable(self):
# type: () -> bool
Expand Down Expand Up @@ -503,6 +505,43 @@ def add_log(self, level, message, stack_trace="", tags=None):
# Logs are hashed using the message, level, tags, and stack_trace. This should prevent duplicatation.
self._logs.add(data)

def add_integration_error_log(self, msg: str, exc: BaseException) -> None:
if config.LOG_COLLECTION_ENABLED:
stack_trace = self._format_stack_trace(exc)
self.add_log(
TELEMETRY_LOG_LEVEL.ERROR,
msg,
stack_trace=stack_trace if stack_trace is not None else "",
)

def _format_stack_trace(self, exc: BaseException) -> Optional[str]:
exc_type, exc_value, exc_traceback = type(exc), exc, exc.__traceback__
if exc_traceback:
tb = traceback.extract_tb(exc_traceback)
formatted_tb = ["Traceback (most recent call last):"]
for filename, lineno, funcname, srcline in tb:
if self._should_redact(filename):
formatted_tb.append(" <REDACTED>")
formatted_tb.append(" <REDACTED>")
else:
relative_filename = self._format_file_path(filename)
formatted_line = f' File "{relative_filename}", line {lineno}, in {funcname}\n {srcline}'
formatted_tb.append(formatted_line)
if exc_type:
formatted_tb.append(f"{exc_type.__module__}.{exc_type.__name__}: {exc_value}")
return "\n".join(formatted_tb)

return None

def _should_redact(self, filename: str) -> bool:
return "ddtrace" not in filename

def _format_file_path(self, filename: str) -> str:
try:
return os.path.relpath(filename, start=self.CWD)
except ValueError:
return filename

def add_gauge_metric(self, namespace: TELEMETRY_NAMESPACE, name: str, value: float, tags: MetricTagType = None):
"""
Queues gauge metric
Expand Down
152 changes: 0 additions & 152 deletions tests/telemetry/test_telemetry_log_handler.py

This file was deleted.

64 changes: 64 additions & 0 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
from ddtrace import config
import ddtrace.internal.telemetry
from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL
from ddtrace.internal.telemetry.data import get_application
from ddtrace.internal.telemetry.data import get_host_info
from ddtrace.internal.telemetry.writer import TelemetryWriter
from ddtrace.internal.telemetry.writer import get_runtime_id
from ddtrace.internal.utils.version import _pep440_to_semver
from ddtrace.settings._config import DD_TRACE_OBFUSCATION_QUERY_STRING_REGEXP_DEFAULT
from ddtrace.settings._telemetry import config as telemetry_config
from tests.conftest import DEFAULT_DDTRACE_SUBPROCESS_TEST_SERVICE_NAME
from tests.utils import call_program
from tests.utils import override_global_config
Expand Down Expand Up @@ -952,3 +955,64 @@ def test_otel_config_telemetry(test_agent_session, run_python_code_in_subprocess
env_invalid_metrics = test_agent_session.get_metrics("otel.env.invalid")
tags = [m["tags"] for m in env_invalid_metrics]
assert tags == [["config_opentelemetry:otel_logs_exporter"]]


def test_add_integration_error_log(mock_time, telemetry_writer, test_agent_session):
"""Test add_integration_error_log functionality with real stack trace"""
try:
raise ValueError("Test exception")
except ValueError as e:
telemetry_writer.add_integration_error_log("Test error message", e)
telemetry_writer.periodic(force_flush=True)

log_events = test_agent_session.get_events("logs")
assert len(log_events) == 1

logs = log_events[0]["payload"]["logs"]
assert len(logs) == 1

log_entry = logs[0]
assert log_entry["level"] == TELEMETRY_LOG_LEVEL.ERROR.value
assert log_entry["message"] == "Test error message"

stack_trace = log_entry["stack_trace"]
expected_lines = [
"Traceback (most recent call last):",
" <REDACTED>",
" <REDACTED>",
"builtins.ValueError: Test exception",
]
for expected_line in expected_lines:
assert expected_line in stack_trace


def test_add_integration_error_log_with_log_collection_disabled(mock_time, telemetry_writer, test_agent_session):
"""Test that add_integration_error_log respects LOG_COLLECTION_ENABLED setting"""
original_value = telemetry_config.LOG_COLLECTION_ENABLED
try:
telemetry_config.LOG_COLLECTION_ENABLED = False

try:
raise ValueError("Test exception")
except ValueError as e:
telemetry_writer.add_integration_error_log("Test error message", e)
telemetry_writer.periodic(force_flush=True)

log_events = test_agent_session.get_events("logs", subprocess=True)
assert len(log_events) == 0
finally:
telemetry_config.LOG_COLLECTION_ENABLED = original_value


@pytest.mark.parametrize(
"filename, is_redacted",
[
("/path/to/file.py", True),
("/path/to/ddtrace/contrib/flask/file.py", False),
("/path/to/dd-trace-something/file.py", True),
],
)
def test_redact_filename(filename, is_redacted):
"""Test file redaction logic"""
writer = TelemetryWriter(is_periodic=False)
assert writer._should_redact(filename) == is_redacted
Loading