Skip to content

Commit

Permalink
Merge pull request #18 from wolfmanjm/fix/i2c-timing
Browse files Browse the repository at this point in the history
bitbang I2C running under Blinka did not work with the peripherals I tested.
  • Loading branch information
makermelissa authored Jun 4, 2021
2 parents 7b150a3 + 496b0d6 commit fa8dfc2
Showing 1 changed file with 47 additions and 42 deletions.
89 changes: 47 additions & 42 deletions adafruit_bitbangio.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"""

# imports
from time import monotonic, sleep
from time import monotonic
from digitalio import DigitalInOut

__version__ = "0.0.0-auto.0"
Expand Down Expand Up @@ -84,22 +84,27 @@ def __init__(self, scl, sda, *, frequency=400000, timeout=1):

# Set pins as outputs/inputs.
self._scl = DigitalInOut(scl)
self._scl.switch_to_output()
self._scl.value = 1
# rpi gpio does not support OPEN_DRAIN, so we have to emulate it
# by setting the pin to input for high and output 0 for low
self._scl.switch_to_input()

# SDA flips between being input and output
self._sda = DigitalInOut(sda)
self._sda.switch_to_output()
self._sda.value = 1
self._sda.switch_to_input()

self._delay = 1 / frequency / 2
self._delay = (1 / frequency) / 2 # half period
self._timeout = timeout

def deinit(self):
"""Free any hardware used by the object."""
self._sda.deinit()
self._scl.deinit()

def _wait(self):
end = monotonic() + self._delay # half period
while end > monotonic():
pass

def scan(self):
"""Perform an I2C Device Scan"""
found = []
Expand Down Expand Up @@ -145,100 +150,100 @@ def writeto_then_readfrom(
if in_end is None:
in_end = len(buffer_in)
if self._check_lock():
self.writeto(address, buffer_out, start=out_start, end=out_end)
self._write(address, buffer_out[out_start:out_end], False)
self.readfrom_into(address, buffer_in, start=in_start, end=in_end)

def _scl_low(self):
self._scl.value = 0
self._scl.switch_to_output(value=False)

def _sda_low(self):
self._sda.value = 0
self._sda.switch_to_output(value=False)

def _scl_release(self):
"""Release and let the pullups lift"""
# Use self._timeout to add clock stretching
self._scl.value = 1
self._scl.switch_to_input()

def _sda_release(self):
"""Release and let the pullups lift"""
# Use self._timeout to add clock stretching
self._sda.value = 1
self._sda.switch_to_input()

def _start(self):
self._sda_release()
self._scl_release()
sleep(self._delay)
self._wait()
self._sda_low()
sleep(self._delay)
self._wait()

def _stop(self):
self._scl_low()
sleep(self._delay)
self._wait()
self._sda_low()
sleep(self._delay)
self._wait()
self._scl_release()
sleep(self._delay)
self._wait()
self._sda_release()
sleep(self._delay)
self._wait()

def _repeated_start(self):
self._scl_low()
sleep(self._delay)
self._wait()
self._sda_release()
sleep(self._delay)
self._wait()
self._scl_release()
sleep(self._delay)
self._wait()
self._sda_low()
sleep(self._delay)
self._wait()

def _write_byte(self, byte):
for bit_position in range(8):
self._scl_low()
sleep(self._delay)

if byte & (0x80 >> bit_position):
self._sda_release()
else:
self._sda_low()
sleep(self._delay)
self._wait()
self._scl_release()
sleep(self._delay)
self._wait()

self._scl_low()
sleep(self._delay * 2)
self._sda.switch_to_input() # SDA may go high, but SCL is low
self._wait()

self._scl_release()
sleep(self._delay)

self._sda.switch_to_input()
ack = self._sda.value
self._sda.switch_to_output()
sleep(self._delay)
self._wait()
ack = self._sda.value # read the ack

self._scl_low()
self._sda_release()
self._wait()

return not ack

def _read_byte(self, ack=False):
self._scl_low()
sleep(self._delay)

self._wait()
# sda will already be an input as we are simulating open drain
data = 0
self._sda.switch_to_input()
for _ in range(8):
self._scl_release()
sleep(self._delay)
self._wait()
data = (data << 1) | int(self._sda.value)
sleep(self._delay)
self._scl_low()
sleep(self._delay)
self._sda.switch_to_output()
self._wait()

if ack:
self._sda_low()
else:
self._sda_release()
sleep(self._delay)
# else sda will already be in release (open drain) mode

self._wait()
self._scl_release()
sleep(self._delay)
self._wait()
self._scl_low()
self._sda_release()

return data & 0xFF

def _probe(self, address):
Expand Down

0 comments on commit fa8dfc2

Please sign in to comment.