Skip to content

Commit

Permalink
final PR touch ups
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Sep 3, 2024
1 parent 1a4d416 commit 875f137
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 70 deletions.
13 changes: 7 additions & 6 deletions merlin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def stop_workers(args):
LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?")

# Send stop command to router
router.stop_workers(args.task_server, worker_names, args.queues, args.workers)
router.stop_workers(args.task_server, worker_names, args.queues, args.workers, args.level.upper())


def print_info(args):
Expand Down Expand Up @@ -414,12 +414,13 @@ def process_manager(args: Namespace):
if args.command == "run":
run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout)
elif args.command == "start":
if start_manager(
query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout
):
try:
start_manager(
query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout
)
LOG.info("Manager started successfully.")
else:
LOG.error("Unable to start manager")
except Exception as e:
LOG.error(f"Unable to start manager.\n{e}")
elif args.command == "stop":
if stop_manager():
LOG.info("Manager stopped successfully.")
Expand Down
64 changes: 7 additions & 57 deletions merlin/managers/celerymanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,62 +50,12 @@ class WorkerStatus:
WORKER_INFO = {
"status": WorkerStatus.running,
"pid": -1,
"monitored": 1,
"monitored": 1, # This setting is for debug mode
"num_unresponsive": 0,
"processing_work": 1,
}


# class RedisConnectionManager:
# """
# A context manager for handling redis connections.
# This will ensure safe opening and closing of Redis connections.
# """

# def __init__(self, db_num: int):
# self.db_num = db_num
# self.connection = None

# def __enter__(self):
# self.connection = self.get_redis_connection()
# return self.connection

# def __exit__(self, exc_type, exc_val, exc_tb):
# if self.connection:
# LOG.debug(f"MANAGER: Closing connection at db_num: {self.db_num}")
# self.connection.close()

# def get_redis_connection(self) -> redis.Redis:
# """
# Generic redis connection function to get the results backend redis server with a given db number increment.

# :return: Redis connection object that can be used to access values for the manager.
# """
# # from merlin.config.results_backend import get_backend_password
# from merlin.config import results_backend
# from merlin.config.configfile import CONFIG

# conn_string = results_backend.get_connection_string()
# base, _ = conn_string.rsplit("/", 1)
# new_db_num = CONFIG.results_backend.db_num + self.db_num
# conn_string = f"{base}/{new_db_num}"
# LOG.debug(f"MANAGER: Connecting to redis at db_num: {new_db_num}")
# return redis.from_url(conn_string, decode_responses=True)
# # password_file = CONFIG.results_backend.password
# # try:
# # password = get_backend_password(password_file)
# # except IOError:
# # password = CONFIG.results_backend.password
# # return redis.Redis(
# # host=CONFIG.results_backend.server,
# # port=CONFIG.results_backend.port,
# # db=CONFIG.results_backend.db_num + self.db_num, # Increment db_num to avoid conflicts
# # username=CONFIG.results_backend.username,
# # password=password,
# # decode_responses=True,
# # )


class CeleryManager:
def __init__(self, query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180):
"""
Expand Down Expand Up @@ -156,6 +106,7 @@ def stop_celery_worker(self, worker: str) -> bool:
worker_pid = int(worker_status_connect.hget(worker, "pid"))
worker_status = worker_status_connect.hget(worker, "status")

# TODO be wary of stalled state workers (should not happen since we use psutil.Process.kill())
# Check to see if the pid exists and worker is set as running
if worker_status == WorkerStatus.running and psutil.pid_exists(worker_pid):
# Check to see if the pid is associated with celery
Expand All @@ -174,9 +125,8 @@ def restart_celery_worker(self, worker: str) -> bool:
:return: The result of whether a worker was restarted.
"""

# Stop the worker that is currently running
if not self.stop_celery_worker(worker):
return False
# Stop the worker that is currently running (if possible)
self.stop_celery_worker(worker)

# Start the worker again with the args saved in redis db
with (
Expand Down Expand Up @@ -218,7 +168,7 @@ def run(self):

with self.get_worker_status_redis_connection() as redis_connection:
LOG.debug(f"MANAGER: setting manager key in redis to hold the following info {manager_info}")
redis_connection.hmset(name="manager", mapping=manager_info)
redis_connection.hset("manager", mapping=manager_info)

# TODO figure out what to do with "processing_work" entry for the merlin monitor
while True: # TODO Make it so that it will stop after a list of workers is stopped
Expand Down Expand Up @@ -251,10 +201,10 @@ def run(self):
# If successful set the status to running and reset num_unresponsive
redis_connection.hset(worker, "status", WorkerStatus.running)
redis_connection.hset(worker, "num_unresponsive", 0)
# If failed set the status to stalled
redis_connection.hset(worker, "status", WorkerStatus.stalled)
LOG.info(f"MANAGER: Worker '{worker}' restarted.")
else:
# If failed set the status to stalled
redis_connection.hset(worker, "status", WorkerStatus.stalled)
LOG.error(f"MANAGER: Could not restart worker '{worker}'.")
else:
redis_connection.hset(worker, "num_unresponsive", num_unresponsive)
Expand Down
5 changes: 3 additions & 2 deletions merlin/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,20 +190,21 @@ def get_workers(task_server):
return []


def stop_workers(task_server, spec_worker_names, queues, workers_regex):
def stop_workers(task_server, spec_worker_names, queues, workers_regex, debug_lvl):
"""
Stops workers.
:param `task_server`: The task server from which to stop workers.
:param `spec_worker_names`: Worker names to stop, drawn from a spec.
:param `queues` : The queues to stop
:param `workers_regex` : Regex for workers to stop
:param debug_lvl: The debug level to use (INFO, DEBUG, ERROR, etc.)
"""
LOG.info("Stopping workers...")

if task_server == "celery": # pylint: disable=R1705
# Stop workers
stop_celery_workers(queues, spec_worker_names, workers_regex)
stop_celery_workers(queues, spec_worker_names, workers_regex, debug_lvl)
else:
LOG.error("Celery is not specified as the task server!")

Expand Down
7 changes: 4 additions & 3 deletions merlin/study/celeryadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

from merlin.common.dumper import dump_handler
from merlin.config import Config
from merlin.managers.celerymanager import CeleryManager
from merlin.managers.celerymanager import CeleryManager, WorkerStatus
from merlin.study.batch import batch_check_parallel, batch_worker_launch
from merlin.study.celerymanageradapter import add_monitor_workers, remove_monitor_workers
from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running
Expand Down Expand Up @@ -838,7 +838,7 @@ def purge_celery_tasks(queues, force):
return subprocess.run(purge_command, shell=True).returncode


def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): # pylint: disable=R0912
def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None, debug_lvl="INFO"): # pylint: disable=R0912
"""Send a stop command to celery workers.
Default behavior is to stop all connected workers.
Expand Down Expand Up @@ -903,7 +903,8 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None):
if workers_to_stop:
LOG.info(f"Sending stop to these workers: {workers_to_stop}")
app.control.broadcast("shutdown", destination=workers_to_stop)
remove_monitor_workers(workers=workers_to_stop)
remove_entry = False if debug_lvl == "DEBUG" else True
remove_monitor_workers(workers=workers_to_stop, worker_status=WorkerStatus.stopped, remove_entry=remove_entry)
else:
LOG.warning("No workers found to stop")

Expand Down
11 changes: 9 additions & 2 deletions merlin/study/celerymanageradapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ def add_monitor_workers(workers: list):
LOG.info(f"MANAGER: Manager is monitoring the following workers {monitored_workers}.")


def remove_monitor_workers(workers: list):
def remove_monitor_workers(
workers: list,
worker_status: WorkerStatus = None,
remove_entry: bool = True
):
"""
Remove workers from being monitored by the celery manager.
:param list workers: A worker names
Expand All @@ -78,7 +82,10 @@ def remove_monitor_workers(workers: list):
for worker in workers:
if redis_connection.exists(worker):
redis_connection.hset(worker, "monitored", 0)
redis_connection.hset(worker, "status", WorkerStatus.stopped)
if worker_status is not None:
redis_connection.hset(worker, "status", worker_status)
if remove_entry:
redis_connection.delete(worker)


def is_manager_runnning() -> bool:
Expand Down

0 comments on commit 875f137

Please sign in to comment.