Skip to content

Commit

Permalink
protocol/driver: add DigitalInputProtocol, GpioDigitalInputDriver
Browse files Browse the repository at this point in the history
this adds a new protocol `DigitalInputProtocol` and a GPIO driver
that implements it.

The `DigitalOutputProtocol` ABC now also inherits it, ensuring it can be
used in all existing driver implementations.

Additionally the client now automatically uses `DigitalInputProtocol`
when the command is `io get`.

By seperating the input from output protocol it is possible
to make sure the GPIO line is configured in direction `input` in sysfs,
which avoids shorts when reading out digital states  from DUT's.
Otherwise, one would have to rely on the circuitry of the GPIO line.
  • Loading branch information
flxzt committed Jul 31, 2024
1 parent 8599a03 commit dfed151
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 38 deletions.
26 changes: 25 additions & 1 deletion doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2174,9 +2174,33 @@ Implements:
Arguments:
- delay (float, default=2.0): delay in seconds between off and on

GpioDigitalInputDriver
~~~~~~~~~~~~~~~~~~~~~~
The :any:`GpioDigitalInputDriver` reads a digital signal from a GPIO line.

This driver configures GPIO lines via `the sysfs kernel interface <https://www.kernel.org/doc/html/latest/gpio/sysfs.html>`
as an input.


Binds to:
gpio:
- `SysfsGPIO`_
- `MatchedSysfsGPIO`_
- NetworkSysfsGPIO

Implements:
- :any:`DigitalInputProtocol`

.. code-block:: yaml
GpioDigitalInputDriver: {}
Arguments:
- None

GpioDigitalOutputDriver
~~~~~~~~~~~~~~~~~~~~~~~
The :any:`GpioDigitalOutputDriver` writes a digital signal to a GPIO line.
The :any:`GpioDigitalOutputDriver` reads and writes a digital signal from and to a GPIO line.

This driver configures GPIO lines via `the sysfs kernel interface <https://www.kernel.org/doc/html/latest/gpio/sysfs.html>`.
While the driver automatically exports the GPIO, it does not configure it in any other way than as an output.
Expand Down
2 changes: 1 addition & 1 deletion labgrid/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .sigrokdriver import SigrokDriver, SigrokPowerDriver, SigrokDmmDriver
from .usbstoragedriver import USBStorageDriver, NetworkUSBStorageDriver, Mode
from .resetdriver import DigitalOutputResetDriver
from .gpiodriver import GpioDigitalOutputDriver
from .gpiodriver import GpioDigitalInputDriver, GpioDigitalOutputDriver
from .filedigitaloutput import FileDigitalOutputDriver
from .serialdigitaloutput import SerialPortDigitalOutputDriver
from .xenadriver import XenaDriver
Expand Down
33 changes: 32 additions & 1 deletion labgrid/driver/gpiodriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,44 @@
import attr

from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..protocol import DigitalInputProtocol, DigitalOutputProtocol
from ..resource.remote import NetworkSysfsGPIO
from ..step import step
from .common import Driver
from ..util.agentwrapper import AgentWrapper


@target_factory.reg_driver
@attr.s(eq=False)
class GpioDigitalInputDriver(Driver, DigitalInputProtocol):

bindings = {
"gpio": {"SysfsGPIO", "NetworkSysfsGPIO"},
}

def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None

def on_activate(self):
if isinstance(self.gpio, NetworkSysfsGPIO):
host = self.gpio.host
else:
host = None
self.wrapper = AgentWrapper(host)
self.proxy = self.wrapper.load('sysfsgpioin')

def on_deactivate(self):
self.wrapper.close()
self.wrapper = None
self.proxy = None

@Driver.check_active
@step(result=True)
def get(self):
return self.proxy.get(self.gpio.index)


@target_factory.reg_driver
@attr.s(eq=False)
class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol):
Expand Down
1 change: 1 addition & 0 deletions labgrid/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .filetransferprotocol import FileTransferProtocol
from .infoprotocol import InfoProtocol
from .digitaloutputprotocol import DigitalOutputProtocol
from .digitalinputprotocol import DigitalInputProtocol
from .mmioprotocol import MMIOProtocol
from .filesystemprotocol import FileSystemProtocol
from .resetprotocol import ResetProtocol
Expand Down
10 changes: 10 additions & 0 deletions labgrid/protocol/digitalinputprotocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import abc


class DigitalInputProtocol(abc.ABC):
"""Abstract class providing the DigitalInputProtocol interface"""

@abc.abstractmethod
def get(self):
"""Implementations should return the status of the digital input."""
raise NotImplementedError
11 changes: 4 additions & 7 deletions labgrid/protocol/digitaloutputprotocol.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import abc
from .digitalinputprotocol import DigitalInputProtocol


class DigitalOutputProtocol(abc.ABC):
"""Abstract class providing the DigitalOutputProtocol interface"""

@abc.abstractmethod
def get(self):
"""Implementations should return the status of the digital output."""
raise NotImplementedError
class DigitalOutputProtocol(DigitalInputProtocol):
"""Abstract class providing the DigitalOutputProtocol interface.
Implies that the set output can be read as well, so requires DigitalInputProtocol"""

@abc.abstractmethod
def set(self, status):
Expand Down
83 changes: 55 additions & 28 deletions labgrid/remote/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,36 +770,63 @@ def digital_io(self):
from ..resource import ModbusTCPCoil, OneWirePIO, HttpDigitalOutput
from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay

drv = None
try:
drv = target.get_driver("DigitalOutputProtocol", name=name)
except NoDriverFoundError:
for resource in target.resources:
if isinstance(resource, ModbusTCPCoil):
drv = self._get_driver_or_new(target, "ModbusCoilDriver", name=name)
elif isinstance(resource, OneWirePIO):
drv = self._get_driver_or_new(target, "OneWirePIODriver", name=name)
elif isinstance(resource, HttpDigitalOutput):
drv = self._get_driver_or_new(target, "HttpDigitalOutputDriver", name=name)
elif isinstance(resource, NetworkDeditecRelais8):
drv = self._get_driver_or_new(target, "DeditecRelaisDriver", name=name)
elif isinstance(resource, NetworkSysfsGPIO):
drv = self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name)
elif isinstance(resource, NetworkLXAIOBusPIO):
drv = self._get_driver_or_new(target, "LXAIOBusPIODriver", name=name)
elif isinstance(resource, NetworkHIDRelay):
drv = self._get_driver_or_new(target, "HIDRelayDriver", name=name)
if drv:
break

if not drv:
raise UserError("target has no compatible resource available")
if action == "get":
drv = None
try:
drv = target.get_driver("DigitalInputProtocol", name=name)
except NoDriverFoundError:
for resource in target.resources:
if isinstance(resource, ModbusTCPCoil):
drv = self._get_driver_or_new(target, "ModbusCoilDriver", name=name)
elif isinstance(resource, OneWirePIO):
drv = self._get_driver_or_new(target, "OneWirePIODriver", name=name)
elif isinstance(resource, HttpDigitalOutput):
drv = self._get_driver_or_new(target, "HttpDigitalOutputDriver", name=name)
elif isinstance(resource, NetworkDeditecRelais8):
drv = self._get_driver_or_new(target, "DeditecRelaisDriver", name=name)
elif isinstance(resource, NetworkSysfsGPIO):
drv = self._get_driver_or_new(target, "GpioDigitalInputDriver", name=name)
elif isinstance(resource, NetworkLXAIOBusPIO):
drv = self._get_driver_or_new(target, "LXAIOBusPIODriver", name=name)
elif isinstance(resource, NetworkHIDRelay):
drv = self._get_driver_or_new(target, "HIDRelayDriver", name=name)
if drv:
break

if not drv:
raise UserError("target has no compatible resource available")

print(f"digital IO{' ' + name if name else ''} for place {place.name} is {'high' if drv.get() else 'low'}")
elif action == "high":
drv.set(True)
elif action == "low":
drv.set(False)
else:
drv = None
try:
drv = target.get_driver("DigitalOutputProtocol", name=name)
except NoDriverFoundError:
for resource in target.resources:
if isinstance(resource, ModbusTCPCoil):
drv = self._get_driver_or_new(target, "ModbusCoilDriver", name=name)
elif isinstance(resource, OneWirePIO):
drv = self._get_driver_or_new(target, "OneWirePIODriver", name=name)
elif isinstance(resource, HttpDigitalOutput):
drv = self._get_driver_or_new(target, "HttpDigitalOutputDriver", name=name)
elif isinstance(resource, NetworkDeditecRelais8):
drv = self._get_driver_or_new(target, "DeditecRelaisDriver", name=name)
elif isinstance(resource, NetworkSysfsGPIO):
drv = self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name)
elif isinstance(resource, NetworkLXAIOBusPIO):
drv = self._get_driver_or_new(target, "LXAIOBusPIODriver", name=name)
elif isinstance(resource, NetworkHIDRelay):
drv = self._get_driver_or_new(target, "HIDRelayDriver", name=name)
if drv:
break

if not drv:
raise UserError("target has no compatible resource available")

if action == "high":
drv.set(True)
elif action == "low":
drv.set(False)

async def _console(self, place, target, timeout, *, logfile=None, loop=False, listen_only=False):
name = self.args.name
Expand Down
72 changes: 72 additions & 0 deletions labgrid/util/agents/sysfsgpioin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
This module implements reading GPIOs via sysfs GPIO kernel interface.
Takes an integer property 'index' which refers to the already exported GPIO device.
"""
import logging
import os

class GpioDigitalInput:
_gpio_sysfs_path_prefix = '/sys/class/gpio'

@staticmethod
def _assert_gpio_line_is_exported(index):
gpio_sysfs_path = os.path.join(GpioDigitalInput._gpio_sysfs_path_prefix,
f'gpio{index}')
# Deprecated: the exporter can export on acquire, we are leaving this
# in for now to support exporters which have not been updated yet.
if not os.path.exists(gpio_sysfs_path):
export_sysfs_path = os.path.join(GpioDigitalInput._gpio_sysfs_path_prefix, 'export')
with open(export_sysfs_path, mode='wb') as export:
export.write(str(index).encode('utf-8'))
if not os.path.exists(gpio_sysfs_path):
raise ValueError("Device not found")

def __init__(self, index):
self._logger = logging.getLogger("Device: ")
GpioDigitalInput._assert_gpio_line_is_exported(index)
gpio_sysfs_path = os.path.join(GpioDigitalInput._gpio_sysfs_path_prefix,
f'gpio{index}')

gpio_sysfs_direction_path = os.path.join(gpio_sysfs_path, 'direction')
with open(gpio_sysfs_direction_path, 'rb') as direction_fd:
literal_value = direction_fd.read(2)
if literal_value != b"in":
self._logger.debug("Configuring GPIO %d as input.", index)
with open(gpio_sysfs_direction_path, 'wb') as direction_fd:
direction_fd.write(b'in')

gpio_sysfs_value_path = os.path.join(gpio_sysfs_path, 'value')
self.gpio_sysfs_value_fd = os.open(gpio_sysfs_value_path, flags=(os.O_RDONLY | os.O_SYNC))

def __del__(self):
os.close(self.gpio_sysfs_value_fd)
self.gpio_sysfs_value_fd = None

def get(self):
os.lseek(self.gpio_sysfs_value_fd, 0, os.SEEK_SET)
literal_value = os.read(self.gpio_sysfs_value_fd, 1)
if literal_value == b'0':
return False
elif literal_value == b'1':
return True
raise ValueError("GPIO value is out of range.")


_gpios = {}

def _get_gpio_line(index):
if index not in _gpios:
_gpios[index] = GpioDigitalInput(index=index)
return _gpios[index]


def handle_get(index):
gpio_line = _get_gpio_line(index)
return gpio_line.get()


methods = {
'get': handle_get,
}
3 changes: 3 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def test_all_modules():
methods = aw.list()
assert 'sysfsgpio.set' in methods
assert 'sysfsgpio.get' in methods
aw.load('sysfsgpioin')
methods = aw.list()
assert 'sysfsgpio.get' in methods
aw.load('usb_hid_relay')
methods = aw.list()
assert 'usb_hid_relay.set' in methods
Expand Down

0 comments on commit dfed151

Please sign in to comment.