Skip to content

Commit

Permalink
change ODCal schema, get pio calibrations analyze working
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Jan 13, 2025
1 parent 361a987 commit e9d819c
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 76 deletions.
15 changes: 9 additions & 6 deletions pioreactor/background_jobs/od_reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
from pioreactor.utils.streaming_calculations import ExponentialMovingAverage
from pioreactor.utils.streaming_calculations import ExponentialMovingStd
from pioreactor.utils.timing import catchtime
from pioreactor.utils.math_helpers import closest_point_to_domain

ALL_PD_CHANNELS: list[pt.PdChannel] = ["1", "2"]
VALID_PD_ANGLES: list[pt.PdAngle] = ["45", "90", "135", "180"]
Expand Down Expand Up @@ -673,20 +672,24 @@ def _hydrate_model(self, calibration_data: structs.ODCalibration) -> Callable[[f
this procedure effectively ignores it.
"""
from numpy import roots, zeros_like, real, imag

def calibration(observed_voltage: pt.Voltage) -> pt.OD:
poly = calibration_data.curve_data_
min_OD, max_OD = min(calibration_data.recorded_data['y']), max(calibration_data.recorded_data['y'])
min_voltage, max_voltage = min(calibration_data.recorded_data['x']), max(calibration_data.recorded_data['x'])
min_OD, max_OD = min(calibration_data.recorded_data["y"]), max(
calibration_data.recorded_data["y"]
)
min_voltage, max_voltage = min(calibration_data.recorded_data["x"]), max(
calibration_data.recorded_data["x"]
)

try:
return calibration_data.ipredict(observed_voltage)
except exc.NoSolutionsFoundError as e:
except exc.NoSolutionsFoundError:
if observed_voltage <= min_voltage:
return min_OD
elif observed_voltage > max_voltage:
return max_OD
else:
raise exc.NoSolutionsFoundError
except exc.SolutionBelowDomainError:
self.logger.warning(
f"Signal outside suggested calibration range. Trimming signal. Calibrated for OD=[{min_OD:0.3g}, {max_OD:0.3g}], V=[{min_voltage:0.3g}, {max_voltage:0.3g}]. Observed {observed_voltage:0.3f}V."
Expand Down
63 changes: 36 additions & 27 deletions pioreactor/calibrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
from __future__ import annotations

from typing import Callable

import click

from pioreactor import structs


def green(string: str) -> str:
return click.style(string, fg="green")


def red(string: str) -> str:
return click.style(string, fg="red")

Expand All @@ -17,25 +20,24 @@ def bold(string: str) -> str:
return click.style(string, bold=True)


def calculate_curve_of_best_fit(
x: list[float], y: list[float], degree: int
) -> tuple[list[float], str]:
def calculate_poly_curve_of_best_fit(x: list[float], y: list[float], degree: int) -> list[float]:
import numpy as np

# weigh the last point, the "blank measurement", more.
# 1. It's far away from the other points
# 2. We have prior knowledge that OD~0 when V~0.
n = len(voltages)
weights = np.ones_like(voltages)
n = len(x)
weights = np.ones_like(x)
weights[-1] = n / 2

try:
coefs = np.polyfit(inferred_od600s, voltages, deg=degree, w=weights).tolist()
coefs = np.polyfit(x, y, deg=degree, w=weights).tolist()
except Exception:
echo("Unable to fit.")
click.echo("Unable to fit.")
coefs = np.zeros(degree).tolist()

return coefs, "poly"
return coefs


def curve_to_functional_form(curve_type: str, curve_data) -> str:
if curve_type == "poly":
Expand Down Expand Up @@ -98,49 +100,56 @@ def plot_data(
plt.show()



def crunch_data_and_confirm_with_user(
calibration
) -> bool:

click.clear()

def crunch_data_and_confirm_with_user(calibration: structs.AnyCalibration) -> structs.AnyCalibration:
y, x = calibration.recorded_data["y"], calibration.recorded_data["x"]
candidate_curve = calibration.curve_data_

while True:
click.clear()

if candidate_curve is not None:
if candidate_curve is None:
degree = 1
candidate_curve = calculate_curve_of_best_fit(x, y, degree)

if calibration.curve_type == "poly":
candidate_curve = calculate_poly_curve_of_best_fit(x, y, degree)
else:
raise ValueError("only poly supported")

curve_callable = curve_to_callable("poly", candidate_curve)
plot_data(
x,
y,
interpolation_curve=curve_callable,
highlight_recent_point=False,
title="Calibration Curve",
x_label=calibration.x,
y_label=calibration.y,
)
click.echo()
click.echo(f"Calibration curve: {curve_to_functional_form(curve_type, candidate_curve)}")

click.echo(f"Calibration curve: {curve_to_functional_form(calibration.curve_type, candidate_curve)}")
r = click.prompt(
green(
f"""
y: confirm and save to disk
n: exit completely
d: choose a new degree for polynomial fit (currently {len(candidate_curve)-1})
"""
y: confirm and save to disk
q: exit completely
d: choose a new degree for polynomial fit (currently {len(candidate_curve)-1})
"""
),
type=click.Choice(["y", "n", "d"]),
type=click.Choice(["y", "q", "d"]),
)
if r == "y":
calibration.curve_data_ = candidate_curve
return True
return calibration
elif r == "n":
return False
return calibration
elif r == "d":
degree = click.prompt(green("Enter new degree"), type=click.IntRange(1, 5, clamp=True))

if calibration.curve_type == "poly":
candidate_curve = calculate_poly_curve_of_best_fit(x, y, degree)
else:
raise ValueError("only poly supported")

else:
return False
return calibration
15 changes: 6 additions & 9 deletions pioreactor/cli/calibrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pioreactor.calibrations import list_devices
from pioreactor.calibrations import list_of_calibrations_by_device
from pioreactor.calibrations import load_calibration
from pioreactor.calibrations.utils import crunch_data_and_confirm_with_user
from pioreactor.calibrations.utils import curve_to_callable
from pioreactor.calibrations.utils import plot_data
from pioreactor.utils import local_persistent_storage
Expand Down Expand Up @@ -197,25 +198,21 @@ def delete_calibration(device: str, calibration_name: str) -> None:
if is_present:
c.pop(device)


click.echo(f"Deleted calibration '{calibration_name}' of device '{device}'.")


@calibration.command(name="analyze")
@click.option("--device", required=True, help="Which calibration device to delete from.")
@click.option("--name", "calibration_name", required=True, help="Which calibration name to delete.")
def analyse_calibration(device: str, calibration_name: str) -> None:
"""
Delete a calibration file from local storage.
Example usage:
calibration delete --device od --name my_od_cal_v1
Analyze a calibration file from local storage.
"""
target_file = CALIBRATION_PATH / device / f"{calibration_name}.yaml"
if not target_file.exists():
click.echo(f"No such calibration file: {target_file}")
raise click.Abort()


data = load_calibration(device, calibration_name)
finsished, degree = show_results_and_confirm_with_user(...)
# TODO finish this
calibration = load_calibration(device, calibration_name)
calibration = crunch_data_and_confirm_with_user(calibration)
calibration.save_to_disk_for_device(device)
2 changes: 2 additions & 0 deletions pioreactor/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class RsyncError(OSError):
Syncing files failed
"""


class NoSolutionsFoundError(ValueError):
"""
No solutions found
Expand All @@ -84,6 +85,7 @@ class SolutionBelowDomainError(ValueError):
Outside minimum range
"""


class SolutionAboveDomainError(ValueError):
"""
Outside maximum range
Expand Down
9 changes: 5 additions & 4 deletions pioreactor/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@

from pioreactor import exc
from pioreactor import types as pt
from pioreactor.utils.math_helpers import closest_point_to_domain


T = t.TypeVar("T")



def subclass_union(cls: t.Type[T]) -> t.Type[T]:
"""
Returns a Union of all subclasses of `cls` (excluding `cls` itself)
Expand Down Expand Up @@ -141,9 +139,11 @@ class Voltage(JSONPrintedStruct):
timestamp: t.Annotated[datetime, Meta(tz=True)]
voltage: pt.Voltage


X = float
Y = float


class CalibrationBase(Struct, tag_field="calibration_type", kw_only=True):
calibration_name: str
calibrated_on_pioreactor_unit: str
Expand Down Expand Up @@ -192,8 +192,7 @@ def predict(self, x: X) -> Y:
Predict y given x
"""
assert self.curve_type == "poly"
return sum([c * x ** i for i, c in enumerate(reversed(self.curve_data_))])

return sum([c * x**i for i, c in enumerate(reversed(self.curve_data_))])

def ipredict(self, y: Y) -> X:
assert self.curve_type == "poly"
Expand All @@ -206,6 +205,7 @@ def ipredict(self, y: Y) -> X:

# complex case: we have to solve the polynomial roots numerically, possibly with complex roots
from numpy import roots, zeros_like, real, imag
from pioreactor.utils.math_helpers import closest_point_to_domain

min_X, max_X = min(self.recorded_data["x"]), max(self.recorded_data["x"])

Expand Down Expand Up @@ -239,6 +239,7 @@ def ipredict(self, y: Y) -> X:
else:
raise exc.SolutionAboveDomainError("Solution below domain")


class ODCalibration(CalibrationBase, kw_only=True, tag="od"):
ir_led_intensity: float
angle: t.Literal["45", "90", "135", "180"]
Expand Down
17 changes: 13 additions & 4 deletions pioreactor/tests/test_calibrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import pytest
from msgspec import ValidationError

from pioreactor import exc
from pioreactor.calibrations import CALIBRATION_PATH
from pioreactor.calibrations import load_active_calibration
from pioreactor.calibrations import load_calibration
from pioreactor.structs import ODCalibration
from pioreactor.structs import CalibrationBase
from pioreactor.structs import ODCalibration
from pioreactor.utils import local_persistent_storage
from pioreactor import exc


@pytest.fixture
Expand Down Expand Up @@ -97,9 +97,9 @@ def test_load_calibration_validation_error(temp_calibration_dir) -> None:
assert "Error reading bad_cal" in str(exc_info.value)



# test calibration structs


@pytest.fixture
def calibration():
return CalibrationBase(
Expand All @@ -113,56 +113,65 @@ def calibration():
recorded_data={"x": [0.1, 0.2, 0.3], "y": [1.0, 2.0, 3.0]},
)


def test_predict_linear(calibration) -> None:
calibration.curve_data_ = [3, 2] # 3x + 2
x = 4
expected_y = 3 * x + 2
assert calibration.predict(x) == expected_y


def test_predict_quadratic(calibration) -> None:
calibration.curve_data_ = [5, 3, 2] # 5x^2 + 3x + 2
x = 2
expected_y = 5 * x ** 2 + 3 * x + 2
expected_y = 5 * x**2 + 3 * x + 2
assert calibration.predict(x) == expected_y


def test_ipredict_linear(calibration) -> None:
calibration.curve_data_ = [3, 2] # 3x + 2
y = 14
expected_x = (y - 2) / 3
assert calibration.ipredict(y) == pytest.approx(expected_x)


def test_ipredict_quadratic_single_solution(calibration) -> None:
calibration.curve_data_ = [5, 3, 2] # 5x^2 + 3x + 2
calibration.recorded_data = {"x": [0, 2], "y": [2, 20]}
y = 12
expected_x = 1.145683229480096 # Solves 5x^2 + 3x + 2 = 12
assert calibration.ipredict(y) == pytest.approx(expected_x)


def test_ipredict_no_solution(calibration) -> None:
calibration.curve_data_ = [1, 0, 5] # x^2 + 5, no solution for y = -10
with pytest.raises(exc.NoSolutionsFoundError):
calibration.ipredict(-10)


def test_ipredict_multiple_solutions(calibration) -> None:
calibration.curve_data_ = [1, 0, -6] # x^2 - 6, solutions for y=0 are +- 2.45
calibration.recorded_data = {"x": [0, 3], "y": [0, 9]}
y = 0
assert calibration.ipredict(y) == pytest.approx(2.44948974)


def test_ipredict_solution_below_domain(calibration) -> None:
calibration.curve_data_ = [5, 3, 2] # 5x^2 + 3x + 2
calibration.recorded_data = {"x": [0, 1], "y": [10, 20]}
y = 100 # Solution below domain
with pytest.raises(exc.SolutionBelowDomainError):
calibration.ipredict(y)


def test_ipredict_solution_above_domain(calibration) -> None:
calibration.curve_data_ = [25, -10, 1] # 25x^2 - 10x + 1
calibration.recorded_data = {"x": [0, 1], "y": [0, 100]}
y = 50 # Solution above domain
with pytest.raises(exc.SolutionAboveDomainError):
calibration.ipredict(y)


def test_predict_ipredict_consistency(calibration) -> None:
calibration.curve_data_ = [2, -3, 1] # 2x^2 - 3x + 1
calibration.recorded_data = {"x": [0, 3], "y": [1, 16]}
Expand Down
23 changes: 23 additions & 0 deletions pioreactor/tests/test_math_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest

from pioreactor.utils.math_helpers import closest_point_to_domain
from pioreactor.utils.math_helpers import simple_linear_regression
from pioreactor.utils.math_helpers import trimmed_mean

Expand Down Expand Up @@ -43,3 +44,25 @@ def test_trimmed_mean() -> None:

assert trimmed_mean([-100, -50, 1, 2, 3, 50, 10], cut_off_n=2) == 2
assert trimmed_mean([100, -50, -1, 0, 1, -50, 10], cut_off_n=2) == 0


def test_closest_point_single_point_in_domain():
assert closest_point_to_domain([1.0], (0.5, 1.5)) == 1.0


def test_closest_point_multiple_points_in_domain():
assert closest_point_to_domain([0.6, 0.8, 1.2], (0.5, 1.5)) == 0.6


def test_closest_point_all_outside_domain():
assert closest_point_to_domain([2.0, 3.0], (0.5, 1.5)) == 2.0
assert closest_point_to_domain([-1.0, -2.0], (0.5, 1.5)) == -1.0


def test_closest_point_empty_list():
with pytest.raises(AssertionError):
closest_point_to_domain([], (0.5, 1.5))


def test_closest_point_on_boundaries():
assert closest_point_to_domain([0.5, 1.5], (0.5, 1.5)) == 0.5
Loading

0 comments on commit e9d819c

Please sign in to comment.