Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/distributed tests #16

Open
wants to merge 219 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
219 commits
Select commit Hold shift + click to select a range
ec13acf
Merge branch 'develop' of https://github.com/LLNL/merlin into develop
bgunnar5 Aug 4, 2023
7c59fc2
merge latest changes from develop
bgunnar5 Nov 20, 2023
29573d4
remove a merge conflict statement that was missed
bgunnar5 Nov 20, 2023
f10c896
add pytest coverage library and add sample_index coverage
bgunnar5 Dec 11, 2023
362478e
run fix style and add module header
bgunnar5 Dec 12, 2023
9339b6b
add tests for encryption modules
bgunnar5 Dec 12, 2023
54a31bc
add unit tests for util_sampling
bgunnar5 Dec 12, 2023
be02611
run fix-style and fix typo
bgunnar5 Dec 12, 2023
a015e3c
pull latest changes from develop
bgunnar5 Dec 12, 2023
63d22f0
create directory for context managers and fix issue with an encryptio…
bgunnar5 Dec 12, 2023
b2a9976
add a context manager for spinning up/down the redis server
bgunnar5 Dec 13, 2023
0ef09bc
Merge remote-tracking branch 'origin/develop' into develop
bgunnar5 Dec 13, 2023
35c614a
fix issue with path in one test
bgunnar5 Dec 13, 2023
638a27e
rework CONFIG functionality for testing
bgunnar5 Dec 13, 2023
9b342ab
refactor config fixture so it doesn't depend on redis server to be st…
bgunnar5 Dec 14, 2023
661ab71
split CONFIG fixtures into rabbit and redis configs, run fix-style
bgunnar5 Dec 14, 2023
db1f20a
add unit tests for broker.py
bgunnar5 Dec 14, 2023
896898e
add unit tests for the Config object
bgunnar5 Dec 14, 2023
2a5148c
update CHANGELOG
bgunnar5 Dec 14, 2023
3d7228e
make CONFIG fixtures more flexible for tests
bgunnar5 Dec 18, 2023
525e403
add tests for results_backend.py
bgunnar5 Dec 18, 2023
47a0b4e
fix lint issues for most recent changes
bgunnar5 Dec 18, 2023
91a3f2f
fix filename issue in setup.cfg and move celeryadapter tests to integ…
bgunnar5 Dec 19, 2023
78b019b
add ssl filepaths to mysql config object
bgunnar5 Dec 19, 2023
275fbd4
add unit tests for configfile.py
bgunnar5 Dec 19, 2023
9669bd0
add tests for the utils.py file in config/
bgunnar5 Dec 19, 2023
2a209e5
create utilities file and constants file
bgunnar5 Dec 19, 2023
ddb0588
move create_dir function to utils.py
bgunnar5 Dec 20, 2023
e5bc0fe
add tests for merlin/examples/generator.py
bgunnar5 Dec 20, 2023
681bd71
run fix-style and update changelog
bgunnar5 Dec 20, 2023
fb6058c
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Jan 24, 2024
2a6ec3a
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Jan 25, 2024
b22e3a1
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Feb 14, 2024
4b8fab5
add a 'pip freeze' call in github workflow to view reqs versions
bgunnar5 Feb 14, 2024
714dafe
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Feb 15, 2024
d00aff0
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Mar 12, 2024
9a9f2c0
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Apr 15, 2024
074e031
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Apr 15, 2024
124e40f
pull changes from 1.12.0 and 1.12.1
bgunnar5 Apr 25, 2024
3099d4c
re-delete the old config test files
bgunnar5 Apr 25, 2024
37839f6
fix tests/bugs introduced by merging in develop
bgunnar5 Apr 25, 2024
b8185cc
add a unit test file for the dumper module
bgunnar5 Apr 25, 2024
e48fe32
begin work on server tests and modular fixtures
bgunnar5 May 7, 2024
812459f
pull changes from develop
bgunnar5 May 10, 2024
e1f667d
start work on tests for RedisConfig
bgunnar5 May 23, 2024
9997d8e
add tests for RedisConfig object
bgunnar5 Jun 4, 2024
52213f2
add tests for RedisUsers class
bgunnar5 Jun 4, 2024
a59243f
change server fixtures to use redis config files
bgunnar5 Jun 6, 2024
0ef586e
add tests for AppYaml class
bgunnar5 Jun 6, 2024
bde9079
final cleanup of server_utils
bgunnar5 Jun 6, 2024
da94020
fix lint issues
bgunnar5 Jun 6, 2024
2997de6
parametrize setup examples tests
bgunnar5 Jun 6, 2024
2f24577
sort example output
bgunnar5 Jun 6, 2024
5e0a5f7
ensure directory is changed back on no outdir test
bgunnar5 Jun 6, 2024
d8fa77c
sort the specs in examples output
bgunnar5 Jun 6, 2024
8421d74
fix lint issues
bgunnar5 Jun 6, 2024
0543ae4
start writing tests for server config
bgunnar5 Jun 10, 2024
3b0ccde
add pytest coverage library and add sample_index coverage
bgunnar5 Dec 11, 2023
4ad3729
run fix style and add module header
bgunnar5 Dec 12, 2023
089e2bd
add tests for encryption modules
bgunnar5 Dec 12, 2023
2a3bc69
add unit tests for util_sampling
bgunnar5 Dec 12, 2023
30fd21d
run fix-style and fix typo
bgunnar5 Dec 12, 2023
76b3a55
create directory for context managers and fix issue with an encryptio…
bgunnar5 Dec 12, 2023
fa33cb2
add a context manager for spinning up/down the redis server
bgunnar5 Dec 13, 2023
78a298a
fix issue with path in one test
bgunnar5 Dec 13, 2023
5b6878c
rework CONFIG functionality for testing
bgunnar5 Dec 13, 2023
7aaa832
refactor config fixture so it doesn't depend on redis server to be st…
bgunnar5 Dec 14, 2023
acb6d43
split CONFIG fixtures into rabbit and redis configs, run fix-style
bgunnar5 Dec 14, 2023
c1bfc6a
add unit tests for broker.py
bgunnar5 Dec 14, 2023
3300291
add unit tests for the Config object
bgunnar5 Dec 14, 2023
58a3044
update CHANGELOG
bgunnar5 Dec 14, 2023
9f16701
make CONFIG fixtures more flexible for tests
bgunnar5 Dec 18, 2023
17be237
add tests for results_backend.py
bgunnar5 Dec 18, 2023
d54f750
fix lint issues for most recent changes
bgunnar5 Dec 18, 2023
8e9d1e2
fix filename issue in setup.cfg and move celeryadapter tests to integ…
bgunnar5 Dec 19, 2023
6dae836
add ssl filepaths to mysql config object
bgunnar5 Dec 19, 2023
89caa21
add unit tests for configfile.py
bgunnar5 Dec 19, 2023
ed3a660
add tests for the utils.py file in config/
bgunnar5 Dec 19, 2023
45dfa51
create utilities file and constants file
bgunnar5 Dec 19, 2023
598ffb7
move create_dir function to utils.py
bgunnar5 Dec 20, 2023
a8d0d1d
add tests for merlin/examples/generator.py
bgunnar5 Dec 20, 2023
7728893
run fix-style and update changelog
bgunnar5 Dec 20, 2023
6b97b0a
fix tests/bugs introduced by merging in develop
bgunnar5 Apr 25, 2024
57f0446
add a unit test file for the dumper module
bgunnar5 Apr 25, 2024
963fed1
begin work on server tests and modular fixtures
bgunnar5 May 7, 2024
8649ca1
start work on tests for RedisConfig
bgunnar5 May 23, 2024
91ad49b
add tests for RedisConfig object
bgunnar5 Jun 4, 2024
1df0e44
add tests for RedisUsers class
bgunnar5 Jun 4, 2024
921c38b
change server fixtures to use redis config files
bgunnar5 Jun 6, 2024
2cad8cb
add tests for AppYaml class
bgunnar5 Jun 6, 2024
f28cad3
final cleanup of server_utils
bgunnar5 Jun 6, 2024
716dc32
fix lint issues
bgunnar5 Jun 6, 2024
c1b71f0
parametrize setup examples tests
bgunnar5 Jun 6, 2024
bd598f4
sort example output
bgunnar5 Jun 6, 2024
0487a9e
ensure directory is changed back on no outdir test
bgunnar5 Jun 6, 2024
c11e82f
sort the specs in examples output
bgunnar5 Jun 6, 2024
a39e65c
fix lint issues
bgunnar5 Jun 6, 2024
031fb0e
start writing tests for server config
bgunnar5 Jun 10, 2024
ed3b4ea
bake in LC_ALL env variable setting for server cmds
bgunnar5 Jun 11, 2024
9dc2bd8
add tests for parse_redis_output
bgunnar5 Jun 11, 2024
459aa21
fix issue with scope of fixture after rebase
bgunnar5 Jun 26, 2024
a6b52d0
resolve merge conflicts after rebase
bgunnar5 Jun 26, 2024
5945c36
run fix-style
bgunnar5 Jun 26, 2024
78ef619
Include celerymanager and update celeryadapter to check the status of…
ryannova Jul 23, 2024
8561a18
Fixed issue where the update status was outside of if statement for c…
ryannova Jul 23, 2024
1120dd7
Include worker status stop and add template for merlin restart
ryannova Aug 1, 2024
f41938f
Added comment to the CeleryManager init
ryannova Aug 2, 2024
690115e
Increment db_num instead of being fixed
ryannova Aug 2, 2024
de4ffd0
Added other subprocess parameters and created a linking system for re…
ryannova Aug 2, 2024
67e9268
Implemented stopping of celery workers and restarting workers properly
ryannova Aug 6, 2024
406e4c2
Update stopped to stalled for when the worker doesn't respond to restart
ryannova Aug 6, 2024
78e4525
Working merlin manager run but start and stop not working properly
ryannova Aug 7, 2024
eca74ac
Made fix for subprocess to start new shell and fixed manager start an…
ryannova Aug 7, 2024
ec8aa78
Added comments and update changelog
ryannova Aug 7, 2024
3f04d24
Include style fixes
ryannova Aug 7, 2024
5538f4b
Fix style for black
ryannova Aug 7, 2024
b6bcd33
Revert launch_job script that was edited when doing automated lint
ryannova Aug 7, 2024
9b97f8b
Move importing of CONFIG to be within redis_connection due to error o…
ryannova Aug 7, 2024
c9dfd31
Added space to fix style
ryannova Aug 7, 2024
a9bd865
Revert launch_jobs.py:
ryannova Aug 7, 2024
ddc7614
Update import of all merlin.config to be in the function
ryannova Aug 7, 2024
353a66b
suggested changes plus beginning work on monitor/manager collab
bgunnar5 Aug 17, 2024
1a4d416
move managers to their own folder and fix ssl problems
bgunnar5 Aug 22, 2024
875f137
final PR touch ups
bgunnar5 Sep 3, 2024
9020aa0
Merge pull request #2 from bgunnar5/monitor_manager_collab
ryannova Sep 3, 2024
58da9bc
Fix lint style changes
ryannova Sep 3, 2024
e75dcc2
Fixed issue with context manager
ryannova Sep 4, 2024
11f9e7c
Reset file that was incorrect changed
ryannova Sep 4, 2024
7204e46
Check for ssl cert before applying to Redis connection
ryannova Sep 4, 2024
53d8f32
Comment out Active tests for celerymanager
ryannova Sep 4, 2024
6bf1fe6
split up create_server_config and write tests for it
bgunnar5 Sep 5, 2024
cf307bb
add tests for config_merlin_server function
bgunnar5 Sep 5, 2024
a5ccb2d
Fix lint issue with unused import after commenting out Active celery …
ryannova Sep 9, 2024
2b0e8a6
Fixed style for import
ryannova Sep 9, 2024
6ba91a6
add tests for pull_server_config
bgunnar5 Sep 9, 2024
28e5040
add tests for pull_server_image
bgunnar5 Sep 10, 2024
24470e5
finish writing tests for server_config.py
bgunnar5 Sep 11, 2024
e49f378
Fixed kwargs being modified when making a copy for saving to redis wo…
ryannova Sep 12, 2024
72398e6
add tests for server_commands.py
bgunnar5 Sep 13, 2024
ff4f649
run fix-style
bgunnar5 Sep 13, 2024
7050822
update README for testing directory
bgunnar5 Sep 13, 2024
0c74021
update the temp_output_directory to include python version
bgunnar5 Sep 13, 2024
42c121b
mock the open.write to try to fix github CI
bgunnar5 Sep 13, 2024
f5e8671
ensure config dir is created
bgunnar5 Sep 13, 2024
3ce140e
update CHANGELOG
bgunnar5 Sep 13, 2024
73e4cf5
add print of exception to OSError catch in pull_server_image
bgunnar5 Sep 13, 2024
5b36b41
change name of config_file in test that's failing
bgunnar5 Sep 13, 2024
352e7df
Added password check and omit if a password doesn't exist
ryannova Sep 13, 2024
9782d58
update CHANGELOG
bgunnar5 Sep 13, 2024
75a9972
change testing log level to debug
bgunnar5 Sep 16, 2024
c27a208
add debug statement for redis_connection
bgunnar5 Sep 17, 2024
97a9cf1
change debug log to info so github ci will display it
bgunnar5 Sep 17, 2024
ce8bf37
attempt to fix password missing from Namespace error
bgunnar5 Sep 17, 2024
5851d9d
run checks for all necessary configurations
bgunnar5 Sep 17, 2024
97d075e
convert stop-workers tests to pytest format
bgunnar5 Sep 20, 2024
04e9122
update github wf and comment out stop-workers tests in definitions.py
bgunnar5 Sep 20, 2024
f93c7f6
add missing key to GH wf file
bgunnar5 Sep 20, 2024
835399c
fix invalid syntax in definitions.py
bgunnar5 Sep 20, 2024
176ff4d
comment out stop_workers tests
bgunnar5 Sep 24, 2024
e38cc93
playing with new caches for workflow CI
bgunnar5 Sep 24, 2024
c136058
fix yaml syntax error
bgunnar5 Sep 24, 2024
56a6a05
fix typo for getting runner os
bgunnar5 Sep 24, 2024
f45a798
fix test and add python version to CI cache
bgunnar5 Sep 24, 2024
290d350
add in common-setup step again with caches this time
bgunnar5 Sep 24, 2024
8a1bc14
run fix-style
bgunnar5 Sep 24, 2024
58622ba
update CHANGELOG
bgunnar5 Sep 24, 2024
917f8d7
fix remaining style issues
bgunnar5 Sep 24, 2024
91c7505
run without caches to compare execution time of test suite
bgunnar5 Sep 24, 2024
c7adb96
resolve merge conflict
bgunnar5 Sep 24, 2024
608e00e
allow redis config to not use ssl
bgunnar5 Sep 25, 2024
bf41a2d
remove stop-workers and query-workers tests from definitions.py
bgunnar5 Sep 26, 2024
630c9c9
create helper_funcs file with common testing functions
bgunnar5 Sep 26, 2024
17889fd
move query-workers to pytest and add base class w/ stop-workers tests
bgunnar5 Sep 26, 2024
5c28b49
update CHANGELOG
bgunnar5 Sep 26, 2024
643b4d1
final changes for the stop-workers & query-workers tests
bgunnar5 Sep 27, 2024
0f0264c
run fix-style
bgunnar5 Sep 27, 2024
6340604
move stop and query workers tests to the same file
bgunnar5 Sep 30, 2024
19c4bf7
run fix-style
bgunnar5 Sep 30, 2024
99257d8
go back to original cache setup
bgunnar5 Sep 30, 2024
f947eae
try new cache for singularity install
bgunnar5 Sep 30, 2024
beafb22
fix syntax issue in github workflow
bgunnar5 Sep 30, 2024
fac2892
attempt to fix singularity cache
bgunnar5 Sep 30, 2024
c49660a
remove ls statement that breaks workflow
bgunnar5 Sep 30, 2024
ecb1762
revert back to no common setup
bgunnar5 Sep 30, 2024
39d09d6
remove unnecessary dependency
bgunnar5 Sep 30, 2024
5f4673b
update github actions versions to use latest
bgunnar5 Sep 30, 2024
70e540f
update action versions that didn't save
bgunnar5 Sep 30, 2024
94f8f72
merge in new tests for stop and query workers commands
bgunnar5 Sep 30, 2024
d2e85ec
run fix-style
bgunnar5 Sep 30, 2024
9c4fca4
move distributed test suite actions back to v2
bgunnar5 Oct 3, 2024
3a2cafa
add 'merlin run' tests and port existing ones to pytest
bgunnar5 Oct 15, 2024
15d6665
update CHANGELOG
bgunnar5 Oct 15, 2024
5844e91
add aliased fixture types for typehinting
bgunnar5 Oct 15, 2024
085d4b2
add tests for the purge command
bgunnar5 Oct 16, 2024
b36dedc
update CHANGELOG
bgunnar5 Oct 16, 2024
6758282
update run command tests to use conditions when appropriate
bgunnar5 Oct 16, 2024
466c276
start work on adding workflow tests
bgunnar5 Oct 28, 2024
144246f
create function and class scoped config fixtures
bgunnar5 Oct 31, 2024
0518a48
add Tuple fixture type
bgunnar5 Oct 31, 2024
d577653
get e2e test of feature_demo workflow running
bgunnar5 Oct 31, 2024
983215b
add check for proper variable substitution in e2e test
bgunnar5 Oct 31, 2024
977e91c
generalize functionality to run workflows
bgunnar5 Nov 4, 2024
ba3f066
add create_testing_dir fixture
bgunnar5 Nov 5, 2024
b30d796
port chord error workflow to pytest
bgunnar5 Nov 5, 2024
8b76db8
create dataclasses to house common fixtures and reduce fixture import…
bgunnar5 Nov 8, 2024
c1b39e4
fix lint issues
bgunnar5 Nov 8, 2024
c450cbd
remove hard requirement of Annotated type for python 3.7 and 3.8
bgunnar5 Nov 8, 2024
6a9ede4
remove distributed test CI and add unit test CI
bgunnar5 Nov 8, 2024
711ce06
fix typo in fixture_types and fix lint issues
bgunnar5 Nov 8, 2024
088ecc1
run fix-style
bgunnar5 Nov 8, 2024
f544644
add check for python2 before adding that condition check
bgunnar5 Nov 8, 2024
28cae91
convert local run test to use StepFinishedFilesCount condition
bgunnar5 Nov 8, 2024
916b5f8
update CHANGELOG.md
bgunnar5 Nov 8, 2024
e108fa9
pull changes from develop
bgunnar5 Nov 8, 2024
3bc1fa6
fix problem created by merge conflict when mergin develop
bgunnar5 Nov 8, 2024
3be8963
remove manager functionality from this PR
bgunnar5 Nov 12, 2024
29b39d3
update README for test suite
bgunnar5 Nov 12, 2024
6dab66b
change SIGTERM to SIGKILL
bgunnar5 Nov 12, 2024
dcefd22
update Makefile to include new changes to test suite
bgunnar5 Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix style for black
ryannova committed Aug 7, 2024
commit 5538f4ba6d3898e716748ff1f32aab3e2f8a42d2
4 changes: 1 addition & 3 deletions merlin/examples/workflows/null_spec/scripts/launch_jobs.py
Original file line number Diff line number Diff line change
@@ -78,9 +78,7 @@
if real_time > 1440:
real_time = 1440
submit: str = "submit.sbatch"
command: str = (
f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}"
)
command: str = f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}"
shutil.copyfile(os.path.join(submit_path, submit), submit)
shutil.copyfile(args.spec_path, "spec.yaml")
shutil.copyfile(args.script_path, os.path.join("scripts", "make_samples.py"))
15 changes: 6 additions & 9 deletions merlin/main.py
Original file line number Diff line number Diff line change
@@ -401,15 +401,13 @@ def process_example(args: Namespace) -> None:
setup_example(args.workflow, args.path)


def process_manager(args : Namespace):
def process_manager(args: Namespace):
if args.command == "run":
run_manager(query_frequency=args.query_frequency,
query_timeout=args.query_timeout,
worker_timeout=args.worker_timeout)
run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout)
elif args.command == "start":
if start_manager(query_frequency=args.query_frequency,
query_timeout=args.query_timeout,
worker_timeout=args.worker_timeout):
if start_manager(
query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout
):
LOG.info("Manager started successfully.")
elif args.command == "stop":
if stop_manager():
@@ -420,7 +418,6 @@ def process_manager(args : Namespace):
print("Run manager with a command. Try 'merlin manager -h' for more details")



def process_monitor(args):
"""
CLI command to monitor merlin workers and queues to keep
@@ -919,7 +916,7 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None:
)

# merlin manager
manager : ArgumentParser = subparsers.add_parser(
manager: ArgumentParser = subparsers.add_parser(
"manager",
help="Watchdog application to manage workers",
description="A daemon process that helps to restart and communicate with workers while running.",
8 changes: 4 additions & 4 deletions merlin/study/celeryadapter.py
Original file line number Diff line number Diff line change
@@ -766,7 +766,7 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs):
process = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732
# Get the worker name from worker_cmd and add to be monitored by celery manager
worker_cmd_list = worker_cmd.split()
worker_name = worker_cmd_list[worker_cmd_list.index("-n")+1].replace("%h", kwargs["env"]["HOSTNAME"])
worker_name = worker_cmd_list[worker_cmd_list.index("-n") + 1].replace("%h", kwargs["env"]["HOSTNAME"])
worker_name = "celery@" + worker_name
worker_list.append(worker_cmd)

@@ -780,9 +780,9 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs):
# further nesting can be accomplished by making this recursive.
for key in kwargs:
if type(kwargs[key]) is dict:
key_name = worker_name+"_"+key
key_name = worker_name + "_" + key
redis_connection.hmset(name=key_name, mapping=kwargs[key])
args[key] = "link:"+key_name
args[key] = "link:" + key_name
if type(kwargs[key]) is bool:
if kwargs[key]:
args[key] = "True"
@@ -792,7 +792,7 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs):
redis_connection.quit()

# Adding the worker to redis db to be monitored
add_monitor_workers(workers=((worker_name, process.pid), ))
add_monitor_workers(workers=((worker_name, process.pid),))
LOG.info(f"Added {worker_name} to be monitored")
except Exception as e: # pylint: disable=C0103
LOG.error(f"Cannot start celery workers, {e}")
40 changes: 22 additions & 18 deletions merlin/study/celerymanager.py
Original file line number Diff line number Diff line change
@@ -45,27 +45,28 @@ class WorkerStatus:
stopped = "Stopped"
rebooting = "Rebooting"


WORKER_INFO = {
"status" : WorkerStatus.running,
"status": WorkerStatus.running,
"pid": -1,
"monitored": 1,
"num_unresponsive": 0,
}

class CeleryManager():

def __init__(self, query_frequency:int=60, query_timeout:float=0.5, worker_timeout:int=180):
class CeleryManager:
def __init__(self, query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180):
"""
Initializer for Celery Manager
@param int query_frequency: The frequency at which workers will be queried with ping commands
@param float query_timeout: The timeout for the query pings that are sent to workers
@param int worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker.
@param int worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker.
"""
self.redis_connection = self.get_worker_status_redis_connection()
self.query_frequency = query_frequency
self.query_timeout = query_timeout
self.worker_timeout = worker_timeout

@staticmethod
def get_worker_status_redis_connection():
"""
@@ -93,12 +94,14 @@ def get_redis_connection(db_num):
password = get_backend_password(password_file)
except IOError:
password = CONFIG.results_backend.password
return redis.Redis(host=CONFIG.results_backend.server,
port=CONFIG.results_backend.port,
db=CONFIG.results_backend.db_num+db_num, #Increment db_num to avoid conflicts
username=CONFIG.results_backend.username,
password=password,
decode_responses=True)
return redis.Redis(
host=CONFIG.results_backend.server,
port=CONFIG.results_backend.port,
db=CONFIG.results_backend.db_num + db_num, # Increment db_num to avoid conflicts
username=CONFIG.results_backend.username,
password=password,
decode_responses=True,
)

def get_celery_workers_status(self, workers):
"""
@@ -108,7 +111,7 @@ def get_celery_workers_status(self, workers):
:return dict: The result dictionary for each worker and the response.
"""
from merlin.celery import app
from merlin.celery import app

celery_app = app.control
ping_result = celery_app.ping(workers, timeout=self.query_timeout)
@@ -176,7 +179,7 @@ def restart_celery_worker(self, worker):

return True

#TODO add some logs
# TODO add some logs
def run(self):
"""
Main manager loop
@@ -188,7 +191,7 @@ def run(self):
}
self.redis_connection.hmset(name="manager", mapping=manager_info)

while True: #TODO Make it so that it will stop after a list of workers is stopped
while True: # TODO Make it so that it will stop after a list of workers is stopped
# Get the list of running workers
workers = self.redis_connection.keys()
workers.remove("manager")
@@ -207,8 +210,8 @@ def run(self):
for worker in workers:
if worker not in worker_results:
# If time where the worker is unresponsive is less than the worker time out then just increment
num_unresponsive = int(self.redis_connection.hget(worker, "num_unresponsive"))+1
if num_unresponsive*self.query_frequency < self.worker_timeout:
num_unresponsive = int(self.redis_connection.hget(worker, "num_unresponsive")) + 1
if num_unresponsive * self.query_frequency < self.worker_timeout:
# Attempt to restart worker
if self.restart_celery_worker(worker):
# If successful set the status to running and reset num_unresponsive
@@ -220,7 +223,8 @@ def run(self):
self.redis_connection.hset(worker, "num_unresponsive", num_unresponsive)
# Sleep for the query_frequency for the next iteration
time.sleep(self.query_frequency)



if __name__ == "__main__":
cm = CeleryManager()
cm.run()
cm.run()
24 changes: 13 additions & 11 deletions merlin/study/celerymanageradapter.py
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ def add_monitor_workers(workers: list):
"""
if workers is None or len(workers) <= 0:
return

redis_connection = CeleryManager.get_worker_status_redis_connection()
for worker in workers:
if redis_connection.exists(worker[0]):
@@ -52,6 +52,7 @@ def add_monitor_workers(workers: list):
redis_connection.hmset(name=worker[0], mapping=worker_info)
redis_connection.quit()


def remove_monitor_workers(workers: list):
"""
Remove workers from being monitored by the celery manager.
@@ -64,9 +65,10 @@ def remove_monitor_workers(workers: list):
if redis_connection.exists(worker):
redis_connection.hset(worker, "monitored", 0)
redis_connection.hset(worker, "status", WorkerStatus.stopped)

redis_connection.quit()


def is_manager_runnning() -> bool:
"""
Check to see if the manager is running
@@ -78,31 +80,32 @@ def is_manager_runnning() -> bool:
redis_connection.quit()
return manager_status["status"] == WorkerStatus.running and psutil.pid_exists(manager_status["pid"])

def run_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_timeout:int = 180) -> bool:

def run_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool:
"""
A process locking function that calls the celery manager with proper arguments.
:params: See CeleryManager for more information regarding the parameters
"""
celerymanager = CeleryManager(query_frequency=query_frequency,
query_timeout=query_timeout,
worker_timeout=worker_timeout)
celerymanager = CeleryManager(query_frequency=query_frequency, query_timeout=query_timeout, worker_timeout=worker_timeout)
celerymanager.run()


def start_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_timeout:int = 180) -> bool:

def start_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool:
"""
A Non-locking function that calls the celery manager with proper arguments.
:params: See CeleryManager for more information regarding the parameters
:return bool: True if the manager was started successfully.
"""
subprocess.Popen(f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}",
subprocess.Popen(
f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}",
shell=True,
close_fds=True,
stdout=subprocess.PIPE,
)
return True


def stop_manager() -> bool:
"""
Stop the manager process using it's pid.
@@ -114,11 +117,10 @@ def stop_manager() -> bool:
manager_status = redis_connection.hget("manager", "status")
print(redis_connection.hgetall("manager"))
redis_connection.quit()

print(manager_status, psutil.pid_exists(manager_pid))
# Check to make sure that the manager is running and the pid exists
if manager_status == WorkerStatus.running and psutil.pid_exists(manager_pid):
psutil.Process(manager_pid).terminate()
return True
return False