From c122e0603d6214c87d8b7343642be5d0aa40e28a Mon Sep 17 00:00:00 2001 From: KB Sriram Date: Sat, 9 Mar 2024 07:11:07 -0800 Subject: [PATCH] Add I2C clock stretching. - Also fixed a corner case with I2C checking whether the device acknowledged sent data. - Added tests to cover most of the major operations. Fixes https://github.com/adafruit/Adafruit_CircuitPython_BitbangIO/issues/16 --- adafruit_bitbangio.py | 13 +- tests/simulated_i2c.py | 209 +++++++++++++++++++++++++++ tests/test_adafruit_bitbangio_i2c.py | 188 ++++++++++++++++++++++++ 3 files changed, 406 insertions(+), 4 deletions(-) create mode 100644 tests/simulated_i2c.py create mode 100644 tests/test_adafruit_bitbangio_i2c.py diff --git a/adafruit_bitbangio.py b/adafruit_bitbangio.py index d5a30c2..10bf6f5 100644 --- a/adafruit_bitbangio.py +++ b/adafruit_bitbangio.py @@ -190,13 +190,17 @@ def _sda_low(self) -> None: self._sda.switch_to_output(value=False) def _scl_release(self) -> None: - """Release and let the pullups lift""" - # Use self._timeout to add clock stretching + """Release and wait for the pullups to lift.""" self._scl.switch_to_input() + # Wait at most self._timeout seconds for any clock stretching. + end = monotonic() + self._timeout + while not self._scl.value and end > monotonic(): + pass + if not self._scl.value: + raise RuntimeError("Bus timed out.") def _sda_release(self) -> None: """Release and let the pullups lift""" - # Use self._timeout to add clock stretching self._sda.switch_to_input() def _start(self) -> None: @@ -288,7 +292,8 @@ def _write(self, address: int, buffer: ReadableBuffer, transmit_stop: bool) -> N # raise RuntimeError("Device not responding at 0x{:02X}".format(address)) raise RuntimeError(f"Device not responding at 0x{address:02X}") for byte in buffer: - self._write_byte(byte) + if not self._write_byte(byte): + raise RuntimeError(f"Device not responding at 0x{address:02X}") if transmit_stop: self._stop() diff --git a/tests/simulated_i2c.py b/tests/simulated_i2c.py new file mode 100644 index 0000000..c31a371 --- /dev/null +++ b/tests/simulated_i2c.py @@ -0,0 +1,209 @@ +# SPDX-FileCopyrightText: KB Sriram +# SPDX-License-Identifier: MIT +"""Implementation of testable I2C devices.""" + +from typing import Any, Callable, Optional, Union +import dataclasses +import enum +import signal +import types +from typing_extensions import TypeAlias +import simulator as sim + +_SignalHandler: TypeAlias = Union[ + Callable[[int, Optional[types.FrameType]], Any], int, None +] + + +@enum.unique +class State(enum.Enum): + IDLE = "idle" + ADDRESS = "address" + ACK = "ack" + ACK_DONE = "ack_done" + WAIT_ACK = "wait_ack" + READ = "read" + WRITE = "write" + + +@dataclasses.dataclass(frozen=True) +class I2CBus: + scl: sim.Net + sda: sim.Net + + +def _switch_to_output(pin: sim.FakePin, value: bool) -> None: + pin.mode = sim.Mode.OUT + pin.value(1 if value else 0) + + +def _switch_to_input(pin: sim.FakePin) -> None: + pin.init(mode=sim.Mode.IN) + pin.level = sim.Level.HIGH + + +class Constant: + """I2C device that sinks all data and can send a constant.""" + + # pylint:disable=too-many-instance-attributes + # pylint:disable=too-many-arguments + def __init__( + self, + name: str, + address: int, + bus: I2CBus, + ack_data: bool = True, + clock_stretch_sec: int = 0, + data_to_send: int = 0, + ) -> None: + self._address = address + self._scl = sim.FakePin(f"{name}_scl_pin", bus.scl) + self._sda = sim.FakePin(f"{name}_sda_pin", bus.sda) + self._last_scl_level = bus.scl.level + self._ack_data = ack_data + self._clock_stretch_sec = clock_stretch_sec + self._prev_signal: _SignalHandler = None + self._state = State.IDLE + self._bit_count = 0 + self._received = 0 + self._all_received = bytearray() + self._send_data = data_to_send + self._sent_bit_count = 0 + self._in_write = 0 + + bus.scl.on_level_change(self._on_level_change) + bus.sda.on_level_change(self._on_level_change) + + def _move_state(self, nstate: State) -> None: + self._state = nstate + + def _on_start(self) -> None: + # This resets our state machine unconditionally and + # starts waiting for an address. + self._bit_count = 0 + self._received = 0 + self._move_state(State.ADDRESS) + + def _on_stop(self) -> None: + # Reset and start idling. + self._reset() + + def _reset(self) -> None: + self._bit_count = 0 + self._received = 0 + self._move_state(State.IDLE) + + def _clock_release( + self, ignored_signum: int, ignored_frame: Optional[types.FrameType] = None + ) -> None: + # First release the scl line + _switch_to_input(self._scl) + # Remove alarms + signal.alarm(0) + # Restore any existing signal. + if self._prev_signal: + signal.signal(signal.SIGALRM, self._prev_signal) + self._prev_signal = None + + def _maybe_clock_stretch(self) -> None: + if not self._clock_stretch_sec: + return + if self._state == State.IDLE: + return + # pull the clock line low + _switch_to_output(self._scl, value=False) + # Set an alarm to release the line after some time. + self._prev_signal = signal.signal(signal.SIGALRM, self._clock_release) + signal.alarm(self._clock_stretch_sec) + + def _on_byte_read(self) -> None: + self._all_received.append(self._received) + + def _on_clock_fall(self) -> None: + self._maybe_clock_stretch() + + # Return early unless we need to send data. + if self._state not in (State.ACK, State.ACK_DONE, State.WRITE): + return + + if self._state == State.ACK: + # pull down the data line to start the ack. We want to hold + # it down until the next clock falling edge. + if self._ack_data or not self._all_received: + _switch_to_output(self._sda, value=False) + self._move_state(State.ACK_DONE) + return + if self._state == State.ACK_DONE: + # The data line has been held between one pair of falling edges - we can + # let go now if we need to start reading. + if self._in_write: + # Note: this will also write out the first bit later in this method. + self._move_state(State.WRITE) + else: + _switch_to_input(self._sda) + self._move_state(State.READ) + + if self._state == State.WRITE: + if self._sent_bit_count == 8: + _switch_to_input(self._sda) + self._sent_bit_count = 0 + self._move_state(State.WAIT_ACK) + else: + bit_value = (self._send_data >> (7 - self._sent_bit_count)) & 0x1 + _switch_to_output(self._sda, value=bit_value == 1) + self._sent_bit_count += 1 + + def _on_clock_rise(self) -> None: + if self._state not in (State.ADDRESS, State.READ, State.WAIT_ACK): + return + bit_value = 1 if self._sda.net.level == sim.Level.HIGH else 0 + if self._state == State.WAIT_ACK: + if bit_value: + # NACK, just reset. + self._move_state(State.IDLE) + else: + # ACK, continue writing. + self._move_state(State.ACK_DONE) + return + self._received = (self._received << 1) | bit_value + self._bit_count += 1 + if self._bit_count < 8: + return + + # We've read 8 bits of either address or data sent to us. + if self._state == State.ADDRESS and self._address != (self._received >> 1): + # This message isn't for us, reset and start idling. + self._reset() + return + # This message is for us, ack it. + if self._state == State.ADDRESS: + self._in_write = self._received & 0x1 + elif self._state == State.READ: + self._on_byte_read() + self._bit_count = 0 + self._received = 0 + self._move_state(State.ACK) + + def _on_level_change(self, net: sim.Net) -> None: + # Handle start/stop events directly. + if net == self._sda.net and self._scl.net.level == sim.Level.HIGH: + if net.level == sim.Level.LOW: + # sda hi->low with scl high + self._on_start() + else: + # sda low->hi with scl high + self._on_stop() + return + + # Everything else can be handled as state changes that occur + # either on the clock rising or falling edge. + if net == self._scl.net: + if net.level == sim.Level.HIGH: + # scl low->high + self._on_clock_rise() + else: + # scl high->low + self._on_clock_fall() + + def all_received_data(self) -> bytearray: + return self._all_received diff --git a/tests/test_adafruit_bitbangio_i2c.py b/tests/test_adafruit_bitbangio_i2c.py new file mode 100644 index 0000000..2d8e5df --- /dev/null +++ b/tests/test_adafruit_bitbangio_i2c.py @@ -0,0 +1,188 @@ +# SPDX-FileCopyrightText: KB Sriram +# SPDX-License-Identifier: MIT + +from typing import Sequence +import pytest +import simulated_i2c as si2c +import simulator as sim +import adafruit_bitbangio + + +_SCL_NET = "scl" +_SDA_NET = "sda" + + +class TestBitbangI2C: + def setup_method(self) -> None: + sim.engine.reset() + # Create nets, with a pullup by default. + scl = sim.engine.create_net( + _SCL_NET, monitor=True, default_level=sim.Level.HIGH + ) + sda = sim.engine.create_net( + _SDA_NET, monitor=True, default_level=sim.Level.HIGH + ) + # pylint: disable=attribute-defined-outside-init + self.scl_pin = sim.FakePin("scl_pin", scl) + self.sda_pin = sim.FakePin("sda_pin", sda) + self.i2cbus = si2c.I2CBus(scl=scl, sda=sda) + # pylint: enable=attribute-defined-outside-init + + @sim.stub + @pytest.mark.parametrize("addresses", [[0x42, 0x43]]) + def test_scan(self, addresses: Sequence[int]) -> None: + # Create a set of data sinks, one for each address. + for address in addresses: + si2c.Constant(hex(address), address=address, bus=self.i2cbus) + + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000, timeout=1 + ) as i2c: + i2c.try_lock() + scanned = i2c.scan() + i2c.unlock() + + assert addresses == scanned + + @sim.stub + @pytest.mark.parametrize( + "data", ["11000011", "00111100", "1010101001010101", "1010101111010100"] + ) + def test_write( + self, + data: str, + ) -> None: + datalen = len(data) // 8 + data_array = bytearray(int(data, 2).to_bytes(datalen, byteorder="big")) + + # attach a device that records whatever we send to it. + device = si2c.Constant("target", address=0x42, bus=self.i2cbus) + + # Write data over the bus and verify the device received it. + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000 + ) as i2c: + i2c.try_lock() + i2c.writeto(address=0x42, buffer=data_array) + i2c.unlock() + + # Useful to debug signals in pulseview. + # sim.engine.write_vcd(f"/tmp/test_{data}.vcd") + assert data_array == device.all_received_data() + + @sim.stub + def test_write_no_ack(self) -> None: + # attach a device that will ack the address, but not the data. + si2c.Constant("target", address=0x42, bus=self.i2cbus, ack_data=False) + + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000 + ) as i2c: + i2c.try_lock() + with pytest.raises(RuntimeError) as info: + i2c.writeto(address=0x42, buffer=b"\x42") + i2c.unlock() + + assert "not responding" in str(info.value) + + @sim.stub + @pytest.mark.parametrize("data", ["11000011", "00111100"]) + def test_write_clock_stretching(self, data: str) -> None: + datalen = len(data) // 8 + data_array = bytearray(int(data, 2).to_bytes(datalen, byteorder="big")) + + # attach a device that does clock stretching, but not exceed our timeout. + device = si2c.Constant( + "target", address=0x42, bus=self.i2cbus, clock_stretch_sec=1 + ) + + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000, timeout=2.0 + ) as i2c: + i2c.try_lock() + i2c.writeto(address=0x42, buffer=data_array) + i2c.unlock() + + assert data_array == device.all_received_data() + + @sim.stub + def test_write_clock_timeout(self) -> None: + # attach a device that does clock stretching, but exceeds our timeout. + si2c.Constant("target", address=0x42, bus=self.i2cbus, clock_stretch_sec=3) + + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000, timeout=1 + ) as i2c: + i2c.try_lock() + with pytest.raises(RuntimeError) as info: + i2c.writeto(address=0x42, buffer=b"\x42") + i2c.unlock() + + assert "timed out" in str(info.value) + + @sim.stub + @pytest.mark.parametrize("count", [1, 2, 5]) + @pytest.mark.parametrize("data", ["11000011", "00111100", "10101010", "01010101"]) + def test_readfrom(self, count: int, data: str) -> None: + value = int(data, 2) + expected_array = bytearray([value] * count) + data_array = bytearray(count) + + # attach a device that sends a constant byte of data. + si2c.Constant("target", address=0x42, bus=self.i2cbus, data_to_send=value) + + # Confirm we were able to read back the data + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000 + ) as i2c: + i2c.try_lock() + i2c.readfrom_into(address=0x42, buffer=data_array) + i2c.unlock() + + # Useful to debug signals in pulseview. + # sim.engine.write_vcd(f"/tmp/test_{count}_{data}.vcd") + assert data_array == expected_array + + @sim.stub + @pytest.mark.parametrize( + "send_data", + [ + "11000011", + "00111100", + "10101010", + "0101010", + ], + ) + @pytest.mark.parametrize( + "expect_data", + [ + "11000011", + "00111100", + "10101010", + "01010101", + ], + ) + def test_writeto_readfrom(self, send_data: str, expect_data: str) -> None: + send_array = bytearray(int(send_data, 2).to_bytes(1, byteorder="big")) + expect_value = int(expect_data, 2) + data_array = bytearray(1) + + # attach a device that sends a constant byte of data. + device = si2c.Constant( + "target", address=0x42, bus=self.i2cbus, data_to_send=expect_value + ) + + # Send the send_data, and check we got back expect_data + with adafruit_bitbangio.I2C( + scl=self.scl_pin, sda=self.sda_pin, frequency=1000 + ) as i2c: + i2c.try_lock() + i2c.writeto_then_readfrom( + address=0x42, buffer_out=send_array, buffer_in=data_array + ) + i2c.unlock() + + # Useful to debug signals in pulseview. + # sim.engine.write_vcd(f"/tmp/test_{send_data}_{expect_data}.vcd") + assert send_array == device.all_received_data() + assert data_array == bytearray([expect_value])