Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sqlserver] Emit raw query statements and plans for non prepared statements #19421

Merged
merged 12 commits into from
Jan 27, 2025
16 changes: 16 additions & 0 deletions sqlserver/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,22 @@ files:
type: boolean
example: false
display_default: false
- name: collect_raw_query_statement
hidden: true
description: |
Configure the collection of raw query statements in query activity and execution plans.
Raw query statements and execution plans may contain sensitive information in query text.
Enabling this option will allow the collection and ingestion of raw query statements and
execution plans into Datadog. This option is disabled by default.
Note: This option only applies when `dbm` is enabled.
options:
- name: enabled
description: |
Set to `true` to collect the raw query statements.
value:
type: boolean
example: false
display_default: false
- name: log_unobfuscated_queries
hidden: true
description: |
Expand Down
1 change: 1 addition & 0 deletions sqlserver/changelog.d/19421.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for collecting raw query statements and explain plans when `collect_raw_query_statement.enabled` is true.
89 changes: 86 additions & 3 deletions sqlserver/datadog_checks/sqlserver/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@

from datadog_checks.base import is_affirmative
from datadog_checks.base.utils.db.sql import compute_sql_signature
from datadog_checks.base.utils.db.utils import DBMAsyncJob, default_json_event_encoding, obfuscate_sql_with_metadata
from datadog_checks.base.utils.db.utils import (
DBMAsyncJob,
RateLimitingTTLCache,
default_json_event_encoding,
obfuscate_sql_with_metadata,
)
from datadog_checks.base.utils.serialization import json
from datadog_checks.base.utils.tracking import tracked_method
from datadog_checks.sqlserver.config import SQLServerConfig
Expand Down Expand Up @@ -70,13 +75,15 @@
sess.host_name as host_name,
sess.program_name as program_name,
sess.is_user_process as is_user_process,
{input_buffer_columns}
{exec_request_columns}
FROM sys.dm_exec_sessions sess
INNER JOIN sys.dm_exec_connections c
ON sess.session_id = c.session_id
INNER JOIN sys.dm_exec_requests req
ON c.connection_id = req.connection_id
CROSS APPLY sys.dm_exec_sql_text(req.sql_handle) qt
{input_buffer_join}
WHERE
sess.session_id != @@spid AND
sess.status != 'sleeping'
Expand Down Expand Up @@ -145,6 +152,12 @@
"context_info",
]

INPUT_BUFFER_COLUMNS = [
"input_buffer.event_info as raw_statement",
]

INPUT_BUFFER_JOIN = "OUTER APPLY sys.dm_exec_input_buffer(req.session_id, req.request_id) input_buffer"


def _hash_to_hex(hash) -> str:
return binascii.hexlify(hash).decode("utf-8")
Expand Down Expand Up @@ -183,6 +196,12 @@ def __init__(self, check, config: SQLServerConfig):
self._activity_payload_max_bytes = MAX_PAYLOAD_BYTES
self._exec_requests_cols_cached = None

self._collect_raw_query_statement = self._config.collect_raw_query_statement.get("enabled", False)
self._raw_statement_text_cache = RateLimitingTTLCache(
maxsize=self._config.collect_raw_query_statement["cache_max_size"],
ttl=60 * 60 / self._config.collect_raw_query_statement["samples_per_hour_per_query"],
)

def _close_db_conn(self):
pass

Expand Down Expand Up @@ -214,12 +233,14 @@ def _get_idle_blocking_sessions(self, cursor, blocking_session_ids):
return rows

@tracked_method(agent_check_getter=agent_check_getter, track_result_length=True)
def _get_activity(self, cursor, exec_request_columns):
def _get_activity(self, cursor, exec_request_columns, input_buffer_columns, input_buffer_join):
self.log.debug("collecting sql server activity")
query = ACTIVITY_QUERY.format(
exec_request_columns=', '.join(['req.{}'.format(r) for r in exec_request_columns]),
proc_char_limit=self._config.stored_procedure_characters_limit,
tail_text_size=TAIL_TEXT_SIZE,
input_buffer_columns=input_buffer_columns,
input_buffer_join=input_buffer_join,
)
self.log.debug("Running query [%s]", query)
cursor.execute(query)
Expand Down Expand Up @@ -262,13 +283,69 @@ def _normalize_queries_and_filter_rows(self, rows, max_bytes_limit):
normalized_rows.append(row)
return normalized_rows

@tracked_method(agent_check_getter=agent_check_getter)
def _rows_to_raw_statement_events(self, rows):
for row in rows:
query_signature = row.get('query_signature')
if not query_signature:
continue

raw_statement = row.pop("raw_statement", None)
if not raw_statement:
self.log.debug("No raw statement found for query_signature=%s", query_signature)
continue

raw_query_signature = compute_sql_signature(raw_statement)
row["raw_query_signature"] = raw_query_signature
raw_statement_key = (query_signature, raw_query_signature)

if not self._raw_statement_text_cache.acquire(raw_statement_key):
continue

yield {
"timestamp": time.time() * 1000,
"host": self._check.resolved_hostname,
"ddagentversion": datadog_agent.get_version(),
"ddsource": "sqlserver",
"dbm_type": "rqt",
"ddtags": ",".join(self.tags),
'service': self._config.service,
"db": {
"instance": row.get('database_name', None),
"query_signature": query_signature,
"raw_query_signature": raw_query_signature,
"statement": raw_statement,
"metadata": {
"tables": row['dd_tables'],
"commands": row['dd_commands'],
"comments": row.get('dd_comments', None),
},
"procedure_signature": row.get("procedure_signature"),
"procedure_name": row.get("procedure_name"),
},
"sqlserver": {
"query_hash": row.get("query_hash"),
"query_plan_hash": row.get("query_plan_hash"),
},
}

def _get_exec_requests_cols_cached(self, cursor, expected_cols):
if self._exec_requests_cols_cached:
return self._exec_requests_cols_cached

self._exec_requests_cols_cached = self._get_available_requests_columns(cursor, expected_cols)
return self._exec_requests_cols_cached

def _get_input_buffer_columns_and_join(self):
input_buffer_columns = ""
input_buffer_join = ""

if self._collect_raw_query_statement:
input_buffer_columns = ", ".join(INPUT_BUFFER_COLUMNS) + ","
input_buffer_join = INPUT_BUFFER_JOIN

return input_buffer_columns, input_buffer_join

def _get_available_requests_columns(self, cursor, all_expected_columns):
cursor.execute("select TOP 0 * from sys.dm_exec_requests")
all_columns = {i[0] for i in cursor.description}
Expand Down Expand Up @@ -386,8 +463,14 @@ def collect_activity(self):
with self._check.connection.get_managed_cursor(key_prefix=self._conn_key_prefix) as cursor:
connections = self._get_active_connections(cursor)
request_cols = self._get_exec_requests_cols_cached(cursor, DM_EXEC_REQUESTS_COLS)
rows = self._get_activity(cursor, request_cols)
input_buffer_columns, input_buffer_join = self._get_input_buffer_columns_and_join()
rows = self._get_activity(cursor, request_cols, input_buffer_columns, input_buffer_join)
normalized_rows = self._normalize_queries_and_filter_rows(rows, MAX_PAYLOAD_BYTES)
if self._collect_raw_query_statement:
for raw_statement_event in self._rows_to_raw_statement_events(normalized_rows):
self._check.database_monitoring_query_sample(
json.dumps(raw_statement_event, default=default_json_event_encoding)
)
event = self._create_activity_event(normalized_rows, connections)
payload = json.dumps(event, default=default_json_event_encoding)
self._check.database_monitoring_query_activity(payload)
Expand Down
21 changes: 16 additions & 5 deletions sqlserver/datadog_checks/sqlserver/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
class SQLServerConfig:
def __init__(self, init_config, instance, log):
self.log = log
self.tags: list[str] = self._build_tags(
custom_tags=instance.get('tags', []),
propagate_agent_tags=self._should_propagate_agent_tags(instance, init_config),
)
self.reported_hostname: str = instance.get('reported_hostname')
self.autodiscovery: bool = is_affirmative(instance.get('database_autodiscovery'))
self.autodiscovery_include: list[str] = instance.get('autodiscovery_include', ['.*']) or ['.*']
Expand Down Expand Up @@ -107,13 +103,25 @@ def __init__(self, init_config, instance, log):
}
)
)
collect_raw_query_statement_config: dict = instance.get('collect_raw_query_statement', {}) or {}
self.collect_raw_query_statement = {
"enabled": is_affirmative(collect_raw_query_statement_config.get('enabled', False)),
"cache_max_size": int(collect_raw_query_statement_config.get('cache_max_size', 10000)),
"samples_per_hour_per_query": int(collect_raw_query_statement_config.get('samples_per_hour_per_query', 1)),
}
self.log_unobfuscated_queries: bool = is_affirmative(instance.get('log_unobfuscated_queries', False))
self.log_unobfuscated_plans: bool = is_affirmative(instance.get('log_unobfuscated_plans', False))
self.stored_procedure_characters_limit: int = instance.get('stored_procedure_characters_limit', PROC_CHAR_LIMIT)
self.connection_host: str = instance['host']
self.service = instance.get('service') or init_config.get('service') or ''
self.db_fragmentation_object_names = instance.get('db_fragmentation_object_names', []) or []

self.tags: list[str] = self._build_tags(
custom_tags=instance.get('tags', []),
propagate_agent_tags=self._should_propagate_agent_tags(instance, init_config),
additional_tags=["raw_query_statement:enabled"] if self.collect_raw_query_statement["enabled"] else [],
)

def _compile_valid_patterns(self, patterns: list[str]) -> re.Pattern:
valid_patterns = []

Expand All @@ -135,7 +143,7 @@ def _compile_valid_patterns(self, patterns: list[str]) -> re.Pattern:
# create unmatchable regex - https://stackoverflow.com/a/1845097/2157429
return re.compile(r'(?!x)x')

def _build_tags(self, custom_tags, propagate_agent_tags):
def _build_tags(self, custom_tags, propagate_agent_tags, additional_tags):
# Clean up tags in case there was a None entry in the instance
# e.g. if the yaml contains tags: but no actual tags
if custom_tags is None:
Expand All @@ -151,6 +159,9 @@ def _build_tags(self, custom_tags, propagate_agent_tags):
raise ConfigurationError(
'propagate_agent_tags enabled but there was an error fetching agent tags {}'.format(e)
)

if additional_tags:
tags.extend(additional_tags)
return tags

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ class Azure(BaseModel):
fully_qualified_domain_name: Optional[str] = None


class CollectRawQueryStatement(BaseModel):
model_config = ConfigDict(
arbitrary_types_allowed=True,
frozen=True,
)
enabled: Optional[bool] = None


class CollectSettings(BaseModel):
model_config = ConfigDict(
arbitrary_types_allowed=True,
Expand Down Expand Up @@ -343,6 +351,7 @@ class InstanceConfig(BaseModel):
autodiscovery_include: Optional[tuple[str, ...]] = None
aws: Optional[Aws] = None
azure: Optional[Azure] = None
collect_raw_query_statement: Optional[CollectRawQueryStatement] = None
collect_settings: Optional[CollectSettings] = None
command_timeout: Optional[int] = None
connection_string: Optional[str] = None
Expand Down
11 changes: 10 additions & 1 deletion sqlserver/datadog_checks/sqlserver/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under a 3-clause BSD style license (see LICENSE)

import binascii
import copy
import math
import time

Expand Down Expand Up @@ -234,6 +235,8 @@ def __init__(self, check, config: SQLServerConfig):
self._last_stats_query_time = None
self._max_query_metrics = self._config.statement_metrics_config.get("max_queries", 250)

self._collect_raw_query_statement = self._config.collect_raw_query_statement.get("enabled", False)

def _init_caches(self):
# full_statement_text_cache: limit the ingestion rate of full statement text events per query_signature
self._full_statement_text_cache = TTLCache(
Expand Down Expand Up @@ -585,7 +588,7 @@ def _collect_plans(self, rows, cursor, deadline):
query_signature = None
if 'database_name' in row:
tags += ["db:{}".format(row['database_name'])]
yield {
obfuscated_plan_event = {
"host": self._check.resolved_hostname,
"ddagentversion": datadog_agent.get_version(),
"ddsource": "sqlserver",
Expand Down Expand Up @@ -623,3 +626,9 @@ def _collect_plans(self, rows, cursor, deadline):
'total_elapsed_time': row.get('total_elapsed_time', None),
},
}
yield obfuscated_plan_event
if self._collect_raw_query_statement:
raw_plan_event = copy.deepcopy(obfuscated_plan_event)
raw_plan_event["dbm_type"] = "rqp" # raw query plan
raw_plan_event["db"]["plan"]["definition"] = raw_plan
yield raw_plan_event
7 changes: 7 additions & 0 deletions sqlserver/tests/compose-ha/sql/aoag_primary.sql
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,15 @@ BEGIN
END;
GO

CREATE PROCEDURE fredProcParams @Name nvarchar(8) = NULL AS
BEGIN
SELECT * FROM ϑings WHERE name like @Name;
END;
GO

GRANT EXECUTE on bobProcParams to bob;
GRANT EXECUTE on bobProc to bob;
GRANT EXECUTE on fredProcParams to fred;
GRANT EXECUTE on bobProc to fred;
GO

Expand Down
7 changes: 7 additions & 0 deletions sqlserver/tests/compose-high-cardinality-windows/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,15 @@ BEGIN
END;
GO

CREATE PROCEDURE fredProcParams @Name nvarchar(8) = NULL AS
BEGIN
SELECT * FROM ϑings WHERE name like @Name;
END;
GO

GRANT EXECUTE on bobProcParams to bob;
GRANT EXECUTE on bobProc to bob;
GRANT EXECUTE on fredProcParams to fred;
GRANT EXECUTE on bobProc to fred;
GO

Expand Down
7 changes: 7 additions & 0 deletions sqlserver/tests/compose-high-cardinality/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,15 @@ BEGIN
END;
GO

CREATE PROCEDURE fredProcParams @Name nvarchar(8) = NULL AS
BEGIN
SELECT * FROM ϑings WHERE name like @Name;
END;
GO

GRANT EXECUTE on bobProcParams to bob;
GRANT EXECUTE on bobProc to bob;
GRANT EXECUTE on fredProcParams to fred;
GRANT EXECUTE on bobProc to fred;
GO

Expand Down
8 changes: 8 additions & 0 deletions sqlserver/tests/compose-windows/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,16 @@ BEGIN
SELECT id FROM ϑings WHERE name = @P2;
END;
GO

CREATE PROCEDURE fredProcParams @Name nvarchar(8) = NULL AS
BEGIN
SELECT * FROM ϑings WHERE name like @Name;
END;
GO

GRANT EXECUTE on bobProcParams to bob;
GRANT EXECUTE on bobProc to bob;
GRANT EXECUTE on fredProcParams to fred;
GRANT EXECUTE on bobProc to fred;
GO

Expand Down
7 changes: 7 additions & 0 deletions sqlserver/tests/compose/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,15 @@ BEGIN
END;
GO

CREATE PROCEDURE fredProcParams @Name nvarchar(8) = NULL AS
BEGIN
SELECT * FROM ϑings WHERE name like @Name;
END;
GO

GRANT EXECUTE on bobProcParams to bob;
GRANT EXECUTE on bobProc to bob;
GRANT EXECUTE on fredProcParams to fred;
GRANT EXECUTE on bobProc to fred;
GO

Expand Down
Loading
Loading