Skip to content

Commit

Permalink
move od normalization in gr job; script to download ui without git
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Dec 27, 2020
1 parent b561528 commit 41dea34
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 55 deletions.
11 changes: 9 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,18 @@ configure-rpi:


install-ui:
# install NPM and Node
wget -O - https://raw.githubusercontent.com/audstanley/NodeJs-Raspberry-Pi/master/Install-Node.sh | sudo bash
git clone https://github.com/Pioreactor/pioreactorui.git /home/pi/

# get latest pioreactorUI release from Github.
cd /home/pi/
mkdir /home/pi/pioreactorui
curl -L https://api.github.com/repos/pioreactor/pioreactorui/tarball | tar -zxv -C /home/pi/pioreactorui --strip-components=1

# install required libraries
npm --prefix /home/pi/pioreactorui/client install
npm --prefix /home/pi/pioreactorui/backend install
sudo cnpm install pm2@latest -g
sudo npm install pm2@latest -g

install-worker: install-python configure-rpi systemd-worker install-i2c install-pioreactor-worker logging-files

Expand Down
19 changes: 4 additions & 15 deletions config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,20 @@ log_file=/var/log/pioreactor.log
[network.topology]
# should be a hostname defined in [network.ips]
# In solo mode, this can be pioreactor1 (as an example)
leader_hostname=leader
# See docs: https://github.com/Pioreactor/pioreactor/wiki/Leaders,-workers-and-inventory
leader_hostname=pioreactor1

[network.ips]
leader=192.168.0.100
pioreactor1=192.168.0.1
pioreactor2=192.168.0.2
pioreactor3=192.168.0.3


[inventory]
# This controls what's available to be used as workers, i.e. what `pios` will talk to.
# This also controls what shows up in the dashboard as active
# a 1 means available, and a 0 means not available.
# See docs: https://github.com/Pioreactor/pioreactor/wiki/Leaders,-workers-and-inventory
pioreactor1=1
pioreactor2=1
pioreactor3=1
pioreactor4=1
pioreactor5=1
pioreactor6=1
pioreactor7=1
pioreactor8=1
pioreactor9=1
pioreactor10=1
pioreactor11=1
pioreactor12=1


[dashboard.settings]
# changing these requires a power cycle of the leader unit.
Expand Down
7 changes: 4 additions & 3 deletions pioreactor/actions/od_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
logger = logging.getLogger("od_normalization")


def od_normalization(od_angle_channel, unit=None, experiment=None):
def od_normalization(od_angle_channel=None, unit=None, experiment=None, N_samples=35):
logger.info("Starting OD normalization")
if "stirring" not in pio_jobs_running():
logger.error("stirring jobs should be running. Run `mb stirring -b` first.")
raise ValueError("stirring jobs should be running. Run `mb stirring -b` first. ")

if "od_reading" not in pio_jobs_running():
# we sample faster, because we can...
# TODO: write tests for this
assert od_angle_channel is not None
sampling_rate = 0.5
signal = od_reading(od_angle_channel, sampling_rate)
else:
# not tested
# TODO: write tests for this
def yield_from_mqtt():
while True:
msg = pubsub.subscribe(f"pioreactor/{unit}/{experiment}/od_raw_batched")
Expand All @@ -37,7 +39,6 @@ def yield_from_mqtt():

time.sleep(0.5)
readings = defaultdict(list)
N_samples = 35

try:

Expand Down
19 changes: 11 additions & 8 deletions pioreactor/background_jobs/growth_rate_calculating.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import signal
import logging
from collections import defaultdict

import click

Expand All @@ -13,6 +12,7 @@
from pioreactor.whoami import get_unit_name, get_latest_experiment_name
from pioreactor.config import config
from pioreactor.background_jobs.base import BackgroundJob
from pioreactor.actions.od_normalization import od_normalization

JOB_NAME = os.path.splitext(os.path.basename((__file__)))[0]

Expand All @@ -25,6 +25,7 @@ def __init__(self, ignore_cache=False, unit=None, experiment=None):
super(GrowthRateCalculator, self).__init__(
job_name=JOB_NAME, unit=unit, experiment=experiment
)

self.ignore_cache = ignore_cache
self.initial_growth_rate = self.set_initial_growth_rate()
self.od_normalization_factors = self.set_od_normalization_factors()
Expand Down Expand Up @@ -125,31 +126,33 @@ def set_initial_growth_rate(self):
return 0

def set_od_normalization_factors(self):

# we check if the broker has variance/median stats, and if not, run it ourselves.
message = subscribe(
f"pioreactor/{self.unit}/{self.experiment}/od_normalization/median",
timeout=2,
qos=QOS.EXACTLY_ONCE,
)
if message:
if message and not self.ignore_cache:
return self.json_to_sorted_dict(message.payload)
else:
return defaultdict(lambda: 1)
od_normalization(unit=self.unit, experiment=self.experiment)
return self.set_od_normalization_factors()

def set_od_variances(self):

# we check if the broker has variance/median stats, and if not, run it ourselves.
message = subscribe(
f"pioreactor/{self.unit}/{self.experiment}/od_normalization/variance",
timeout=2,
qos=QOS.EXACTLY_ONCE,
)
if message:
if message and not self.ignore_cache:
return self.json_to_sorted_dict(message.payload)
else:
return defaultdict(lambda: 1e-5)
od_normalization(unit=self.unit, experiment=self.experiment)
return self.set_od_normalization_factors()

def update_ekf_variance_after_io_event(self, message):
self.ekf.scale_OD_variance_for_next_n_steps(2e4, 2 * self.samples_per_minute)
self.ekf.scale_OD_variance_for_next_n_steps(2e4, 1 * self.samples_per_minute)

def scale_raw_observations(self, observations):
return {
Expand Down
3 changes: 1 addition & 2 deletions pioreactor/background_jobs/io_controlling.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from pioreactor.actions.remove_waste import remove_waste
from pioreactor.actions.add_alt_media import add_alt_media
from pioreactor.pubsub import publish, subscribe_and_callback, QOS

from pioreactor.utils.timing import RepeatedTimer
from pioreactor.utils.streaming_calculations import PID
from pioreactor.whoami import get_unit_name, get_latest_experiment_name
Expand Down Expand Up @@ -131,7 +130,7 @@ def set_duration(self, value):

def send_details_to_mqtt(self):
publish(
f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/io_details",
f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/io_algorithm_settings",
json.dumps(
{
"pioreactor_unit": self.unit,
Expand Down
15 changes: 9 additions & 6 deletions pioreactor/background_jobs/leader/mqtt_to_db_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ def start_passive_listeners(self):

@click.command(name="mqtt_to_db_streaming")
def click_mqtt_to_db_streaming():
# start the job sending MQTT streams to the database (as defined in config.ini)
# start the job sending MQTT streams to the database
# parsers should return a dict of all the entries in the corresponding table.

def parse_od(topic, payload):
# should return a dict
metadata = produce_metadata(topic)

return {
Expand All @@ -92,7 +93,6 @@ def parse_od(topic, payload):
}

def parse_io_events(topic, payload):
# should return a dict
payload = json.loads(payload)
metadata = produce_metadata(topic)

Expand All @@ -106,7 +106,6 @@ def parse_io_events(topic, payload):
}

def parse_growth_rate(topic, payload):
# should return a dict
metadata = produce_metadata(topic)

return {
Expand Down Expand Up @@ -137,7 +136,6 @@ def parse_pid_logs(topic, payload):
}

def parse_alt_media_fraction(topic, payload):
# should return a dict
metadata = produce_metadata(topic)

return {
Expand All @@ -156,7 +154,7 @@ def parse_logs(topic, payload):
"message": payload.decode(),
}

def parse_experiment_details(topic, payload):
def parse_io_algorithm_settings(topic, payload):
payload = json.loads(payload.decode())
return payload

Expand Down Expand Up @@ -192,6 +190,11 @@ def parse_experiment_details(topic, payload):
"parser": parse_alt_media_fraction,
},
{"topic": "pioreactor/+/+/log", "table": "logs", "parser": parse_logs},
{
"topic": "pioreactor/+/+/io_algorithm_settings",
"table": "io_algorithm_settings",
"parser": parse_io_algorithm_settings,
},
]

streamer = MqttToDBStreamer( # noqa: F841
Expand Down
18 changes: 9 additions & 9 deletions pioreactor/background_jobs/leader/time_series_aggregating.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(
topic,
output_dir,
extract_label,
skip_cache=False,
ignore_cache=False,
job_name=DEFAULT_JOB_NAME, # this is overwritten importantly
record_every_n_seconds=None, # controls how often we should sample data. Ex: growth_rate is ~5min
write_every_n_seconds=None, # controls how often we write to disk. Ex: about 30seconds
Expand All @@ -46,7 +46,7 @@ def __init__(
super(TimeSeriesAggregation, self).__init__(job_name=job_name, **kwargs)
self.topic = topic
self.output_dir = output_dir
self.aggregated_time_series = self.read(skip_cache)
self.aggregated_time_series = self.read(ignore_cache)
self.extract_label = extract_label
self.time_window_seconds = time_window_seconds
self.cache = {}
Expand All @@ -66,8 +66,8 @@ def on_disconnect(self):
def output(self):
return self.output_dir + self.job_name + ".json.gz"

def read(self, skip_cache):
if skip_cache:
def read(self, ignore_cache):
if ignore_cache:
return {"series": [], "data": []}
try:
# try except hell
Expand Down Expand Up @@ -154,7 +154,7 @@ def start_passive_listeners(self):
help="the output directory",
)
@click.option("--skip-cache", is_flag=True, help="skip using the saved data on disk")
def click_time_series_aggregating(output_dir, skip_cache):
def click_time_series_aggregating(output_dir, ignore_cache):
# start the job that aggregates time series data and caches it for the PioreactorUI
unit = get_unit_name()

Expand All @@ -172,7 +172,7 @@ def unit_from_topic(topic):
experiment=UNIVERSAL_EXPERIMENT,
job_name="od_raw_time_series_aggregating",
unit=unit,
skip_cache=skip_cache,
ignore_cache=ignore_cache,
extract_label=single_sensor_label_from_topic,
write_every_n_seconds=30,
time_window_seconds=60
Expand All @@ -186,7 +186,7 @@ def unit_from_topic(topic):
experiment=UNIVERSAL_EXPERIMENT,
job_name="od_filtered_time_series_aggregating",
unit=unit,
skip_cache=skip_cache,
ignore_cache=ignore_cache,
extract_label=single_sensor_label_from_topic,
write_every_n_seconds=15,
time_window_seconds=60
Expand All @@ -200,7 +200,7 @@ def unit_from_topic(topic):
experiment=UNIVERSAL_EXPERIMENT,
job_name="growth_rate_time_series_aggregating",
unit=unit,
skip_cache=skip_cache,
ignore_cache=ignore_cache,
extract_label=unit_from_topic,
write_every_n_seconds=15,
record_every_n_seconds=5 * 60, # TODO: move this to a config param
Expand All @@ -212,7 +212,7 @@ def unit_from_topic(topic):
experiment=UNIVERSAL_EXPERIMENT,
job_name="alt_media_fraction_time_series_aggregating",
unit=unit,
skip_cache=skip_cache,
ignore_cache=ignore_cache,
extract_label=unit_from_topic,
write_every_n_seconds=15,
record_every_n_seconds=1,
Expand Down
58 changes: 54 additions & 4 deletions pioreactor/tests/test_growth_rate_calculating.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,23 @@ def pause():


def test_subscribing(monkeypatch):
publish(f"pioreactor/{unit}/{experiment}/od_normalization/median", None, retain=True)
publish(
f"pioreactor/{unit}/{experiment}/od_normalization/variance", None, retain=True
f"pioreactor/{unit}/{experiment}/od_normalization/median",
'{"135/A": 1, "90/A": 1}',
retain=True,
)
publish(
f"pioreactor/{unit}/{experiment}/od_normalization/variance",
'{"135/A": 1, "90/A": 1}',
retain=True,
)
publish(f"pioreactor/{unit}/{experiment}/growth_rate", None, retain=True)

publish(
f"pioreactor/{unit}/{experiment}/od_raw_batched",
'{"135/A": 0.778586260567034, "90/A": 0.20944389172032837}',
retain=True,
)
publish(f"pioreactor/{unit}/{experiment}/growth_rate", 1.0, retain=True)
calc = GrowthRateCalculator(unit=unit, experiment=experiment)
pause()
Expand Down Expand Up @@ -58,9 +69,15 @@ def test_subscribing(monkeypatch):


def test_same_angles(monkeypatch):
publish(f"pioreactor/{unit}/{experiment}/od_normalization/median", None, retain=True)
publish(
f"pioreactor/{unit}/{experiment}/od_normalization/variance", None, retain=True
f"pioreactor/{unit}/{experiment}/od_normalization/median",
'{"135/A": 1, "135/B": 1, "90/A": 1}',
retain=True,
)
publish(
f"pioreactor/{unit}/{experiment}/od_normalization/variance",
'{"135/A": 1, "135/B":1, "90/A": 1}',
retain=True,
)
publish(f"pioreactor/{unit}/{experiment}/growth_rate", None, retain=True)

Expand All @@ -86,6 +103,18 @@ def test_same_angles(monkeypatch):


def test_mis_shapen_data(monkeypatch):

publish(
f"pioreactor/{unit}/{experiment}/od_normalization/median",
'{"135/A": 1, "90/A": 1}',
retain=True,
)
publish(
f"pioreactor/{unit}/{experiment}/od_normalization/variance",
'{"135/A": 1, "90/A": 1}',
retain=True,
)

publish(
f"pioreactor/{unit}/{experiment}/od_raw_batched",
'{"135/A": 0.778586260567034, "90/A": 0.1}',
Expand All @@ -107,6 +136,16 @@ def test_mis_shapen_data(monkeypatch):


def test_restart():
publish(
f"pioreactor/{unit}/{experiment}/od_normalization/median",
'{"135/A": 1, "135/B": 1, "90/A": 1}',
retain=True,
)
publish(
f"pioreactor/{unit}/{experiment}/od_normalization/variance",
'{"135/A": 1, "135/B": 1, "90/A": 1}',
retain=True,
)
publish(
f"pioreactor/{unit}/{experiment}/od_raw_batched",
'{"135/A": 0.778586260567034, "135/B": 0.20944389172032837, "90/A": 0.1}',
Expand Down Expand Up @@ -152,6 +191,17 @@ def test_restart():


def test_skip_180():
publish(
f"pioreactor/{unit}/{experiment}/od_normalization/median",
'{"135/A": 1, "180/A": 1, "90/A": 1}',
retain=True,
)
publish(
f"pioreactor/{unit}/{experiment}/od_normalization/variance",
'{"135/A": 1, "180/A": 1, "90/A": 1}',
retain=True,
)

publish(f"pioreactor/{unit}/{experiment}/growth_rate", None, retain=True)
publish(
f"pioreactor/{unit}/{experiment}/od_raw_batched",
Expand Down
Loading

0 comments on commit 41dea34

Please sign in to comment.