diff --git a/bellows/exception.py b/bellows/exception.py index 17114faa..25fbb311 100644 --- a/bellows/exception.py +++ b/bellows/exception.py @@ -1,13 +1,9 @@ -from zigpy.exceptions import ZigbeeException +from zigpy.exceptions import APIException, ControllerException -class BellowsException(ZigbeeException): +class EzspError(APIException): pass -class EzspError(BellowsException): - pass - - -class ControllerError(BellowsException): +class ControllerError(ControllerException): pass diff --git a/bellows/ezsp.py b/bellows/ezsp.py index a28b1d47..0c0c1e7f 100644 --- a/bellows/ezsp.py +++ b/bellows/ezsp.py @@ -4,12 +4,14 @@ import logging from bellows.commands import COMMANDS -from bellows.exception import EzspError +from bellows.exception import APIException, EzspError import bellows.types as t import bellows.uart as uart +import serial EZSP_CMD_TIMEOUT = 10 LOGGER = logging.getLogger(__name__) +PROBE_TIMEOUT = 3 class EZSP: @@ -37,6 +39,26 @@ async def connect(self, device, baudrate): self._device = device self._gw = await uart.connect(device, baudrate, self) + @classmethod + async def probe(cls, device: str, baudrate: int) -> bool: + """Probe port for the device presence.""" + ezsp = cls() + try: + await asyncio.wait_for(ezsp._probe(device, baudrate), timeout=PROBE_TIMEOUT) + return True + except (asyncio.TimeoutError, serial.SerialException, APIException) as exc: + LOGGER.debug("Unsuccessful radio probe of '%s' port", exc_info=exc) + finally: + ezsp.close() + + return False + + async def _probe(self, device: str, baudrate: int) -> None: + """Open port and try sending a command""" + await self.connect(device, baudrate) + await self.reset() + self.close() + def reconnect(self): """Reconnect using saved parameters.""" LOGGER.debug( diff --git a/setup.py b/setup.py index 09de73b9..61fbcd64 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ "pure_pcapy3==1.0.1", "pyserial-asyncio", "voluptuous", - "zigpy-homeassistant>=0.12.0", + "zigpy-homeassistant>=0.17.0", ], dependency_links=["https://codeload.github.com/rcloran/pure-pcapy-3/zip/master"], tests_require=["asynctest", "pytest", "pytest-asyncio"], diff --git a/tests/test_ezsp.py b/tests/test_ezsp.py index 54645da3..09678979 100644 --- a/tests/test_ezsp.py +++ b/tests/test_ezsp.py @@ -1,10 +1,11 @@ import asyncio import functools -from unittest import mock +from asynctest import CoroutineMock, mock from bellows import ezsp, uart from bellows.exception import EzspError import pytest +import serial @pytest.fixture @@ -297,3 +298,50 @@ def test_ezsp_frame(ezsp_f): ezsp_f._ezsp_version = 5 data = ezsp_f._ezsp_frame("version", 6) assert data == b"\x22\x00\xff\x00\x00\x06" + + +@pytest.mark.asyncio +@mock.patch.object(ezsp.EZSP, "reset", new_callable=CoroutineMock) +@mock.patch.object(uart, "connect") +async def test_probe_success(mock_connect, mock_reset): + """Test device probing.""" + + res = await ezsp.EZSP.probe(mock.sentinel.uart, mock.sentinel.baud) + assert res is True + assert mock_connect.call_count == 1 + assert mock_connect.await_count == 1 + assert mock_connect.call_args[0][0] is mock.sentinel.uart + assert mock_reset.call_count == 1 + assert mock_connect.return_value.close.call_count == 1 + + mock_connect.reset_mock() + mock_reset.reset_mock() + mock_connect.reset_mock() + res = await ezsp.EZSP.probe(mock.sentinel.uart, mock.sentinel.baud) + assert res is True + assert mock_connect.call_count == 1 + assert mock_connect.await_count == 1 + assert mock_connect.call_args[0][0] is mock.sentinel.uart + assert mock_reset.call_count == 1 + assert mock_connect.return_value.close.call_count == 1 + + +@pytest.mark.asyncio +@mock.patch.object(ezsp.EZSP, "reset", new_callable=CoroutineMock) +@mock.patch.object(uart, "connect") +@pytest.mark.parametrize( + "exception", (asyncio.TimeoutError, serial.SerialException, EzspError) +) +async def test_probe_fail(mock_connect, mock_reset, exception): + """Test device probing fails.""" + + mock_reset.side_effect = exception + mock_reset.reset_mock() + mock_connect.reset_mock() + res = await ezsp.EZSP.probe(mock.sentinel.uart, mock.sentinel.baud) + assert res is False + assert mock_connect.call_count == 1 + assert mock_connect.await_count == 1 + assert mock_connect.call_args[0][0] is mock.sentinel.uart + assert mock_reset.call_count == 1 + assert mock_connect.return_value.close.call_count == 1