Skip to content

Commit

Permalink
add functionality to stop taskvine workers
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Aug 21, 2024
1 parent 9d18eab commit 3481c11
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
5 changes: 4 additions & 1 deletion merlin/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
purge_taskvine_tasks,
query_taskvine_study,
run_taskvine,
start_taskvine_workers
start_taskvine_workers,
stop_taskvine_workers
)
from merlin.study.celeryadapter import (
build_set_of_queues,
Expand Down Expand Up @@ -241,6 +242,8 @@ def stop_workers(task_server, spec_worker_names, queues, workers_regex):
if task_server == "celery": # pylint: disable=R1705
# Stop workers
stop_celery_workers(queues, spec_worker_names, workers_regex)
elif task_server == "taskvine":
stop_taskvine_workers()
else:
LOG.error("Celery is not specified as the task server!")

Expand Down
24 changes: 22 additions & 2 deletions merlin/study/vineadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"""
import logging
import os
import psutil
import socket
import subprocess
import time
Expand Down Expand Up @@ -132,7 +133,9 @@ def start_taskvine_workers(
return command

else:
subprocess.Popen(command)
vine_factory_process = subprocess.Popen(command)
with get_vine_redis_connection() as redis_conn:
redis_conn.hset("vine_factory", "pid", vine_factory_process.pid)


def purge_taskvine_tasks(spec: MerlinSpec, force: bool):
Expand Down Expand Up @@ -193,7 +196,24 @@ def query_taskvine_study(spec: MerlinSpec):
print(headers, values)
print(tabulate(values, headers=headers))
print(study_info)



def stop_taskvine_workers():
"""
Stop the taskvine workers by killing the vine factory process
that's keeping them alive.
"""
with get_vine_redis_connection() as redis_conn:
vine_factory_pid = int(redis_conn.hget("vine_factory", "pid"))

LOG.info(f"Attempting to stop vine factory with pid {vine_factory_pid}")
if psutil.pid_exists(vine_factory_pid):
factory_process = psutil.Process(vine_factory_pid)
factory_process.kill()
LOG.info(f"Vine factory successfully killed. Workers should now be stopped.")
else:
LOG.warning(f"Vine factory with pid {vine_factory_pid} does not exist. Perhaps it has already been stopped?")



def get_running_managers(celery_app_name: str, test_mode: bool = False) -> List[str]:
Expand Down

0 comments on commit 3481c11

Please sign in to comment.