-
Notifications
You must be signed in to change notification settings - Fork 403
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #763 from veloxidSchweiz/feature/configureableGPIO…
…Buttons RotaryEncoder based on gpiozero
- Loading branch information
Showing
29 changed files
with
1,589 additions
and
179 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from .rotary_encoder import RotaryEncoder | ||
from .two_button_control import TwoButtonControl | ||
from .shutdown_button import ShutdownButton | ||
from .simple_button import SimpleButton | ||
from .two_button_control import TwoButtonControl |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
#!/usr/bin/python3 | ||
# rotary volume knob | ||
# these files belong all together: | ||
# RPi-Jukebox-RFID/scripts/rotary-encoder.py | ||
# RPi-Jukebox-RFID/scripts/rotary_encoder.py | ||
# RPi-Jukebox-RFID/misc/sampleconfigs/phoniebox-rotary-encoder.service.stretch-default.sample | ||
# See wiki for more info: https://github.com/MiczFlor/RPi-Jukebox-RFID/wiki | ||
|
||
import RPi.GPIO as GPIO | ||
from timeit import default_timer as timer | ||
import ctypes | ||
import logging | ||
from signal import pause | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
c_uint8 = ctypes.c_uint8 | ||
|
||
|
||
class Flags_bits(ctypes.LittleEndianStructure): | ||
_fields_ = [ | ||
("A", c_uint8, 1), # asByte & 1 | ||
("B", c_uint8, 1), # asByte & 2 | ||
] | ||
|
||
|
||
class Flags(ctypes.Union): | ||
_anonymous_ = ("bit",) | ||
_fields_ = [ | ||
("bit", Flags_bits), | ||
("asByte", c_uint8) | ||
] | ||
|
||
|
||
class RotaryEncoder: | ||
# select Enocder state bits | ||
KeyIncr = 0b00000010 | ||
KeyDecr = 0b00000001 | ||
|
||
tblEncoder = [ | ||
0b00000011, 0b00000111, 0b00010011, 0b00000011, | ||
0b00001011, 0b00000111, 0b00000011, 0b00000011, | ||
0b00001011, 0b00000111, 0b00001111, 0b00000011, | ||
0b00001011, 0b00000011, 0b00001111, 0b00000001, | ||
0b00010111, 0b00000011, 0b00010011, 0b00000011, | ||
0b00010111, 0b00011011, 0b00010011, 0b00000011, | ||
0b00010111, 0b00011011, 0b00000011, 0b00000010] | ||
|
||
def __init__(self, pinA, pinB, functionCallIncr=None, functionCallDecr=None, timeBase=0.1, | ||
name='RotaryEncoder'): | ||
logger.debug('Initialize {name} RotaryEncoder({arg_Apin}, {arg_Bpin})'.format( | ||
arg_Apin=pinA, | ||
arg_Bpin=pinB, | ||
name=name if name is not None else '' | ||
)) | ||
self.name = name | ||
# persist values | ||
self.pinA = pinA | ||
self.pinB = pinB | ||
self.functionCallbackIncr = functionCallIncr | ||
self.functionCallbackDecr = functionCallDecr | ||
self.timeBase = timeBase | ||
|
||
self.encoderState = Flags() # stores the encoder state machine state | ||
self.startTime = timer() | ||
|
||
# setup pins | ||
GPIO.setup(self.pinA, GPIO.IN, pull_up_down=GPIO.PUD_UP) | ||
GPIO.setup(self.pinB, GPIO.IN, pull_up_down=GPIO.PUD_UP) | ||
self._is_active = False | ||
self.start() | ||
|
||
def __repr__(self): | ||
repr_str = '<{class_name}{object_name} on pin_a {pin_a},' + \ | ||
' pin_b {pin_b},timBase {time_base} is_active={is_active}%s>' | ||
return repr_str.format( | ||
class_name=self.__class__.__name__, | ||
object_name=':{}'.format(self.name) if self.name is not None else '', | ||
pin_a=self.pinA, | ||
pin_b=self.pinB, | ||
time_base=self.timeBase, | ||
is_active=self.is_active) | ||
|
||
def start(self): | ||
logger.debug('Start Event Detection on {} and {}'.format(self.pinA, self.pinB)) | ||
self._is_active = True | ||
GPIO.add_event_detect(self.pinA, GPIO.BOTH, callback=self._Callback) | ||
GPIO.add_event_detect(self.pinB, GPIO.BOTH, callback=self._Callback) | ||
|
||
def stop(self): | ||
logger.debug('Stop Event Detection on {} and {}'.format(self.pinA, self.pinB)) | ||
GPIO.remove_event_detect(self.pinA) | ||
GPIO.remove_event_detect(self.pinB) | ||
self._is_active = False | ||
|
||
def __del__(self): | ||
if self.is_active: | ||
self.stop() | ||
|
||
@property | ||
def is_active(self): | ||
return self._is_active | ||
|
||
def _StepSize(self): | ||
end = timer() | ||
duration = end - self.startTime | ||
self.startTime = end | ||
return int(self.timeBase / duration) + 1 | ||
|
||
def _Callback(self, pin): | ||
logger.debug('EventDetection Called') | ||
# construct new state machine input from encoder state and old state | ||
statusA = GPIO.input(self.pinA) | ||
statusB = GPIO.input(self.pinB) | ||
|
||
self.encoderState.A = statusA | ||
self.encoderState.B = statusB | ||
logger.debug('new encoderState: "{}" -> {}, {},{}'.format( | ||
self.encoderState.asByte, | ||
self.tblEncoder[self.encoderState.asByte],statusA,statusB | ||
)) | ||
current_state = self.encoderState.asByte | ||
self.encoderState.asByte = self.tblEncoder[current_state] | ||
|
||
if self.KeyIncr == self.encoderState.asByte: | ||
steps = self._StepSize() | ||
logger.info('{name}: Calling functionIncr {steps}'.format( | ||
name=self.name,steps=steps)) | ||
self.functionCallbackIncr(steps) | ||
elif self.KeyDecr == self.encoderState.asByte: | ||
steps = self._StepSize() | ||
logger.info('{name}: Calling functionDecr {steps}'.format( | ||
name=self.name, steps=steps)) | ||
self.functionCallbackDecr(steps) | ||
else: | ||
logger.debug('Ignoring encoderState: "{}"'.format(self.encoderState.asByte)) | ||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level='INFO') | ||
GPIO.setmode(GPIO.BCM) | ||
pin1 = int(input('please enter first pin')) | ||
pin2 = int(input('please enter second pin')) | ||
func1 = lambda *args: print('Function Incr executed with {}'.format(args)) | ||
func2 = lambda *args: print('Function Decr executed with {}'.format(args)) | ||
rotarty_encoder = RotaryEncoder(pin1, pin2, func1, func2) | ||
|
||
print('running') | ||
pause() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
import math | ||
import time | ||
from RPi import GPIO | ||
import logging | ||
from .simple_button import SimpleButton | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
class ShutdownButton(SimpleButton): | ||
|
||
def __init__(self, pin, action=lambda *args: None, name=None, bouncetime=500, edge=GPIO.FALLING, | ||
hold_time=.1, led_pin=None, time_pressed=2): | ||
self.led_pin = led_pin | ||
self.time_pressed = 2 | ||
self.iteration_time = .2 | ||
super(ShutdownButton, self).__init__(pin=pin, action=action, name=name, bouncetime=bouncetime, edge=edge, | ||
hold_time=hold_time, hold_repeat=False) | ||
|
||
|
||
# function to provide user feedback (= flashing led) while the shutdown button is pressed | ||
# do not directly call shutdown, in case it was hit accedently | ||
# shutdown is only issued when the button remains pressed for all interations of the for loop | ||
def set_led(self, status): | ||
if self.led_pin is not None: | ||
logger.debug('set LED on pin {} to {}'.format(self.led_pin, status)) | ||
GPIO.output(self.led_pin, status) | ||
else: | ||
logger.debug('cannot set LED to {}: no LED pin defined'.format(status)) | ||
|
||
def callbackFunctionHandler(self, *args): | ||
status = False | ||
n_checks = math.ceil(self.time_pressed/self.iteration_time) | ||
logger.debug('ShutdownButton pressed, ensuring long press for {} seconds, checking each {}s: {}'.format( | ||
self.time_pressed, self.iteration_time, n_checks | ||
)) | ||
for x in range(n_checks): | ||
self.set_led(x & 1) | ||
time.sleep(.2) | ||
status = not self.is_pressed | ||
if status: | ||
break | ||
self.set_led(status) | ||
if not status: | ||
# trippel off period to indicate command accepted | ||
time.sleep(.6) | ||
self.set_led(GPIO.HIGH) | ||
# leave it on for the moment, it will be off when the system is down | ||
self.when_pressed(*args) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import time | ||
from signal import pause | ||
import logging | ||
import RPi.GPIO as GPIO | ||
GPIO.setmode(GPIO.BCM) | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
def parse_edge_key(edge): | ||
if edge in [GPIO.FALLING, GPIO.RISING, GPIO.BOTH]: | ||
edge | ||
elif edge.lower() == 'falling': | ||
edge = GPIO.FALLING | ||
elif edge.lower() == 'raising': | ||
edge = GPIO.RISING | ||
elif edge.lower() == 'both': | ||
edge = GPIO.BOTH | ||
else: | ||
raise KeyError('Unknown Edge type {edge}'.format(edge=edge)) | ||
return edge | ||
|
||
|
||
|
||
# This function takes a holding time (fractional seconds), a channel, a GPIO state and an action reference (function). | ||
# It checks if the GPIO is in the state since the function was called. If the state | ||
# changes it return False. If the time is over the function returns True. | ||
def checkGpioStaysInState(holdingTime, gpioChannel, gpioHoldingState): | ||
# Get a reference start time (https://docs.python.org/3/library/time.html#time.perf_counter) | ||
startTime = time.perf_counter() | ||
# Continously check if time is not over | ||
while True: | ||
currentState = GPIO.input(gpioChannel) | ||
if holdingTime < (time.perf_counter() - startTime): | ||
break | ||
# Return if state does not match holding state | ||
if (gpioHoldingState != currentState): | ||
return False | ||
# Else: Wait | ||
|
||
if (gpioHoldingState != currentState): | ||
return False | ||
return True | ||
|
||
|
||
class SimpleButton: | ||
def __init__(self, pin, action=lambda *args: None, name=None, bouncetime=500, edge=GPIO.FALLING, | ||
hold_time=.1, hold_repeat=False): | ||
self.edge = parse_edge_key(edge) | ||
self.hold_time = hold_time | ||
self.hold_repeat = hold_repeat | ||
self.pull_up = True | ||
|
||
self.pin = pin | ||
self.name = name | ||
self.bouncetime = bouncetime | ||
GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) | ||
self._action = action | ||
GPIO.add_event_detect(self.pin, edge=self.edge, callback=self.callbackFunctionHandler, | ||
bouncetime=self.bouncetime) | ||
|
||
def callbackFunctionHandler(self, *args): | ||
if self.hold_repeat: | ||
return self.holdAndRepeatHandler(*args) | ||
logger.info('{}: executre callback'.format(self.name)) | ||
return self.when_pressed(*args) | ||
|
||
|
||
@property | ||
def when_pressed(self): | ||
logger.info('{}: action'.format(self.name)) | ||
return self._action | ||
|
||
@when_pressed.setter | ||
def when_pressed(self, func): | ||
logger.info('{}: set when_pressed') | ||
self._action = func | ||
|
||
GPIO.remove_event_detect(self.pin) | ||
self._action = func | ||
logger.info('add new action') | ||
GPIO.add_event_detect(self.pin, edge=self.edge, callback=self.callbackFunctionHandler, bouncetime=self.bouncetime) | ||
|
||
|
||
|
||
def set_callbackFunction(self, callbackFunction): | ||
self.when_pressed = callbackFunction | ||
|
||
def holdAndRepeatHandler(self, *args): | ||
logger.info('{}: holdAndRepeatHandler'.format(self.name)) | ||
# Rise volume as requested | ||
self.when_pressed(*args) | ||
# Detect holding of button | ||
while checkGpioStaysInState(self.hold_time, self.pin, GPIO.LOW): | ||
self.when_pressed(*args) | ||
|
||
def __del__(self): | ||
logger.debug('remove event detection') | ||
GPIO.remove_event_detect(self.pin) | ||
|
||
@property | ||
def is_pressed(self): | ||
if self.pull_up: | ||
return not GPIO.input(self.pin) | ||
return GPIO.input(self.pin) | ||
|
||
def __repr__(self): | ||
return '<SimpleButton-{}(pin {},hold_repeat={},hold_time={})>'.format( | ||
self.name, self.pin, self.hold_repeat, self.hold_time | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
print('please enter pin no to test') | ||
pin = int(input()) | ||
func = lambda *args: print('FunctionCall with {}'.format(args)) | ||
btn = SimpleButton(pin=pin, action=func, hold_repeat=True) | ||
pause() |
Oops, something went wrong.