Skip to content

Commit

Permalink
Adc and x_to_y
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Feb 9, 2025
1 parent 36f669e commit 003887a
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 77 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
- Specify which Pioreactor to update on the Updates page (option is only available with release archives.)
- Choose the level of detail on the new Event Logs page.
- Previously, when a worker's web server is down, it would halt an update from proceeding (since it can't send the command). Now, leader will try the webserver, and if it observes a 5xx error, will attempt an SSH communication.
- stirring calibration is run as part of self-test now.
- improvements to stirring job when OD readings have a long pause between.

#### Web API changes

- GET `/unit_api/jobs/running/<job>` introduced
- GET `/api/experiment_profiles/running/experiments/<experiment>` introduced

#### Breaking changes

- Calbration structs `predict` is now `x_to_y`, `ipredict` is now `y_to_x`. This is just more clear!
- (Eventually) plugins should migrate from `click_some_name` to autodiscover plugins, to importing `run`. Example:
```
import click
Expand All @@ -44,6 +46,7 @@
- experiment profiles start now use the `unit_api/` directly. This may mitigate the issue where huey workers stampeding on each other when try to start many jobs.
- fix `pio calibrations run ... -y` not saving as active.
- fix manual dosing in the UI
- fix recording logs manually via the UI.

### 25.1.21

Expand Down
6 changes: 3 additions & 3 deletions pioreactor/actions/pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,9 @@ def _get_pump_action(pump_device: PumpCalibrationDevices) -> str:

if not is_default_calibration(waste_calibration) and not is_default_calibration(media_calibration):
# provided with calibrations, we can compute if media_rate > waste_rate, which is a danger zone!
# `predict(1)` asks "how much lqd is moved in 1 second"
if media_calibration.predict(1) > waste_calibration.predict(1):
ratio = min(waste_calibration.predict(1) / media_calibration.predict(1), ratio)
# `x_to_y(1)` asks "how much lqd is moved in 1 second"
if media_calibration.x_to_y(1) > waste_calibration.x_to_y(1):
ratio = min(waste_calibration.x_to_y(1) / media_calibration.x_to_y(1), ratio)
else:
logger.warning(
"Calibrations don't exist for pump(s). Keep an eye on the liquid level to avoid overflowing!"
Expand Down
1 change: 1 addition & 0 deletions pioreactor/background_jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,7 @@ def __init__(self, *args, source="app", **kwargs) -> None:
self._event_is_dodging_od = threading.Event()

def __post__init__(self):
# this method runs after the subclasses init
self.set_enable_dodging_od(
config.getboolean(f"{self.job_name}.config", "enable_dodging_od", fallback="False")
)
Expand Down
2 changes: 1 addition & 1 deletion pioreactor/background_jobs/leader/mqtt_to_db_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def parse_logs(topic: str, payload: pt.MQTTMessagePayload) -> dict:
"timestamp": log.timestamp,
"message": log.message,
"task": log.task,
"level": log.level,
"level": log.level.upper(),
"source": log.source, # should be app, ui, etc.
}

Expand Down
2 changes: 1 addition & 1 deletion pioreactor/background_jobs/od_reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ def calibration(observed_voltage: pt.Voltage) -> pt.OD:
)

try:
return calibration_data.ipredict(observed_voltage, enforce_bounds=True)
return calibration_data.y_to_x(observed_voltage, enforce_bounds=True)
except exc.NoSolutionsFoundError:
if observed_voltage <= min_voltage:
return min_OD
Expand Down
26 changes: 16 additions & 10 deletions pioreactor/background_jobs/stirring.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,13 @@ def initialize_rpm_to_dc_lookup(self) -> Callable:

# since we have calibration data, and the initial_duty_cycle could be
# far off, giving the below equation a bad "first step". We set it here.
self._estimate_duty_cycle = calibration.ipredict(self.target_rpm)
self._estimate_duty_cycle = calibration.y_to_x(self.target_rpm)

# we scale this by 90% to make sure the PID + prediction doesn't overshoot,
# better to be conservative here.
# equivalent to a weighted average: 0.1 * current + 0.9 * predicted
return lambda rpm: self._estimate_duty_cycle - 0.90 * (
self._estimate_duty_cycle - (calibration.ipredict(rpm))
self._estimate_duty_cycle - (calibration.y_to_x(rpm))
)
else:
return lambda rpm: self._estimate_duty_cycle
Expand Down Expand Up @@ -367,7 +367,7 @@ def kick_stirring(self) -> None:
def kick_stirring_but_avoid_od_reading(self) -> None:
"""
This will determine when the next od reading occurs (if possible), and
wait until it completes before kicking stirring.
wait until it completes before kicking stirring or sneak in early.
"""
with JobManager() as jm:
interval = float(jm.get_setting_from_running_job("od_reading", "interval", timeout=5))
Expand All @@ -376,9 +376,13 @@ def kick_stirring_but_avoid_od_reading(self) -> None:
)

seconds_to_next_reading = interval - (time() - first_od_obs_time) % interval
sleep(
seconds_to_next_reading + 2
) # add an additional 2 seconds to make sure we wait long enough for OD reading to complete.

# if seconds_to_next_reading is like 50s (high duration between ODs), let's kick now and not wait.
if seconds_to_next_reading <= 2:
sleep(
seconds_to_next_reading + 2
) # add an additional 2 seconds to make sure we wait long enough for OD reading to complete.

self.kick_stirring()
return

Expand All @@ -403,8 +407,10 @@ def poll(self, poll_for_seconds: float) -> Optional[structs.MeasuredRPM]:
self.blink_error_code(error_codes.STIRRING_FAILED)

is_od_running = is_pio_job_running("od_reading")
is_dodging = self.currently_dodging_od

if not is_od_running:
if not is_od_running or is_dodging:
# if dodging, poll only runs when needed (outside od readings), so it's always safe to kick.
self.kick_stirring()
else:
self.kick_stirring_but_avoid_od_reading()
Expand All @@ -418,9 +424,9 @@ def poll_and_update_dc(self, poll_for_seconds: Optional[float] = None) -> None:
if poll_for_seconds is None:
target_n_data_points = 12
rps = self.target_rpm / 60.0
poll_for_seconds = min(
target_n_data_points / rps, 5
) # things can break if this function takes too long.
poll_for_seconds = max(
1, min(target_n_data_points / rps, 5)
) # things can break if this function takes too long, but always get _some_ data.

self.poll(poll_for_seconds)

Expand Down
66 changes: 33 additions & 33 deletions pioreactor/calibrations/stirring_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
from time import sleep

from pioreactor.background_jobs import stirring
from pioreactor.calibrations.utils import linspace
from pioreactor.config import config
from pioreactor.config import temporary_config_change
from pioreactor.exc import JobPresentError
from pioreactor.hardware import voltage_in_aux
from pioreactor.logging import create_logger
from pioreactor.structs import SimpleStirringCalibration
from pioreactor.utils import clamp
from pioreactor.utils import is_pio_job_running
from pioreactor.utils import managed_lifecycle
from pioreactor.utils.math_helpers import simple_linear_regression
Expand All @@ -28,7 +31,7 @@ def run_stirring_calibration(
if max_dc is None and min_dc is None:
# seed with initial_duty_cycle
config_initial_duty_cycle = config.getfloat("stirring.config", "initial_duty_cycle", fallback=30)
min_dc, max_dc = round(config_initial_duty_cycle * 0.75), round(config_initial_duty_cycle * 1.33)
min_dc, max_dc = config_initial_duty_cycle * 0.75, clamp(0, config_initial_duty_cycle * 1.5, 100)
elif (max_dc is not None) and (min_dc is not None):
assert min_dc < max_dc, "min_dc >= max_dc"
else:
Expand All @@ -49,46 +52,43 @@ def run_stirring_calibration(
measured_rpms = []

# go up and down to observe any hysteresis.
dcs = (
list(range(round(max_dc), round(min_dc) - 2, -3))
+ list(range(round(min_dc), round(max_dc) + 3, 3))
+ list(range(round(max_dc), round(min_dc) - 2, -3))
)
dcs = linspace(max_dc, min_dc, 5) + linspace(min_dc, min_dc, 5) + linspace(max_dc, min_dc, 5)
n_samples = len(dcs)

with stirring.RpmFromFrequency() as rpm_calc, stirring.Stirrer(
target_rpm=0,
unit=unit,
experiment=experiment,
rpm_calculator=None,
) as st:
rpm_calc.setup()
st.duty_cycle = (
max_dc + min_dc
) / 2 # we start with a somewhat low value, s.t. the stir bar is caught.
st.start_stirring()
sleep(5)

for count, dc in enumerate(dcs, start=1):
st.set_duty_cycle(dc)
sleep(1.5)
rpm = rpm_calc.estimate(2)
measured_rpms.append(rpm)
logger.debug(f"Detected {rpm=:.1f} RPM @ {dc=}%")

# log progress
lc.mqtt_client.publish(
f"pioreactor/{unit}/{experiment}/{action_name}/percent_progress",
count / n_samples * 100,
)
logger.debug(f"Progress: {count/n_samples:.0%}")
with temporary_config_change(config, "stirring.config", "enable_dodging_od", "False"):
with stirring.RpmFromFrequency() as rpm_calc, stirring.Stirrer(
target_rpm=0,
unit=unit,
experiment=experiment,
rpm_calculator=None,
) as st:
rpm_calc.setup()
st.duty_cycle = (
max_dc + min_dc
) / 2 # we start with a somewhat low value, s.t. the stir bar is caught.
st.start_stirring()
sleep(3)

for count, dc in enumerate(dcs, start=1):
st.set_duty_cycle(dc)
sleep(2.0)
rpm = rpm_calc.estimate(2)
measured_rpms.append(rpm)
logger.debug(f"Detected {rpm=:.1f} RPM @ {dc=}%")

# log progress
lc.mqtt_client.publish(
f"pioreactor/{unit}/{experiment}/{action_name}/percent_progress",
count / n_samples * 100,
)
logger.debug(f"Progress: {count/n_samples:.0%}")

# drop any 0 in RPM, too little DC
try:
filtered_dcs, filtered_measured_rpms = zip(*filter(lambda d: d[1] > 0, zip(dcs, measured_rpms)))
except ValueError:
# the above can fail if all measured rpms are 0
logger.error("No RPMs were measured. Is the stirring spinning?")
logger.warning("No RPMs were measured. Is the stirring spinning?")
raise ValueError("No RPMs were measured. Is the stirring spinning?")

if len(filtered_dcs) <= n_samples * 0.75:
Expand Down
5 changes: 3 additions & 2 deletions pioreactor/cli/calibrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,16 @@ def delete_calibration(device: str, calibration_name: str) -> None:
calibration delete --device od --name my_od_cal_v1
"""
target_file = CALIBRATION_PATH / device / f"{calibration_name}.yaml"

if not target_file.exists():
click.echo(f"No such calibration file: {target_file}")
raise click.Abort()

target_file.unlink()

cal = load_calibration(device, calibration_name)
cal.remove_as_active_calibration_for_device(device)

target_file.unlink()

click.echo(f"Deleted calibration '{calibration_name}' of device '{device}'.")


Expand Down
4 changes: 2 additions & 2 deletions pioreactor/cli/pios.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def _thread_function(unit: str) -> bool:
logger.debug(f"Error occurred: {e}.", exc_info=True)
return False

with ThreadPoolExecutor(max_workers=len(units)) as executor:
with ThreadPoolExecutor(max_workers=min(len(units), 6)) as executor:
results = executor.map(_thread_function, units)

if not all(results):
Expand Down Expand Up @@ -627,7 +627,7 @@ def _thread_function(unit: str) -> bool:
# save config.inis to database
save_config_files_to_db(units, shared, specific)

with ThreadPoolExecutor(max_workers=len(units)) as executor:
with ThreadPoolExecutor(max_workers=min(len(units), 6)) as executor:
results = executor.map(_thread_function, units)

if not all(results):
Expand Down
12 changes: 4 additions & 8 deletions pioreactor/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,12 @@ def calibration_type(self):

def save_to_disk_for_device(self, device: str) -> str:
from pioreactor.calibrations import CALIBRATION_PATH
import shutil

logger = create_logger("calibrations")

calibration_dir = CALIBRATION_PATH / device
calibration_dir.mkdir(parents=True, exist_ok=True)

# Set ownership to pioreactor:www-data using shutil
# shutil.chown(calibration_dir, user="pioreactor", group="www-data")

out_file = calibration_dir / f"{self.calibration_name}.yaml"

# Serialize to YAML
Expand Down Expand Up @@ -214,14 +210,14 @@ def exists_on_disk_for_device(self, device: str) -> bool:

return target_file.exists()

def predict(self, x: X) -> Y:
def x_to_y(self, x: X) -> Y:
"""
Predict y given x
"""
assert self.curve_type == "poly"
return sum([c * x**i for i, c in enumerate(reversed(self.curve_data_))])

def ipredict(self, y: Y, enforce_bounds=False) -> X:
def y_to_x(self, y: Y, enforce_bounds=False) -> X:
"""
predict x given y
"""
Expand Down Expand Up @@ -297,10 +293,10 @@ class SimplePeristalticPumpCalibration(CalibrationBase, kw_only=True, tag="simpl
y: str = "Volume"

def ml_to_duration(self, ml: pt.mL) -> pt.Seconds:
return t.cast(pt.Seconds, self.ipredict(ml))
return t.cast(pt.Seconds, self.y_to_x(ml))

def duration_to_ml(self, duration: pt.Seconds) -> pt.mL:
return t.cast(pt.mL, self.predict(duration))
return t.cast(pt.mL, self.x_to_y(duration))


class SimpleStirringCalibration(CalibrationBase, kw_only=True, tag="simple_stirring"):
Expand Down
Loading

0 comments on commit 003887a

Please sign in to comment.