Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add probe EZSP method. #223

Merged
merged 2 commits into from
Mar 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions bellows/exception.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
from zigpy.exceptions import ZigbeeException
from zigpy.exceptions import APIException, ControllerException


class BellowsException(ZigbeeException):
class EzspError(APIException):
pass


class EzspError(BellowsException):
pass


class ControllerError(BellowsException):
class ControllerError(ControllerException):
pass
24 changes: 23 additions & 1 deletion bellows/ezsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import logging

from bellows.commands import COMMANDS
from bellows.exception import EzspError
from bellows.exception import APIException, EzspError
import bellows.types as t
import bellows.uart as uart
import serial

EZSP_CMD_TIMEOUT = 10
LOGGER = logging.getLogger(__name__)
PROBE_TIMEOUT = 3


class EZSP:
Expand Down Expand Up @@ -37,6 +39,26 @@ async def connect(self, device, baudrate):
self._device = device
self._gw = await uart.connect(device, baudrate, self)

@classmethod
async def probe(cls, device: str, baudrate: int) -> bool:
"""Probe port for the device presence."""
ezsp = cls()
try:
await asyncio.wait_for(ezsp._probe(device, baudrate), timeout=PROBE_TIMEOUT)
return True
except (asyncio.TimeoutError, serial.SerialException, APIException) as exc:
LOGGER.debug("Unsuccessful radio probe of '%s' port", exc_info=exc)
finally:
ezsp.close()

return False

async def _probe(self, device: str, baudrate: int) -> None:
"""Open port and try sending a command"""
await self.connect(device, baudrate)
await self.reset()
self.close()

def reconnect(self):
"""Reconnect using saved parameters."""
LOGGER.debug(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"pure_pcapy3==1.0.1",
"pyserial-asyncio",
"voluptuous",
"zigpy-homeassistant>=0.12.0",
"zigpy-homeassistant>=0.17.0",
],
dependency_links=["https://codeload.github.com/rcloran/pure-pcapy-3/zip/master"],
tests_require=["asynctest", "pytest", "pytest-asyncio"],
Expand Down
50 changes: 49 additions & 1 deletion tests/test_ezsp.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import functools
from unittest import mock

from asynctest import CoroutineMock, mock
from bellows import ezsp, uart
from bellows.exception import EzspError
import pytest
import serial


@pytest.fixture
Expand Down Expand Up @@ -297,3 +298,50 @@ def test_ezsp_frame(ezsp_f):
ezsp_f._ezsp_version = 5
data = ezsp_f._ezsp_frame("version", 6)
assert data == b"\x22\x00\xff\x00\x00\x06"


@pytest.mark.asyncio
@mock.patch.object(ezsp.EZSP, "reset", new_callable=CoroutineMock)
@mock.patch.object(uart, "connect")
async def test_probe_success(mock_connect, mock_reset):
"""Test device probing."""

res = await ezsp.EZSP.probe(mock.sentinel.uart, mock.sentinel.baud)
assert res is True
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] is mock.sentinel.uart
assert mock_reset.call_count == 1
assert mock_connect.return_value.close.call_count == 1

mock_connect.reset_mock()
mock_reset.reset_mock()
mock_connect.reset_mock()
res = await ezsp.EZSP.probe(mock.sentinel.uart, mock.sentinel.baud)
assert res is True
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] is mock.sentinel.uart
assert mock_reset.call_count == 1
assert mock_connect.return_value.close.call_count == 1


@pytest.mark.asyncio
@mock.patch.object(ezsp.EZSP, "reset", new_callable=CoroutineMock)
@mock.patch.object(uart, "connect")
@pytest.mark.parametrize(
"exception", (asyncio.TimeoutError, serial.SerialException, EzspError)
)
async def test_probe_fail(mock_connect, mock_reset, exception):
"""Test device probing fails."""

mock_reset.side_effect = exception
mock_reset.reset_mock()
mock_connect.reset_mock()
res = await ezsp.EZSP.probe(mock.sentinel.uart, mock.sentinel.baud)
assert res is False
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] is mock.sentinel.uart
assert mock_reset.call_count == 1
assert mock_connect.return_value.close.call_count == 1