Skip to content

Commit

Permalink
add DD_SERVICE to agent operations
Browse files Browse the repository at this point in the history
  • Loading branch information
lu-zhengda committed Jan 22, 2025
1 parent fd08ef9 commit e52c48a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 42 deletions.
50 changes: 29 additions & 21 deletions mongo/datadog_checks/mongo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
ProtocolError,
)

DD_SERVICE = f'service={DD_APP_NAME}'


class MongoApi(object):
"""Mongodb connection through pymongo.MongoClient
Expand Down Expand Up @@ -83,21 +85,21 @@ def connect(self):
raise

def ping(self):
return self['admin'].command('ping')
return self['admin'].command('ping', comment=DD_SERVICE)

def server_info(self, session=None):
return self._cli.server_info(session)

def list_database_names(self, session=None):
return self._cli.list_database_names(session)
return self._cli.list_database_names(session, comment=DD_SERVICE)

def current_op(self, session=None):
# Use $currentOp stage to get all users and idle sessions.
# Note: Why not use the `currentOp` command?
# Because the currentOp command and db.currentOp() helper method return the results in a single document,
# the total size of the currentOp result set is subject to the maximum 16MB BSON size limit for documents.
# The $currentOp stage returns a cursor over a stream of documents, each of which reports a single operation.
return self["admin"].aggregate([{'$currentOp': {'allUsers': True}}], session=session)
return self["admin"].aggregate([{'$currentOp': {'allUsers': True}}], session=session, comment=DD_SERVICE)

def get_collection_stats(self, db_name, coll_name, stats=None, session=None):
if not self.coll_stats_pipeline_supported:
Expand Down Expand Up @@ -129,31 +131,32 @@ def coll_stats(self, db_name, coll_name, stats=None, session=None):
},
],
session=session,
comment=DD_SERVICE,
)

def coll_stats_compatible(self, db_name, coll_name, session=None):
# collStats is deprecated in MongoDB 6.2. Use the $collStats aggregation stage instead.
return self[db_name].command({'collStats': coll_name}, session=session)
return self[db_name].command({'collStats': coll_name}, session=session, comment=DD_SERVICE)

def index_stats(self, db_name, coll_name, session=None):
return self[db_name][coll_name].aggregate([{"$indexStats": {}}], session=session)
return self[db_name][coll_name].aggregate([{"$indexStats": {}}], session=session, comment=DD_SERVICE)

def index_information(self, db_name, coll_name, session=None):
return self[db_name][coll_name].index_information(session=session)
return self[db_name][coll_name].index_information(session=session, comment=DD_SERVICE)

def list_search_indexes(self, db_name, coll_name, session=None):
return self[db_name][coll_name].list_search_indexes(session=session)
return self[db_name][coll_name].list_search_indexes(session=session, comment=DD_SERVICE)

def sharded_data_distribution_stats(self, session=None):
return self["admin"].aggregate([{"$shardedDataDistribution": {}}], session=session)
return self["admin"].aggregate([{"$shardedDataDistribution": {}}], session=session, comment=DD_SERVICE)

def _is_auth_required(self, options):
# Check if the node is an arbiter. If it is, usually it does not require authentication.
# However this is a best-effort check as the replica set might focce authentication.
try:
# Try connect to the admin database to run the isMaster command without authentication.
cli = MongoClient(**options)
is_master_payload = cli['admin'].command('isMaster')
is_master_payload = cli['admin'].command('isMaster', comment=DD_SERVICE)
is_arbiter = is_master_payload.get('arbiterOnly', False)
# If the node is an arbiter and we are able to connect without authentication
# we can assume that the node does not require authentication.
Expand All @@ -162,35 +165,37 @@ def _is_auth_required(self, options):
return True

def get_profiling_level(self, db_name, session=None):
return self[db_name].command('profile', -1, session=session)
return self[db_name].command('profile', -1, session=session, comment=DD_SERVICE)

def get_profiling_data(self, db_name, ts, session=None):
filter = {'ts': {'$gt': ts}}
return self[db_name]['system.profile'].find(filter, session=session).sort('ts', 1)
return self[db_name]['system.profile'].find(filter, session=session, comment=DD_SERVICE).sort('ts', 1)

def get_log_data(self, session=None):
return self['admin'].command("getLog", "global", session=session)
return self['admin'].command("getLog", "global", session=session, comment=DD_SERVICE)

def sample(self, db_name, coll_name, sample_size, session=None):
return self[db_name][coll_name].aggregate([{"$sample": {"size": sample_size}}], session=session)
return self[db_name][coll_name].aggregate(
[{"$sample": {"size": sample_size}}], session=session, comment=DD_SERVICE
)

def get_cmdline_opts(self):
return self["admin"].command("getCmdLineOpts")["parsed"]
return self["admin"].command("getCmdLineOpts", comment=DD_SERVICE)["parsed"]

def replset_get_status(self):
return self["admin"].command("replSetGetStatus")
return self["admin"].command("replSetGetStatus", comment=DD_SERVICE)

def is_master(self):
return self["admin"].command("isMaster")
return self["admin"].command("isMaster", comment=DD_SERVICE)

def sharding_state_is_enabled(self):
return self["admin"].command("shardingState").get("enabled", False)
return self["admin"].command("shardingState", comment=DD_SERVICE).get("enabled", False)

def get_shard_map(self):
return self['admin'].command('getShardMap')
return self['admin'].command('getShardMap', comment=DD_SERVICE)

def server_status(self):
return self['admin'].command('serverStatus')
return self['admin'].command('serverStatus', comment=DD_SERVICE)

def list_authorized_collections(
self,
Expand All @@ -202,11 +207,12 @@ def list_authorized_collections(
coll_names = self[db_name].list_collection_names(
filter={"type": "collection"}, # Only return collections, not views
authorizedCollections=True,
comment=DD_SERVICE,
)
except OperationFailure as e:
if e.code == 303 and e.details.get("errmsg") == "Field 'type' is currently not supported":
# Filter by type is not supported on AWS DocumentDB
coll_names = self[db_name].list_collection_names(authorizedCollections=True)
coll_names = self[db_name].list_collection_names(authorizedCollections=True, comment=DD_SERVICE)
else:
# The user is not authorized to run listCollections on this database.
# This is NOT a critical error, so we log it as a warning.
Expand All @@ -224,7 +230,9 @@ def is_collection_sharded(self, db_name, coll_name):
try:
# Check if the collection is sharded by looking for the collection config
# in the config.collections collection.
collection_config = self["config"]["collections"].find_one({"_id": f"{db_name}.{coll_name}"})
collection_config = self["config"]["collections"].find_one(
{"_id": f"{db_name}.{coll_name}"}, comment=DD_SERVICE
)
return collection_config is not None
except OperationFailure as e:
self._log.warning("Could not determine if collection %s.%s is sharded: %s", db_name, coll_name, e)
Expand Down
2 changes: 1 addition & 1 deletion mongo/tests/mocked_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(self, db_name, deployment):
def __getitem__(self, coll_name):
return MockedCollection(self._db_name, coll_name, self.deployment)

def command(self, command, *args, **_):
def command(self, command, *args, **kwargs):
filename = command
if "dbStats" in command:
filename = f"dbstats-{self._db_name}"
Expand Down
62 changes: 42 additions & 20 deletions mongo/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from datadog_checks.base import ConfigurationError
from datadog_checks.base.utils.db.sql import compute_exec_plan_signature
from datadog_checks.mongo.api import CRITICAL_FAILURE, MongoApi
from datadog_checks.mongo.api import CRITICAL_FAILURE, DD_SERVICE, MongoApi
from datadog_checks.mongo.collectors import MongoCollector
from datadog_checks.mongo.common import MongosDeployment, ReplicaSetDeployment, get_state_name
from datadog_checks.mongo.dbm.utils import should_explain_operation
Expand Down Expand Up @@ -146,7 +146,13 @@ def test_emits_ok_service_check_when_alibaba_mongos_deployment(
dd_run_check(check)
# Then
aggregator.assert_service_check('mongodb.can_connect', MongoDb.OK)
mock_command.assert_has_calls([mock.call('serverStatus'), mock.call('getCmdLineOpts'), mock.call('isMaster')])
mock_command.assert_has_calls(
[
mock.call('serverStatus', comment=DD_SERVICE),
mock.call('getCmdLineOpts', comment=DD_SERVICE),
mock.call('isMaster', comment=DD_SERVICE),
]
)
mock_server_info.assert_called_once()
mock_list_database_names.assert_called_once()
assert check._resolved_hostname == 'test-hostname:27017'
Expand Down Expand Up @@ -176,10 +182,10 @@ def test_emits_ok_service_check_when_alibaba_replicaset_role_configsvr_deploymen
aggregator.assert_service_check('mongodb.can_connect', MongoDb.OK)
mock_command.assert_has_calls(
[
mock.call('serverStatus'),
mock.call('getCmdLineOpts'),
mock.call('isMaster'),
mock.call('replSetGetStatus'),
mock.call('serverStatus', comment=DD_SERVICE),
mock.call('getCmdLineOpts', comment=DD_SERVICE),
mock.call('isMaster', comment=DD_SERVICE),
mock.call('replSetGetStatus', comment=DD_SERVICE),
]
)
mock_server_info.assert_called_once()
Expand Down Expand Up @@ -209,10 +215,10 @@ def test_when_replicaset_state_recovering_then_database_names_not_called(
aggregator.assert_service_check('mongodb.can_connect', MongoDb.OK)
mock_command.assert_has_calls(
[
mock.call('serverStatus'),
mock.call('getCmdLineOpts'),
mock.call('isMaster'),
mock.call('replSetGetStatus'),
mock.call('serverStatus', comment=DD_SERVICE),
mock.call('getCmdLineOpts', comment=DD_SERVICE),
mock.call('isMaster', comment=DD_SERVICE),
mock.call('replSetGetStatus', comment=DD_SERVICE),
]
)
mock_server_info.assert_called_once()
Expand Down Expand Up @@ -499,8 +505,12 @@ def test_collector_submit_payload(check, aggregator):

def test_api_alibaba_mongos(check, aggregator):
payload = {'isMaster': {'msg': 'isdbgrid'}}

def mocked_command(command, *args, **kwargs):
return payload[command]

mocked_client = mock.MagicMock()
mocked_client.__getitem__ = mock.MagicMock(return_value=mock.MagicMock(command=payload.__getitem__))
mocked_client.__getitem__ = mock.MagicMock(return_value=mock.MagicMock(command=mocked_command))
mocked_client.get_cmdline_opts.side_effect = OperationFailure('getCmdLineOpts is not supported')

with mock.patch('datadog_checks.mongo.api.MongoClient', mock.MagicMock(return_value=mocked_client)):
Expand All @@ -517,8 +527,12 @@ def test_api_alibaba_mongod_shard(check, aggregator):
'replSetGetStatus': {'myState': 1, 'set': 'foo', 'configsvr': False},
'shardingState': {'enabled': True},
}

def mocked_command(command, *args, **kwargs):
return payload[command]

mocked_client = mock.MagicMock()
mocked_client.__getitem__ = mock.MagicMock(return_value=mock.MagicMock(command=payload.__getitem__))
mocked_client.__getitem__ = mock.MagicMock(return_value=mock.MagicMock(command=mocked_command))
mocked_client.get_cmdline_opts.side_effect = OperationFailure('getCmdLineOpts is not supported')

with mock.patch('datadog_checks.mongo.api.MongoClient', mock.MagicMock(return_value=mocked_client)):
Expand All @@ -539,8 +553,12 @@ def test_api_alibaba_mongod_shard(check, aggregator):

def test_api_alibaba_configsvr(check, aggregator):
payload = {'isMaster': {}, 'replSetGetStatus': {'myState': 2, 'set': 'config', 'configsvr': True}}

def mocked_command(command, *args, **kwargs):
return payload[command]

mocked_client = mock.MagicMock()
mocked_client.__getitem__ = mock.MagicMock(return_value=mock.MagicMock(command=payload.__getitem__))
mocked_client.__getitem__ = mock.MagicMock(return_value=mock.MagicMock(command=mocked_command))
mocked_client.get_cmdline_opts.side_effect = OperationFailure('getCmdLineOpts is not supported')

with mock.patch('datadog_checks.mongo.api.MongoClient', mock.MagicMock(return_value=mocked_client)):
Expand All @@ -565,8 +583,12 @@ def test_api_alibaba_mongod(check, aggregator):
'replSetGetStatus': {'myState': 1, 'set': 'foo', 'configsvr': False},
'shardingState': {'enabled': False},
}

def mocked_command(command, *args, **kwargs):
return payload[command]

mocked_client = mock.MagicMock()
mocked_client.__getitem__ = mock.MagicMock(return_value=mock.MagicMock(command=payload.__getitem__))
mocked_client.__getitem__ = mock.MagicMock(return_value=mock.MagicMock(command=mocked_command))

with mock.patch('datadog_checks.mongo.api.MongoClient', mock.MagicMock(return_value=mocked_client)):
check = check(common.INSTANCE_BASIC)
Expand Down Expand Up @@ -694,10 +716,10 @@ def test_emits_ok_service_check_for_documentdb_deployment(
aggregator.assert_service_check('mongodb.can_connect', MongoDb.OK)
mock_command.assert_has_calls(
[
mock.call('serverStatus'),
mock.call('getCmdLineOpts'),
mock.call('isMaster'),
mock.call('replSetGetStatus'),
mock.call('serverStatus', comment=DD_SERVICE),
mock.call('getCmdLineOpts', comment=DD_SERVICE),
mock.call('isMaster', comment=DD_SERVICE),
mock.call('replSetGetStatus', comment=DD_SERVICE),
]
)
mock_server_info.assert_called_once()
Expand Down Expand Up @@ -726,8 +748,8 @@ def test_emits_ok_service_check_for_mongodb_atlas_deployment(
aggregator.assert_service_check('mongodb.can_connect', MongoDb.OK)
mock_command.assert_has_calls(
[
mock.call('serverStatus'),
mock.call('getCmdLineOpts'),
mock.call('serverStatus', comment=DD_SERVICE),
mock.call('getCmdLineOpts', comment=DD_SERVICE),
]
)
mock_server_info.assert_called_once()
Expand Down

0 comments on commit e52c48a

Please sign in to comment.