-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathspeedtest_wrapper.py
executable file
·122 lines (100 loc) · 4.08 KB
/
speedtest_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# coding=utf8
import argparse
import json
import logging
import sys
import time
from socket import getfqdn
from subprocess import CompletedProcess, PIPE, run
from typing import Dict, Generator, Union
from prometheus_client.core import GaugeMetricFamily, REGISTRY
from prometheus_client import start_http_server
DEFAULT_PORT = 6970
HOSTNAME = getfqdn()
LOG = logging.getLogger(__name__)
class SpeedtestCollector:
IGNORE_CATEGORIES = ("interface", "server", "result")
key_prefix = "speedtest"
labels = ["hostname", "speedtest_host"]
def __init__(self, debug: bool) -> None:
self.debug = debug
def _handle_counter(self, category: str, value: float) -> GaugeMetricFamily:
normalized_category = category.replace(" ", "_")
key = f"{self.key_prefix}_{normalized_category}"
g = GaugeMetricFamily(key, "Speedtest Metric", labels=self.labels)
g.add_metric([HOSTNAME, self.speedtest_host], value)
return g
def collect(self) -> Generator[GaugeMetricFamily, None, None]:
start_time = time.time()
LOG.info("Collection started")
speedtest_data = self.run_speedtest()
if isinstance(speedtest_data, CompletedProcess):
LOG.error(
f"Speedtest failed: {speedtest_data.stderr.decode('utf8')} "
+ f"(returned {speedtest_data.returncode})"
)
return
elif not speedtest_data:
LOG.error("Got no speedtest data")
return
if self.debug:
print(speedtest_data, flush=True)
self.speedtest_host = speedtest_data["server"]["host"]
for category, value in speedtest_data.items():
if isinstance(value, (float, int)):
yield self._handle_counter(category, float(value))
elif category not in self.IGNORE_CATEGORIES and isinstance(value, dict):
for subcategory, subvalue in value.items():
combined_category = f"{category}_{subcategory}"
if combined_category in {"download_latency", "upload_latency"}:
for subsubcategory, subsubvalue in subvalue.items():
yield self._handle_counter(
f"{combined_category}_{subsubcategory}",
float(subsubvalue),
)
else:
yield self._handle_counter(combined_category, float(subvalue))
run_time = time.time() - start_time
LOG.info(f"Collection finished in {run_time}s")
def run_speedtest(self) -> Union[Dict, CompletedProcess]:
cmd = ["speedtest", "--accept-license", "-f", "json", "-u", "bps"]
cp = run(cmd, stderr=PIPE, stdout=PIPE)
if cp.returncode:
return cp
return json.loads(cp.stdout)
def _handle_debug(debug: bool) -> None:
"""Turn on debugging if asked otherwise INFO default"""
log_level = logging.DEBUG if debug else logging.INFO
logging.basicConfig(
format="[%(asctime)s] %(levelname)s: %(message)s (%(filename)s:%(lineno)d)",
level=log_level,
)
def main() -> int:
parser = argparse.ArgumentParser(description="Speedtest Wrapper")
parser.add_argument(
"-d", "--debug", action="store_true", help="Verbose debug output"
)
parser.add_argument(
"-p",
"--port",
type=int,
default=DEFAULT_PORT,
help=f"Port to run webserver on [Default = {DEFAULT_PORT}]",
)
args = parser.parse_args()
_handle_debug(args.debug)
LOG.info(f"Starting {sys.argv[0]}")
start_http_server(args.port)
REGISTRY.register(SpeedtestCollector(args.debug))
LOG.info(f"Speedtest Prometheus Exporter - listening on {args.port}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
LOG.info("Shutting down ...")
return 0
if __name__ == "__main__": # pragma: no cover
sys.exit(main())