Skip to content

Commit

Permalink
suggested changes plus beginning work on monitor/manager collab
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Aug 17, 2024
1 parent ddc7614 commit 353a66b
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 218 deletions.
96 changes: 48 additions & 48 deletions merlin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,24 @@ def process_example(args: Namespace) -> None:


def process_manager(args: Namespace):
"""
Process the command for managing the workers.
This function interprets the command provided in the `args` namespace and
executes the corresponding manager function. It supports three commands:
"run", "start", and "stop".
:param args: parsed CLI arguments
"""
if args.command == "run":
run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout)
elif args.command == "start":
if start_manager(
query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout
):
LOG.info("Manager started successfully.")
else:
LOG.error("Unable to start manager")
elif args.command == "stop":
if stop_manager():
LOG.info("Manager stopped successfully.")
Expand Down Expand Up @@ -924,68 +935,57 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None:
)
manager.set_defaults(func=process_manager)

def add_manager_options(manager_parser: ArgumentParser):
"""
Add shared options for manager subcommands.
The `manager run` and `manager start` subcommands have the same options.
Rather than writing duplicate code for these we'll use this function
to add the arguments to these subcommands.
:param manager_parser: The ArgumentParser object to add these options to
"""
manager_parser.add_argument(
"-qf",
"--query_frequency",
action="store",
type=int,
default=60,
help="The frequency at which workers will be queried for response.",
)
manager_parser.add_argument(
"-qt",
"--query_timeout",
action="store",
type=float,
default=0.5,
help="The timeout for the query response that are sent to workers.",
)
manager_parser.add_argument(
"-wt",
"--worker_timeout",
action="store",
type=int,
default=180,
help="The sum total (query_frequency*tries) time before an attempt is made to restart worker.",
)

manager_commands: ArgumentParser = manager.add_subparsers(dest="command")
manager_run = manager_commands.add_parser(
"run",
help="Run the daemon process",
description="Run manager",
formatter_class=ArgumentDefaultsHelpFormatter,
)
manager_run.add_argument(
"-qf",
"--query_frequency",
action="store",
type=int,
default=60,
help="The frequency at which workers will be queried for response.",
)
manager_run.add_argument(
"-qt",
"--query_timeout",
action="store",
type=float,
default=0.5,
help="The timeout for the query response that are sent to workers.",
)
manager_run.add_argument(
"-wt",
"--worker_timeout",
action="store",
type=int,
default=180,
help="The sum total(query_frequency*tries) time before an attempt is made to restart worker.",
)
add_manager_options(manager_run)
manager_run.set_defaults(func=process_manager)
manager_start = manager_commands.add_parser(
"start",
help="Start the daemon process",
description="Start manager",
formatter_class=ArgumentDefaultsHelpFormatter,
)
manager_start.add_argument(
"-qf",
"--query_frequency",
action="store",
type=int,
default=60,
help="The frequency at which workers will be queried for response.",
)
manager_start.add_argument(
"-qt",
"--query_timeout",
action="store",
type=float,
default=0.5,
help="The timeout for the query response that are sent to workers.",
)
manager_start.add_argument(
"-wt",
"--worker_timeout",
action="store",
type=int,
default=180,
help="The sum total(query_frequency*tries) time before an attempt is made to restart worker.",
)
add_manager_options(manager_start)
manager_start.set_defaults(func=process_manager)
manager_stop = manager_commands.add_parser(
"stop",
Expand Down
58 changes: 32 additions & 26 deletions merlin/study/celeryadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,15 +502,22 @@ def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> b
"""
# Query celery for active tasks
active_tasks = app.control.inspect().active()
result = False

# Search for the queues we provided if necessary
if active_tasks is not None:
for tasks in active_tasks.values():
for task in tasks:
if task["delivery_info"]["routing_key"] in queues_in_spec:
return True
with CeleryManager.get_worker_status_redis_connection() as redis_connection:
# Search for the queues we provided if necessary
if active_tasks is not None:
for worker, tasks in active_tasks.items():
for task in tasks:
if task["delivery_info"]["routing_key"] in queues_in_spec:
result = True

return False
# Set the entry in the Redis DB for the manager to signify if the worker
# is still doing work
worker_still_processing = 1 if result else 0
redis_connection.hset(worker, "processing_work", worker_still_processing)

return result


def _get_workers_to_start(spec, steps):
Expand Down Expand Up @@ -771,25 +778,24 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs):
worker_list.append(worker_cmd)

# Adding the worker args to redis db
redis_connection = CeleryManager.get_worker_args_redis_connection()
args = kwargs
# Save worker command with the arguements
args["worker_cmd"] = worker_cmd
# Store the nested dictionaries into a separate key with a link.
# Note: This only support single nested dicts(for simplicity) and
# further nesting can be accomplished by making this recursive.
for key in kwargs:
if type(kwargs[key]) is dict:
key_name = worker_name + "_" + key
redis_connection.hmset(name=key_name, mapping=kwargs[key])
args[key] = "link:" + key_name
if type(kwargs[key]) is bool:
if kwargs[key]:
args[key] = "True"
else:
args[key] = "False"
redis_connection.hmset(name=worker_name, mapping=args)
redis_connection.quit()
with CeleryManager.get_worker_args_redis_connection() as redis_connection:
args = kwargs
# Save worker command with the arguements
args["worker_cmd"] = worker_cmd
# Store the nested dictionaries into a separate key with a link.
# Note: This only support single nested dicts(for simplicity) and
# further nesting can be accomplished by making this recursive.
for key in kwargs:
if type(kwargs[key]) is dict:
key_name = worker_name + "_" + key
redis_connection.hmset(name=key_name, mapping=kwargs[key])
args[key] = "link:" + key_name
if type(kwargs[key]) is bool:
if kwargs[key]:
args[key] = "True"
else:
args[key] = "False"
redis_connection.hmset(name=worker_name, mapping=args)

# Adding the worker to redis db to be monitored
add_monitor_workers(workers=((worker_name, process.pid),))
Expand Down
Loading

0 comments on commit 353a66b

Please sign in to comment.