From 875f13792f811dd693143f30b6ab192c8abefc32 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Tue, 3 Sep 2024 14:11:29 -0700 Subject: [PATCH] final PR touch ups --- merlin/main.py | 13 +++--- merlin/managers/celerymanager.py | 64 +++------------------------- merlin/router.py | 5 ++- merlin/study/celeryadapter.py | 7 +-- merlin/study/celerymanageradapter.py | 11 ++++- 5 files changed, 30 insertions(+), 70 deletions(-) diff --git a/merlin/main.py b/merlin/main.py index 46683d27..56fcbd5a 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -360,7 +360,7 @@ def stop_workers(args): LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?") # Send stop command to router - router.stop_workers(args.task_server, worker_names, args.queues, args.workers) + router.stop_workers(args.task_server, worker_names, args.queues, args.workers, args.level.upper()) def print_info(args): @@ -414,12 +414,13 @@ def process_manager(args: Namespace): if args.command == "run": run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout) elif args.command == "start": - if start_manager( - query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout - ): + try: + start_manager( + query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout + ) LOG.info("Manager started successfully.") - else: - LOG.error("Unable to start manager") + except Exception as e: + LOG.error(f"Unable to start manager.\n{e}") elif args.command == "stop": if stop_manager(): LOG.info("Manager stopped successfully.") diff --git a/merlin/managers/celerymanager.py b/merlin/managers/celerymanager.py index 99914545..e6b82085 100644 --- a/merlin/managers/celerymanager.py +++ b/merlin/managers/celerymanager.py @@ -50,62 +50,12 @@ class WorkerStatus: WORKER_INFO = { "status": WorkerStatus.running, "pid": -1, - "monitored": 1, + "monitored": 1, # This setting is for debug mode "num_unresponsive": 0, "processing_work": 1, } -# class RedisConnectionManager: -# """ -# A context manager for handling redis connections. -# This will ensure safe opening and closing of Redis connections. -# """ - -# def __init__(self, db_num: int): -# self.db_num = db_num -# self.connection = None - -# def __enter__(self): -# self.connection = self.get_redis_connection() -# return self.connection - -# def __exit__(self, exc_type, exc_val, exc_tb): -# if self.connection: -# LOG.debug(f"MANAGER: Closing connection at db_num: {self.db_num}") -# self.connection.close() - -# def get_redis_connection(self) -> redis.Redis: -# """ -# Generic redis connection function to get the results backend redis server with a given db number increment. - -# :return: Redis connection object that can be used to access values for the manager. -# """ -# # from merlin.config.results_backend import get_backend_password -# from merlin.config import results_backend -# from merlin.config.configfile import CONFIG - -# conn_string = results_backend.get_connection_string() -# base, _ = conn_string.rsplit("/", 1) -# new_db_num = CONFIG.results_backend.db_num + self.db_num -# conn_string = f"{base}/{new_db_num}" -# LOG.debug(f"MANAGER: Connecting to redis at db_num: {new_db_num}") -# return redis.from_url(conn_string, decode_responses=True) -# # password_file = CONFIG.results_backend.password -# # try: -# # password = get_backend_password(password_file) -# # except IOError: -# # password = CONFIG.results_backend.password -# # return redis.Redis( -# # host=CONFIG.results_backend.server, -# # port=CONFIG.results_backend.port, -# # db=CONFIG.results_backend.db_num + self.db_num, # Increment db_num to avoid conflicts -# # username=CONFIG.results_backend.username, -# # password=password, -# # decode_responses=True, -# # ) - - class CeleryManager: def __init__(self, query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180): """ @@ -156,6 +106,7 @@ def stop_celery_worker(self, worker: str) -> bool: worker_pid = int(worker_status_connect.hget(worker, "pid")) worker_status = worker_status_connect.hget(worker, "status") + # TODO be wary of stalled state workers (should not happen since we use psutil.Process.kill()) # Check to see if the pid exists and worker is set as running if worker_status == WorkerStatus.running and psutil.pid_exists(worker_pid): # Check to see if the pid is associated with celery @@ -174,9 +125,8 @@ def restart_celery_worker(self, worker: str) -> bool: :return: The result of whether a worker was restarted. """ - # Stop the worker that is currently running - if not self.stop_celery_worker(worker): - return False + # Stop the worker that is currently running (if possible) + self.stop_celery_worker(worker) # Start the worker again with the args saved in redis db with ( @@ -218,7 +168,7 @@ def run(self): with self.get_worker_status_redis_connection() as redis_connection: LOG.debug(f"MANAGER: setting manager key in redis to hold the following info {manager_info}") - redis_connection.hmset(name="manager", mapping=manager_info) + redis_connection.hset("manager", mapping=manager_info) # TODO figure out what to do with "processing_work" entry for the merlin monitor while True: # TODO Make it so that it will stop after a list of workers is stopped @@ -251,10 +201,10 @@ def run(self): # If successful set the status to running and reset num_unresponsive redis_connection.hset(worker, "status", WorkerStatus.running) redis_connection.hset(worker, "num_unresponsive", 0) - # If failed set the status to stalled - redis_connection.hset(worker, "status", WorkerStatus.stalled) LOG.info(f"MANAGER: Worker '{worker}' restarted.") else: + # If failed set the status to stalled + redis_connection.hset(worker, "status", WorkerStatus.stalled) LOG.error(f"MANAGER: Could not restart worker '{worker}'.") else: redis_connection.hset(worker, "num_unresponsive", num_unresponsive) diff --git a/merlin/router.py b/merlin/router.py index d9114bbc..9747b7c4 100644 --- a/merlin/router.py +++ b/merlin/router.py @@ -190,7 +190,7 @@ def get_workers(task_server): return [] -def stop_workers(task_server, spec_worker_names, queues, workers_regex): +def stop_workers(task_server, spec_worker_names, queues, workers_regex, debug_lvl): """ Stops workers. @@ -198,12 +198,13 @@ def stop_workers(task_server, spec_worker_names, queues, workers_regex): :param `spec_worker_names`: Worker names to stop, drawn from a spec. :param `queues` : The queues to stop :param `workers_regex` : Regex for workers to stop + :param debug_lvl: The debug level to use (INFO, DEBUG, ERROR, etc.) """ LOG.info("Stopping workers...") if task_server == "celery": # pylint: disable=R1705 # Stop workers - stop_celery_workers(queues, spec_worker_names, workers_regex) + stop_celery_workers(queues, spec_worker_names, workers_regex, debug_lvl) else: LOG.error("Celery is not specified as the task server!") diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 651cd7a4..510e5a04 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -46,7 +46,7 @@ from merlin.common.dumper import dump_handler from merlin.config import Config -from merlin.managers.celerymanager import CeleryManager +from merlin.managers.celerymanager import CeleryManager, WorkerStatus from merlin.study.batch import batch_check_parallel, batch_worker_launch from merlin.study.celerymanageradapter import add_monitor_workers, remove_monitor_workers from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running @@ -838,7 +838,7 @@ def purge_celery_tasks(queues, force): return subprocess.run(purge_command, shell=True).returncode -def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): # pylint: disable=R0912 +def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None, debug_lvl="INFO"): # pylint: disable=R0912 """Send a stop command to celery workers. Default behavior is to stop all connected workers. @@ -903,7 +903,8 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): if workers_to_stop: LOG.info(f"Sending stop to these workers: {workers_to_stop}") app.control.broadcast("shutdown", destination=workers_to_stop) - remove_monitor_workers(workers=workers_to_stop) + remove_entry = False if debug_lvl == "DEBUG" else True + remove_monitor_workers(workers=workers_to_stop, worker_status=WorkerStatus.stopped, remove_entry=remove_entry) else: LOG.warning("No workers found to stop") diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index 31072d23..81f1a324 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -67,7 +67,11 @@ def add_monitor_workers(workers: list): LOG.info(f"MANAGER: Manager is monitoring the following workers {monitored_workers}.") -def remove_monitor_workers(workers: list): +def remove_monitor_workers( + workers: list, + worker_status: WorkerStatus = None, + remove_entry: bool = True +): """ Remove workers from being monitored by the celery manager. :param list workers: A worker names @@ -78,7 +82,10 @@ def remove_monitor_workers(workers: list): for worker in workers: if redis_connection.exists(worker): redis_connection.hset(worker, "monitored", 0) - redis_connection.hset(worker, "status", WorkerStatus.stopped) + if worker_status is not None: + redis_connection.hset(worker, "status", worker_status) + if remove_entry: + redis_connection.delete(worker) def is_manager_runnning() -> bool: