Skip to content

Commit

Permalink
duration is added in the super classes afterwards
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Dec 1, 2024
1 parent 4eb6b2b commit 975e27b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
46 changes: 26 additions & 20 deletions pioreactor/background_jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,6 @@ def __post__init__(self) -> None:
# task in on_ready, which delays writing to the db, which means `pio kill` might not see it.
self.set_state(self.READY)

# now start listening to confirm our state is correct in mqtt
self.subscribe_and_callback(
self._confirm_state_in_broker,
f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/$state",
)

def start_passive_listeners(self) -> None:
# overwrite this to in subclasses to subscribe to topics in MQTT
# using this handles reconnects correctly.
Expand Down Expand Up @@ -885,6 +879,13 @@ def _start_general_passive_listeners(self) -> None:
allow_retained=False,
)

# TODO: previously this was in __post_init__ - why?
# now start listening to confirm our state is correct in mqtt
self.subscribe_and_callback(
self._confirm_state_in_broker,
f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/$state",
)

def _confirm_state_in_broker(self, message: pt.MQTTMessage) -> None:
if message.payload is None:
return
Expand Down Expand Up @@ -978,6 +979,10 @@ def __init__(self, unit: str, experiment: str, plugin_name: str) -> None:
super().__init__(unit, experiment, source=plugin_name)


def _noop():
pass


class BackgroundJobWithDodging(_BackgroundJob):
"""
This utility class allows for a change in behaviour when an OD reading is about to taken. Example: shutting
Expand Down Expand Up @@ -1039,7 +1044,7 @@ def __init__(self, *args, source="app", **kwargs) -> None:
)

self.sneak_in_timer = RepeatedTimer(
5, self._noop, job_name=self.job_name, logger=self.logger
5, _noop, job_name=self.job_name, logger=self.logger
) # placeholder?
self.add_to_published_settings("enable_dodging_od", {"datatype": "boolean", "settable": True})
self.add_to_published_settings("currently_dodging_od", {"datatype": "boolean", "settable": False})
Expand All @@ -1051,9 +1056,6 @@ def __post__init__(self):
self.start_passive_listeners()
super().__post__init__()

def _noop(self):
pass

def set_currently_dodging_od(self, value: bool):
self.currently_dodging_od = value
if self.currently_dodging_od:
Expand All @@ -1063,14 +1065,14 @@ def set_currently_dodging_od(self, value: bool):
self._setup_timer()
else:
self.initialize_continuous_operation() # user defined
self._action_to_do_before_od_reading = self._noop
self._action_to_do_after_od_reading = self._noop
self._action_to_do_before_od_reading = _noop
self._action_to_do_after_od_reading = _noop
self.sneak_in_timer.cancel()

def set_enable_dodging_od(self, value: bool):
self.enable_dodging_od = value
if self.enable_dodging_od:
if self.is_od_job_running():
if is_pio_job_running("od_reading"):
self.logger.debug("Will attempt to dodge OD readings.")
self.set_currently_dodging_od(True)
else:
Expand All @@ -1080,22 +1082,20 @@ def set_enable_dodging_od(self, value: bool):
self.logger.debug("Running continuously through OD readings.")
self.set_currently_dodging_od(False)

def is_od_job_running(self) -> bool:
return is_pio_job_running("od_reading")

def action_to_do_after_od_reading(self) -> None:
raise NotImplementedError()
pass

def action_to_do_before_od_reading(self) -> None:
raise NotImplementedError()
pass

def initialize_dodging_operation(self) -> None:
pass

def initialize_continuous_operation(self) -> None:
pass

def start_passive_listeners(self) -> None:
def _start_general_passive_listeners(self) -> None:
super()._start_general_passive_listeners()
self.subscribe_and_callback(
self._od_reading_changed_status,
f"pioreactor/{self.unit}/{self.experiment}/od_reading/interval",
Expand Down Expand Up @@ -1130,13 +1130,19 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None:

with catchtime() as timer:
self._action_to_do_after_od_reading()

action_after_duration = timer()

if ads_interval - self.OD_READING_DURATION - (post_delay + pre_delay) - action_after_duration < 0:
raise ValueError(
"samples_per_second is too high, or post_delay is too high, or pre_delay is too high, or action_to_do_after_od_reading takes too long."
)

sleep(ads_interval - self.OD_READING_DURATION - (post_delay + pre_delay) - action_after_duration)

if self.state != self.READY:
return

self._action_to_do_before_od_reading()

# this could fail in the following way:
Expand All @@ -1146,7 +1152,7 @@ def sneak_in(ads_interval, post_delay, pre_delay) -> None:
ads_interval = jm.get_setting_from_running_job("od_reading", "interval", timeout=5)
ads_start_time = jm.get_setting_from_running_job(
"od_reading", "first_od_obs_time", timeout=5
) # this is populated later in the job...
) # this is populated later in the OD job...

# get interval, and confirm that the requirements are possible: post_delay + pre_delay <= ADS interval - (od reading duration)
if not (ads_interval - self.OD_READING_DURATION > (post_delay + pre_delay)):
Expand Down
8 changes: 5 additions & 3 deletions pioreactor/background_jobs/stirring.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ def action_to_do_before_od_reading(self):
def action_to_do_after_od_reading(self):
self.start_stirring()
sleep(1)
self.poll_and_update_dc(2)
self.poll_and_update_dc()

def initialize_dodging_operation(self):
if config.getfloat("od_reading.config", "samples_per_second") > 0.12:
if config.getfloat("od_reading.config", "samples_per_second") > 0.121:
self.logger.warning(
"Recommended to decrease `samples_per_second` to ensure there is time to start/stop stirring. Try 0.12 or less."
)
Expand Down Expand Up @@ -416,7 +416,9 @@ def poll_and_update_dc(self, poll_for_seconds: Optional[float] = None) -> None:
if poll_for_seconds is None:
target_n_data_points = 12
rps = self.target_rpm / 60.0
poll_for_seconds = target_n_data_points / rps
poll_for_seconds = min(
target_n_data_points / rps, 5
) # things can break if this function takes too long.

self.poll(poll_for_seconds)

Expand Down

0 comments on commit 975e27b

Please sign in to comment.