We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Assume there is available DO or pH measurements over i2c. Users would like to do the following:
pH and DO can be two different plugins. Below, I'll use pH, but DO should like very similar.
Open up pio db and paste the following:
pio db
CREATE TABLE IF NOT EXISTS pH_readings ( experiment TEXT NOT NULL, pioreactor_unit TEXT NOT NULL, timestamp TEXT NOT NULL, pH_reading REAL NOT NULL ); CREATE INDEX IF NOT EXISTS pH_measurements_ix ON pH_readings (experiment);
.q to quit.
.q
~/.pioreactor/plugins/ui/contrib/charts/ph.yaml
ph_readings=1
[ui.overview.charts]
--- data_source: pH_readings # SQL table data_source_column: ph_reading title: pH mqtt_topic: pH_reading/pH chart_key: ph_readings source: pH_readings_plugin y_axis_label: pH interpolation: stepAfter y_axis_domain: [6, 8] lookback: 100000 fixed_decimals: 2
Add the following to ~/.pioreactor/plugins/ph_reading.py
~/.pioreactor/plugins/ph_reading.py
# -*- coding: utf-8 -*- import json import click import busio from time import sleep from pioreactor.whoami import get_unit_name, get_assigned_experiment_name from pioreactor.config import config from pioreactor.background_jobs.base import BackgroundJobContrib from pioreactor.utils import clamp from pioreactor.utils.timing import RepeatedTimer from pioreactor import hardware from pioreactor.background_jobs.leader.mqtt_to_db_streaming import produce_metadata from pioreactor.background_jobs.leader.mqtt_to_db_streaming import register_source_to_sink from pioreactor.background_jobs.leader.mqtt_to_db_streaming import TopicToParserToTable from pioreactor import types as pt def __dir__(): return ['click_pH_reading'] def parser(topic, payload) -> dict: metadata = produce_metadata(topic) return { "experiment": metadata.experiment, "pioreactor_unit": metadata.pioreactor_unit, "timestamp": timing.current_utc_timestamp(), "pH_reading": float(payload), } register_source_to_sink( TopicToParserToTable( ["pioreactor/+/+/pH_reading/pH"], parser, "pH_readings", ) ) class PHReader(BackgroundJobContrib): job_name="ph_reading" published_settings = { "pH": {"datatype": "float", "settable": False}, } def __init__(self, unit, experiment, **kwargs) -> None: super().__init__(unit=unit, experiment=experiment, plugin_name="ph_reading", **kwargs) time_between_readings = config.getfloat("ph_reading.config", "time_between_readings") assert time_between_readings >= 2.0 self.i2c_channel = int(config.get("ph_reading.config", "i2c_channel_hex"), base=16) self.i2c = busio.I2C(3,2) self.timer_thread = RepeatedTimer(time_between_readings, self.read_pH, job_name=self.job_name, run_immediately=True).start() def read_pH(self): samples = 2 running_sum = 0.0 for _ in range(samples): running_sum += self.query("R") sleep(0.05) self.pH = running_sum/samples return pH def on_ready_to_sleeping(self) -> None: self.timer_thread.pause() def on_sleeping_to_ready(self) -> None: self.timer_thread.unpause() def on_disconnect(self) -> None: self.timer_thread.cancel() def write(self, cmd): cmd_bytes = bytes(cmd + "\x00", "latin-1") # Null-terminated command self.i2c.writeto(self.i2c_channel, cmd_bytes) @staticmethod def handle_raspi_glitch(response): return [chr(x & ~0x80) for x in response] def read(self, num_of_bytes=31): result = bytearray(num_of_bytes) self.i2c.readfrom_into(self.i2c_channel, result) response = self.get_response(result) char_list = self.handle_raspi_glitch(response[1:]) return float(''.join(char_list)) def query(self, command): self.write(command) current_timeout = 1.5 sleep(current_timeout) return self.read() @staticmethod def get_response(raw_data): return [i for i in raw_data if i != 0] @click.command(name="pH_reading") def click_pH_reading(): """ Start pH reading """ unit = get_unit_name() job = PHReader( unit=unit, experiment=get_assigned_experiment_name(unit), ) job.block_until_disconnected()
The text was updated successfully, but these errors were encountered:
No branches or pull requests
Assume there is available DO or pH measurements over i2c. Users would like to do the following:
pH and DO can be two different plugins. Below, I'll use pH, but DO should like very similar.
Database table
Open up
pio db
and paste the following:.q
to quit.Chart yaml for UI
~/.pioreactor/plugins/ui/contrib/charts/ph.yaml
ph_readings=1
under[ui.overview.charts]
in your configuration.pH measurement reading job
Add the following to
~/.pioreactor/plugins/ph_reading.py
Dosing automation to target a pH
Open questions
The text was updated successfully, but these errors were encountered: