Skip to content

Commit 9a91f75

Browse files
authored
Check if MQTT updates being received if not then alert and reboot
1 parent 3046b24 commit 9a91f75

File tree

1 file changed

+303
-0
lines changed

1 file changed

+303
-0
lines changed

sensor_monitor.py

+303
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
#! /usr/bin/env -S python3 -u
2+
#force unbuffered stdout output for piping to another program
3+
#####! /usr/bin/env python3
4+
#
5+
# sensor_monitor.py
6+
# 202012151112
7+
#
8+
# check to see if sensors are publishing to MQTT on a periodic basic, if not then alert
9+
10+
#
11+
PROGRAM_NAME = "sensor_monitor"
12+
VERSION_MAJOR = "1"
13+
VERSION_MINOR = "2"
14+
WORKING_DIRECTORY = "/home/user/sensor_monitor/"
15+
16+
#
17+
#
18+
#
19+
20+
import sys
21+
import cProfile
22+
23+
# check version of python
24+
if not (sys.version_info.major == 3 and sys.version_info.minor >= 7):
25+
print("This script requires Python 3.7 or higher!")
26+
print("You are using Python {}.{}.".format(sys.version_info.major, sys.version_info.minor))
27+
sys.exit(1)
28+
#print("{} {} is using Python {}.{}.".format(PROGRAM_NAME, PROGRAM_VERSION, sys.version_info.major, sys.version_info.minor))
29+
30+
31+
import json
32+
from urllib import request
33+
34+
import traceback
35+
from pathlib import Path
36+
import yaml
37+
import queue
38+
from dateutil.parser import parse
39+
import paho.mqtt.client as mqtt
40+
import time
41+
from datetime import datetime
42+
from timeloop import Timeloop
43+
from datetime import timedelta
44+
from dateutil import tz
45+
import http.client, urllib
46+
import logging
47+
import logging.handlers
48+
import paramiko
49+
50+
# Logging setup
51+
52+
# select logging level
53+
logging_level_file = logging.getLevelName("DEBUG")
54+
#level_file = logging.getLevelName('DEBUG')
55+
logging_level_rsyslog = logging.getLevelName("INFO")
56+
57+
# set local logging
58+
LOG_FILENAME = PROGRAM_NAME + '.log'
59+
60+
root_logger = logging.getLogger()
61+
62+
#set loggers
63+
64+
# file logger
65+
handler_file = logging.handlers.RotatingFileHandler(WORKING_DIRECTORY + LOG_FILENAME, backupCount=5)
66+
handler_file.setFormatter(logging.Formatter(fmt='%(asctime)s %(levelname)-8s ' + PROGRAM_NAME + ' ' + '%(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
67+
handler_file.setLevel(logging_level_file)
68+
69+
root_logger.addHandler(handler_file)
70+
71+
# Roll over on application start
72+
handler_file.doRollover()
73+
74+
# configure highest level combo logger, this is what we log to and it automagically goes to the log receivers that we have configured
75+
# logging.getLogger("timeloop").setLevel(logging.CRITICAL)
76+
my_logger = logging.getLogger(PROGRAM_NAME)
77+
78+
# read yaml config file which lists the air purifer units
79+
try :
80+
raw_yaml = Path(WORKING_DIRECTORY + PROGRAM_NAME + ".yaml").read_text()
81+
except Exception as e:
82+
my_logger.error("Error : configuration file : " + WORKING_DIRECTORY + PROGRAM_NAME + ".yaml" + " not found.")
83+
sys.exit(1)
84+
85+
try :
86+
PROGRAM_CONFIG = yaml.load(Path(WORKING_DIRECTORY + PROGRAM_NAME + ".yaml").read_text(), Loader=yaml.FullLoader)
87+
except Exception as e :
88+
my_logger.error("Error : YAML syntax problem in configuration file : " + WORKING_DIRECTORY + PROGRAM_NAME + ".yaml" + " .")
89+
sys.exit(1)
90+
91+
# read debug from YAML config file
92+
# simple key value pair in YAML file : debug_level: "level" and set debug level
93+
DEBUG_LEVEL = PROGRAM_CONFIG.get("debug_level", "")
94+
if ( DEBUG_LEVEL == "" ) :
95+
DEBUG_LEVEL = "INFO"
96+
97+
logging_level_file = logging.getLevelName(DEBUG_LEVEL)
98+
handler_file.setLevel(logging_level_file)
99+
100+
# get pushover notification information
101+
PUSHOVER_TOKEN = PROGRAM_CONFIG.get("pushover_token", "")
102+
PUSHOVER_USER = PROGRAM_CONFIG.get("pushover_user", "")
103+
PUSHOVER_ALERT = PROGRAM_CONFIG.get("pushover_sound", "")
104+
105+
# read MQTT server info from YAML config file
106+
# simple key value pair in YAML file : mqtt: "<mqtt server info>"
107+
MQTT_SERVER = PROGRAM_CONFIG.get("mqtt", "")
108+
if ( MQTT_SERVER == "" ) :
109+
MQTT_SERVER = "192.168.2.242"
110+
111+
# read MQTT server info from YAML config file
112+
# simple key value pair in YAML file : mqtt: "<mqtt server info>"
113+
MQTT_TOPIC_BASE = PROGRAM_CONFIG.get("mqtt_topic", "")
114+
115+
# read rsyslog info from YAML config file
116+
# simple key value pair in YAML file : rsyslog: "<rsyslog server info>"
117+
# simple string
118+
RSYSLOG_SERVER = PROGRAM_CONFIG.get("rsyslog", "")
119+
LOG_RSYSLOG = (RSYSLOG_SERVER, 514)
120+
121+
# rsyslog handler, if an IP address was specified in the YAML config file that configure to log to a RSYSLOG server
122+
if (RSYSLOG_SERVER != "") :
123+
handler_rsyslog = logging.handlers.SysLogHandler(address = LOG_RSYSLOG)
124+
handler_rsyslog.setFormatter(logging.Formatter(fmt='%(asctime)s %(levelname)-8s ' + PROGRAM_NAME + ' ' + '%(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
125+
handler_rsyslog.setLevel(logging_level_rsyslog)
126+
root_logger.addHandler(handler_rsyslog)
127+
128+
logging_level_file = logging.getLevelName('DEBUG')
129+
root_logger.setLevel(logging_level_file)
130+
# how often to check the winix cloud for updated from each unit, be careful to not be to quick at updates
131+
# this is in minutes
132+
CHECK_PERIOD_MINUTES = PROGRAM_CONFIG.get("check_interval", 5)
133+
134+
REBOOT_SERVER = "192.168.88.10"
135+
REBOOT_USER = "pi"
136+
REBOOT_PASSWORD = "raspberry"
137+
REBOOT_COMMAND = "sudo reboot"
138+
139+
# debug, check that the YAML reads and messaging are correct
140+
my_logger.debug("MQTT_SERVER :" + str(MQTT_SERVER))
141+
my_logger.debug("MQTT_TOPIC_BASE :" + str(MQTT_TOPIC_BASE))
142+
my_logger.debug("LOG_RSYSLOG :" + str(LOG_RSYSLOG))
143+
my_logger.debug("CHECK_PERIOD_MINUTES :" + str(CHECK_PERIOD_MINUTES))
144+
my_logger.debug("PUSHOVER_ALERT :" + str(PUSHOVER_ALERT))
145+
146+
# setup timeloop, this allows to schedule the pull of units current status from winix cloud on regular basis
147+
# https://github.com/sankalpjonn/timeloop
148+
tl = Timeloop()
149+
150+
# create MQTT client globally
151+
# connect to MQTT server
152+
mqttc = mqtt.Client(PROGRAM_NAME) # Create instance of client with client ID
153+
mqttc.connect(MQTT_SERVER, 1883) # Connect to (broker, port, keepalive-time)
154+
155+
# flag to indicate if sensors did update during period
156+
pat_the_watchdog = False
157+
# counter of interval between notifications
158+
alert_number = 0
159+
# flag to check if we rebooted server with sensors
160+
pat_rebooted = False
161+
162+
# functions to handle the command messages from MQTT sources
163+
164+
def message_received(mosq, obj, msg) :
165+
# we have to check all MQTT messages for either a command to the unit from an MQTT source
166+
# or a status up MQTT message that was published by this routine (because the current state of a control,
167+
# might have been change by a interaction with front panel of unit, or command from the mobile phone app)
168+
169+
global pat_the_watchdog
170+
171+
msg_text = msg.payload.decode("utf-8")
172+
173+
my_logger.debug("in messages_received, message topic, qos, text: " + msg.topic + " " + str(msg.qos) + " " + msg_text)
174+
175+
if ( DEBUG_LEVEL == "DEBUG" ) :
176+
print(".", end="")
177+
178+
# if the message is a status message, aka has subtopic of "$SYS" we can skip
179+
if ( "$SYS" in msg.topic ) :
180+
return
181+
182+
pat_the_watchdog = True
183+
184+
return
185+
186+
# using the timeloop scheduling tool
187+
@tl.job(interval=timedelta(minutes=CHECK_PERIOD_MINUTES))
188+
def periodic_update_units():
189+
190+
global pat_the_watchdog, alert_number, pat_rebooted
191+
192+
# if server was rebooted and sensor data now being received, note this
193+
if (pat_rebooted == True and pat_the_watchdog == True) :
194+
my_logger.info("server rebooted, sensor updates being received")
195+
pat_rebooted = False
196+
197+
# if no update in check period, raise error
198+
if (pat_the_watchdog == False) :
199+
if ( DEBUG_LEVEL == "DEBUG" ) :
200+
print("!", end="")
201+
my_logger.error("no update for any bluetooth sensors in check period (minutes) : " + str(CHECK_PERIOD_MINUTES))
202+
203+
# don't send alert every time
204+
if (alert_number == 0) :
205+
# send pushover alert
206+
conn = http.client.HTTPSConnection("api.pushover.net:443")
207+
conn.request("POST", "/1/messages.json",
208+
urllib.parse.urlencode({
209+
"token": PUSHOVER_TOKEN,
210+
"user": PUSHOVER_USER,
211+
"sound": PUSHOVER_ALERT,
212+
"priority": "1",
213+
"message": "rebooting server, no update for any bluetooth sensors in check period : " + str(CHECK_PERIOD_MINUTES),
214+
}), { "Content-type": "application/x-www-form-urlencoded" })
215+
pushover_result = conn.getresponse()
216+
my_logger.error("notification sent to pushover : " + str(pushover_result.read().decode()))
217+
218+
# reboot the server
219+
ssh = paramiko.SSHClient()
220+
ssh.load_system_host_keys()
221+
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
222+
ssh.connect(REBOOT_SERVER, username=REBOOT_USER, password=REBOOT_PASSWORD, look_for_keys=False, allow_agent=False)
223+
ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(REBOOT_COMMAND)
224+
my_logger.error("server reboot command sent : " + REBOOT_SERVER + " " + str(ssh_stdout) + " " + str(ssh_stderr))
225+
alert_number = 1
226+
pat_rebooted = True
227+
else :
228+
# increment alert counter and reset if alert period has been run through
229+
pat_rebooted = False
230+
alert_number = alert_number + 1
231+
if (alert_number > 4) :
232+
alert_number = 0
233+
234+
else :
235+
pat_rebooted = False
236+
if ( DEBUG_LEVEL == "DEBUG" ) :
237+
print("*", end="")
238+
239+
# reset the pat, either way
240+
pat_the_watchdog = False
241+
242+
return
243+
244+
def main():
245+
246+
# keep track of transition to new day at midnight local time
247+
# at rollover, reset the tracking of duplicate incident id
248+
current_day = datetime.now().timetuple().tm_yday
249+
250+
try :
251+
# # connect to MQTT server
252+
# mqttc = mqtt.Client(PROGRAM_NAME) # Create instance of client with client ID
253+
# mqttc.connect(MQTT_SERVER, 1883) # Connect to (broker, port, keepalive-time)
254+
# Add message callbacks that will only trigger on a specific subscription match.
255+
mqttc.message_callback_add(MQTT_TOPIC_BASE + "/" + "#", message_received)
256+
mqttc.subscribe(MQTT_TOPIC_BASE + "/" + "#", 0)
257+
258+
my_logger.info("Program start : " + PROGRAM_NAME + " Version : " + VERSION_MAJOR + "." + VERSION_MINOR)
259+
260+
# Start mqtt
261+
mqttc.loop_start()
262+
263+
# start timeloop thread to update units on periodic basis
264+
tl.start()
265+
266+
# loop forever waiting for keyboard interrupt, seeing if there are unit update requests queued
267+
while True :
268+
269+
# check if it is a new day, if so clear out the record of duplicate incidents published during prior day
270+
# publish to MQTT a stat about how many unique incidents were published in prior day
271+
if current_day != datetime.now().timetuple().tm_yday :
272+
my_logger.info("24 hour rollover")
273+
current_day = datetime.now().timetuple().tm_yday
274+
275+
time.sleep(1)
276+
# end loop forever
277+
278+
except KeyboardInterrupt :
279+
tl.stop()
280+
message = {"timestamp": "{:d}".format(int(datetime.now().timestamp()))}
281+
message["program_version"] = PROGRAM_NAME + " Version : " + VERSION_MAJOR + "." + VERSION_MINOR
282+
message["status"] = "STOP"
283+
mqttc.publish(MQTT_TOPIC_BASE + "$SYS/STATUS", json.dumps(message))
284+
mqttc.disconnect()
285+
mqttc.loop_stop()
286+
my_logger.info("Keyboard interrupt.")
287+
# sys.exit(0)
288+
289+
except :
290+
tl.stop()
291+
my_logger.critical("Unhandled error : " + traceback.format_exc())
292+
sys.exit(1)
293+
294+
# proper exit
295+
print("")
296+
print("")
297+
my_logger.info("Program end : " + PROGRAM_NAME + " Version : " + VERSION_MAJOR + "." + VERSION_MINOR)
298+
sys.exit(0)
299+
300+
if __name__ == '__main__':
301+
main()
302+
# if __name__ == '__main__':
303+
# cProfile.run('main()')

0 commit comments

Comments
 (0)