Skip to content

Commit

Permalink
start of multi profiles
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Jan 29, 2025
1 parent 84e12a1 commit 7d53c6c
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 30 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
```
pio run pumps --waste 2 --media 1 --waste 2
```
- support for ??
- support for 40ml
- Run multiple experiment profiles per experiment.


#### Web API changes

- GET `/unit_api/jobs/running/<job>` introduced
- GET `/api/experiment_profiles/running/experiments/<experiment>` introduced

#### Breaking changes
- plugins should migrate from `click_some_name` to autodiscover plugins, to importing `run`. Example:
Expand Down
63 changes: 48 additions & 15 deletions pioreactor/actions/leader/experiment_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def wrapped_execute_action(
elapsed_seconds_func: Callable[[], float],
client: Client,
action: struct.Action,
job_id: int,
dry_run: bool = False,
) -> Callable[..., None]:
# hack...
Expand All @@ -211,28 +212,39 @@ def wrapped_execute_action(
env,
logger,
elapsed_seconds_func,
job_id,
options,
args,
)

case struct.Pause(_, if_):
return pause_job(
unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func
unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func, job_id
)

case struct.Resume(_, if_):
return resume_job(
unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func
unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func, job_id
)

case struct.Stop(_, if_):
return stop_job(
unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func
unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func, job_id
)

case struct.Update(_, if_, options):
return update_job(
unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func, options
unit,
experiment,
client,
job_name,
dry_run,
if_,
env,
logger,
elapsed_seconds_func,
job_id,
options,
)

case struct.Log(_, options, if_):
Expand All @@ -246,6 +258,7 @@ def wrapped_execute_action(
env,
logger,
elapsed_seconds_func,
job_id,
options,
)

Expand All @@ -260,6 +273,7 @@ def wrapped_execute_action(
env,
logger,
elapsed_seconds_func,
job_id,
action,
while_,
repeat_every_hours,
Expand All @@ -279,6 +293,7 @@ def wrapped_execute_action(
env,
logger,
elapsed_seconds_func,
job_id,
condition,
action,
actions,
Expand Down Expand Up @@ -307,6 +322,7 @@ def common_wrapped_execute_action(
elapsed_seconds_func: Callable[[], float],
client: Client,
action: struct.Action,
job_id: int,
dry_run: bool = False,
) -> Callable[..., None]:
actions_to_execute = []
Expand All @@ -322,6 +338,7 @@ def common_wrapped_execute_action(
elapsed_seconds_func,
client,
action,
job_id,
dry_run,
)
)
Expand All @@ -339,6 +356,7 @@ def when(
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
job_id: int,
condition: BoolExpression,
when_action: struct.When,
actions: list[struct.Action],
Expand Down Expand Up @@ -373,6 +391,7 @@ def _callable() -> None:
elapsed_seconds_func,
client,
action,
job_id,
dry_run,
),
)
Expand All @@ -392,6 +411,7 @@ def _callable() -> None:
elapsed_seconds_func,
client,
when_action,
job_id,
dry_run,
),
)
Expand All @@ -412,6 +432,7 @@ def repeat(
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
job_id: int,
repeat_action: struct.Repeat,
while_: Optional[BoolExpression],
repeat_every_hours: float,
Expand Down Expand Up @@ -455,6 +476,7 @@ def _callable() -> None:
elapsed_seconds_func,
client,
action,
job_id,
dry_run,
),
)
Expand All @@ -479,6 +501,7 @@ def _callable() -> None:
elapsed_seconds_func,
client,
repeat_action,
job_id,
dry_run,
),
)
Expand All @@ -503,6 +526,7 @@ def log(
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
job_id: int,
options: struct._LogOptions,
) -> Callable[..., None]:
def _callable() -> None:
Expand Down Expand Up @@ -536,6 +560,7 @@ def start_job(
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
job_id: int,
options: dict,
args: list,
) -> Callable[..., None]:
Expand All @@ -560,7 +585,7 @@ def _callable() -> None:
f"/unit_api/jobs/run/job_name/{job_name}",
json={
"options": evaluate_options(options, env),
"env": {"JOB_SOURCE": "experiment_profile", "EXPERIMENT": experiment},
"env": {"JOB_SOURCE": f"experiment_profile:{job_id}", "EXPERIMENT": experiment},
"args": args,
},
).raise_for_status()
Expand All @@ -580,6 +605,7 @@ def pause_job(
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
job_id: int,
) -> Callable[..., None]:
def _callable() -> None:
# first check if the Pioreactor is still part of the experiment.
Expand Down Expand Up @@ -620,6 +646,7 @@ def resume_job(
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
job_id: int,
) -> Callable[..., None]:
def _callable() -> None:
# first check if the Pioreactor is still part of the experiment.
Expand Down Expand Up @@ -661,6 +688,7 @@ def stop_job(
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
job_id: int,
) -> Callable[..., None]:
def _callable() -> None:
# first check if the Pioreactor is still part of the experiment.
Expand Down Expand Up @@ -698,6 +726,7 @@ def update_job(
env: dict,
logger: CustomLogger,
elapsed_seconds_func: Callable[[], float],
job_id: int,
options: dict,
) -> Callable[..., None]:
def _callable() -> None:
Expand Down Expand Up @@ -846,22 +875,24 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run:
logger = create_logger(action_name, unit=unit, experiment=experiment)
with managed_lifecycle(
unit, experiment, action_name, ignore_is_active_state=True, is_long_running_job=True
) as state:
) as mananged_job:
try:
profile = load_and_verify_profile(profile_filename)
except Exception as e:
logger.error(e)
raise e

state.publish_setting(
mananged_job.publish_setting(
"experiment_profile_name",
profile.experiment_profile_name,
)
state.publish_setting(
mananged_job.publish_setting(
"start_time_utc",
current_utc_timestamp(),
)

job_id = mananged_job.job_id

if dry_run:
logger.notice( # type: ignore
f"Executing DRY-RUN of profile {profile.experiment_profile_name}, sourced from {Path(profile_filename).name}. See logs."
Expand Down Expand Up @@ -896,8 +927,9 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run:
logger,
sched,
elapsed_seconds_func,
state.mqtt_client,
mananged_job.mqtt_client,
action,
job_id,
dry_run,
),
)
Expand Down Expand Up @@ -933,8 +965,9 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run:
logger,
sched,
elapsed_seconds_func,
state.mqtt_client,
mananged_job.mqtt_client,
action,
job_id,
dry_run,
),
)
Expand All @@ -945,22 +978,22 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run:
# try / finally to handle keyboard interrupts

# the below is so the schedule can be canceled by setting the event.
while not state.exit_event.wait(timeout=0):
while not mananged_job.exit_event.wait(timeout=0):
next_event_in = sched.run(blocking=False)
if next_event_in is not None:
time.sleep(min(0.25, next_event_in))
else:
break
finally:
if state.exit_event.is_set():
if mananged_job.exit_event.is_set():
# ended early

logger.notice(f"Stopping profile {profile.experiment_profile_name} early: {len(sched.queue)} action(s) not started, and stopping all started action(s).") # type: ignore
# stop all jobs started
# we can use active workers in experiment, since if a worker leaves an experiment or goes inactive, it's jobs are stopped
workers = get_active_workers_in_experiment(experiment)
with ClusterJobManager() as cjm:
cjm.kill_jobs(workers, experiment=experiment, job_source="experiment_profile")
cjm.kill_jobs(workers, experiment=experiment, job_source=f"experiment_profile:{job_id}")

else:
if dry_run:
Expand All @@ -971,8 +1004,8 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run:
else:
logger.info(f"Finished executing profile {profile.experiment_profile_name}.") # type: ignore

state.publish_setting("experiment_profile_name", None)
state.publish_setting("start_time_utc", None)
mananged_job.publish_setting("experiment_profile_name", None)
mananged_job.publish_setting("start_time_utc", None)

logger.clean_up()

Expand Down
1 change: 1 addition & 0 deletions pioreactor/background_jobs/temperature_automation.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ def infer_temperature(self) -> None:
sleep(time_between_samples)

if self.state != self.READY:
# TODO: does this
# if our state changes in this loop, exit. Note that the finally block is still called.
return

Expand Down
9 changes: 6 additions & 3 deletions pioreactor/cli/pio.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,20 @@ def blink() -> None:
@click.option("--job-name", type=click.STRING)
@click.option("--experiment", type=click.STRING)
@click.option("--job-source", type=click.STRING)
@click.option("--job-id", type=click.INT)
@click.option("--all-jobs", is_flag=True, help="kill all Pioreactor jobs running")
def kill(job_name: str | None, experiment: str | None, job_source: str | None, all_jobs: bool) -> None:
def kill(
job_name: str | None, experiment: str | None, job_source: str | None, job_id: int | None, all_jobs: bool
) -> None:
"""
stop job(s).
"""
if not (job_name or experiment or job_source or all_jobs):
if not (job_name or experiment or job_source or all_jobs or job_id):
raise click.Abort("Provide an option to kill. See --help")

with JobManager() as jm:
count = jm.kill_jobs(
all_jobs=all_jobs, job_name=job_name, experiment=experiment, job_source=job_source
all_jobs=all_jobs, job_name=job_name, experiment=experiment, job_source=job_source, id=job_id
)
click.echo(f"Killed {count} job{'s' if count != 1 else ''}.")

Expand Down
10 changes: 4 additions & 6 deletions pioreactor/tests/test_execute_experiment_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ def test_execute_experiment_profile_order(

assert bucket[0].path == "/api/experiments/_testing_experiment/unit_labels"
assert bucket[0].json == {"label": "label1", "unit": "unit1"}
assert bucket[1].path == "/api/workers/unit1/jobs/run/job_name/job1/experiments/_testing_experiment"
assert bucket[2].path == "/api/workers/unit2/jobs/run/job_name/job1/experiments/_testing_experiment"
assert bucket[3].path == "/api/workers/unit1/jobs/run/job_name/job2/experiments/_testing_experiment"
assert bucket[1].path == "/unit_api/jobs/run/job_name/job1"
assert bucket[2].path == "/unit_api/jobs/run/job_name/job1"
assert bucket[3].path == "/unit_api/jobs/run/job_name/job2"
assert bucket[4].path == "/api/workers/unit1/jobs/stop/job_name/job2/experiments/_testing_experiment"


Expand All @@ -99,9 +99,7 @@ def test_execute_experiment_profile_hack_for_led_intensity(mock__load_experiment
with capture_requests() as bucket:
execute_experiment_profile("profile.yaml", experiment)

assert (
bucket[0].path == "/api/workers/unit1/jobs/run/job_name/led_intensity/experiments/_testing_experiment"
)
assert bucket[0].path == "/unit_api/jobs/run/job_name/led_intensity"
assert bucket[0].json == {
"options": {"A": 50},
"args": [],
Expand Down
6 changes: 3 additions & 3 deletions pioreactor/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def __enter__(self) -> managed_lifecycle:
pass

with JobManager() as jm:
self._job_id = jm.register_and_set_running(
self.job_id = jm.register_and_set_running(
self.unit,
self.experiment,
self.name,
Expand All @@ -234,7 +234,7 @@ def __exit__(self, *args) -> None:
self.mqtt_client.disconnect()

with JobManager() as jm:
jm.set_not_running(self._job_id)
jm.set_not_running(self.job_id)

return

Expand Down Expand Up @@ -263,7 +263,7 @@ def publish_setting(self, setting: str, value: Any) -> None:
f"pioreactor/{self.unit}/{self.experiment}/{self.name}/{setting}", value, retain=True
)
with JobManager() as jm:
jm.upsert_setting(self._job_id, setting, value)
jm.upsert_setting(self.job_id, setting, value)


class cache:
Expand Down
4 changes: 2 additions & 2 deletions update_scripts/25.1.21/update.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ sudo -u pioreactor python "$SCRIPT_DIR"/cal_convert.py "$STORAGE_DIR"/od_calibra
sudo -u pioreactor python "$SCRIPT_DIR"/cal_convert.py "$STORAGE_DIR"/pump_calibrations/cache.db
chown -R pioreactor:www-data "$STORAGE_DIR"/calibrations/

sudo -u pioreactor python "$SCRIPT_DIR"/cal_active.py "$STORAGE_DIR"/current_pump_calibrations/cache.db
sudo -u pioreactor python "$SCRIPT_DIR"/cal_active.py "$STORAGE_DIR"/current_od_calibrations/cache.db
sudo -u pioreactor python "$SCRIPT_DIR"/cal_active.py "$STORAGE_DIR"/current_pump_calibration/cache.db
sudo -u pioreactor python "$SCRIPT_DIR"/cal_active.py "$STORAGE_DIR"/current_od_calibration/cache.db

# if leader
if [ "$HOSTNAME" = "$LEADER_HOSTNAME" ]; then
Expand Down

0 comments on commit 7d53c6c

Please sign in to comment.