From 975e27b2b687cf4cf10d091b003f4ca51ae03e9f Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Sat, 30 Nov 2024 21:29:56 -0500 Subject: [PATCH] duration is added in the super classes afterwards --- pioreactor/background_jobs/base.py | 46 +++++++++++++++----------- pioreactor/background_jobs/stirring.py | 8 +++-- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/pioreactor/background_jobs/base.py b/pioreactor/background_jobs/base.py index 15c0eb20..5a134ea0 100644 --- a/pioreactor/background_jobs/base.py +++ b/pioreactor/background_jobs/base.py @@ -339,12 +339,6 @@ def __post__init__(self) -> None: # task in on_ready, which delays writing to the db, which means `pio kill` might not see it. self.set_state(self.READY) - # now start listening to confirm our state is correct in mqtt - self.subscribe_and_callback( - self._confirm_state_in_broker, - f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/$state", - ) - def start_passive_listeners(self) -> None: # overwrite this to in subclasses to subscribe to topics in MQTT # using this handles reconnects correctly. @@ -885,6 +879,13 @@ def _start_general_passive_listeners(self) -> None: allow_retained=False, ) + # TODO: previously this was in __post_init__ - why? + # now start listening to confirm our state is correct in mqtt + self.subscribe_and_callback( + self._confirm_state_in_broker, + f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/$state", + ) + def _confirm_state_in_broker(self, message: pt.MQTTMessage) -> None: if message.payload is None: return @@ -978,6 +979,10 @@ def __init__(self, unit: str, experiment: str, plugin_name: str) -> None: super().__init__(unit, experiment, source=plugin_name) +def _noop(): + pass + + class BackgroundJobWithDodging(_BackgroundJob): """ This utility class allows for a change in behaviour when an OD reading is about to taken. Example: shutting @@ -1039,7 +1044,7 @@ def __init__(self, *args, source="app", **kwargs) -> None: ) self.sneak_in_timer = RepeatedTimer( - 5, self._noop, job_name=self.job_name, logger=self.logger + 5, _noop, job_name=self.job_name, logger=self.logger ) # placeholder? self.add_to_published_settings("enable_dodging_od", {"datatype": "boolean", "settable": True}) self.add_to_published_settings("currently_dodging_od", {"datatype": "boolean", "settable": False}) @@ -1051,9 +1056,6 @@ def __post__init__(self): self.start_passive_listeners() super().__post__init__() - def _noop(self): - pass - def set_currently_dodging_od(self, value: bool): self.currently_dodging_od = value if self.currently_dodging_od: @@ -1063,14 +1065,14 @@ def set_currently_dodging_od(self, value: bool): self._setup_timer() else: self.initialize_continuous_operation() # user defined - self._action_to_do_before_od_reading = self._noop - self._action_to_do_after_od_reading = self._noop + self._action_to_do_before_od_reading = _noop + self._action_to_do_after_od_reading = _noop self.sneak_in_timer.cancel() def set_enable_dodging_od(self, value: bool): self.enable_dodging_od = value if self.enable_dodging_od: - if self.is_od_job_running(): + if is_pio_job_running("od_reading"): self.logger.debug("Will attempt to dodge OD readings.") self.set_currently_dodging_od(True) else: @@ -1080,14 +1082,11 @@ def set_enable_dodging_od(self, value: bool): self.logger.debug("Running continuously through OD readings.") self.set_currently_dodging_od(False) - def is_od_job_running(self) -> bool: - return is_pio_job_running("od_reading") - def action_to_do_after_od_reading(self) -> None: - raise NotImplementedError() + pass def action_to_do_before_od_reading(self) -> None: - raise NotImplementedError() + pass def initialize_dodging_operation(self) -> None: pass @@ -1095,7 +1094,8 @@ def initialize_dodging_operation(self) -> None: def initialize_continuous_operation(self) -> None: pass - def start_passive_listeners(self) -> None: + def _start_general_passive_listeners(self) -> None: + super()._start_general_passive_listeners() self.subscribe_and_callback( self._od_reading_changed_status, f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval", @@ -1130,13 +1130,19 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None: with catchtime() as timer: self._action_to_do_after_od_reading() + action_after_duration = timer() + if ads_interval - self.OD_READING_DURATION - (post_delay + pre_delay) - action_after_duration < 0: raise ValueError( "samples_per_second is too high, or post_delay is too high, or pre_delay is too high, or action_to_do_after_od_reading takes too long." ) sleep(ads_interval - self.OD_READING_DURATION - (post_delay + pre_delay) - action_after_duration) + + if self.state != self.READY: + return + self._action_to_do_before_od_reading() # this could fail in the following way: @@ -1146,7 +1152,7 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None: ads_interval = jm.get_setting_from_running_job("od_reading", "interval", timeout=5) ads_start_time = jm.get_setting_from_running_job( "od_reading", "first_od_obs_time", timeout=5 - ) # this is populated later in the job... + ) # this is populated later in the OD job... # get interval, and confirm that the requirements are possible: post_delay + pre_delay <= ADS interval - (od reading duration) if not (ads_interval - self.OD_READING_DURATION > (post_delay + pre_delay)): diff --git a/pioreactor/background_jobs/stirring.py b/pioreactor/background_jobs/stirring.py index e721b995..0c688f95 100644 --- a/pioreactor/background_jobs/stirring.py +++ b/pioreactor/background_jobs/stirring.py @@ -277,10 +277,10 @@ def action_to_do_before_od_reading(self): def action_to_do_after_od_reading(self): self.start_stirring() sleep(1) - self.poll_and_update_dc(2) + self.poll_and_update_dc() def initialize_dodging_operation(self): - if config.getfloat("od_reading.config", "samples_per_second") > 0.12: + if config.getfloat("od_reading.config", "samples_per_second") > 0.121: self.logger.warning( "Recommended to decrease `samples_per_second` to ensure there is time to start/stop stirring. Try 0.12 or less." ) @@ -416,7 +416,9 @@ def poll_and_update_dc(self, poll_for_seconds: Optional[float] = None) -> None: if poll_for_seconds is None: target_n_data_points = 12 rps = self.target_rpm / 60.0 - poll_for_seconds = target_n_data_points / rps + poll_for_seconds = min( + target_n_data_points / rps, 5 + ) # things can break if this function takes too long. self.poll(poll_for_seconds)