Skip to content

Add UART receive interrupt handler #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from

Conversation

alustig3
Copy link
Contributor

@alustig3 alustig3 commented Apr 8, 2025

I have a custom syringe pump that uses UART serial messages to communicate with pyControl. It is sometimes useful to receive messages from the pump, for instance to tell the task that a syringe is empty, so the task will automatically stop.

Previously, to get this working would require constant polling the syringe pump UART to see if there were any messages available. Something like:

from pyControl.utility import *
from devices import PumpController, Breakout_dseries_1_6

bb = Breakout_dseries_1_6()  # breakout board
syringes = PumpController(port=bb.port_11)

states = ["state_A"]
events = ["check_for_message"]

initial_state = "state_A"


def run_start():
    set_timer("check_for_message", 200, output_event=False)


def all_states(event):
    if event == "check_for_message":
        set_timer("check_for_message", 200, output_event=False)
        msg = syringes.read_serial()
        if msg == "syringe_empty":
            stop_framework()
        else:
            print(msg)


def state_A(event):
    pass

This pull request adds a UART_handler class that can be attached to a UART's RX interrupt. Now a pyControl framework event will be output whenever a UART message is received. So the task code now looks something like this:

from pyControl.utility import *
from devices import PumpController, Breakout_dseries_1_6

bb = Breakout_dseries_1_6()  # breakout board
syringes = PumpController(port=bb.port_11, event_name="msg_received")

states = ["state_A"]
events = ["msg_received"]

initial_state = "mystate"


def all_states(event):
    if event == "msg_received":
        msg = syringes.read_serial()
        if msg == "syringe_empty":
            stop_framework()
        else:
            print(msg)


def state_A(event):
    pass

This gets rid of the constantly polling timer and greatly reduces response time.

Here is a look at my PumpController device class to see how it is implemented:

from machine import UART
from pyControl.hardware import UART_handler


class PumpController:
    def __init__(self, port, event_name, pybv1=False):
        assert port.UART is not None, "! Pump needs port with UART."

        self.uart = UART(port.UART)
        self.uart.init(115200, bits=8, parity=None, stop=1, timeout=100, rxbuf=130)

        handler = UART_handler(event_name, first_char_interrupt=pybv1)
        self.uart.irq(trigger=UART.IRQ_RXIDLE, handler=handler.ISR)

        # default pump settings
        self.configure_pump(
            steps_per_rev=200,
            syringe_id_mm=26.7,
            lead_mm=2,
            acceleration=5000,
            max_velocity=180_000,
        )
        self.uart.read()  # clear buffer
        self._send("reset")

        self._left = "l1"
        self._right = "r1"

    def _send(self, command, payload=""):
        self.uart.write(f"{command};{payload}\n")

    def configure_pump(
        self,
        steps_per_rev,
        syringe_id_mm,
        lead_mm,
        acceleration,
        max_velocity,
        pump="all",
    ):
        if pump == "all":
            for pump in ["l1", "l2", "r1", "r2"]:
                self._send(
                    "eval",
                    f"{pump}.configure({steps_per_rev}, {syringe_id_mm}, {lead_mm}, {acceleration}, {max_velocity})",
                )
        else:
            self._send(
                "eval", f"{pump}.configure({steps_per_rev}, {syringe_id_mm}, {lead_mm}, {acceleration}, {max_velocity})"
            )

    def display_animal_number(self, number):
        self._send("animal", number)

    def stop(self):
        self._send("stop")

    def infuse(self, pump, volume):
        self._send("eval", f"{pump}.dispense({volume})")

    def left_infuse(self, volume):
        self.infuse(self._left, volume)

    def right_infuse(self, volume):
        self.infuse(self._right, volume)

    def get_version(self):
        self._send("version")

    def read_serial(self):
        if self.uart.any():
            return self.uart.readline().decode("utf-8").strip("\n")
        return None

This improves the situation described in the docs

Polling may be necessary if you need to respond to events send over a serial connection (e.g. from an external device that communicates via UART). In this case either poll at the lowest frequency necessary to ensure an acceptable response latency to the serial input, or if possible trigger the serial read using a separate digital input from the external device

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant