Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Dec 14, 2024
1 parent 0cdf51c commit c9fa9b7
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 130 deletions.
1 change: 0 additions & 1 deletion pioreactor/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from pioreactor.actions import pump
from pioreactor.actions import pump_calibration
from pioreactor.actions import self_test
from pioreactor.actions import stirring_calibration
from pioreactor.actions.leader import backup_database
from pioreactor.actions.leader import experiment_profile
from pioreactor.actions.leader import export_experiment_data
11 changes: 5 additions & 6 deletions pioreactor/actions/pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,16 @@
from pioreactor.whoami import get_unit_name

DEFAULT_PWM_CALIBRATION = structs.PumpCalibration(
# TODO: provide better estimates for duration_ and bias_ based on some historical data.
# it can even be a function of voltage
name="default",
pioreactor_unit=get_unit_name(),
created_at=default_datetime_for_pioreactor(),
pump="",
hz=200.0,
dc=100.0,
duration_=1.0,
bias_=0,
voltage=-1,
calibration_name="default_pwm_calibration",
curve_type="poly",
curve_data_ = [1., 0.],
recorded_data = {'x': [], 'y': []},
calibration_subtype="generic"
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@
import json
from time import sleep

import click

from pioreactor.background_jobs import stirring
from pioreactor.config import config
from pioreactor.logging import create_logger
from pioreactor.pubsub import publish
from pioreactor.utils import is_pio_job_running
from pioreactor.utils import local_persistant_storage
from pioreactor.utils import managed_lifecycle
Expand All @@ -23,20 +20,34 @@
from pioreactor.whoami import get_assigned_experiment_name
from pioreactor.whoami import get_testing_experiment_name
from pioreactor.whoami import get_unit_name
from pioreactor.structs import StirringCalibration
from pioreactor.hardware import voltage_in_aux
from pioreactor.exc import JobPresentError

def run_stirring_calibration(min_dc: int | None = None, max_dc: int | None = None) -> StirringCalibration:


if max_dc is None and min_dc is None:
# seed with initial_duty_cycle
config_initial_duty_cycle = config.getfloat("stirring.config", "initial_duty_cycle")
min_dc, max_dc = round(config_initial_duty_cycle * 0.75), round(config_initial_duty_cycle * 1.33)
elif (max_dc is not None) and (min_dc is not None):
assert min_dc < max_dc, "min_dc >= max_dc"
else:
raise ValueError("min_dc and max_dc must both be set.")


def stirring_calibration(min_dc: int, max_dc: int) -> None:
unit = get_unit_name()
experiment = get_testing_experiment_name()
action_name = "stirring_calibration"
logger = create_logger(action_name)

with managed_lifecycle(unit, get_assigned_experiment_name(unit), action_name):
with managed_lifecycle(unit, get_assigned_experiment_name(unit), action_name) as lc:
logger.info("Starting stirring calibration.")

if is_pio_job_running("stirring"):
logger.error("Make sure Stirring job is off before running stirring calibration. Exiting.")
return
raise JobPresentError("Make sure Stirring job is off before running stirring calibration.")

measured_rpms = []

Expand Down Expand Up @@ -69,7 +80,7 @@ def stirring_calibration(min_dc: int, max_dc: int) -> None:
logger.debug(f"Detected {rpm=:.1f} RPM @ {dc=}%")

# log progress
publish(
lc.mqtt_client.publish(
f"pioreactor/{unit}/{experiment}/{action_name}/percent_progress",
count / n_samples * 100,
)
Expand All @@ -81,14 +92,14 @@ def stirring_calibration(min_dc: int, max_dc: int) -> None:
except ValueError:
# the above can fail if all measured rpms are 0
logger.error("No RPMs were measured. Is the stirring spinning?")
return
raise ValueError("No RPMs were measured. Is the stirring spinning?")

if len(filtered_dcs) <= n_samples * 0.75:
# the above can fail if all measured rpms are 0
logger.warning(
"Not enough RPMs were measured. Is the stirring spinning and working correctly? Try changing your initial_duty_cycle."
)
return
raise ValueError("Not enough RPMs were measured.")

# since in practice, we want a look up from RPM -> required DC, we
# set x=measure_rpms, y=dcs
Expand All @@ -99,51 +110,19 @@ def stirring_calibration(min_dc: int, max_dc: int) -> None:

if rpm_coef <= 0:
logger.warning("Something went wrong - detected negative correlation between RPM and stirring.")
return
raise ValueError("Negative correlation between RPM and stirring.")

elif intercept <= 0:
logger.warning("Something went wrong - the intercept should be greater than 0.")
return

with local_persistant_storage(action_name) as cache:
cache["linear_v1"] = json.dumps(
{
"rpm_coef": rpm_coef,
"intercept": intercept,
"timestamp": current_utc_timestamp(),
}
)
cache["stirring_calibration_data"] = json.dumps(
{
"timestamp": current_utc_timestamp(),
"data": {"dcs": dcs, "measured_rpms": measured_rpms},
}
)


@click.option(
"--min-dc",
help="value between 0 and 100",
type=click.IntRange(0, 100),
)
@click.option(
"--max-dc",
help="value between 0 and 100",
type=click.IntRange(0, 100),
)
@click.command(name="stirring_calibration")
def click_stirring_calibration(min_dc: int, max_dc: int) -> None:
"""
Generate a lookup between stirring and voltage
"""

if max_dc is None and min_dc is None:
# seed with initial_duty_cycle
config_initial_duty_cycle = config.getfloat("stirring.config", "initial_duty_cycle")
min_dc, max_dc = round(config_initial_duty_cycle * 0.75), round(config_initial_duty_cycle * 1.33)
elif (max_dc is not None) and (min_dc is not None):
assert min_dc < max_dc, "min_dc >= max_dc"
else:
raise ValueError("min_dc and max_dc must both be set.")

stirring_calibration(min_dc, max_dc)
raise ValueError("Intercept should be greater than 0.")

return StirringCalibration(
hz=config.getfloat("stirring.config", "hz"),
voltage=voltage_in_aux(),
calibration_name = f"stirring-calibration-{current_utc_timestamp()}",
pioreactor_unit=unit,
created_at=current_utc_timestamp(),
curve_data_= [rpm_coef, intercept],
curve_type = "poly",
recorded_data={"x": filtered_dcs, "y": filtered_measured_rpms}
)
184 changes: 184 additions & 0 deletions pioreactor/cli/calibrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import click
from pathlib import Path
from pioreactor import structs
from pioreactor.whoami import is_testing_env
from msgspec.yaml import decode as yaml_decode

if not is_testing_env():
CALIBRATION_PATH = Path("/home/pioreactor/.pioreactor/storage/calibrations/")
else:
CALIBRATION_PATH = Path(".pioreactor/storage/calibrations/")

# Lookup table for different calibration assistants
CALIBRATION_ASSISTANTS = {}

class CalibrationAssistant:

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
CALIBRATION_ASSISTANTS[cls.target_calibration_type] = cls

def run(self):
raise NotImplementedError("Subclasses must implement this method.")

class ODAssistant(CalibrationAssistant):

target_calibration_type = "od"
calibration_struct = structs.ODCalibration

def __init__(self):
pass

class PumpAssistant(CalibrationAssistant):

target_calibration_type = "pump"
calibration_struct = structs.PumpCalibration

def __init__(self):
pass

class StirringAssistant(CalibrationAssistant):

target_calibration_type = "stirring"
calibration_struct = structs.StirringCalibration

def __init__(self):
pass

def run(self):
from pioreactor.calibrations.stirring import run_stirring_calibration
return run_stirring_calibration()



@click.group(short_help="calibration utils")
def calibration():
"""
Calibration CLI - A unified interface for all calibration types.
"""
pass


@calibration.command(name="list")
@click.option("--type", "cal_type", required=True, help="Filter by calibration type.")
def list_calibrations(cal_type: str):
"""
List existing calibrations for the given type.
"""
calibration_dir = CALIBRATION_PATH / cal_type
if not calibration_dir.exists():
click.echo(f"No calibrations found for type '{cal_type}'. Directory does not exist.")
return

assistant = CALIBRATION_ASSISTANTS.get(cal_type)


for file in calibration_dir.glob("*.yaml"):
try:
yaml_decode(file.read_bytes(), type=assistant.calibration_struct)
click.echo(file.stem)
except Exception as e:
click.echo(f"Error reading {file.stem()}: {e}")



@calibration.command(name="run")
@click.option("--type", "cal_type", required=True, help="Type of calibration (e.g. od, pump, stirring).")
def run_calibration(cal_type: str):
"""
Run an interactive calibration assistant for a specific type.
On completion, stores a YAML file in: /home/pioreactor/.pioreactor/storage/calibrations/<type>/<calibration_name>.yaml
"""
calibration_dir = CALIBRATION_PATH / cal_type
calibration_dir.mkdir(parents=True, exist_ok=True)

# Dispatch to the assistant function for that type
assistant = CALIBRATION_ASSISTANTS.get(cal_type)
if assistant is None:
click.echo(f"No assistant found for calibration type '{cal_type}'.")
raise click.Abort()

# Run the assistant function to get the final calibration data
calibration_data, calibration_name = assistant().run()

out_file = calibration_dir / f"{calibration_name}.yaml"

# Serialize to YAML
with out_file.open("wb") as f:
f.write(yaml_encode(calibration_data))

# TODO: send to leader

click.echo(f"Calibration '{calibration_name}' of type '{cal_type}' saved to {out_file}")


@calibration.command(name="display")
@click.option("--type", "cal_type", required=True, help="Calibration type.")
@click.option("--name", "calibration_name", required=True, help="Name of calibration to display.")
def display_calibration(cal_type: str, calibration_name: str):
"""
Display the contents of a calibration YAML file.
"""
file = CALIBRATION_PATH / cal_type / f"{calibration_name}.yaml"
if not file.exists():
click.echo(f"No such calibration file: {file}")
raise click.Abort()


assistant = CALIBRATION_ASSISTANTS.get(cal_type)

try:
data = yaml_decode(file.read_bytes(), type=assistant.calibration_struct)
except Exception as e:
click.echo(f"Error reading {file.stem()}: {e}")

click.echo(data)


@calibration.command(name="set-current")
@click.option("--type", "cal_type", required=True, help="Which calibration type to set as current.")
@click.option("--name", "calibration_name", required=True, help="Which calibration name to set as current.")
def set_current_calibration(cal_type: str, calibration_name: str):
"""
Mark a specific calibration as 'current' for that calibration type.
"""

# Dispatch to the assistant function for that type
assistant = CALIBRATION_ASSISTANTS.get(cal_type)
if assistant is None:
click.echo(f"No assistant found for calibration type '{cal_type}'.")
raise click.Abort()

assistant = CALIBRATION_ASSISTANTS.get(cal_type)

try:
data = yaml_decode(file.read_bytes(), type=assistant.calibration_struct)
except Exception as e:
click.echo(f"Error reading {file.stem()}: {e}")


with local_persistant_storage("current_calibrations") as c:
c[(cal_type, data.calibration_subtype)] = calibration_name

# TODO: post to leader


@calibration.command(name="delete")
@click.option("--type", "cal_type", required=True, help="Which calibration type to delete from.")
@click.option("--name", "calibration_name", required=True, help="Which calibration name to delete.")
@click.confirmation_option(prompt="Are you sure you want to delete this calibration?")
def delete_calibration(cal_type: str, calibration_name: str):
"""
Delete a calibration file from local storage.
Example usage:
calibration delete --type od --name my_od_cal_v1
"""
target_file = CALIBRATION_PATH / cal_type / f"{calibration_name}.yaml"
if not target_file.exists():
click.echo(f"No such calibration file: {target_file}")
raise click.Abort()


target_file.unlink()
click.echo(f"Deleted calibration '{calibration_name}' of type '{cal_type}'.")
2 changes: 1 addition & 1 deletion pioreactor/cli/pio.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
lazy_subcommands = {
"run": "pioreactor.cli.run.run",
"plugins": "pioreactor.cli.plugins.plugins",
"calibrations": "pioreactor.cli.calibrations.calibration",
}

if whoami.am_I_leader():
# add in ability to control workers
lazy_subcommands["workers"] = "pioreactor.cli.workers.workers"


@click.group(
cls=LazyGroup,
lazy_subcommands=lazy_subcommands,
Expand Down
10 changes: 5 additions & 5 deletions pioreactor/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ class JobRequiredError(Exception):
A job should be running, but isn't found.
"""

class JobPresentError(Exception):
"""
A job shouldn't be running, but is.
"""


class CalibrationError(Exception):
"""
Expand Down Expand Up @@ -67,8 +72,3 @@ class RsyncError(OSError):
Syncing files failed
"""


class JobNotRunningError(Exception):
"""
Required job is not running
"""
Loading

0 comments on commit c9fa9b7

Please sign in to comment.