From 353a66b8497d43771bc03cf8d48cd68a1b62f590 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Fri, 16 Aug 2024 17:12:47 -0700 Subject: [PATCH] suggested changes plus beginning work on monitor/manager collab --- merlin/main.py | 96 +++++----- merlin/study/celeryadapter.py | 58 +++--- merlin/study/celerymanager.py | 268 +++++++++++++++------------ merlin/study/celerymanageradapter.py | 72 ++++--- 4 files changed, 276 insertions(+), 218 deletions(-) diff --git a/merlin/main.py b/merlin/main.py index e6bb7c0c..46683d27 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -402,6 +402,15 @@ def process_example(args: Namespace) -> None: def process_manager(args: Namespace): + """ + Process the command for managing the workers. + + This function interprets the command provided in the `args` namespace and + executes the corresponding manager function. It supports three commands: + "run", "start", and "stop". + + :param args: parsed CLI arguments + """ if args.command == "run": run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout) elif args.command == "start": @@ -409,6 +418,8 @@ def process_manager(args: Namespace): query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout ): LOG.info("Manager started successfully.") + else: + LOG.error("Unable to start manager") elif args.command == "stop": if stop_manager(): LOG.info("Manager stopped successfully.") @@ -924,6 +935,41 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: ) manager.set_defaults(func=process_manager) + def add_manager_options(manager_parser: ArgumentParser): + """ + Add shared options for manager subcommands. + + The `manager run` and `manager start` subcommands have the same options. + Rather than writing duplicate code for these we'll use this function + to add the arguments to these subcommands. + + :param manager_parser: The ArgumentParser object to add these options to + """ + manager_parser.add_argument( + "-qf", + "--query_frequency", + action="store", + type=int, + default=60, + help="The frequency at which workers will be queried for response.", + ) + manager_parser.add_argument( + "-qt", + "--query_timeout", + action="store", + type=float, + default=0.5, + help="The timeout for the query response that are sent to workers.", + ) + manager_parser.add_argument( + "-wt", + "--worker_timeout", + action="store", + type=int, + default=180, + help="The sum total (query_frequency*tries) time before an attempt is made to restart worker.", + ) + manager_commands: ArgumentParser = manager.add_subparsers(dest="command") manager_run = manager_commands.add_parser( "run", @@ -931,30 +977,7 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: description="Run manager", formatter_class=ArgumentDefaultsHelpFormatter, ) - manager_run.add_argument( - "-qf", - "--query_frequency", - action="store", - type=int, - default=60, - help="The frequency at which workers will be queried for response.", - ) - manager_run.add_argument( - "-qt", - "--query_timeout", - action="store", - type=float, - default=0.5, - help="The timeout for the query response that are sent to workers.", - ) - manager_run.add_argument( - "-wt", - "--worker_timeout", - action="store", - type=int, - default=180, - help="The sum total(query_frequency*tries) time before an attempt is made to restart worker.", - ) + add_manager_options(manager_run) manager_run.set_defaults(func=process_manager) manager_start = manager_commands.add_parser( "start", @@ -962,30 +985,7 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: description="Start manager", formatter_class=ArgumentDefaultsHelpFormatter, ) - manager_start.add_argument( - "-qf", - "--query_frequency", - action="store", - type=int, - default=60, - help="The frequency at which workers will be queried for response.", - ) - manager_start.add_argument( - "-qt", - "--query_timeout", - action="store", - type=float, - default=0.5, - help="The timeout for the query response that are sent to workers.", - ) - manager_start.add_argument( - "-wt", - "--worker_timeout", - action="store", - type=int, - default=180, - help="The sum total(query_frequency*tries) time before an attempt is made to restart worker.", - ) + add_manager_options(manager_start) manager_start.set_defaults(func=process_manager) manager_stop = manager_commands.add_parser( "stop", diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 6c09590a..e392b179 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -502,15 +502,22 @@ def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> b """ # Query celery for active tasks active_tasks = app.control.inspect().active() + result = False - # Search for the queues we provided if necessary - if active_tasks is not None: - for tasks in active_tasks.values(): - for task in tasks: - if task["delivery_info"]["routing_key"] in queues_in_spec: - return True + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + # Search for the queues we provided if necessary + if active_tasks is not None: + for worker, tasks in active_tasks.items(): + for task in tasks: + if task["delivery_info"]["routing_key"] in queues_in_spec: + result = True - return False + # Set the entry in the Redis DB for the manager to signify if the worker + # is still doing work + worker_still_processing = 1 if result else 0 + redis_connection.hset(worker, "processing_work", worker_still_processing) + + return result def _get_workers_to_start(spec, steps): @@ -771,25 +778,24 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): worker_list.append(worker_cmd) # Adding the worker args to redis db - redis_connection = CeleryManager.get_worker_args_redis_connection() - args = kwargs - # Save worker command with the arguements - args["worker_cmd"] = worker_cmd - # Store the nested dictionaries into a separate key with a link. - # Note: This only support single nested dicts(for simplicity) and - # further nesting can be accomplished by making this recursive. - for key in kwargs: - if type(kwargs[key]) is dict: - key_name = worker_name + "_" + key - redis_connection.hmset(name=key_name, mapping=kwargs[key]) - args[key] = "link:" + key_name - if type(kwargs[key]) is bool: - if kwargs[key]: - args[key] = "True" - else: - args[key] = "False" - redis_connection.hmset(name=worker_name, mapping=args) - redis_connection.quit() + with CeleryManager.get_worker_args_redis_connection() as redis_connection: + args = kwargs + # Save worker command with the arguements + args["worker_cmd"] = worker_cmd + # Store the nested dictionaries into a separate key with a link. + # Note: This only support single nested dicts(for simplicity) and + # further nesting can be accomplished by making this recursive. + for key in kwargs: + if type(kwargs[key]) is dict: + key_name = worker_name + "_" + key + redis_connection.hmset(name=key_name, mapping=kwargs[key]) + args[key] = "link:" + key_name + if type(kwargs[key]) is bool: + if kwargs[key]: + args[key] = "True" + else: + args[key] = "False" + redis_connection.hmset(name=worker_name, mapping=args) # Adding the worker to redis db to be monitored add_monitor_workers(workers=((worker_name, process.pid),)) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 6f2e697a..ddefab02 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -27,7 +27,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. ############################################################################### - +import logging import os import subprocess import time @@ -36,6 +36,9 @@ import redis +LOG = logging.getLogger(__name__) + + class WorkerStatus: running = "Running" stalled = "Stalled" @@ -48,68 +51,89 @@ class WorkerStatus: "pid": -1, "monitored": 1, "num_unresponsive": 0, + "processing_work": 1, } +class RedisConnectionManager: + """ + A context manager for handling redis connections. + This will ensure safe opening and closing of Redis connections. + """ + + def __init__(self, db_num: int): + self.db_num = db_num + self.connection = None + + def __enter__(self): + self.connection = self.get_redis_connection() + return self.connection + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.connection: + LOG.debug(f"MANAGER: Closing connection at db_num: {self.db_num}") + self.connection.close() + + def get_redis_connection(self) -> redis.Redis: + """ + Generic redis connection function to get the results backend redis server with a given db number increment. + + :return: Redis connection object that can be used to access values for the manager. + """ + # from merlin.config.results_backend import get_backend_password + from merlin.config import results_backend + from merlin.config.configfile import CONFIG + + conn_string = results_backend.get_connection_string() + base, _ = conn_string.rsplit("/", 1) + new_db_num = CONFIG.results_backend.db_num + self.db_num + conn_string = f"{base}/{new_db_num}" + LOG.debug(f"MANAGER: Connecting to redis at db_num: {new_db_num}") + return redis.from_url(conn_string, decode_responses=True) + # password_file = CONFIG.results_backend.password + # try: + # password = get_backend_password(password_file) + # except IOError: + # password = CONFIG.results_backend.password + # return redis.Redis( + # host=CONFIG.results_backend.server, + # port=CONFIG.results_backend.port, + # db=CONFIG.results_backend.db_num + self.db_num, # Increment db_num to avoid conflicts + # username=CONFIG.results_backend.username, + # password=password, + # decode_responses=True, + # ) + + class CeleryManager: def __init__(self, query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180): """ Initializer for Celery Manager - @param int query_frequency: The frequency at which workers will be queried with ping commands - @param float query_timeout: The timeout for the query pings that are sent to workers - @param int worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. """ - self.redis_connection = self.get_worker_status_redis_connection() self.query_frequency = query_frequency self.query_timeout = query_timeout self.worker_timeout = worker_timeout @staticmethod - def get_worker_status_redis_connection(): - """ - Get the redis connection for info regarding the worker and manager status. - """ - return CeleryManager.get_redis_connection(1) - - @staticmethod - def get_worker_args_redis_connection(): - """ - Get the redis connection for info regarding the args used to generate each worker. - """ - return CeleryManager.get_redis_connection(2) + def get_worker_status_redis_connection() -> RedisConnectionManager: + """Get the redis connection for info regarding the worker and manager status.""" + return RedisConnectionManager(1) @staticmethod - def get_redis_connection(db_num): - """ - Generic redis connection function to get the results backend redis server with a given db number increment. - :param int db_num: Increment number for the db from the one provided in the config file. + def get_worker_args_redis_connection() -> RedisConnectionManager: + """Get the redis connection for info regarding the args used to generate each worker.""" + return RedisConnectionManager(2) - :return Redis: Redis connections object that can be used to access values for the manager. - """ - from merlin.config.configfile import CONFIG - from merlin.config.results_backend import get_backend_password - - password_file = CONFIG.results_backend.password - try: - password = get_backend_password(password_file) - except IOError: - password = CONFIG.results_backend.password - return redis.Redis( - host=CONFIG.results_backend.server, - port=CONFIG.results_backend.port, - db=CONFIG.results_backend.db_num + db_num, # Increment db_num to avoid conflicts - username=CONFIG.results_backend.username, - password=password, - decode_responses=True, - ) - - def get_celery_workers_status(self, workers): + def get_celery_workers_status(self, workers: list) -> dict: """ Get the worker status of a current worker that is being managed - :param CeleryManager self: CeleryManager attempting the stop. - :param list workers: Workers that are checked. - :return dict: The result dictionary for each worker and the response. + :param workers: Workers that are checked. + :return: The result dictionary for each worker and the response. """ from merlin.celery import app @@ -118,20 +142,19 @@ def get_celery_workers_status(self, workers): worker_results = {worker: status for d in ping_result for worker, status in d.items()} return worker_results - def stop_celery_worker(self, worker): + def stop_celery_worker(self, worker: str) -> bool: """ Stop a celery worker by kill the worker with pid - :param CeleryManager self: CeleryManager attempting the stop. - :param str worker: Worker that is being stopped. - :return bool: The result of whether a worker was stopped. + :param worker: Worker that is being stopped. + :return: The result of whether a worker was stopped. """ - # Get the PID associated with the pid - worker_status_connect = self.get_worker_status_redis_connection() - worker_pid = int(worker_status_connect.hget(worker, "pid")) - worker_status = worker_status_connect.hget(worker, "status") - worker_status_connect.quit() + # Get the PID associated with the worker + with self.get_worker_status_redis_connection() as worker_status_connect: + worker_pid = int(worker_status_connect.hget(worker, "pid")) + worker_status = worker_status_connect.hget(worker, "status") + # Check to see if the pid exists and worker is set as running if worker_status == WorkerStatus.running and psutil.pid_exists(worker_pid): # Check to see if the pid is associated with celery @@ -142,87 +165,100 @@ def stop_celery_worker(self, worker): return True return False - def restart_celery_worker(self, worker): + def restart_celery_worker(self, worker: str) -> bool: """ Restart a celery worker with the same arguements and parameters during its creation - :param CeleryManager self: CeleryManager attempting the stop. - :param str worker: Worker that is being restarted. - :return bool: The result of whether a worker was restarted. + :param worker: Worker that is being restarted. + :return: The result of whether a worker was restarted. """ # Stop the worker that is currently running if not self.stop_celery_worker(worker): return False + # Start the worker again with the args saved in redis db - worker_args_connect = self.get_worker_args_redis_connection() - worker_status_connect = self.get_worker_status_redis_connection() - # Get the args and remove the worker_cmd from the hash set - args = worker_args_connect.hgetall(worker) - worker_cmd = args["worker_cmd"] - del args["worker_cmd"] - kwargs = args - for key in args: - if args[key].startswith("link:"): - kwargs[key] = worker_args_connect.hgetall(args[key].split(":", 1)[1]) - elif args[key] == "True": - kwargs[key] = True - elif args[key] == "False": - kwargs[key] = False - - # Run the subprocess for the worker and save the PID - process = subprocess.Popen(worker_cmd, **kwargs) - worker_status_connect.hset(worker, "pid", process.pid) - - worker_args_connect.quit() - worker_status_connect.quit() + with ( + self.get_worker_args_redis_connection() as worker_args_connect, + self.get_worker_status_redis_connection() as worker_status_connect, + ): + # Get the args and remove the worker_cmd from the hash set + args = worker_args_connect.hgetall(worker) + worker_cmd = args["worker_cmd"] + del args["worker_cmd"] + kwargs = args + for key in args: + if args[key].startswith("link:"): + kwargs[key] = worker_args_connect.hgetall(args[key].split(":", 1)[1]) + elif args[key] == "True": + kwargs[key] = True + elif args[key] == "False": + kwargs[key] = False + + # Run the subprocess for the worker and save the PID + process = subprocess.Popen(worker_cmd, **kwargs) + worker_status_connect.hset(worker, "pid", process.pid) return True - # TODO add some logs def run(self): """ - Main manager loop - """ + Main manager loop for monitoring and managing Celery workers. + This method continuously monitors the status of Celery workers by + checking their health and attempting to restart any that are + unresponsive. It updates the Redis database with the current + status of the manager and the workers. + """ manager_info = { "status": "Running", - "process id": os.getpid(), + "pid": os.getpid(), } - self.redis_connection.hmset(name="manager", mapping=manager_info) - - while True: # TODO Make it so that it will stop after a list of workers is stopped - # Get the list of running workers - workers = self.redis_connection.keys() - workers.remove("manager") - workers = [worker for worker in workers if int(self.redis_connection.hget(worker, "monitored"))] - print(f"Monitoring {workers} workers") - - # Check/ Ping each worker to see if they are still running - if workers: - worker_results = self.get_celery_workers_status(workers) - - # If running set the status on redis that it is running - for worker in list(worker_results.keys()): - self.redis_connection.hset(worker, "status", WorkerStatus.running) - - # If not running attempt to restart it - for worker in workers: - if worker not in worker_results: - # If time where the worker is unresponsive is less than the worker time out then just increment - num_unresponsive = int(self.redis_connection.hget(worker, "num_unresponsive")) + 1 - if num_unresponsive * self.query_frequency < self.worker_timeout: - # Attempt to restart worker - if self.restart_celery_worker(worker): - # If successful set the status to running and reset num_unresponsive - self.redis_connection.hset(worker, "status", WorkerStatus.running) - self.redis_connection.hset(worker, "num_unresponsive", 0) - # If failed set the status to stalled - self.redis_connection.hset(worker, "status", WorkerStatus.stalled) - else: - self.redis_connection.hset(worker, "num_unresponsive", num_unresponsive) - # Sleep for the query_frequency for the next iteration - time.sleep(self.query_frequency) + + with self.get_worker_status_redis_connection() as redis_connection: + LOG.debug(f"MANAGER: setting manager key in redis to hold the following info {manager_info}") + redis_connection.hmset(name="manager", mapping=manager_info) + + # TODO figure out what to do with "processing_work" entry for the merlin monitor + while True: # TODO Make it so that it will stop after a list of workers is stopped + # Get the list of running workers + workers = redis_connection.keys() + LOG.debug(f"MANAGER: workers: {workers}") + workers.remove("manager") + workers = [worker for worker in workers if int(redis_connection.hget(worker, "monitored"))] + LOG.info(f"MANAGER: Monitoring {workers} workers") + + # Check/ Ping each worker to see if they are still running + if workers: + worker_results = self.get_celery_workers_status(workers) + + # If running set the status on redis that it is running + LOG.info(f"MANAGER: Responsive workers: {worker_results.keys()}") + for worker in list(worker_results.keys()): + redis_connection.hset(worker, "status", WorkerStatus.running) + + # If not running attempt to restart it + for worker in workers: + if worker not in worker_results: + LOG.info(f"MANAGER: Worker '{worker}' is unresponsive.") + # If time where the worker is unresponsive is less than the worker time out then just increment + num_unresponsive = int(redis_connection.hget(worker, "num_unresponsive")) + 1 + if num_unresponsive * self.query_frequency < self.worker_timeout: + # Attempt to restart worker + LOG.info(f"MANAGER: Attempting to restart worker '{worker}'...") + if self.restart_celery_worker(worker): + # If successful set the status to running and reset num_unresponsive + redis_connection.hset(worker, "status", WorkerStatus.running) + redis_connection.hset(worker, "num_unresponsive", 0) + # If failed set the status to stalled + redis_connection.hset(worker, "status", WorkerStatus.stalled) + LOG.info(f"MANAGER: Worker '{worker}' restarted.") + else: + LOG.error(f"MANAGER: Could not restart worker '{worker}'.") + else: + redis_connection.hset(worker, "num_unresponsive", num_unresponsive) + # Sleep for the query_frequency for the next iteration + time.sleep(self.query_frequency) if __name__ == "__main__": diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index a433a8ca..d195eb96 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -27,6 +27,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. ############################################################################### +import logging import subprocess import psutil @@ -34,6 +35,9 @@ from merlin.study.celerymanager import WORKER_INFO, CeleryManager, WorkerStatus +LOG = logging.getLogger(__name__) + + def add_monitor_workers(workers: list): """ Adds workers to be monitored by the celery manager. @@ -42,15 +46,25 @@ def add_monitor_workers(workers: list): if workers is None or len(workers) <= 0: return - redis_connection = CeleryManager.get_worker_status_redis_connection() - for worker in workers: - if redis_connection.exists(worker[0]): - redis_connection.hset(worker[0], "monitored", 1) - redis_connection.hset(worker[0], "pid", worker[1]) - worker_info = WORKER_INFO - worker_info["pid"] = worker[1] - redis_connection.hmset(name=worker[0], mapping=worker_info) - redis_connection.quit() + LOG.info( + f"MANAGER: Attempting to have the manager monitor the following workers {[worker_name for worker_name, _ in workers]}." + ) + monitored_workers = [] + + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + for worker in workers: + LOG.debug(f"MANAGER: Checking if connection for worker '{worker}' exists...") + if redis_connection.exists(worker[0]): + LOG.debug(f"MANAGER: Connection for worker '{worker}' exists. Setting this worker to be monitored") + redis_connection.hset(worker[0], "monitored", 1) + redis_connection.hset(worker[0], "pid", worker[1]) + monitored_workers.append(worker[0]) + else: + LOG.debug(f"MANAGER: Connection for worker '{worker}' does not exist. Not monitoring this worker.") + worker_info = WORKER_INFO + worker_info["pid"] = worker[1] + redis_connection.hmset(name=worker[0], mapping=worker_info) + LOG.info(f"MANAGER: Manager is monitoring the following workers {monitored_workers}.") def remove_monitor_workers(workers: list): @@ -60,13 +74,11 @@ def remove_monitor_workers(workers: list): """ if workers is None or len(workers) <= 0: return - redis_connection = CeleryManager.get_worker_status_redis_connection() - for worker in workers: - if redis_connection.exists(worker): - redis_connection.hset(worker, "monitored", 0) - redis_connection.hset(worker, "status", WorkerStatus.stopped) - - redis_connection.quit() + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + for worker in workers: + if redis_connection.exists(worker): + redis_connection.hset(worker, "monitored", 0) + redis_connection.hset(worker, "status", WorkerStatus.stopped) def is_manager_runnning() -> bool: @@ -75,16 +87,18 @@ def is_manager_runnning() -> bool: :return: True if manager is running and False if not. """ - redis_connection = CeleryManager.get_worker_args_redis_connection() - manager_status = redis_connection.hgetall("manager") - redis_connection.quit() + with CeleryManager.get_worker_args_redis_connection() as redis_connection: + manager_status = redis_connection.hgetall("manager") return manager_status["status"] == WorkerStatus.running and psutil.pid_exists(manager_status["pid"]) def run_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool: """ A process locking function that calls the celery manager with proper arguments. - :params: See CeleryManager for more information regarding the parameters + + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. """ celerymanager = CeleryManager(query_frequency=query_frequency, query_timeout=query_timeout, worker_timeout=worker_timeout) celerymanager.run() @@ -93,9 +107,11 @@ def run_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_ti def start_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool: """ A Non-locking function that calls the celery manager with proper arguments. - :params: See CeleryManager for more information regarding the parameters - :return bool: True if the manager was started successfully. + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + :return bool: True if the manager was started successfully. """ subprocess.Popen( f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}", @@ -112,13 +128,13 @@ def stop_manager() -> bool: :return bool: True if the manager was stopped successfully and False otherwise. """ - redis_connection = CeleryManager.get_worker_status_redis_connection() - manager_pid = int(redis_connection.hget("manager", "pid")) - manager_status = redis_connection.hget("manager", "status") - print(redis_connection.hgetall("manager")) - redis_connection.quit() + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + LOG.debug(f"MANAGER: manager keys: {redis_connection.hgetall('manager')}") + manager_pid = int(redis_connection.hget("manager", "pid")) + manager_status = redis_connection.hget("manager", "status") + LOG.debug(f"MANAGER: manager_status: {manager_status}") + LOG.debug(f"MANAGER: pid exists: {psutil.pid_exists(manager_pid)}") - print(manager_status, psutil.pid_exists(manager_pid)) # Check to make sure that the manager is running and the pid exists if manager_status == WorkerStatus.running and psutil.pid_exists(manager_pid): psutil.Process(manager_pid).terminate()