Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for KMTronic usb relay controller #1568

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,33 @@ Arguments:
Used by:
- `GpioDigitalOutputDriver`_

KMTronicRelay
+++++++++++++
A :any:`KMTronicRelay` resource describes a single output of an USB Relay Controller from KMTronic.

.. code-block:: yaml

KMTronicRelay:
index: 2
ports: 8
match:
ID_SERIAL_SHORT: 'AB0LBF2U'

Arguments:
- index (int, default=1): number on the relay to use.
- ports: (int, default=1): number of ports on the relay.
- match (dict): key and value pairs for a udev match, see `udev Matching`_

.. note::
IMPORTANT
Set ports=8 if you are using a 8 relay controller. if not the default=1 is fine.
The reason for this is the 8 relay controller does not read the state of relays the same
way as the other controllers.

NetworkKMTronicRelay
++++++++++++++++++++
A :any:`NetworkKMTronicRelay` describes an `KMTronicRelay`_ exported over the network.

NetworkService
~~~~~~~~~~~~~~
A :any:`NetworkService` describes a remote SSH connection.
Expand Down Expand Up @@ -2354,6 +2381,30 @@ Implements:
Arguments:
- None

KMTronicRelayDriver
~~~~~~~~~~~~~~~~~~~
A :any:`KMTronicRelayDriver` controls an `KMTronicRelay`_ or `NetworkKMTronicRelay`_ resource.
It can set and get the current state of the resource.

Binds to:
relay:
- `KMTronicRelay`_
- `NetworkKMTronicRelay`_

Implements:
- :any:`DigitalOutputProtocol`

.. code-block:: yaml

KMTronicRelayDriver: {}

Arguments:
- None

.. note::
In order to be able to use this driver pyserial need to be installed on
the system the relay is connected to.

ManualSwitchDriver
~~~~~~~~~~~~~~~~~~
A :any:`ManualSwitchDriver` requires the user to control a switch or jumper on
Expand Down
7 changes: 7 additions & 0 deletions examples/kmtronic/kmtronic-exporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
main:
location: Work Desk
KMTronicRelay:
index: 3
ports: 8
match:
ID_SERIAL_SHORT: 'AB0LBF2U'
10 changes: 10 additions & 0 deletions examples/kmtronic/kmtronic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
targets:
main:
resources:
KMTronicRelay:
index: 2

drivers:
KMTronicRelayDriver: {}
DigitalOutputPowerDriver:
delay: 2.0
1 change: 1 addition & 0 deletions labgrid/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@
from .deditecrelaisdriver import DeditecRelaisDriver
from .dediprogflashdriver import DediprogFlashDriver
from .httpdigitaloutput import HttpDigitalOutputDriver
from .kmtronicrelay import KMTronicRelayDriver
44 changes: 44 additions & 0 deletions labgrid/driver/kmtronicrelay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import attr

from .common import Driver
from ..factory import target_factory
from ..step import step
from ..protocol import DigitalOutputProtocol
from ..util.agentwrapper import AgentWrapper
from ..resource.remote import NetworkKMTronicRelay


@target_factory.reg_driver
@attr.s(eq=False)
class KMTronicRelayDriver(Driver, DigitalOutputProtocol):
bindings = {
"relay": {"KMTronicRelay", "NetworkKMTronicRelay"},
}

def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None

def on_activate(self):
if isinstance(self.relay, NetworkKMTronicRelay):
host = self.relay.host
else:
host = None
self.wrapper = AgentWrapper(host)
self.proxy = self.wrapper.load('kmtronic_relay')

def on_deactivate(self):
self.wrapper.close()
self.wrapper = None
self.proxy = None

@Driver.check_active
@step(args=['status'])
def set(self, status):
self.proxy.set(self.relay.path, self.relay.index, status)

@Driver.check_active
@step(result=True)
def get(self):
status = self.proxy.get(self.relay.path, self.relay.index, self.relay.ports)
return status
5 changes: 3 additions & 2 deletions labgrid/remote/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,8 +898,7 @@ def digital_io(self):
name = self.args.name
target = self._get_target(place)
from ..resource import ModbusTCPCoil, OneWirePIO, HttpDigitalOutput
from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay

from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay, NetworkKMTronicRelay
drv = None
try:
drv = target.get_driver("DigitalOutputProtocol", name=name)
Expand All @@ -919,6 +918,8 @@ def digital_io(self):
drv = self._get_driver_or_new(target, "LXAIOBusPIODriver", name=name)
elif isinstance(resource, NetworkHIDRelay):
drv = self._get_driver_or_new(target, "HIDRelayDriver", name=name)
elif isinstance(resource, NetworkKMTronicRelay):
drv = self._get_driver_or_new(target, "KMTronicRelayDriver", name=name)
if drv:
break

Expand Down
20 changes: 20 additions & 0 deletions labgrid/remote/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,25 @@ def _get_params(self):
"index": self.local.index,
}

@attr.s(eq=False)
class KMTronicRelayExport(USBGenericExport):
"""ResourceExport for outputs on KMTronic relays"""

def __attrs_post_init__(self):
super().__attrs_post_init__()

def _get_params(self):
"""Helper function to return parameters"""
return {
"host": self.host,
"busnum": self.local.busnum,
"devnum": self.local.devnum,
"path": self.local.path,
"vendor_id": self.local.vendor_id,
"model_id": self.local.model_id,
"index": self.local.index,
"ports": self.local.ports,
}

@attr.s(eq=False)
class USBFlashableExport(USBGenericExport):
Expand Down Expand Up @@ -567,6 +586,7 @@ def __attrs_post_init__(self):
exports["USBPowerPort"] = USBPowerPortExport
exports["DeditecRelais8"] = USBDeditecRelaisExport
exports["HIDRelay"] = USBHIDRelayExport
exports["KMTronicRelay"] = KMTronicRelayExport
exports["USBFlashableDevice"] = USBFlashableExport
exports["LXAUSBMux"] = USBGenericExport

Expand Down
1 change: 1 addition & 0 deletions labgrid/resource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
USBSerialPort,
USBTMC,
USBVideo,
KMTronicRelay,
)
from .common import Resource, ResourceManager, ManagedResource
from .ykushpowerport import YKUSHPowerPort, NetworkYKUSHPowerPort
Expand Down
10 changes: 10 additions & 0 deletions labgrid/resource/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,16 @@ def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()

@target_factory.reg_resource
@attr.s(eq=False)
class NetworkKMTronicRelay(RemoteUSBResource):
"""The NetworkKMTronicRelay describes a remotely accessible USB relay port"""
index = attr.ib(default=1, validator=attr.validators.instance_of(int))
ports = attr.ib(default=1, validator=attr.validators.instance_of(int))

def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()

@target_factory.reg_resource
@attr.s(eq=False)
Expand Down
17 changes: 17 additions & 0 deletions labgrid/resource/udev.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,23 @@ def filter_match(self, device):

return super().filter_match(device)

@target_factory.reg_resource
@attr.s(eq=False)
class KMTronicRelay(USBResource):
index = attr.ib(default=1, validator=attr.validators.instance_of(int))
ports = attr.ib(default=1, validator=attr.validators.instance_of(int))

def __attrs_post_init__(self):
self.match['SUBSYSTEM'] = 'tty'
super().__attrs_post_init__()

@property
def path(self):
if self.device is not None:
return self.device.device_node

return None

@target_factory.reg_resource
@attr.s(eq=False)
class USBFlashableDevice(USBResource):
Expand Down
54 changes: 54 additions & 0 deletions labgrid/util/agents/kmtronic_relay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import serial

class USBKMTronicRelay:
def set_output(self, path, index, status):
# second and third is index and status on/off
# \xFF\x01\x00 = turn relay 1 off
# \xFF\x01\x01 = turn relay 1 on
cmd = bytes([255, index, int(status == True)])
with serial.Serial(path, 9600) as ser:
ser.write(cmd)

def get_output(self, path, index, ports):
Copy link

@cidlik cidlik Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It breaks the working with USB One Relay https://forums.raspberrypi.com/viewtopic.php?t=125621
I don't have four channel relay for testing, and link from PR description is broken, thus I don't know the commands.
If commands differ for one and four channels relays, I suggest to add branch:

        if ports == 1:
            # Branch for KMTronic USB One Relay
            cmd = bytes([255, 1, 3])
            with serial.Serial(path, 9600) as ser:
                ser.write(cmd)
                data = ser.read(3)
            return data[2]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there has been made some changes on the site. Here is a new link https://kmtronic.com/product/2786/usb-relay-controller-four-channel-box.html

Originally i used cmd = bytes([255, index, 3]) This is a single read command and only reads from the specified relay.
This works on the 4 port kmtronic I have primarily been testing on. But when a client i'm making this for tried on a 8 port kmtronic this did not work. After writing to kmtronic we found the 8 port version does not support single reads. We where told a read all command could be used. FF 09 00 can be used to get the state from all relays on the kmtronic,
Returned as xx xx xx xx xx xx xx xx where 01 is on and 00 is off. Sadly it seems there are problems with the links to the docs where this is noted.

Now i don't have a 1 or 2 port kmtronic to test on those. but my hope was the FF 09 00 command to read from all would work on the 1 and 2 port as well. If you have a 1 port are you able to test this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but my hope was the FF 09 00 command to read from all would work on the 1 and 2 port as well.

My case is inverse, I only have one channel relay. And I couldn't work with it.
I tried to add the conditional branch from the previous comment, and it works! FF 09 00 command is ignored, driver hangs on in reading state (there is no response from the device?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, there's not much to be done about that. Thanks for letting me know.
Do you know if the two port has support for reading from a single relay and or reading from all?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, no, I don't know. The only thing that could have helped me, it's a page [1]. But it doesn't have any information about reading.
I would offer you to add extra branch and to print warning or to raise an exception like "we don't know how it works with 2-channel relay".

[1] https://kmtronic.com/product/2783/usb-relay-controller-two-channels.html

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote an email to KMTronic Technical support. And they tell me only the FF 0x 03 read command is supported on the usb 2 relay controller. Seems going back to FF 0x 03 being the default is the way to go. Then if port is set to something other that 1 it tries to read from that number of ports using FF 09 00

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe with a warnning on 2 port since it is not supported on that one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cidlik would you be able to check if the new changes work on the 1 relay controller?

Copy link

@cidlik cidlik Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it. Verified, it works. TY

# \xFF\x01\x03 will read relay 1 status
# \xFF\x09\x00 will read from all relays, only works on 4 and 8 relay controller
cmd = bytes([255, index, 3])
if ports > 2:
cmd = bytes([255, 9, 0])
with serial.Serial(path, 9600, timeout=10) as ser:
ser.write(cmd)
if ports > 2:
data = ser.read(ports)
if data[0] == 255:
print("WARNING: Unexpected return value from KMTronic relay.")
print("Make sure to configure ports correctly in your config.2")
return -1
state = data[index-1]
else:
data = ser.read(3)
if data[0] != 255:
print("WARNING: Unexpected return value from KMTronic relay.")
print("Make sure to configure ports correctly in your config.1")
return -1
state = data[2]
return state

_relays = {}

def _get_relay(path):
if (path) not in _relays:
_relays[(path)] = USBKMTronicRelay()
return _relays[(path)]

def handle_set(path, index, status):
relay = _get_relay(path)
relay.set_output(path, index, status)

def handle_get(path, index, ports):
relay = _get_relay(path)
return relay.get_output(path, index, ports)

methods = {
"set": handle_set,
"get": handle_get,
}
5 changes: 5 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ def test_all_modules():
methods = aw.list()
assert 'usb_hid_relay.set' in methods
assert 'usb_hid_relay.get' in methods
aw.load('kmtronic_relay')
methods = aw.list()
assert 'kmtronic_relay.set' in methods
assert 'kmtronic_relay.get' in methods


def test_import_modules():
import labgrid.util.agents
Expand Down
22 changes: 22 additions & 0 deletions tests/test_kmtronicrelay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from labgrid.resource.udev import KMTronicRelay
from labgrid.driver.kmtronicrelay import KMTronicRelayDriver


def test_kmtronicrelay_resource(target):
r = KMTronicRelay(target, name=None, match={"ID_SERIAL_SHORT": "AB0LBF2U"}, index=2)


def test_kmtronicrelay_driver(target):
r = KMTronicRelay(target, name=None, match={"ID_SERIAL_SHORT": "AB0LBF2U"}, index=2)
d = KMTronicRelayDriver(target, name=None)
target.activate(d)


def test_kmtronicrelay_control(target):
r = KMTronicRelay(target, name=None, match={"ID_SERIAL_SHORT": "AB0LBF2U"}, index=2, ports=4)
d = KMTronicRelayDriver(target, name=None)
target.activate(d)
d.set(1)
assert d.get() == 1
d.set(0)
assert d.get() == 0