Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement nonblocking decoder #42

Merged
merged 8 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ bundles
.eggs
dist
**/*.egg-info
*.swp
294 changes: 196 additions & 98 deletions adafruit_irremote.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@
https://github.com/adafruit/circuitpython/releases

"""

# Pretend self matter because we may add object level config later.
# pylint: disable=no-self-use

import array
from collections import namedtuple
import time

__version__ = "0.0.0-auto.0"
Expand All @@ -69,118 +66,219 @@ class IRNECRepeatException(Exception):
"""Exception when a NEC repeat is decoded"""


class GenericDecode:
"""Generic decoding of infrared signals"""
def bin_data(pulses):
"""Compute bins of pulse lengths where pulses are +-25% of the average.

def bin_data(self, pulses):
"""Compute bins of pulse lengths where pulses are +-25% of the average.
:param list pulses: Input pulse lengths
"""
bins = [[pulses[0], 0]]

for _, pulse in enumerate(pulses):
matchedbin = False
# print(pulse, end=": ")
for b, pulse_bin in enumerate(bins):
if pulse_bin[0] * 0.75 <= pulse <= pulse_bin[0] * 1.25:
# print("matches bin")
bins[b][0] = (pulse_bin[0] + pulse) // 2 # avg em
bins[b][1] += 1 # track it
matchedbin = True
break
if not matchedbin:
bins.append([pulse, 1])
# print(bins)
return bins


def decode_bits(pulses):
"""Decode the pulses into bits."""
# pylint: disable=too-many-branches,too-many-statements

# TODO The name pulses is redefined several times below, so we'll stash the
# original in a separate variable for now. It might be worth refactoring to
# avoid redefining pulses, for the sake of readability.
input_pulses = tuple(pulses)
pulses = list(pulses) # Copy to avoid mutating input.

# special exception for NEC repeat code!
if (
(len(pulses) == 3)
and (8000 <= pulses[0] <= 10000)
and (2000 <= pulses[1] <= 3000)
and (450 <= pulses[2] <= 700)
):
return NECRepeatIRMessage(input_pulses)

if len(pulses) < 10:
msg = UnparseableIRMessage(input_pulses, reason="Too short")
raise FailedToDecode(msg)

# Ignore any header (evens start at 1), and any trailer.
if len(pulses) % 2 == 0:
pulses_end = -1
else:
pulses_end = None

evens = pulses[1:pulses_end:2]
odds = pulses[2:pulses_end:2]

# bin both halves
even_bins = bin_data(evens)
odd_bins = bin_data(odds)

outliers = [b[0] for b in (even_bins + odd_bins) if b[1] == 1]
even_bins = [b for b in even_bins if b[1] > 1]
odd_bins = [b for b in odd_bins if b[1] > 1]

if not even_bins or not odd_bins:
msg = UnparseableIRMessage(input_pulses, reason="Not enough data")
raise FailedToDecode(msg)

if len(even_bins) == 1:
pulses = odds
pulse_bins = odd_bins
elif len(odd_bins) == 1:
pulses = evens
pulse_bins = even_bins
else:
msg = UnparseableIRMessage(input_pulses, reason="Both even/odd pulses differ")
raise FailedToDecode(msg)

if len(pulse_bins) == 1:
msg = UnparseableIRMessage(input_pulses, reason="Pulses do not differ")
raise FailedToDecode(msg)
if len(pulse_bins) > 2:
msg = UnparseableIRMessage(input_pulses, reason="Only mark & space handled")
raise FailedToDecode(msg)

mark = min(pulse_bins[0][0], pulse_bins[1][0])
space = max(pulse_bins[0][0], pulse_bins[1][0])

if outliers:
# skip outliers
pulses = [
p for p in pulses if not (outliers[0] * 0.75) <= p <= (outliers[0] * 1.25)
]
# convert marks/spaces to 0 and 1
for i, pulse_length in enumerate(pulses):
if (space * 0.75) <= pulse_length <= (space * 1.25):
pulses[i] = False
elif (mark * 0.75) <= pulse_length <= (mark * 1.25):
pulses[i] = True
else:
msg = UnparseableIRMessage(input_pulses, reason="Pulses outside mark/space")
raise FailedToDecode(msg)

:param list pulses: Input pulse lengths
"""
bins = [[pulses[0], 0]]

for _, pulse in enumerate(pulses):
matchedbin = False
# print(pulse, end=": ")
for b, pulse_bin in enumerate(bins):
if pulse_bin[0] * 0.75 <= pulse <= pulse_bin[0] * 1.25:
# print("matches bin")
bins[b][0] = (pulse_bin[0] + pulse) // 2 # avg em
bins[b][1] += 1 # track it
matchedbin = True
break
if not matchedbin:
bins.append([pulse, 1])
# print(bins)
return bins

def decode_bits(self, pulses):
"""Decode the pulses into bits."""
# pylint: disable=too-many-branches,too-many-statements

# special exception for NEC repeat code!
if (
(len(pulses) == 3)
and (8000 <= pulses[0] <= 10000)
and (2000 <= pulses[1] <= 3000)
and (450 <= pulses[2] <= 700)
):
raise IRNECRepeatException()
# convert bits to bytes!
output = [0] * ((len(pulses) + 7) // 8)
for i, pulse_length in enumerate(pulses):
output[i // 8] = output[i // 8] << 1
if pulse_length:
output[i // 8] |= 1
return IRMessage(tuple(input_pulses), code=tuple(output))

if len(pulses) < 10:
raise IRDecodeException("10 pulses minimum")

# Ignore any header (evens start at 1), and any trailer.
if len(pulses) % 2 == 0:
pulses_end = -1
else:
pulses_end = None
IRMessage = namedtuple("IRMessage", ("pulses", "code"))
"Pulses and the code they were parsed into"

evens = pulses[1:pulses_end:2]
odds = pulses[2:pulses_end:2]
UnparseableIRMessage = namedtuple("IRMessage", ("pulses", "reason"))
"Pulses and the reason that they could not be parsed into a code"

# bin both halves
even_bins = self.bin_data(evens)
odd_bins = self.bin_data(odds)
NECRepeatIRMessage = namedtuple("NECRepeatIRMessage", ("pulses",))
"Pulses interpreted as an NEC repeat code"

outliers = [b[0] for b in (even_bins + odd_bins) if b[1] == 1]
even_bins = [b for b in even_bins if b[1] > 1]
odd_bins = [b for b in odd_bins if b[1] > 1]

if not even_bins or not odd_bins:
raise IRDecodeException("Not enough data")
class FailedToDecode(Exception):
"Raised by decode_bits. Error argument is UnparseableIRMessage"

if len(even_bins) == 1:
pulses = odds
pulse_bins = odd_bins
elif len(odd_bins) == 1:
pulses = evens
pulse_bins = even_bins
else:
raise IRDecodeException("Both even/odd pulses differ")

if len(pulse_bins) == 1:
raise IRDecodeException("Pulses do not differ")
if len(pulse_bins) > 2:
raise IRDecodeException("Only mark & space handled")

mark = min(pulse_bins[0][0], pulse_bins[1][0])
space = max(pulse_bins[0][0], pulse_bins[1][0])

if outliers:
# skip outliers
pulses = [
p
for p in pulses
if not (outliers[0] * 0.75) <= p <= (outliers[0] * 1.25)
]
# convert marks/spaces to 0 and 1
for i, pulse_length in enumerate(pulses):
if (space * 0.75) <= pulse_length <= (space * 1.25):
pulses[i] = False
elif (mark * 0.75) <= pulse_length <= (mark * 1.25):
pulses[i] = True
else:
raise IRDecodeException("Pulses outside mark/space")

# convert bits to bytes!
output = [0] * ((len(pulses) + 7) // 8)
for i, pulse_length in enumerate(pulses):
output[i // 8] = output[i // 8] << 1
if pulse_length:
output[i // 8] |= 1
return output

class NonblockingGenericDecode:
"""
Decode pulses into bytes in a non-blocking fashion.

:param ~pulseio.PulseIn input_pulses: Object to read pulses from
:param int max_pulse: Pulse duration to end a burst. Units are microseconds.

>>> pulses = PulseIn(...)
>>> decoder = NonblockingGenericDecoder(pulses)
>>> for message in decoder.read():
... if isinstace(message, IRMessage):
... message.code # TA-DA! Do something with this in your application.
... else:
... # message is either NECRepeatIRMessage or
... # UnparseableIRMessage. You may decide to ignore it, raise
... # an error, or log the issue to a file. If you raise or log,
... # it may be helpful to include message.pulses in the error message.
... ...
"""

def __init__(self, pulses, max_pulse=10_000):
self.pulses = pulses # PulseIn
self.max_pulse = max_pulse
self._unparsed_pulses = [] # internal buffer of partial messages

def read(self):
"""
Consume all pulses from PulseIn. Yield decoded messages, if any.

If a partial message is received, this does not block to wait for the
rest. It stashes the partial message, to be continued the next time it
is called.
"""
# Consume from PulseIn.
while self.pulses:
pulse = self.pulses.popleft()
self._unparsed_pulses.append(pulse)
if pulse > self.max_pulse:
# End of message! Decode it and yield a BaseIRMessage.
try:
yield decode_bits(self._unparsed_pulses)
except FailedToDecode as err:
# If you want to debug failed decodes, this would be a good
# place to print/log or (re-)raise.
(unparseable_message,) = err.args
yield unparseable_message
self._unparsed_pulses.clear()
# TODO Do we need to consume and throw away more pulses here?
# I'm unclear about the role that "pruning" plays in the
# original implementation in GenericDecode._read_pulses_non_blocking.
# When we reach here, we have consumed everything from PulseIn.
# If there are some pulses in self._unparsed_pulses, they represent
# partial messages. We'll finish them next time read() is called.


class GenericDecode:
"""Generic decoding of infrared signals"""

# Note: pylint's complaint about the following three methods (no self-use)
# is absolutely correct, which is why the code was refactored, but we need
# this here for back-compat, hence we disable pylint for that specific
# complaint.

def bin_data(self, pulses): # pylint: disable=no-self-use
"Wraps the top-level function bin_data for backward-compatibility."
return bin_data(pulses)

def decode_bits(self, pulses): # pylint: disable=no-self-use
"Wraps the top-level function decode_bits for backward-compatibility."
result = decode_bits(pulses)
if isinstance(result, NECRepeatIRMessage):
raise IRNECRepeatException()
if isinstance(result, UnparseableIRMessage):
raise IRDecodeException("10 pulses minimum")

def _read_pulses_non_blocking(
self, input_pulses, max_pulse=10000, pulse_window=0.10
):
): # pylint: disable=no-self-use
"""Read out a burst of pulses without blocking until pulses stop for a specified
period (pulse_window), pruning pulses after a pulse longer than ``max_pulse``.

:param ~pulseio.PulseIn input_pulses: Object to read pulses from
:param int max_pulse: Pulse duration to end a burst
:param float pulse_window: pulses are collected for this period of time
"""
# Note: pylint's complaint (no self-use) is absolutely correct, which
# is why the code was refactored, but we need this here for
# back-compat, hence we disable pylint.
received = None
recent_count = 0
pruning = False
Expand Down Expand Up @@ -209,7 +307,7 @@ def read_pulses(
max_pulse=10000,
blocking=True,
pulse_window=0.10,
blocking_delay=0.10
blocking_delay=0.10,
):
"""Read out a burst of pulses until pulses stop for a specified
period (pulse_window), pruning pulses after a pulse longer than ``max_pulse``.
Expand Down
35 changes: 35 additions & 0 deletions examples/irremote_nonblocking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT

# Circuit Playground Express Demo Code
# Adjust the pulseio 'board.PIN' if using something else
import time

import board
import pulseio

import adafruit_irremote

pulsein = pulseio.PulseIn(board.REMOTEIN, maxlen=120, idle_state=True)
decoder = adafruit_irremote.NonblockingGenericDecode(pulsein)


t0 = next_heartbeat = time.monotonic()

while True:
for message in decoder.read():
print(f"t={time.monotonic() - t0:.3} New Message")
print("Heard", len(message.pulses), "Pulses:", message.pulses)
if isinstance(message, adafruit_irremote.IRMessage):
print("Decoded:", message.code)
elif isinstance(message, adafruit_irremote.NECRepeatIRMessage):
print("NEC repeat!")
elif isinstance(message, adafruit_irremote.UnparseableIRMessage):
print("Failed to decode", message.reason)
print("----------------------------")

# This heartbeat confirms that we are not blocked somewhere above.
t = time.monotonic()
if t > next_heartbeat:
print(f"t={time.monotonic() - t0:.3} Heartbeat")
next_heartbeat = t + 0.1