-
Notifications
You must be signed in to change notification settings - Fork 179
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(api): Add modules api to hardware_control
Closes #2237
- Loading branch information
Showing
11 changed files
with
616 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
import os | ||
import logging | ||
import re | ||
import asyncio | ||
import abc | ||
from typing import List, Type, Dict, Union | ||
from .magdeck import MagDeck | ||
from .tempdeck import TempDeck | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
PORT_SEARCH_TIMEOUT = 5.5 | ||
|
||
# avrdude_options | ||
PART_NO = 'atmega32u4' | ||
PROGRAMMER_ID = 'avr109' | ||
BAUDRATE = '57600' | ||
|
||
|
||
class UnsupportedModuleError(Exception): | ||
pass | ||
|
||
|
||
class AbsentModuleError(Exception): | ||
pass | ||
|
||
|
||
class AbstractModule(abc.ABC): | ||
""" Defines the common methods of a module. """ | ||
|
||
@classmethod | ||
@abc.abstractmethod | ||
def build(cls, port: str, simulating: bool = False) -> 'AbstractModule': | ||
""" Modules should always be created using this factory. | ||
This lets the (perhaps blocking) work of connecting to and initializing | ||
a module be in a place that can be async. | ||
""" | ||
pass | ||
|
||
@abc.abstractmethod | ||
def disengage(self): | ||
""" Deactivate the module. """ | ||
pass | ||
|
||
@property | ||
@abc.abstractmethod | ||
def status(self) -> str: | ||
""" Return some string describing status. """ | ||
pass | ||
|
||
@property | ||
@abc.abstractmethod | ||
def device_info(self) -> Dict[str, str]: | ||
""" Return a dict of the module's static information (serial, etc)""" | ||
pass | ||
|
||
@property | ||
@abc.abstractmethod | ||
def live_data(self) -> Dict[str, str]: | ||
""" Return a dict of the module's dynamic information """ | ||
pass | ||
|
||
|
||
AbstractModule.register(MagDeck) | ||
AbstractModule.register(TempDeck) | ||
|
||
SUPPORTED_MODULES: Dict[str, Union[Type[MagDeck], Type[TempDeck]]]\ | ||
= {'magdeck': MagDeck, 'tempdeck': TempDeck} | ||
|
||
|
||
def build_simulated(which: str) -> AbstractModule: | ||
return SUPPORTED_MODULES[which].build('', True) | ||
|
||
|
||
def discover_and_connect() -> List[AbstractModule]: | ||
""" Scan for connected modules and instantiate handler classes | ||
""" | ||
if os.environ.get('RUNNING_ON_PI') and os.path.isdir('/dev/modules'): | ||
devices = os.listdir('/dev/modules') | ||
else: | ||
devices = [] | ||
|
||
discovered_modules = [] | ||
|
||
module_port_regex = re.compile('|'.join(SUPPORTED_MODULES.keys()), re.I) | ||
for port in devices: | ||
match = module_port_regex.search(port) | ||
if match: | ||
name = match.group().lower() | ||
try: | ||
module_class = SUPPORTED_MODULES[name] | ||
except KeyError: | ||
log.warning("Unexpected module connected: {} on {}" | ||
.format(name, port)) | ||
continue | ||
absolute_port = '/dev/modules/{}'.format(port) | ||
discovered_modules.append(module_class.build(absolute_port)) | ||
|
||
log.debug('Discovered modules: {}'.format(discovered_modules)) | ||
|
||
return discovered_modules | ||
|
||
|
||
async def enter_bootloader(module): | ||
""" | ||
Using the driver method, enter bootloader mode of the atmega32u4. | ||
The bootloader mode opens a new port on the uC to upload the hex file. | ||
After receiving a 'dfu' command, the firmware provides a 3-second window to | ||
close the current port so as to do a clean switch to the bootloader port. | ||
The new port shows up as 'ttyn_bootloader' on the pi; upload fw through it. | ||
NOTE: Modules with old bootloader will have the bootloader port show up as | ||
a regular module port- 'ttyn_tempdeck'/ 'ttyn_magdeck' with the port number | ||
being either different or same as the one that the module was originally on | ||
So we check for changes in ports and use the appropriate one | ||
""" | ||
# Required for old bootloader | ||
ports_before_dfu_mode = await _discover_ports() | ||
|
||
module._driver.enter_programming_mode() | ||
module.disconnect() | ||
new_port = '' | ||
try: | ||
new_port = await asyncio.wait_for( | ||
_port_poll(_has_old_bootloader(module), ports_before_dfu_mode), | ||
PORT_SEARCH_TIMEOUT) | ||
except asyncio.TimeoutError: | ||
pass | ||
return new_port | ||
|
||
|
||
async def update_firmware(module, firmware_file_path, config_file_path, loop): | ||
""" | ||
Run avrdude firmware upload command. Switch back to normal module port | ||
Note: For modules with old bootloader, the kernel could assign the module | ||
a new port after the update (since the board is automatically reset). | ||
Scan for such a port change and use the appropriate port | ||
""" | ||
# TODO: Make sure the module isn't in the middle of operation | ||
|
||
ports_before_update = await _discover_ports() | ||
|
||
proc = await asyncio.create_subprocess_exec( | ||
'avrdude', '-C{}'.format(config_file_path), '-v', | ||
'-p{}'.format(PART_NO), | ||
'-c{}'.format(PROGRAMMER_ID), | ||
'-P{}'.format(module.port), | ||
'-b{}'.format(BAUDRATE), '-D', | ||
'-Uflash:w:{}:i'.format(firmware_file_path), | ||
stdout=asyncio.subprocess.PIPE, | ||
stderr=asyncio.subprocess.PIPE, loop=loop) | ||
await proc.wait() | ||
|
||
_result = await proc.communicate() | ||
result = _result[1].decode() | ||
log.debug(result) | ||
log.debug("Switching back to non-bootloader port") | ||
module._port = _port_on_mode_switch(ports_before_update) | ||
|
||
return _format_avrdude_response(result) | ||
|
||
|
||
def _format_avrdude_response(raw_response): | ||
response = {'message': '', 'avrdudeResponse': ''} | ||
avrdude_log = '' | ||
for line in raw_response.splitlines(): | ||
if 'avrdude:' in line and line != raw_response.splitlines()[1]: | ||
avrdude_log += line.lstrip('avrdude:') + '..' | ||
if 'flash verified' in line: | ||
response['message'] = 'Firmware update successful' | ||
response['avrdudeResponse'] = line.lstrip('avrdude: ') | ||
if not response['message']: | ||
response['message'] = 'Firmware update failed' | ||
response['avrdudeResponse'] = avrdude_log | ||
return response | ||
|
||
|
||
async def _port_on_mode_switch(ports_before_switch): | ||
ports_after_switch = await _discover_ports() | ||
new_port = '' | ||
if ports_after_switch and \ | ||
len(ports_after_switch) >= len(ports_before_switch) and \ | ||
not set(ports_before_switch) == set(ports_after_switch): | ||
new_ports = list(filter( | ||
lambda x: x not in ports_before_switch, | ||
ports_after_switch)) | ||
if len(new_ports) > 1: | ||
raise OSError('Multiple new ports found on mode switch') | ||
new_port = '/dev/modules/{}'.format(new_ports[0]) | ||
return new_port | ||
|
||
|
||
async def _port_poll(is_old_bootloader, ports_before_switch=None): | ||
""" | ||
Checks for the bootloader port | ||
""" | ||
new_port = '' | ||
while not new_port: | ||
if is_old_bootloader: | ||
new_port = await _port_on_mode_switch(ports_before_switch) | ||
else: | ||
ports = await _discover_ports() | ||
if ports: | ||
discovered_ports = list(filter( | ||
lambda x: x.endswith('bootloader'), ports)) | ||
if len(discovered_ports) == 1: | ||
new_port = '/dev/modules/{}'.format(discovered_ports[0]) | ||
await asyncio.sleep(0.05) | ||
return new_port | ||
|
||
|
||
def _has_old_bootloader(module): | ||
return True if module.device_info.get('model') == 'temp_deck_v1' or \ | ||
module.device_info.get('model') == 'temp_deck_v2' else False | ||
|
||
|
||
async def _discover_ports(): | ||
if os.environ.get('RUNNING_ON_PI') and os.path.isdir('/dev/modules'): | ||
for attempt in range(2): | ||
# Measure for race condition where port is being switched in | ||
# between calls to isdir() and listdir() | ||
try: | ||
return os.listdir('/dev/modules') | ||
except (FileNotFoundError, OSError): | ||
pass | ||
await asyncio.sleep(2) | ||
raise Exception("No /dev/modules found. Try again") |
Oops, something went wrong.