-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy path__init__.py
90 lines (70 loc) · 3.18 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""Client to retrieve data from a Netdata instance."""
import asyncio
import logging
import socket
import aiohttp
import async_timeout
from yarl import URL
from . import exceptions
_LOGGER = logging.getLogger(__name__)
_DATA_ENDPOINT = "data?chart={resource}&before=0&after=-1&options=seconds"
_ALARMS_ENDPOINT = "alarms?all&format=json"
_ALL_METRIC_ENDPOINT = (
"allmetrics?format=json&help=no&types=no&" "timestamps=yes&names=yes&data=average"
)
API_VERSION = 1
class Netdata(object):
"""A class for handling connections with a Netdata instance."""
def __init__(self, host, loop, session, scheme="http", port=19999, path=None):
"""Initialize the connection to the Netdata instance."""
self._loop = loop
self._session = session
self.host = host
self.port = port
self.values = self.alarms = self.metrics = None
if path is None:
self.base_url = URL.build(scheme, host=host, port=port, path=f"/api/v{API_VERSION}/")
else:
self.base_url = URL.build(scheme, host=host, port=port, path=path)
async def get_data(self, resource):
"""Get detail for a resource from the data endpoint."""
self.endpoint = _DATA_ENDPOINT
url = "{}{}".format(self.base_url, self.endpoint.format(resource=resource))
try:
with async_timeout.timeout(5, loop=self._loop):
response = await self._session.get(url)
_LOGGER.info("Response from Netdata: %s", response.status)
data = await response.json()
_LOGGER.debug(data)
self.values = {k: v for k, v in zip(data["labels"], data["data"][0])}
except (asyncio.TimeoutError, aiohttp.ClientError, socket.gaierror):
_LOGGER.error("Can not load data from Netdata")
raise exceptions.NetdataConnectionError()
async def get_alarms(self):
"""Get alarms for a Netdata instance."""
self.endpoint = _ALARMS_ENDPOINT
url = "{}{}".format(self.base_url, self.endpoint)
try:
with async_timeout.timeout(5, loop=self._loop):
response = await self._session.get(url)
_LOGGER.debug("Response from Netdata: %s", response.status)
data = await response.json()
_LOGGER.debug(data)
self.alarms = data
except (asyncio.TimeoutError, aiohttp.ClientError, socket.gaierror):
_LOGGER.error("Can not load data from Netdata")
raise exceptions.NetdataConnectionError()
async def get_allmetrics(self):
"""Get all available metrics from a Netdata instance."""
self.endpoint = _ALL_METRIC_ENDPOINT
url = "{}{}".format(self.base_url, self.endpoint)
try:
with async_timeout.timeout(5, loop=self._loop):
response = await self._session.get(url)
_LOGGER.debug("Response from Netdata: %s", response.status)
data = await response.json()
_LOGGER.debug(data)
self.metrics = data
except (asyncio.TimeoutError, aiohttp.ClientError, socket.gaierror):
_LOGGER.error("Can not load data from Netdata")
raise exceptions.NetdataConnectionError()