From 4865e59ac409437290f3922c62855428bd59a3cb Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Fri, 24 Jan 2025 10:13:21 -0500 Subject: [PATCH] more --- CHANGELOG.md | 1 + .../actions/leader/experiment_profile.py | 48 +++++++++++++------ pioreactor/utils/timing.py | 4 +- 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 479a266c..2f3acad4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ - experiment profiles start now use the unit_api directly. I think this mitigates the huey workers stampeding on each other when try to start many jobs. - improved chart colors in the UI + - /jobs/running/ ### 25.1.21 diff --git a/pioreactor/actions/leader/experiment_profile.py b/pioreactor/actions/leader/experiment_profile.py index a6a558dc..57891891 100644 --- a/pioreactor/actions/leader/experiment_profile.py +++ b/pioreactor/actions/leader/experiment_profile.py @@ -19,7 +19,9 @@ from pioreactor.experiment_profiles import profile_struct as struct from pioreactor.logging import create_logger from pioreactor.logging import CustomLogger +from pioreactor.mureq import HTTPException from pioreactor.pubsub import Client +from pioreactor.pubsub import get_from from pioreactor.pubsub import patch_into from pioreactor.pubsub import patch_into_leader from pioreactor.utils import ClusterJobManager @@ -31,7 +33,6 @@ from pioreactor.whoami import get_unit_name from pioreactor.whoami import is_testing_env - BoolExpression = str | bool Env = dict[str, Any] @@ -115,12 +116,22 @@ def check_syntax_of_bool_expression(bool_expression: BoolExpression) -> bool: if is_bracketed_expression(bool_expression): bool_expression = strip_expression_brackets(bool_expression) - # in a common expressions, users can use ::word:work which is technically not allowed. For checking, we replace with garbage + # TODO: in a common expressions, users can use ::word:work which is technically not valid syntax. For checking, we replace with garbage bool_expression = bool_expression.replace("::", "dummy:", 1) return check_syntax(bool_expression) +def check_if_job_running(unit: str, job: str) -> bool: + try: + r = get_from(resolve_to_address(unit), f"/unit_api/jobs/running/{job}") + r.raise_for_status() + return len(r.json()) > 0 + except HTTPException: + # an http hiccup shouldn't stall commands + return True + + def _led_intensity_hack(action: struct.Action) -> struct.Action: # we do this hack because led_intensity doesn't really behave like a background job, but its useful to # treat it as one. @@ -586,10 +597,13 @@ def _callable() -> None: logger.info(f"Dry-run: Pausing {job_name} on {unit}.") else: logger.debug(f"Pausing {job_name} on {unit}.") - patch_into_leader( - f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", - json={"settings": {"$state": "sleeping"}}, - ).raise_for_status() + if check_if_job_running(unit, job_name): + patch_into_leader( + f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", + json={"settings": {"$state": "sleeping"}}, + ).raise_for_status() + else: + logger.debug(f"Job {job_name} not running on {unit}.") else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") @@ -624,10 +638,13 @@ def _callable() -> None: logger.info(f"Dry-run: Resuming {job_name} on {unit}.") else: logger.debug(f"Resuming {job_name} on {unit}.") - patch_into_leader( - f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", - json={"settings": {"$state": "ready"}}, - ).raise_for_status() + if check_if_job_running(unit, job_name): + patch_into_leader( + f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", + json={"settings": {"$state": "ready"}}, + ).raise_for_status() + else: + logger.debug(f"Job {job_name} not running on {unit}.") else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") @@ -703,10 +720,13 @@ def _callable() -> None: else: for setting, value in evaluate_options(options, env).items(): logger.debug(f"Updating {setting} to {value} in {job_name} on {unit}.") - patch_into_leader( - f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", - json={"settings": {setting: value}}, - ).raise_for_status() + if check_if_job_running(unit, job_name): + patch_into_leader( + f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}", + json={"settings": {setting: value}}, + ).raise_for_status() + else: + logger.debug(f"Job {job_name} not running on {unit}.") else: logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.") diff --git a/pioreactor/utils/timing.py b/pioreactor/utils/timing.py index 37fe03f1..5cb65bc7 100644 --- a/pioreactor/utils/timing.py +++ b/pioreactor/utils/timing.py @@ -112,9 +112,7 @@ def __init__( self.args = args self.kwargs = kwargs if logger is None: - self.logger = create_logger( - job_name or "RepeatedTimer" - ) # TODO: I don't think this works as expected. + self.logger = create_logger(job_name or "RepeatedTimer") else: self.logger = logger self.is_paused = False