-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmonitor.py
More file actions
219 lines (178 loc) · 7.4 KB
/
monitor.py
File metadata and controls
219 lines (178 loc) · 7.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
"""Background service health monitoring."""
import asyncio
import socket
import subprocess
import time
from datetime import datetime, timezone
from typing import Callable, Optional
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import async_session, Service
from config import CHECK_INTERVAL, CHECK_TIMEOUT, CONSECUTIVE_FAILURES_THRESHOLD
class HealthMonitor:
"""Background health checker for services."""
def __init__(self):
self._running = False
self._task: Optional[asyncio.Task] = None
self._event_callback: Optional[Callable] = None
self._http_client: Optional[httpx.AsyncClient] = None
def set_event_callback(self, callback: Callable):
"""Set callback for broadcasting status changes."""
self._event_callback = callback
async def start(self):
"""Start the background monitoring loop."""
if self._running:
return
self._running = True
self._http_client = httpx.AsyncClient(timeout=CHECK_TIMEOUT, verify=False)
self._task = asyncio.create_task(self._monitor_loop())
print(f"Health monitor started (interval: {CHECK_INTERVAL}s)")
async def stop(self):
"""Stop the background monitoring loop."""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
if self._http_client:
await self._http_client.aclose()
print("Health monitor stopped")
async def _monitor_loop(self):
"""Main monitoring loop."""
while self._running:
try:
await self._check_all_services()
except Exception as e:
print(f"Monitor loop error: {e}")
await asyncio.sleep(CHECK_INTERVAL)
async def _check_all_services(self):
"""Check all services in the database."""
async with async_session() as session:
result = await session.execute(select(Service))
services = result.scalars().all()
# Check services concurrently
tasks = [self._check_service(session, service) for service in services]
await asyncio.gather(*tasks, return_exceptions=True)
await session.commit()
async def _check_service(self, session: AsyncSession, service: Service):
"""Check a single service and update its status."""
old_status = service.status
start_time = time.time()
try:
if service.check_type == "http":
await self._check_http(service)
elif service.check_type == "tcp":
await self._check_tcp(service)
elif service.check_type == "command":
await self._check_command(service)
else:
service.status = "unknown"
service.last_error = f"Unknown check type: {service.check_type}"
except Exception as e:
service.consecutive_failures += 1
service.last_error = str(e)
# Flapping prevention: only mark as down after threshold
if service.consecutive_failures >= CONSECUTIVE_FAILURES_THRESHOLD:
service.status = "down"
service.response_time_ms = int((time.time() - start_time) * 1000)
service.last_checked = datetime.now(timezone.utc)
# If status changed, broadcast event
if old_status != service.status and self._event_callback:
await self._event_callback({
"event_type": "service_status",
"data": service.to_dict()
})
async def _check_http(self, service: Service):
"""Perform HTTP health check."""
try:
response = await self._http_client.get(service.target)
service.status_code = response.status_code
if 200 <= response.status_code < 400:
service.status = "up"
service.consecutive_failures = 0
service.last_error = None
else:
raise Exception(f"HTTP {response.status_code}")
except httpx.RequestError as e:
raise Exception(f"Connection failed: {type(e).__name__}")
async def _check_tcp(self, service: Service):
"""Perform TCP connection check."""
try:
# Parse host:port
parts = service.target.rsplit(":", 1)
if len(parts) != 2:
raise ValueError("TCP target must be host:port")
host, port = parts[0], int(parts[1])
# Use asyncio for non-blocking socket check
loop = asyncio.get_event_loop()
def connect():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(CHECK_TIMEOUT)
try:
sock.connect((host, port))
return True
finally:
sock.close()
await asyncio.wait_for(
loop.run_in_executor(None, connect),
timeout=CHECK_TIMEOUT
)
service.status = "up"
service.consecutive_failures = 0
service.last_error = None
service.status_code = None
except asyncio.TimeoutError:
raise Exception("Connection timed out")
except socket.error as e:
raise Exception(f"Connection refused: {e}")
async def _check_command(self, service: Service):
"""Perform command execution check."""
try:
loop = asyncio.get_event_loop()
def run_command():
result = subprocess.run(
service.target,
shell=True,
capture_output=True,
timeout=CHECK_TIMEOUT
)
return result.returncode
returncode = await asyncio.wait_for(
loop.run_in_executor(None, run_command),
timeout=CHECK_TIMEOUT + 1
)
service.status_code = returncode
if returncode == 0:
service.status = "up"
service.consecutive_failures = 0
service.last_error = None
else:
raise Exception(f"Command exited with code {returncode}")
except asyncio.TimeoutError:
raise Exception("Command timed out")
except subprocess.TimeoutExpired:
raise Exception("Command timed out")
async def check_single_service(self, service_id: int) -> dict:
"""Manually trigger a health check for a single service."""
async with async_session() as session:
result = await session.execute(
select(Service).where(Service.id == service_id)
)
service = result.scalar_one_or_none()
if not service:
return {"error": "Service not found"}
old_status = service.status
await self._check_service(session, service)
await session.commit()
# Broadcast if status changed
if old_status != service.status and self._event_callback:
await self._event_callback({
"event_type": "service_status",
"data": service.to_dict()
})
return service.to_dict()
# Global monitor instance
monitor = HealthMonitor()