Skip to content

Commit

Permalink
kmtronicrelay: add resource and driver
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin B. Frost <[email protected]>
  • Loading branch information
benjamin1313 committed Jan 16, 2025
1 parent fff35ee commit d1e57e1
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 2 deletions.
43 changes: 43 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,25 @@ Arguments:
Used by:
- `GpioDigitalOutputDriver`_

KMTronicRelay
+++++++++++++
A :any:`KMTronicRelay` resource describes a single output of an USB Relay Controller from KMTronic.

.. code-block:: yaml
KMTronicRelay:
index: 2
match:
ID_SERIAL_SHORT: 'AB0LBF2U'
Arguments:
- index (int): number of the relay to use.
- match (dict): key and value pairs for a udev match, see `udev Matching`_

NetworkKMTronicRelay
++++++++++++++++++++
A :any:`NetworkKMTronicRelay` describes an `KMTronicRelay`_ exported over the network.

NetworkService
~~~~~~~~~~~~~~
A :any:`NetworkService` describes a remote SSH connection.
Expand Down Expand Up @@ -2354,6 +2373,30 @@ Implements:
Arguments:
- None

KMTronicRelayDriver
~~~~~~~~~~~~~~~~~~~
A :any:`KMTronicRelayDriver` controls an `KMTronicRelay`_ or `NetworkKMTronicRelay`_ resource.
It can set and get the current state of the resource.

Binds to:
relay:
- `KMTronicRelay`_
- `NetworkKMTronicRelay`_

Implements:
- :any:`DigitalOutputProtocol`

.. code-block:: yaml
KMTronicRelayDriver: {}
Arguments:
- None

.. note::
In order to be able to use this driver pyserial need to be installed on
the system the relay is connected to.

ManualSwitchDriver
~~~~~~~~~~~~~~~~~~
A :any:`ManualSwitchDriver` requires the user to control a switch or jumper on
Expand Down
1 change: 1 addition & 0 deletions labgrid/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@
from .deditecrelaisdriver import DeditecRelaisDriver
from .dediprogflashdriver import DediprogFlashDriver
from .httpdigitaloutput import HttpDigitalOutputDriver
from .kmtronicrelay import KMTronicRelayDriver
44 changes: 44 additions & 0 deletions labgrid/driver/kmtronicrelay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import attr

from .common import Driver
from ..factory import target_factory
from ..step import step
from ..protocol import DigitalOutputProtocol
from ..util.agentwrapper import AgentWrapper
from ..resource.remote import NetworkKMTronicRelay


@target_factory.reg_driver
@attr.s(eq=False)
class KMTronicRelayDriver(Driver, DigitalOutputProtocol):
bindings = {
"relay": {"KMTronicRelay", "NetworkKMTronicRelay"},
}

def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None

def on_activate(self):
if isinstance(self.relay, NetworkKMTronicRelay):
host = self.relay.host
else:
host = None
self.wrapper = AgentWrapper(host)
self.proxy = self.wrapper.load('kmtronic_relay')

def on_deactivate(self):
self.wrapper.close()
self.wrapper = None
self.proxy = None

@Driver.check_active
@step(args=['status'])
def set(self, status):
self.proxy.set(self.relay.path, self.relay.index, status)

@Driver.check_active
@step(result=True)
def get(self):
status = self.proxy.get(self.relay.path, self.relay.index)
return status
5 changes: 3 additions & 2 deletions labgrid/remote/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,8 +891,7 @@ def digital_io(self):
name = self.args.name
target = self._get_target(place)
from ..resource import ModbusTCPCoil, OneWirePIO, HttpDigitalOutput
from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay

from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay, NetworkKMTronicRelay
drv = None
try:
drv = target.get_driver("DigitalOutputProtocol", name=name)
Expand All @@ -912,6 +911,8 @@ def digital_io(self):
drv = self._get_driver_or_new(target, "LXAIOBusPIODriver", name=name)
elif isinstance(resource, NetworkHIDRelay):
drv = self._get_driver_or_new(target, "HIDRelayDriver", name=name)
elif isinstance(resource, NetworkKMTronicRelay):
drv = self._get_driver_or_new(target, "KMTronicRelayDriver", name=name)
if drv:
break

Expand Down
19 changes: 19 additions & 0 deletions labgrid/remote/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,24 @@ def _get_params(self):
"index": self.local.index,
}

@attr.s(eq=False)
class KMTronicRelayExport(USBGenericExport):
"""ResourceExport for outputs on KMtronic relays"""

def __attrs_post_init__(self):
super().__attrs_post_init__()

def _get_params(self):
"""Helper function to return parameters"""
return {
"host": self.host,
"busnum": self.local.busnum,
"devnum": self.local.devnum,
"path": self.local.path,
"vendor_id": self.local.vendor_id,
"model_id": self.local.model_id,
"index": self.local.index,
}

@attr.s(eq=False)
class USBFlashableExport(USBGenericExport):
Expand Down Expand Up @@ -559,6 +577,7 @@ def __attrs_post_init__(self):
exports["USBPowerPort"] = USBPowerPortExport
exports["DeditecRelais8"] = USBDeditecRelaisExport
exports["HIDRelay"] = USBHIDRelayExport
exports["KMTronicRelay"] = KMTronicRelayExport
exports["USBFlashableDevice"] = USBFlashableExport
exports["LXAUSBMux"] = USBGenericExport

Expand Down
1 change: 1 addition & 0 deletions labgrid/resource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
USBSerialPort,
USBTMC,
USBVideo,
KMTronicRelay,
)
from .common import Resource, ResourceManager, ManagedResource
from .ykushpowerport import YKUSHPowerPort, NetworkYKUSHPowerPort
Expand Down
8 changes: 8 additions & 0 deletions labgrid/resource/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,14 @@ def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()

@target_factory.reg_resource
@attr.s(eq=False)
class NetworkKMTronicRelay(RemoteUSBResource):
"""The NetworkKMTronicRelay describes a remotely accessible USB relay port"""
index = attr.ib(default=1, validator=attr.validators.instance_of(int))
def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()

@target_factory.reg_resource
@attr.s(eq=False)
Expand Down
16 changes: 16 additions & 0 deletions labgrid/resource/udev.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,22 @@ def filter_match(self, device):

return super().filter_match(device)

@target_factory.reg_resource
@attr.s(eq=False)
class KMTronicRelay(USBResource):
index = attr.ib(default=1, validator=attr.validators.instance_of(int))

def __attrs_post_init__(self):
self.match['SUBSYSTEM'] = 'tty'
super().__attrs_post_init__()

@property
def path(self):
if self.device is not None:
return self.device.device_node

return None

@target_factory.reg_resource
@attr.s(eq=False)
class USBFlashableDevice(USBResource):
Expand Down
38 changes: 38 additions & 0 deletions labgrid/util/agents/kmtronic_relay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import serial

class USBKMTronicRelay:
def set_output(self, path, index, status):
# second and third is index and status on/off
# \xFF\x01\x00 = turn relay 1 off
# \xFF\x01\x00 = turn relay 1 on
cmd = bytes([255, index, int(status == True)])
with serial.Serial(path, 9600) as ser:
ser.write(cmd)

def get_output(self, path, index):
# \xFF\x01\x03 will read relay 1 status
cmd = bytes([255, index, 3])
with serial.Serial(path, 9600) as ser:
ser.write(cmd)
data = ser.read(3)
return data[2]

_relays = {}

def _get_relay(path):
if (path) not in _relays:
_relays[(path)] = USBKMTronicRelay()
return _relays[(path)]

def handle_set(path, index, status):
relay = _get_relay(path)
relay.set_output(path, index, status)

def handle_get(path, index):
relay = _get_relay(path)
return relay.get_output(path, index)

methods = {
"set": handle_set,
"get": handle_get,
}
5 changes: 5 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ def test_all_modules():
methods = aw.list()
assert 'usb_hid_relay.set' in methods
assert 'usb_hid_relay.get' in methods
aw.load('kmtronix_relay')
methods = aw.list()
assert 'kmtronic_relay.set' in methods
assert 'kmtronic_relay.get' in methods


def test_import_modules():
import labgrid.util.agents
Expand Down
22 changes: 22 additions & 0 deletions tests/test_kmtronicrelay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from labgrid.resource.udev import KMTronicRelay
from labgrid.driver.kmtronicrelay import KMTronicRelayDriver


def test_kmtronicrelay_resource(target):
r = KMTronicRelay(target, name=None, match={"ID_SERIAL_SHORT": "AB0LBF2U"})


def test_kmtronicrelay_driver(target):
r = KMTronicRelay(target, name=None, match={"ID_SERIAL_SHORT": "AB0LBF2U"})
d = KMTronicRelayDriver(target, name=None)
target.activate(d)


def test_kmtronicrelay_control(target):
r = KMTronicRelay(target, name=None, match={"ID_SERIAL_SHORT": "AB0LBF2U"})
d = KMTronicRelayDriver(target, name=None)
target.activate(d)
d.set(1)
assert d.get() == 1
d.set(0)
assert d.get() == 0

0 comments on commit d1e57e1

Please sign in to comment.