From 7d53c6cc535fe785b772766148ed144d9efc62fe Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Tue, 28 Jan 2025 19:51:21 -0500 Subject: [PATCH] start of multi profiles --- CHANGELOG.md | 4 +- .../actions/leader/experiment_profile.py | 63 ++++++++++++++----- .../background_jobs/temperature_automation.py | 1 + pioreactor/cli/pio.py | 9 ++- .../tests/test_execute_experiment_profile.py | 10 ++- pioreactor/utils/__init__.py | 6 +- update_scripts/25.1.21/update.sh | 4 +- 7 files changed, 67 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc990acb..e7341c2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,12 +12,14 @@ ``` pio run pumps --waste 2 --media 1 --waste 2 ``` - - support for ?? + - support for 40ml + - Run multiple experiment profiles per experiment. #### Web API changes - GET `/unit_api/jobs/running/` introduced + - GET `/api/experiment_profiles/running/experiments/` introduced #### Breaking changes - plugins should migrate from `click_some_name` to autodiscover plugins, to importing `run`. Example: diff --git a/pioreactor/actions/leader/experiment_profile.py b/pioreactor/actions/leader/experiment_profile.py index 57891891..ea02dc61 100644 --- a/pioreactor/actions/leader/experiment_profile.py +++ b/pioreactor/actions/leader/experiment_profile.py @@ -191,6 +191,7 @@ def wrapped_execute_action( elapsed_seconds_func: Callable[[], float], client: Client, action: struct.Action, + job_id: int, dry_run: bool = False, ) -> Callable[..., None]: # hack... @@ -211,28 +212,39 @@ def wrapped_execute_action( env, logger, elapsed_seconds_func, + job_id, options, args, ) case struct.Pause(_, if_): return pause_job( - unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func + unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func, job_id ) case struct.Resume(_, if_): return resume_job( - unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func + unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func, job_id ) case struct.Stop(_, if_): return stop_job( - unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func + unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func, job_id ) case struct.Update(_, if_, options): return update_job( - unit, experiment, client, job_name, dry_run, if_, env, logger, elapsed_seconds_func, options + unit, + experiment, + client, + job_name, + dry_run, + if_, + env, + logger, + elapsed_seconds_func, + job_id, + options, ) case struct.Log(_, options, if_): @@ -246,6 +258,7 @@ def wrapped_execute_action( env, logger, elapsed_seconds_func, + job_id, options, ) @@ -260,6 +273,7 @@ def wrapped_execute_action( env, logger, elapsed_seconds_func, + job_id, action, while_, repeat_every_hours, @@ -279,6 +293,7 @@ def wrapped_execute_action( env, logger, elapsed_seconds_func, + job_id, condition, action, actions, @@ -307,6 +322,7 @@ def common_wrapped_execute_action( elapsed_seconds_func: Callable[[], float], client: Client, action: struct.Action, + job_id: int, dry_run: bool = False, ) -> Callable[..., None]: actions_to_execute = [] @@ -322,6 +338,7 @@ def common_wrapped_execute_action( elapsed_seconds_func, client, action, + job_id, dry_run, ) ) @@ -339,6 +356,7 @@ def when( env: dict, logger: CustomLogger, elapsed_seconds_func: Callable[[], float], + job_id: int, condition: BoolExpression, when_action: struct.When, actions: list[struct.Action], @@ -373,6 +391,7 @@ def _callable() -> None: elapsed_seconds_func, client, action, + job_id, dry_run, ), ) @@ -392,6 +411,7 @@ def _callable() -> None: elapsed_seconds_func, client, when_action, + job_id, dry_run, ), ) @@ -412,6 +432,7 @@ def repeat( env: dict, logger: CustomLogger, elapsed_seconds_func: Callable[[], float], + job_id: int, repeat_action: struct.Repeat, while_: Optional[BoolExpression], repeat_every_hours: float, @@ -455,6 +476,7 @@ def _callable() -> None: elapsed_seconds_func, client, action, + job_id, dry_run, ), ) @@ -479,6 +501,7 @@ def _callable() -> None: elapsed_seconds_func, client, repeat_action, + job_id, dry_run, ), ) @@ -503,6 +526,7 @@ def log( env: dict, logger: CustomLogger, elapsed_seconds_func: Callable[[], float], + job_id: int, options: struct._LogOptions, ) -> Callable[..., None]: def _callable() -> None: @@ -536,6 +560,7 @@ def start_job( env: dict, logger: CustomLogger, elapsed_seconds_func: Callable[[], float], + job_id: int, options: dict, args: list, ) -> Callable[..., None]: @@ -560,7 +585,7 @@ def _callable() -> None: f"/unit_api/jobs/run/job_name/{job_name}", json={ "options": evaluate_options(options, env), - "env": {"JOB_SOURCE": "experiment_profile", "EXPERIMENT": experiment}, + "env": {"JOB_SOURCE": f"experiment_profile:{job_id}", "EXPERIMENT": experiment}, "args": args, }, ).raise_for_status() @@ -580,6 +605,7 @@ def pause_job( env: dict, logger: CustomLogger, elapsed_seconds_func: Callable[[], float], + job_id: int, ) -> Callable[..., None]: def _callable() -> None: # first check if the Pioreactor is still part of the experiment. @@ -620,6 +646,7 @@ def resume_job( env: dict, logger: CustomLogger, elapsed_seconds_func: Callable[[], float], + job_id: int, ) -> Callable[..., None]: def _callable() -> None: # first check if the Pioreactor is still part of the experiment. @@ -661,6 +688,7 @@ def stop_job( env: dict, logger: CustomLogger, elapsed_seconds_func: Callable[[], float], + job_id: int, ) -> Callable[..., None]: def _callable() -> None: # first check if the Pioreactor is still part of the experiment. @@ -698,6 +726,7 @@ def update_job( env: dict, logger: CustomLogger, elapsed_seconds_func: Callable[[], float], + job_id: int, options: dict, ) -> Callable[..., None]: def _callable() -> None: @@ -846,22 +875,24 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run: logger = create_logger(action_name, unit=unit, experiment=experiment) with managed_lifecycle( unit, experiment, action_name, ignore_is_active_state=True, is_long_running_job=True - ) as state: + ) as mananged_job: try: profile = load_and_verify_profile(profile_filename) except Exception as e: logger.error(e) raise e - state.publish_setting( + mananged_job.publish_setting( "experiment_profile_name", profile.experiment_profile_name, ) - state.publish_setting( + mananged_job.publish_setting( "start_time_utc", current_utc_timestamp(), ) + job_id = mananged_job.job_id + if dry_run: logger.notice( # type: ignore f"Executing DRY-RUN of profile {profile.experiment_profile_name}, sourced from {Path(profile_filename).name}. See logs." @@ -896,8 +927,9 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run: logger, sched, elapsed_seconds_func, - state.mqtt_client, + mananged_job.mqtt_client, action, + job_id, dry_run, ), ) @@ -933,8 +965,9 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run: logger, sched, elapsed_seconds_func, - state.mqtt_client, + mananged_job.mqtt_client, action, + job_id, dry_run, ), ) @@ -945,14 +978,14 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run: # try / finally to handle keyboard interrupts # the below is so the schedule can be canceled by setting the event. - while not state.exit_event.wait(timeout=0): + while not mananged_job.exit_event.wait(timeout=0): next_event_in = sched.run(blocking=False) if next_event_in is not None: time.sleep(min(0.25, next_event_in)) else: break finally: - if state.exit_event.is_set(): + if mananged_job.exit_event.is_set(): # ended early logger.notice(f"Stopping profile {profile.experiment_profile_name} early: {len(sched.queue)} action(s) not started, and stopping all started action(s).") # type: ignore @@ -960,7 +993,7 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run: # we can use active workers in experiment, since if a worker leaves an experiment or goes inactive, it's jobs are stopped workers = get_active_workers_in_experiment(experiment) with ClusterJobManager() as cjm: - cjm.kill_jobs(workers, experiment=experiment, job_source="experiment_profile") + cjm.kill_jobs(workers, experiment=experiment, job_source=f"experiment_profile:{job_id}") else: if dry_run: @@ -971,8 +1004,8 @@ def execute_experiment_profile(profile_filename: str, experiment: str, dry_run: else: logger.info(f"Finished executing profile {profile.experiment_profile_name}.") # type: ignore - state.publish_setting("experiment_profile_name", None) - state.publish_setting("start_time_utc", None) + mananged_job.publish_setting("experiment_profile_name", None) + mananged_job.publish_setting("start_time_utc", None) logger.clean_up() diff --git a/pioreactor/background_jobs/temperature_automation.py b/pioreactor/background_jobs/temperature_automation.py index 0d65059c..7edecc3a 100644 --- a/pioreactor/background_jobs/temperature_automation.py +++ b/pioreactor/background_jobs/temperature_automation.py @@ -389,6 +389,7 @@ def infer_temperature(self) -> None: sleep(time_between_samples) if self.state != self.READY: + # TODO: does this # if our state changes in this loop, exit. Note that the finally block is still called. return diff --git a/pioreactor/cli/pio.py b/pioreactor/cli/pio.py index 25179244..2d329d66 100644 --- a/pioreactor/cli/pio.py +++ b/pioreactor/cli/pio.py @@ -143,17 +143,20 @@ def blink() -> None: @click.option("--job-name", type=click.STRING) @click.option("--experiment", type=click.STRING) @click.option("--job-source", type=click.STRING) +@click.option("--job-id", type=click.INT) @click.option("--all-jobs", is_flag=True, help="kill all Pioreactor jobs running") -def kill(job_name: str | None, experiment: str | None, job_source: str | None, all_jobs: bool) -> None: +def kill( + job_name: str | None, experiment: str | None, job_source: str | None, job_id: int | None, all_jobs: bool +) -> None: """ stop job(s). """ - if not (job_name or experiment or job_source or all_jobs): + if not (job_name or experiment or job_source or all_jobs or job_id): raise click.Abort("Provide an option to kill. See --help") with JobManager() as jm: count = jm.kill_jobs( - all_jobs=all_jobs, job_name=job_name, experiment=experiment, job_source=job_source + all_jobs=all_jobs, job_name=job_name, experiment=experiment, job_source=job_source, id=job_id ) click.echo(f"Killed {count} job{'s' if count != 1 else ''}.") diff --git a/pioreactor/tests/test_execute_experiment_profile.py b/pioreactor/tests/test_execute_experiment_profile.py index ef67ba7c..cd0d68b2 100644 --- a/pioreactor/tests/test_execute_experiment_profile.py +++ b/pioreactor/tests/test_execute_experiment_profile.py @@ -73,9 +73,9 @@ def test_execute_experiment_profile_order( assert bucket[0].path == "/api/experiments/_testing_experiment/unit_labels" assert bucket[0].json == {"label": "label1", "unit": "unit1"} - assert bucket[1].path == "/api/workers/unit1/jobs/run/job_name/job1/experiments/_testing_experiment" - assert bucket[2].path == "/api/workers/unit2/jobs/run/job_name/job1/experiments/_testing_experiment" - assert bucket[3].path == "/api/workers/unit1/jobs/run/job_name/job2/experiments/_testing_experiment" + assert bucket[1].path == "/unit_api/jobs/run/job_name/job1" + assert bucket[2].path == "/unit_api/jobs/run/job_name/job1" + assert bucket[3].path == "/unit_api/jobs/run/job_name/job2" assert bucket[4].path == "/api/workers/unit1/jobs/stop/job_name/job2/experiments/_testing_experiment" @@ -99,9 +99,7 @@ def test_execute_experiment_profile_hack_for_led_intensity(mock__load_experiment with capture_requests() as bucket: execute_experiment_profile("profile.yaml", experiment) - assert ( - bucket[0].path == "/api/workers/unit1/jobs/run/job_name/led_intensity/experiments/_testing_experiment" - ) + assert bucket[0].path == "/unit_api/jobs/run/job_name/led_intensity" assert bucket[0].json == { "options": {"A": 50}, "args": [], diff --git a/pioreactor/utils/__init__.py b/pioreactor/utils/__init__.py index ee838b5c..667877e1 100644 --- a/pioreactor/utils/__init__.py +++ b/pioreactor/utils/__init__.py @@ -211,7 +211,7 @@ def __enter__(self) -> managed_lifecycle: pass with JobManager() as jm: - self._job_id = jm.register_and_set_running( + self.job_id = jm.register_and_set_running( self.unit, self.experiment, self.name, @@ -234,7 +234,7 @@ def __exit__(self, *args) -> None: self.mqtt_client.disconnect() with JobManager() as jm: - jm.set_not_running(self._job_id) + jm.set_not_running(self.job_id) return @@ -263,7 +263,7 @@ def publish_setting(self, setting: str, value: Any) -> None: f"pioreactor/{self.unit}/{self.experiment}/{self.name}/{setting}", value, retain=True ) with JobManager() as jm: - jm.upsert_setting(self._job_id, setting, value) + jm.upsert_setting(self.job_id, setting, value) class cache: diff --git a/update_scripts/25.1.21/update.sh b/update_scripts/25.1.21/update.sh index d39f9a19..1d740036 100644 --- a/update_scripts/25.1.21/update.sh +++ b/update_scripts/25.1.21/update.sh @@ -36,8 +36,8 @@ sudo -u pioreactor python "$SCRIPT_DIR"/cal_convert.py "$STORAGE_DIR"/od_calibra sudo -u pioreactor python "$SCRIPT_DIR"/cal_convert.py "$STORAGE_DIR"/pump_calibrations/cache.db chown -R pioreactor:www-data "$STORAGE_DIR"/calibrations/ -sudo -u pioreactor python "$SCRIPT_DIR"/cal_active.py "$STORAGE_DIR"/current_pump_calibrations/cache.db -sudo -u pioreactor python "$SCRIPT_DIR"/cal_active.py "$STORAGE_DIR"/current_od_calibrations/cache.db +sudo -u pioreactor python "$SCRIPT_DIR"/cal_active.py "$STORAGE_DIR"/current_pump_calibration/cache.db +sudo -u pioreactor python "$SCRIPT_DIR"/cal_active.py "$STORAGE_DIR"/current_od_calibration/cache.db # if leader if [ "$HOSTNAME" = "$LEADER_HOSTNAME" ]; then