-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathup-down.py
executable file
·350 lines (270 loc) · 12.2 KB
/
up-down.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#!/usr/bin/env python3
from argparse import ArgumentParser
from datetime import datetime
from os import fork, getpid, kill, path, unlink, makedirs
from signal import signal, SIGTERM, SIGINT
import sqlite3
from time import sleep
from plyer import notification
import requests
from slugify import slugify
from helpers import Conditional, Chain, Call, setInterval
from decorators import pipe, catching
LOCKFILE = path.join(path.abspath(path.dirname(__file__)), "lock")
MEMORY = path.join(path.abspath(path.dirname(__file__)), "memory.db")
LOGS_DIR = path.join(path.abspath(path.dirname(__file__)), "logs")
RUNTIME_TASKS = []
CRONTAB = []
@catching(OSError, "Disk write failure.")
def log_write(host, event):
with open(path.join(LOGS_DIR, slugify(host) + ".txt"), "a+") as log:
timestamp = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
log.write(f"[{timestamp}] {event}\n")
def DaemonControl(args):
@catching(OSError, "Failed to switch to daemon mode. Free up your RAM and retry.")
def switch_to_daemon_mode():
if fork():
exit(0)
@catching(OSError, "Disk write failure.")
def create_lock_file():
with open(LOCKFILE, "w") as lock:
lock.write(str(getpid()))
mutual_exclude_flow = Conditional(lambda fst, snd: fst and snd, bool(args["detach"]), bool(args["kill"]))\
.then(
Chain(print, "Mutually exclusive flags.")\
.then(exit, 0)\
.execute
)\
.end()
detach_flow = Conditional(bool, args["detach"])\
.then(
Conditional(path.exists, LOCKFILE)\
.then(
Chain(print, "Daemon is already running.")\
.then(exit, 0)\
.execute
)\
.otherwise(
Chain(print, "Switching to daemon mode...")\
.then(switch_to_daemon_mode)\
.then(create_lock_file)\
.execute
)
)\
.end()
@catching(ValueError, "Lockfile is malformed.")
@catching(ProcessLookupError, "No process found.", ignoring=True)
def terminate_daemon():
with open(LOCKFILE, "r") as lock:
kill(int(lock.read()), SIGTERM)
termination_flow = Conditional(bool, args["kill"])\
.then(
Conditional(path.exists, LOCKFILE)\
.then(
Chain(terminate_daemon)\
.then(print, "Daemon terminated successfully.")\
.then(
Conditional(path.exists, LOCKFILE)\
.then(catching(OSError, "Unable to delete lockfile.")(unlink), LOCKFILE)\
.end()
)\
.execute
)\
.otherwise(print, "Lockfile wasn't found")
)\
.end()
Chain(mutual_exclude_flow).then(detach_flow).then(termination_flow).execute()
return bool(args["kill"]) or False
def DatabaseControl(args):
connection = sqlite3.connect(MEMORY)
cursor = connection.cursor()
create_flow = Chain(
cursor.execute,
'''CREATE TABLE IF NOT EXISTS memory (
host TEXT NOT NULL UNIQUE,
isdown INTEGER DEFAULT 0 CHECK (isdown == 0 OR isdown == 1),
interval INTEGER DEFAULT 20 CHECK (interval >= 1)
);'''
)\
.then(connection.commit)\
.execute
def clear_flow():
if not args["clear_hosts"]:
return
sel = cursor.execute('''SELECT host FROM memory;''')
hosts = list(map(lambda row: row[0], sel))
cursor.execute('''DELETE FROM memory;''')
connection.commit()
for host in hosts:
log_write(host, "REMOVE")
print("Memory has been cleared.")
remove_flow = Conditional(lambda fst, snd: fst and snd, bool(args["remove_host"]), not bool(args["clear_hosts"]))\
.then(
Chain(cursor.execute, '''DELETE FROM memory WHERE host = ?;''', (args["remove_host"],))\
.then(connection.commit)\
.then(log_write, args["remove_host"], "REMOVE")\
.then(print, f"Deleted {args['remove_host']} from memory.")\
.execute
)\
.end()
def add_flow():
if not args["add_host"]:
return
# Add host with interval to memory. If host exists, update interval.
if args["interval"]:
cursor.execute(
'''INSERT INTO memory (host, interval) VALUES (?, ?) ON CONFLICT(host) DO UPDATE SET interval=excluded.interval;''',
(args["add_host"], args["interval"])
)
# Add host with default interval. If host exists, ignore.
else:
cursor.execute(
'''INSERT OR IGNORE INTO memory (host) VALUES (?);''',
(args["add_host"],)
)
connection.commit()
log_write(args["add_host"], f"ADD [INTERVAL={args['interval'] or 20}]")
print(f"Added {args['add_host']} with interval {args['interval'] or 20} to memory.")
list_flow = Conditional(bool, args["list_hosts"])\
.then(
pipe(
# https://michaelsoolee.com/case-insensitive-sorting-sqlite/
lambda cur: cur.execute('''SELECT host, interval FROM memory ORDER BY host COLLATE NOCASE ASC;'''),
lambda selection: map(lambda row: f"{row[0]} — every {row[1]} seconds", selection),
"\n".join,
print
),
cursor
)\
.end()
Chain(create_flow).then(clear_flow).then(remove_flow).then(add_flow).then(list_flow).then(connection.close).execute()
return bool(args["clear_hosts"]) or bool(args["remove_host"]) or bool(args["add_host"]) or bool(args["list_hosts"]) or False
def main(args):
if not path.exists(LOGS_DIR):
catching(OSError, "Unable to create directory.")(makedirs)(LOGS_DIR)
should_exit = DaemonControl(args) or DatabaseControl(args)
if should_exit:
exit(0)
connection = sqlite3.connect(MEMORY, check_same_thread=False)
cursor = connection.cursor()
def job(host):
idx, task = next(
filter(
lambda tup: tup[1]["host"] == host,
enumerate(RUNTIME_TASKS)
)
)
try:
# Throws an exception if response status is 4xx or 5xx or there was a connection error.
requests.get(host).raise_for_status()
if task["isdown"] == 1:
RUNTIME_TASKS.__setitem__(idx, {**task, "isdown": 0})
log_write(host, "HOST_UP")
notification.notify(title="UpDown :)", message=f"{host} is back up again!", timeout=5)
except requests.exceptions.RequestException:
if task["isdown"] == 0:
RUNTIME_TASKS.__setitem__(idx, {**task, "isdown": 1})
log_write(host, "HOST_DOWN")
notification.notify(title="UpDown :(", message=f"{host} is down!", timeout=5)
def update_db():
# RUNTIME_TASKS is empty on startup. Therefore, there is nothing to sync.
if not RUNTIME_TASKS:
return
for task_dict in RUNTIME_TASKS:
cursor.execute('''UPDATE memory SET isdown = ? WHERE host = ?;''', (task_dict["isdown"], task_dict["host"]))
connection.commit()
def update_task_list():
def spawn_cronjob(row_dict):
return setInterval(
Call(job, row_dict["host"]),
row_dict["interval"],
immediately=True
)
# 1. SELECT returns List[Tuple].
# 2. zip(["host", "isdown", "interval"], ("https://example.com", 0, 20))
# -> [('host', 'https://example.com'), ('isdown', 0), ('interval', 20)]
# 3. dict() -> {'host': 'https://example.com', 'isdown': 0, 'interval': 20}
# 4. map returns an iterator. Piping to list to create a reusable list.
row_dicts = pipe(list)(
map(
lambda row: dict(zip(["host", "isdown", "interval"], row)),
cursor.execute('''SELECT host, isdown, interval FROM memory;''')
)
)
for row_dict in row_dicts:
# Task search
task = None
idx = None
for task_idx, task_dict in enumerate(RUNTIME_TASKS):
if task_dict["host"] == row_dict["host"]:
task = task_dict
idx = task_idx
# If task wasn't found, add it
if not task:
RUNTIME_TASKS.append(row_dict)
CRONTAB.append(spawn_cronjob(row_dict))
log_write(row_dict["host"], "START")
# Otherwise, it's there
else:
# If task interval doesn't match the one in memory...
if task["interval"] != row_dict["interval"]:
# ...this task may've been cancelled.
# If it is, notify that it's starting again
if task["interval"] == -1:
log_write(row_dict["host"], "START")
# ...sync it
CRONTAB[idx].cancel()
RUNTIME_TASKS.__setitem__(idx, {**task, "interval": row_dict["interval"]})
CRONTAB.__setitem__(idx, spawn_cronjob(row_dict))
log_write(row_dict["host"], f"APPLY [INTERVAL={row_dict['interval']}]")
# Create a list with only hosts
row_hosts = pipe(list)(
map(lambda sel: sel["host"], row_dicts)
)
for task_idx, task_dict in enumerate(RUNTIME_TASKS):
# Check if any of tasks in RUNTIME_TASKS is absent in memory.
# If the one has been found and wasn't already cancelled, cancel it.
if task_dict["host"] not in row_hosts and task_dict["interval"] != -1:
CRONTAB[task_idx].cancel()
RUNTIME_TASKS.__setitem__(task_idx, {**task_dict, "interval": -1})
log_write(task_dict["host"], "STOP")
def log_exit_event():
sel = cursor.execute('''SELECT host FROM memory;''')
hosts = map(lambda row: row[0], sel)
for host in hosts:
log_write(host, "STOP")
sync_cronjob = setInterval(
Chain(update_db).then(update_task_list).execute,
5,
immediately=True
)
clean_exit_flow = Chain(catching(OSError, "Unable to delete lockfile.", ignoring=True)(unlink), LOCKFILE)\
.then(sync_cronjob.cancel)\
.then(list, map(lambda cronjob: cronjob.cancel(), CRONTAB))\
.then(update_db)\
.then(log_exit_event)\
.then(CRONTAB.clear)\
.then(RUNTIME_TASKS.clear)\
.then(connection.close)\
.then(exit, 0)\
.execute
signal(SIGTERM, clean_exit_flow)
signal(SIGINT, clean_exit_flow)
while True:
try:
sleep(5.1)
except Exception:
break
clean_exit_flow()
if __name__ == "__main__":
parser = ArgumentParser(description="Monitor website state.")
parser.add_argument("--detach", dest="detach", action="store_true", required=False, help="Start process as a daemon.")
parser.add_argument("--kill", dest="kill", action="store_true", required=False, help="Terminate daemonized process and exit.")
parser.add_argument("--add-host", dest="add_host", type=str, required=False, help="Add host to memory and exit.")
parser.add_argument("--with-interval", dest="interval", type=lambda x: (int(x) >= 1) and int(x) or exit("interval ∉ Nat"), required=False, help="Specify check interval in seconds. Should be used with --add-host.")
parser.add_argument("--remove-host", dest="remove_host", type=str, required=False, help="Remove host from memory and exit.")
parser.add_argument("--list-hosts", dest="list_hosts", action="store_true", required=False, help="List hosts in memory and exit.")
parser.add_argument("--clear-hosts", dest="clear_hosts", action="store_true", required=False, help="Clear memory and exit.")
parser.set_defaults(detach=False, kill=False, list_hosts=False, clear_hosts=False)
args = parser.parse_args()
main(args.__dict__)