|
9 | 9 |
|
10 | 10 | from datadog_checks.base import is_affirmative
|
11 | 11 | from datadog_checks.base.utils.db.sql import compute_sql_signature
|
12 |
| -from datadog_checks.base.utils.db.utils import DBMAsyncJob, default_json_event_encoding, obfuscate_sql_with_metadata |
| 12 | +from datadog_checks.base.utils.db.utils import ( |
| 13 | + DBMAsyncJob, |
| 14 | + RateLimitingTTLCache, |
| 15 | + default_json_event_encoding, |
| 16 | + obfuscate_sql_with_metadata, |
| 17 | +) |
13 | 18 | from datadog_checks.base.utils.serialization import json
|
14 | 19 | from datadog_checks.base.utils.tracking import tracked_method
|
15 | 20 | from datadog_checks.sqlserver.config import SQLServerConfig
|
|
70 | 75 | sess.host_name as host_name,
|
71 | 76 | sess.program_name as program_name,
|
72 | 77 | sess.is_user_process as is_user_process,
|
| 78 | + {input_buffer_columns} |
73 | 79 | {exec_request_columns}
|
74 | 80 | FROM sys.dm_exec_sessions sess
|
75 | 81 | INNER JOIN sys.dm_exec_connections c
|
76 | 82 | ON sess.session_id = c.session_id
|
77 | 83 | INNER JOIN sys.dm_exec_requests req
|
78 | 84 | ON c.connection_id = req.connection_id
|
79 | 85 | CROSS APPLY sys.dm_exec_sql_text(req.sql_handle) qt
|
| 86 | + {input_buffer_join} |
80 | 87 | WHERE
|
81 | 88 | sess.session_id != @@spid AND
|
82 | 89 | sess.status != 'sleeping'
|
|
153 | 160 | "context_info",
|
154 | 161 | ]
|
155 | 162 |
|
| 163 | +INPUT_BUFFER_COLUMNS = [ |
| 164 | + "input_buffer.event_info as raw_statement", |
| 165 | +] |
| 166 | + |
| 167 | +INPUT_BUFFER_JOIN = "OUTER APPLY sys.dm_exec_input_buffer(req.session_id, req.request_id) input_buffer" |
| 168 | + |
156 | 169 |
|
157 | 170 | def _hash_to_hex(hash) -> str:
|
158 | 171 | return binascii.hexlify(hash).decode("utf-8")
|
@@ -195,6 +208,12 @@ def __init__(self, check, config: SQLServerConfig):
|
195 | 208 | )
|
196 | 209 | self._time_since_last_activity_event = 0
|
197 | 210 |
|
| 211 | + self._collect_raw_query_statement = self._config.collect_raw_query_statement.get("enabled", False) |
| 212 | + self._raw_statement_text_cache = RateLimitingTTLCache( |
| 213 | + maxsize=self._config.collect_raw_query_statement["cache_max_size"], |
| 214 | + ttl=60 * 60 / self._config.collect_raw_query_statement["samples_per_hour_per_query"], |
| 215 | + ) |
| 216 | + |
198 | 217 | def _close_db_conn(self):
|
199 | 218 | pass
|
200 | 219 |
|
@@ -245,12 +264,14 @@ def _append_filter(filter: str) -> str:
|
245 | 264 | return rows
|
246 | 265 |
|
247 | 266 | @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True)
|
248 |
| - def _get_activity(self, cursor, exec_request_columns): |
| 267 | + def _get_activity(self, cursor, exec_request_columns, input_buffer_columns, input_buffer_join): |
249 | 268 | self.log.debug("collecting sql server activity")
|
250 | 269 | query = ACTIVITY_QUERY.format(
|
251 | 270 | exec_request_columns=', '.join(['req.{}'.format(r) for r in exec_request_columns]),
|
252 | 271 | proc_char_limit=self._config.stored_procedure_characters_limit,
|
253 | 272 | tail_text_size=TAIL_TEXT_SIZE,
|
| 273 | + input_buffer_columns=input_buffer_columns, |
| 274 | + input_buffer_join=input_buffer_join, |
254 | 275 | )
|
255 | 276 | self.log.debug("Running query [%s]", query)
|
256 | 277 | cursor.execute(query)
|
@@ -293,13 +314,69 @@ def _normalize_queries_and_filter_rows(self, rows, max_bytes_limit):
|
293 | 314 | normalized_rows.append(row)
|
294 | 315 | return normalized_rows
|
295 | 316 |
|
| 317 | + @tracked_method(agent_check_getter=agent_check_getter) |
| 318 | + def _rows_to_raw_statement_events(self, rows): |
| 319 | + for row in rows: |
| 320 | + query_signature = row.get('query_signature') |
| 321 | + if not query_signature: |
| 322 | + continue |
| 323 | + |
| 324 | + raw_statement = row.pop("raw_statement", None) |
| 325 | + if not raw_statement: |
| 326 | + self.log.debug("No raw statement found for query_signature=%s", query_signature) |
| 327 | + continue |
| 328 | + |
| 329 | + raw_query_signature = compute_sql_signature(raw_statement) |
| 330 | + row["raw_query_signature"] = raw_query_signature |
| 331 | + raw_statement_key = (query_signature, raw_query_signature) |
| 332 | + |
| 333 | + if not self._raw_statement_text_cache.acquire(raw_statement_key): |
| 334 | + continue |
| 335 | + |
| 336 | + yield { |
| 337 | + "timestamp": time.time() * 1000, |
| 338 | + "host": self._check.resolved_hostname, |
| 339 | + "ddagentversion": datadog_agent.get_version(), |
| 340 | + "ddsource": "sqlserver", |
| 341 | + "dbm_type": "rqt", |
| 342 | + "ddtags": ",".join(self.tags), |
| 343 | + 'service': self._config.service, |
| 344 | + "db": { |
| 345 | + "instance": row.get('database_name', None), |
| 346 | + "query_signature": query_signature, |
| 347 | + "raw_query_signature": raw_query_signature, |
| 348 | + "statement": raw_statement, |
| 349 | + "metadata": { |
| 350 | + "tables": row['dd_tables'], |
| 351 | + "commands": row['dd_commands'], |
| 352 | + "comments": row.get('dd_comments', None), |
| 353 | + }, |
| 354 | + "procedure_signature": row.get("procedure_signature"), |
| 355 | + "procedure_name": row.get("procedure_name"), |
| 356 | + }, |
| 357 | + "sqlserver": { |
| 358 | + "query_hash": row.get("query_hash"), |
| 359 | + "query_plan_hash": row.get("query_plan_hash"), |
| 360 | + }, |
| 361 | + } |
| 362 | + |
296 | 363 | def _get_exec_requests_cols_cached(self, cursor, expected_cols):
|
297 | 364 | if self._exec_requests_cols_cached:
|
298 | 365 | return self._exec_requests_cols_cached
|
299 | 366 |
|
300 | 367 | self._exec_requests_cols_cached = self._get_available_requests_columns(cursor, expected_cols)
|
301 | 368 | return self._exec_requests_cols_cached
|
302 | 369 |
|
| 370 | + def _get_input_buffer_columns_and_join(self): |
| 371 | + input_buffer_columns = "" |
| 372 | + input_buffer_join = "" |
| 373 | + |
| 374 | + if self._collect_raw_query_statement: |
| 375 | + input_buffer_columns = ", ".join(INPUT_BUFFER_COLUMNS) + "," |
| 376 | + input_buffer_join = INPUT_BUFFER_JOIN |
| 377 | + |
| 378 | + return input_buffer_columns, input_buffer_join |
| 379 | + |
303 | 380 | def _get_available_requests_columns(self, cursor, all_expected_columns):
|
304 | 381 | cursor.execute("select TOP 0 * from sys.dm_exec_requests")
|
305 | 382 | all_columns = {i[0] for i in cursor.description}
|
@@ -418,8 +495,14 @@ def collect_activity(self):
|
418 | 495 | with self._check.connection.get_managed_cursor(key_prefix=self._conn_key_prefix) as cursor:
|
419 | 496 | connections = self._get_active_connections(cursor)
|
420 | 497 | request_cols = self._get_exec_requests_cols_cached(cursor, DM_EXEC_REQUESTS_COLS)
|
421 |
| - rows = self._get_activity(cursor, request_cols) |
| 498 | + input_buffer_columns, input_buffer_join = self._get_input_buffer_columns_and_join() |
| 499 | + rows = self._get_activity(cursor, request_cols, input_buffer_columns, input_buffer_join) |
422 | 500 | normalized_rows = self._normalize_queries_and_filter_rows(rows, MAX_PAYLOAD_BYTES)
|
| 501 | + if self._collect_raw_query_statement: |
| 502 | + for raw_statement_event in self._rows_to_raw_statement_events(normalized_rows): |
| 503 | + self._check.database_monitoring_query_sample( |
| 504 | + json.dumps(raw_statement_event, default=default_json_event_encoding) |
| 505 | + ) |
423 | 506 | event = self._create_activity_event(normalized_rows, connections)
|
424 | 507 | payload = json.dumps(event, default=default_json_event_encoding)
|
425 | 508 | self._check.database_monitoring_query_activity(payload)
|
|
0 commit comments