Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple joystick example #1200

Merged
merged 4 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions examples/joystick.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#!/usr/bin/env python3
import time
import threading
import argparse
import numpy as np
from inputs import get_gamepad

from kbhit import KBHit

from opendbc.car.structs import CarControl
from opendbc.car.panda_runner import PandaRunner
from opendbc.car.can_definitions import CanData

class Keyboard:
def __init__(self):
self.kb = KBHit()
self.axis_increment = 0.05 # 5% of full actuation each key press
self.axes_map = {'w': 'gb', 's': 'gb',
'a': 'steer', 'd': 'steer'}
self.axes_values = {'gb': 0., 'steer': 0.}
self.axes_order = ['gb', 'steer']
self.cancel = False

def update(self):
key = self.kb.getch().lower()
print(key)
self.cancel = False
if key == 'r':
self.axes_values = {ax: 0. for ax in self.axes_values}
elif key == 'c':
self.cancel = True
elif key in self.axes_map:
axis = self.axes_map[key]
incr = self.axis_increment if key in ['w', 'a'] else -self.axis_increment
self.axes_values[axis] = float(np.clip(self.axes_values[axis] + incr, -1, 1))
else:
return False
return True

class Joystick:
def __init__(self, gamepad=False):
# TODO: find a way to get this from API, perhaps "inputs" doesn't support it
if gamepad:
self.cancel_button = 'BTN_NORTH' # (BTN_NORTH=X, ABS_RZ=Right Trigger)
accel_axis = 'ABS_Y'
steer_axis = 'ABS_RX'
else:
self.cancel_button = 'BTN_TRIGGER'
accel_axis = 'ABS_Y'
steer_axis = 'ABS_RX'
self.min_axis_value = {accel_axis: 0., steer_axis: 0.}
self.max_axis_value = {accel_axis: 255., steer_axis: 255.}
self.axes_values = {accel_axis: 0., steer_axis: 0.}
self.axes_order = [accel_axis, steer_axis]
self.cancel = False

def update(self):
joystick_event = get_gamepad()[0]
event = (joystick_event.code, joystick_event.state)
if event[0] == self.cancel_button:
if event[1] == 1:
self.cancel = True
elif event[1] == 0: # state 0 is falling edge
self.cancel = False
elif event[0] in self.axes_values:
self.max_axis_value[event[0]] = max(event[1], self.max_axis_value[event[0]])
self.min_axis_value[event[0]] = min(event[1], self.min_axis_value[event[0]])

norm = -float(np.interp(event[1], [self.min_axis_value[event[0]], self.max_axis_value[event[0]]], [-1., 1.]))
self.axes_values[event[0]] = norm if abs(norm) > 0.05 else 0. # center can be noisy, deadzone of 5%
else:
return False
return True

def joystick_thread(joystick):
while True:
joystick.update()

def main(joystick):
threading.Thread(target=joystick_thread, args=(joystick,), daemon=True).start()
with PandaRunner() as (p, CI):
CC = CarControl(enabled=False)
while True:
cd = [CanData(addr, dat, bus) for addr, dat, bus in p.can_recv()]
CI.update([0, cd])

CC.actuators.accel = float(4.0*np.clip(joystick.axes_values['gb'], -1, 1))
CC.actuators.steer = float(np.clip(joystick.axes_values['steer'], -1, 1))

from pprint import pprint
pprint(CC)

_, can_sends = CI.apply(CC)
p.can_send_many(can_sends, timeout=1000)

# 100Hz
time.sleep(0.01)


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Test the car interface with a joystick. Uses keyboard by default.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)

parser.add_argument('--mode', choices=['keyboard', 'gamepad', 'joystick'], default='keyboard')
args = parser.parse_args()

print()
joystick: Keyboard | Joystick
if args.mode == 'keyboard':
print('Gas/brake control: `W` and `S` keys')
print('Steering control: `A` and `D` keys')
print('Buttons')
print('- `R`: Resets axes')
print('- `C`: Cancel cruise control')
joystick = Keyboard()
else:
joystick = Joystick(gamepad=(args.mode == 'gamepad'))
main(joystick)
59 changes: 59 additions & 0 deletions examples/kbhit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env python3
import sys
import termios
import atexit
from select import select

STDIN_FD = sys.stdin.fileno()

class KBHit:
def __init__(self) -> None:
self.set_kbhit_terminal()

def set_kbhit_terminal(self) -> None:
# Save the terminal settings
self.old_term = termios.tcgetattr(STDIN_FD)
self.new_term = self.old_term.copy()

# New terminal setting unbuffered
self.new_term[3] &= ~(termios.ICANON | termios.ECHO)
termios.tcsetattr(STDIN_FD, termios.TCSAFLUSH, self.new_term)

# Support normal-terminal reset at exit
atexit.register(self.set_normal_term)

def set_normal_term(self) -> None:
termios.tcsetattr(STDIN_FD, termios.TCSAFLUSH, self.old_term)

@staticmethod
def getch() -> str:
return sys.stdin.read(1)

@staticmethod
def getarrow() -> int:
c = sys.stdin.read(3)[2]
vals = [65, 67, 66, 68]
return vals.index(ord(c))

@staticmethod
def kbhit():
''' Returns True if keyboard character was hit, False otherwise.
'''
return select([sys.stdin], [], [], 0)[0] != []


if __name__ == "__main__":

kb = KBHit()

print('Hit any key, or ESC to exit')

while True:

if kb.kbhit():
c = kb.getch()
if c == '\x1b': # ESC
break
print(c)

kb.set_normal_term()
35 changes: 35 additions & 0 deletions opendbc/car/panda_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from contextlib import contextmanager

from panda import Panda
from opendbc.car.car_helpers import get_car
from opendbc.car.can_definitions import CanData

@contextmanager
def PandaRunner():
p = Panda()

def _can_recv(wait_for_one: bool = False) -> list[list[CanData]]:
recv = p.can_recv()
while len(recv) == 0 and wait_for_one:
recv = p.can_recv()
return [[CanData(addr, dat, bus) for addr, dat, bus in recv], ]

try:
# setup + fingerprinting
p.set_safety_mode(Panda.SAFETY_ELM327, 1)
CI = get_car(_can_recv, p.can_send_many, p.set_obd, True)
print("fingerprinted", CI.CP.carName)
assert CI.CP.carFingerprint != "mock", "Unable to identify car. Check connections and ensure car is supported."

p.set_safety_mode(Panda.SAFETY_ELM327, 1)
CI.init(CI.CP, _can_recv, p.can_send_many)
p.set_safety_mode(Panda.SAFETY_TOYOTA, CI.CP.safetyConfigs[0].safetyParam)

yield p, CI
finally:
p.set_safety_mode(Panda.SAFETY_NOOUTPUT)


if __name__ == "__main__":
with PandaRunner() as (p, CI):
print(p.can_recv())
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ docs = [
"Jinja2",
"natsort",
]
examples = [
"inputs",
]

[tool.pytest.ini_options]
addopts = "--ignore=panda/ -Werror --strict-config --strict-markers --durations=10 -n auto"
Expand Down
Loading