Skip to content

Commit

Permalink
debugging why my clients keep getting lost
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Dec 7, 2020
1 parent 9dde0b4 commit 5577c4c
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 32 deletions.
2 changes: 1 addition & 1 deletion config.dev.ini
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ observation_database=pioreactor.sqlite3
[logging]
log_file=./pioreactor.log

[network]
[topology]
leader_hostname=localhost


Expand Down
10 changes: 9 additions & 1 deletion config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@ observation_database=/home/pi/db/pioreactor.sqlite
[logging]
log_file=/var/log/pioreactor.log

[network]
[topology]
# assign a unit to be the leader. In solo mode, this can pioreactor1
leader_hostname=leader

[network]
# leader key can be removed if in solo mode.
leader=192.168.0.10
pioreactor1=192.168.0.17
pioreactor2=192.168.0.20
pioreactor3=192.168.0.18

[dashboard]
filtered_lookback_minutes=240
raw_lookback_minutes=240
Expand Down
41 changes: 24 additions & 17 deletions pioreactor/background_jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import signal
import os
import sys
import threading
import atexit
from collections import namedtuple
import logging
Expand Down Expand Up @@ -75,17 +76,22 @@ def disconnect_gracefully(*args):
self.set_state("disconnected")

def exit_python(*args):
self.logger.debug("exit_python")
sys.exit(0)

try:
# signals only work in main thread - and if we set state via MQTT,
# this runs in a thread
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGTERM, disconnect_gracefully)
signal.signal(signal.SIGINT, disconnect_gracefully)
signal.signal(signal.SIGUSR1, exit_python)
except Exception:
# if we set "init" state from MQTT, this code runs in a thread and will fail.
pass
atexit.register(disconnect_gracefully)

atexit.register(disconnect_gracefully)
# if we re-init (via MQTT, close previous threads)
for client in self.pubsub_clients:
client.loop_stop()
client.disconnect()
self.pubsub_clients = []

self.declare_settable_properties_to_broker()
self.start_general_passive_listeners()
Expand All @@ -107,7 +113,7 @@ def disconnected(self):
# disconnect from the passive subscription threads
for client in self.pubsub_clients:
client.loop_stop() # takes a second or two.
client.disconnect()
assert client.disconnect() == 0

# set state to disconnect
self.state = self.DISCONNECTED
Expand Down Expand Up @@ -179,6 +185,7 @@ def publish_attr(self, attr: str) -> None:
)

def start_general_passive_listeners(self) -> None:

last_will = {
"topic": f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/$state",
"payload": self.LOST,
Expand All @@ -188,18 +195,18 @@ def start_general_passive_listeners(self) -> None:

# listen to changes in editable properties
# everyone listens to $BROADCAST (TODO: even leader?)
self.pubsub_clients.append(
subscribe_and_callback(
self.set_attr_from_message,
[
f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/+/set",
f"pioreactor/{UNIVERSAL_IDENTIFIER}/{self.experiment}/{self.job_name}/+/set",
],
qos=QOS.EXACTLY_ONCE,
last_will=last_will,
job_name=self.job_name,
)
client = subscribe_and_callback(
self.set_attr_from_message,
[
f"pioreactor/{self.unit}/{self.experiment}/{self.job_name}/+/set",
f"pioreactor/{UNIVERSAL_IDENTIFIER}/{self.experiment}/{self.job_name}/+/set",
],
qos=QOS.EXACTLY_ONCE,
last_will=last_will,
job_name=self.job_name,
)
client.name = "test"
self.pubsub_clients.append(client)

def check_for_duplicate_process(self):
if (
Expand Down
6 changes: 3 additions & 3 deletions pioreactor/background_jobs/io_controlling.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ def __init__(self, target_growth_rate=None, target_od=None, volume=None, **kwarg
self.set_target_growth_rate(target_growth_rate)
self.target_od = target_od

Kp = config["pid_morbidostat"]["Kp"]
Ki = config["pid_morbidostat"]["Ki"]
Kd = config["pid_morbidostat"]["Kd"]
Kp = float(config["pid_morbidostat"]["Kp"])
Ki = float(config["pid_morbidostat"]["Ki"])
Kd = float(config["pid_morbidostat"]["Kd"])
self.pid = PID(
-Kp,
-Ki,
Expand Down
4 changes: 2 additions & 2 deletions pioreactor/command_line_leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import click

from pioreactor.whoami import am_I_leader, UNIVERSAL_IDENTIFIER
from pioreactor.config import get_units_and_ips

ALL_UNITS = ["1", "2", "3"] # how is this updated when we add new units?
ALL_WORKER_JOBS = [
"stirring",
"growth_rate_calculating",
Expand All @@ -33,7 +33,7 @@ def unit_to_hostname(unit):

def universal_identifier_to_all_units(units):
if units == (UNIVERSAL_IDENTIFIER,):
return ALL_UNITS
return list(get_units_and_ips().keys())
else:
return units

Expand Down
8 changes: 7 additions & 1 deletion pioreactor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ def get_config():


def get_leader_hostname():
return config["network"]["leader_hostname"]
return config["topology"]["leader_hostname"]


def get_units_and_ips():
return dict(
[(unit, ip) for unit, ip in config["network"].items() if unit != "leader"]
)


leader_hostname = get_leader_hostname()
2 changes: 1 addition & 1 deletion pioreactor/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def emit(self, record):

# define a Handler which writes INFO messages or higher to the sys.stderr
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(
logging.Formatter(
"%(asctime)s [%(name)s] %(levelname)-2s %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
Expand Down
8 changes: 3 additions & 5 deletions pioreactor/tests/test_background_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
from pioreactor.whoami import get_unit_from_hostname, get_latest_experiment_name
from pioreactor.pubsub import publish

unit = get_unit_from_hostname()
exp = get_latest_experiment_name()


def pause():
# to avoid race conditions
time.sleep(0.5)


def test_states():
unit = get_unit_from_hostname()
exp = get_latest_experiment_name()

bj = BackgroundJob(job_name="job", unit=unit, experiment=exp)
pause()
Expand All @@ -32,6 +31,5 @@ def test_states():
pause()
assert bj.state == "init"

publish(f"pioreactor/{unit}/{exp}/job/$state/set", "foo")
publish(f"pioreactor/{unit}/{exp}/job/$state/set", "disconnected")
pause()
assert bj.state == "init"
1 change: 0 additions & 1 deletion sql/create_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ CREATE INDEX IF NOT EXISTS od_readings_raw_ix
ON od_readings_raw (experiment, pioreactor_unit, angle);



CREATE TABLE IF NOT EXISTS alt_media_fraction (
timestamp TEXT NOT NULL,
pioreactor_unit TEXT NOT NULL,
Expand Down

0 comments on commit 5577c4c

Please sign in to comment.