Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Jan 24, 2025
1 parent 6e2f25b commit 4865e59
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

- experiment profiles start now use the unit_api directly. I think this mitigates the huey workers stampeding on each other when try to start many jobs.
- improved chart colors in the UI
- /jobs/running/<job>

### 25.1.21

Expand Down
48 changes: 34 additions & 14 deletions pioreactor/actions/leader/experiment_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from pioreactor.experiment_profiles import profile_struct as struct
from pioreactor.logging import create_logger
from pioreactor.logging import CustomLogger
from pioreactor.mureq import HTTPException
from pioreactor.pubsub import Client
from pioreactor.pubsub import get_from
from pioreactor.pubsub import patch_into
from pioreactor.pubsub import patch_into_leader
from pioreactor.utils import ClusterJobManager
Expand All @@ -31,7 +33,6 @@
from pioreactor.whoami import get_unit_name
from pioreactor.whoami import is_testing_env


BoolExpression = str | bool
Env = dict[str, Any]

Expand Down Expand Up @@ -115,12 +116,22 @@ def check_syntax_of_bool_expression(bool_expression: BoolExpression) -> bool:
if is_bracketed_expression(bool_expression):
bool_expression = strip_expression_brackets(bool_expression)

# in a common expressions, users can use ::word:work which is technically not allowed. For checking, we replace with garbage
# TODO: in a common expressions, users can use ::word:work which is technically not valid syntax. For checking, we replace with garbage
bool_expression = bool_expression.replace("::", "dummy:", 1)

return check_syntax(bool_expression)


def check_if_job_running(unit: str, job: str) -> bool:
try:
r = get_from(resolve_to_address(unit), f"/unit_api/jobs/running/{job}")
r.raise_for_status()
return len(r.json()) > 0
except HTTPException:
# an http hiccup shouldn't stall commands
return True


def _led_intensity_hack(action: struct.Action) -> struct.Action:
# we do this hack because led_intensity doesn't really behave like a background job, but its useful to
# treat it as one.
Expand Down Expand Up @@ -586,10 +597,13 @@ def _callable() -> None:
logger.info(f"Dry-run: Pausing {job_name} on {unit}.")
else:
logger.debug(f"Pausing {job_name} on {unit}.")
patch_into_leader(
f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}",
json={"settings": {"$state": "sleeping"}},
).raise_for_status()
if check_if_job_running(unit, job_name):
patch_into_leader(
f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}",
json={"settings": {"$state": "sleeping"}},
).raise_for_status()
else:
logger.debug(f"Job {job_name} not running on {unit}.")
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")

Expand Down Expand Up @@ -624,10 +638,13 @@ def _callable() -> None:
logger.info(f"Dry-run: Resuming {job_name} on {unit}.")
else:
logger.debug(f"Resuming {job_name} on {unit}.")
patch_into_leader(
f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}",
json={"settings": {"$state": "ready"}},
).raise_for_status()
if check_if_job_running(unit, job_name):
patch_into_leader(
f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}",
json={"settings": {"$state": "ready"}},
).raise_for_status()
else:
logger.debug(f"Job {job_name} not running on {unit}.")
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")

Expand Down Expand Up @@ -703,10 +720,13 @@ def _callable() -> None:
else:
for setting, value in evaluate_options(options, env).items():
logger.debug(f"Updating {setting} to {value} in {job_name} on {unit}.")
patch_into_leader(
f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}",
json={"settings": {setting: value}},
).raise_for_status()
if check_if_job_running(unit, job_name):
patch_into_leader(
f"/api/workers/{unit}/jobs/update/job_name/{job_name}/experiments/{experiment}",
json={"settings": {setting: value}},
).raise_for_status()
else:
logger.debug(f"Job {job_name} not running on {unit}.")
else:
logger.debug(f"Action's `if` condition, `{if_}`, evaluated False. Skipping action.")

Expand Down
4 changes: 1 addition & 3 deletions pioreactor/utils/timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ def __init__(
self.args = args
self.kwargs = kwargs
if logger is None:
self.logger = create_logger(
job_name or "RepeatedTimer"
) # TODO: I don't think this works as expected.
self.logger = create_logger(job_name or "RepeatedTimer")
else:
self.logger = logger
self.is_paused = False
Expand Down

0 comments on commit 4865e59

Please sign in to comment.