Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new zigpy radio API #123

Merged
merged 18 commits into from
Jun 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
repos:
- repo: https://github.com/psf/black
rev: 19.10b0
rev: 22.3.0
hooks:
- id: black
args:
- --safe
- --quiet
- repo: https://gitlab.com/pycqa/flake8
rev: 3.7.9
rev: 4.0.1
hooks:
- id: flake8
- repo: https://github.com/pre-commit/mirrors-isort
rev: v4.3.21
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
hooks:
- id: isort
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
rev: v4.1.0
hooks:
- id: no-commit-to-branch
args:
7 changes: 4 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -20,12 +20,13 @@ force_grid_wrap=0
use_parentheses=True
line_length=88
indent = " "
# by default isort don't check module indexes
not_skip = __init__.py
# will group `import x` and `from x import` of the same module.
force_sort_within_sections = true
sections = FUTURE,STDLIB,INBETWEENS,THIRDPARTY,FIRSTPARTY,LOCALFOLDER
sections = FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER
default_section = THIRDPARTY
known_first_party = zigpy_xbee,tests
forced_separate = tests
combine_as_imports = true

[tool:pytest]
asyncio_mode = auto
12 changes: 4 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
"""Setup module for zigpy-xbee"""

import os
import pathlib

from setuptools import find_packages, setup

import zigpy_xbee

this_directory = os.path.join(os.path.abspath(os.path.dirname(__file__)))
with open(os.path.join(this_directory, "README.md"), encoding="utf-8") as f:
long_description = f.read()

setup(
name="zigpy-xbee",
version=zigpy_xbee.__version__,
description="A library which communicates with XBee radios for zigpy",
long_description=long_description,
long_description=(pathlib.Path(__file__).parent / "README.md").read_text(),
long_description_content_type="text/markdown",
url="http://github.com/zigpy/zigpy-xbee",
author="Russell Cloran",
author_email="[email protected]",
license="GPL-3.0",
packages=find_packages(exclude=["*.tests"]),
install_requires=["pyserial-asyncio", "zigpy>= 0.23.0"],
tests_require=["pytest"],
install_requires=["pyserial-asyncio", "zigpy>=0.47.0"],
tests_require=["pytest", "asynctest", "pytest-asyncio"],
)
Empty file added tests/__init__.py
Empty file.
9 changes: 9 additions & 0 deletions tests/async_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Mock utilities that are async aware."""
import sys

if sys.version_info[:2] < (3, 8):
from asynctest.mock import * # noqa

AsyncMock = CoroutineMock # noqa: F405
else:
from unittest.mock import * # noqa
67 changes: 17 additions & 50 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import logging

from asynctest import CoroutineMock, mock
import pytest
import serial
import zigpy.exceptions
@@ -10,6 +9,8 @@
import zigpy_xbee.config
from zigpy_xbee.zigbee.application import ControllerApplication

import tests.async_mock as mock

DEVICE_CONFIG = zigpy_xbee.config.SCHEMA_DEVICE(
{zigpy_xbee.config.CONF_DEVICE_PATH: "/dev/null"}
)
@@ -22,10 +23,9 @@ def api():
return api


@pytest.mark.asyncio
async def test_connect(monkeypatch):
api = xbee_api.XBee(DEVICE_CONFIG)
monkeypatch.setattr(uart, "connect", CoroutineMock())
monkeypatch.setattr(uart, "connect", mock.AsyncMock())
await api.connect()


@@ -52,7 +52,6 @@ def test_commands():
assert reply is None or isinstance(reply, int)


@pytest.mark.asyncio
async def test_command(api):
def mock_api_frame(name, *args):
c = xbee_api.COMMAND_REQUESTS[name]
@@ -90,7 +89,6 @@ def mock_api_frame(name, *args):
api._uart.send.reset_mock()


@pytest.mark.asyncio
async def test_command_not_connected(api):
api._uart = None

@@ -135,20 +133,17 @@ def mock_command(name, *args):
api._command.reset_mock()


@pytest.mark.asyncio
async def test_at_command(api, monkeypatch):
await _test_at_or_queued_at_command(api, api._at_command, monkeypatch)


@pytest.mark.asyncio
async def test_at_command_no_response(api, monkeypatch):
with pytest.raises(asyncio.TimeoutError):
await _test_at_or_queued_at_command(
api, api._at_command, monkeypatch, do_reply=False
)


@pytest.mark.asyncio
async def test_queued_at_command(api, monkeypatch):
await _test_at_or_queued_at_command(api, api._queued_at, monkeypatch)

@@ -191,12 +186,10 @@ def mock_command(name, *args):
api._command.reset_mock()


@pytest.mark.asyncio
async def test_remote_at_cmd(api, monkeypatch):
await _test_remote_at_command(api, monkeypatch)


@pytest.mark.asyncio
async def test_remote_at_cmd_no_rsp(api, monkeypatch):
monkeypatch.setattr(xbee_api, "REMOTE_AT_COMMAND_TIMEOUT", 0.1)
with pytest.raises(asyncio.TimeoutError):
@@ -417,7 +410,6 @@ def test_handle_tx_status_duplicate(api):
assert send_fut.set_exception.call_count == 0


@pytest.mark.asyncio
async def test_command_mode_at_cmd(api):
command = "+++"

@@ -430,7 +422,6 @@ def cmd_mode_send(cmd):
assert result


@pytest.mark.asyncio
async def test_command_mode_at_cmd_timeout(api):
command = "+++"

@@ -462,21 +453,15 @@ def test_handle_command_mode_rsp(api):
assert api._cmd_mode_future.result() == data


@pytest.mark.asyncio
async def test_enter_at_command_mode(api):
api.command_mode_at_cmd = mock.MagicMock(
side_effect=asyncio.coroutine(lambda x: mock.sentinel.at_response)
)
api.command_mode_at_cmd = mock.AsyncMock(return_value=mock.sentinel.at_response)

res = await api.enter_at_command_mode()
assert res == mock.sentinel.at_response


@pytest.mark.asyncio
async def test_api_mode_at_commands(api):
api.command_mode_at_cmd = mock.MagicMock(
side_effect=asyncio.coroutine(lambda x: mock.sentinel.api_mode)
)
api.command_mode_at_cmd = mock.AsyncMock(return_value=mock.sentinel.api_mode)

res = await api.api_mode_at_commands(57600)
assert res is True
@@ -491,20 +476,15 @@ async def mock_at_cmd(cmd):
assert res is None


@pytest.mark.asyncio
async def test_init_api_mode(api, monkeypatch):
monkeypatch.setattr(api._uart, "baudrate", 57600)
api.enter_at_command_mode = mock.MagicMock(
side_effect=asyncio.coroutine(mock.MagicMock(return_value=True))
)
api.enter_at_command_mode = mock.AsyncMock(return_value=True)

res = await api.init_api_mode()
assert res is None
assert api.enter_at_command_mode.call_count == 1

api.enter_at_command_mode = mock.MagicMock(
side_effect=asyncio.coroutine(mock.MagicMock(return_value=False))
)
api.enter_at_command_mode = mock.AsyncMock(return_value=False)

res = await api.init_api_mode()
assert res is False
@@ -517,9 +497,7 @@ async def enter_at_mode():

api._uart.baudrate = 57600
api.enter_at_command_mode = mock.MagicMock(side_effect=enter_at_mode)
api.api_mode_at_commands = mock.MagicMock(
side_effect=asyncio.coroutine(mock.MagicMock(return_value=True))
)
api.api_mode_at_commands = mock.AsyncMock(return_value=True)

res = await api.init_api_mode()
assert res is True
@@ -542,21 +520,16 @@ def test_handle_many_to_one_rri(api):
api._handle_many_to_one_rri(ieee, nwk, 0)


@pytest.mark.asyncio
async def test_reconnect_multiple_disconnects(monkeypatch, caplog):
api = xbee_api.XBee(DEVICE_CONFIG)
connect_mock = CoroutineMock()
connect_mock.return_value = asyncio.Future()
connect_mock.return_value.set_result(True)
connect_mock = mock.AsyncMock(return_value=True)
monkeypatch.setattr(uart, "connect", connect_mock)

await api.connect()

caplog.set_level(logging.DEBUG)
connected = asyncio.Future()
connected.set_result(mock.sentinel.uart_reconnect)
connect_mock.reset_mock()
connect_mock.side_effect = [asyncio.Future(), connected]
connect_mock.side_effect = [OSError, mock.sentinel.uart_reconnect]
api.connection_lost("connection lost")
await asyncio.sleep(0.3)
api.connection_lost("connection lost 2")
@@ -567,21 +540,20 @@ async def test_reconnect_multiple_disconnects(monkeypatch, caplog):
assert connect_mock.call_count == 2


@pytest.mark.asyncio
async def test_reconnect_multiple_attempts(monkeypatch, caplog):
api = xbee_api.XBee(DEVICE_CONFIG)
connect_mock = CoroutineMock()
connect_mock.return_value = asyncio.Future()
connect_mock.return_value.set_result(True)
connect_mock = mock.AsyncMock(return_value=True)
monkeypatch.setattr(uart, "connect", connect_mock)

await api.connect()

caplog.set_level(logging.DEBUG)
connected = asyncio.Future()
connected.set_result(mock.sentinel.uart_reconnect)
connect_mock.reset_mock()
connect_mock.side_effect = [asyncio.TimeoutError, OSError, connected]
connect_mock.side_effect = [
asyncio.TimeoutError,
OSError,
mock.sentinel.uart_reconnect,
]

with mock.patch("asyncio.sleep"):
api.connection_lost("connection lost")
@@ -591,8 +563,7 @@ async def test_reconnect_multiple_attempts(monkeypatch, caplog):
assert connect_mock.call_count == 3


@pytest.mark.asyncio
@mock.patch.object(xbee_api.XBee, "_at_command", new_callable=CoroutineMock)
@mock.patch.object(xbee_api.XBee, "_at_command", new_callable=mock.AsyncMock)
@mock.patch.object(uart, "connect")
async def test_probe_success(mock_connect, mock_at_cmd):
"""Test device probing."""
@@ -606,7 +577,6 @@ async def test_probe_success(mock_connect, mock_at_cmd):
assert mock_connect.return_value.close.call_count == 1


@pytest.mark.asyncio
@mock.patch.object(xbee_api.XBee, "init_api_mode", return_value=True)
@mock.patch.object(xbee_api.XBee, "_at_command", side_effect=asyncio.TimeoutError)
@mock.patch.object(uart, "connect")
@@ -623,7 +593,6 @@ async def test_probe_success_api_mode(mock_connect, mock_at_cmd, mock_api_mode):
assert mock_connect.return_value.close.call_count == 1


@pytest.mark.asyncio
@mock.patch.object(xbee_api.XBee, "init_api_mode")
@mock.patch.object(xbee_api.XBee, "_at_command", side_effect=asyncio.TimeoutError)
@mock.patch.object(uart, "connect")
@@ -648,7 +617,6 @@ async def test_probe_fail(mock_connect, mock_at_cmd, mock_api_mode, exception):
assert mock_connect.return_value.close.call_count == 1


@pytest.mark.asyncio
@mock.patch.object(xbee_api.XBee, "init_api_mode", return_value=False)
@mock.patch.object(xbee_api.XBee, "_at_command", side_effect=asyncio.TimeoutError)
@mock.patch.object(uart, "connect")
@@ -668,7 +636,6 @@ async def test_probe_fail_api_mode(mock_connect, mock_at_cmd, mock_api_mode):
assert mock_connect.return_value.close.call_count == 1


@pytest.mark.asyncio
@mock.patch.object(xbee_api.XBee, "connect")
async def test_xbee_new(conn_mck):
"""Test new class method."""
169 changes: 76 additions & 93 deletions tests/test_application.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import asyncio

from asynctest import CoroutineMock, mock
import pytest
from zigpy import types as t
import zigpy.exceptions
from zigpy.zdo.types import ZDOCmd

from zigpy_xbee.api import ModemStatus, XBee
import zigpy_xbee.config as config
import zigpy_xbee.types as xbee_t
from zigpy_xbee.zigbee import application

import tests.async_mock as mock

APP_CONFIG = {
config.CONF_DEVICE: {
config.CONF_DEVICE_PATH: "/dev/null",
@@ -26,8 +28,11 @@ def app(monkeypatch):
monkeypatch.setattr(application, "TIMEOUT_REPLY_EXTENDED", 0.1)
app = application.ControllerApplication(APP_CONFIG)
api = XBee(APP_CONFIG[config.CONF_DEVICE])
monkeypatch.setattr(api, "_command", CoroutineMock())
monkeypatch.setattr(api, "_command", mock.AsyncMock())
app._api = api

app.state.node_info.nwk = 0x0000
app.state.node_info.ieee = t.EUI64.convert("aa:bb:cc:dd:ee:ff:00:11")
return app


@@ -92,7 +97,7 @@ def test_rx_nwk_0000(app):
b"",
)
assert app.handle_message.call_count == 1
assert app.get_device.call_count == 1
assert app.get_device.call_count == 2


def test_rx_unknown_device(app, device):
@@ -101,7 +106,8 @@ def test_rx_unknown_device(app, device):
app.handle_join = mock.MagicMock()
dev = device(nwk=0x1234)
app.devices[dev.ieee] = dev
app.get_device = mock.MagicMock(side_effect=[KeyError, dev])

num_before_rx = len(app.devices)
app.handle_rx(
b"\x08\x07\x06\x05\x04\x03\x02\x01",
0x3334,
@@ -113,8 +119,8 @@ def test_rx_unknown_device(app, device):
b"",
)
assert app.handle_join.call_count == 1
assert app.get_device.call_count == 2
assert app.handle_message.call_count == 1
assert len(app.devices) == num_before_rx


def test_rx_unknown_device_ieee(app):
@@ -133,7 +139,7 @@ def test_rx_unknown_device_ieee(app):
b"",
)
assert app.handle_join.call_count == 0
assert app.get_device.call_count == 1
assert app.get_device.call_count == 2
assert app.handle_message.call_count == 0


@@ -188,7 +194,6 @@ def test_device_join_inconsistent_ieee(app, device):
_device_join(app, dev, data)


@pytest.mark.asyncio
async def test_broadcast(app):
(profile, cluster, src_ep, dst_ep, grpid, radius, tsn, data) = (
0x260,
@@ -220,19 +225,17 @@ async def test_broadcast(app):
assert r[0] != xbee_t.TXStatus.SUCCESS


@pytest.mark.asyncio
async def test_get_association_state(app):
ai_results = (0xFF, 0xFF, 0xFF, 0xFF, mock.sentinel.ai)
app._api._at_command = mock.MagicMock(
app._api._at_command = mock.AsyncMock(
spec=XBee._at_command,
side_effect=asyncio.coroutine(mock.MagicMock(side_effect=ai_results)),
side_effect=ai_results,
)
ai = await app._get_association_state()
assert app._api._at_command.call_count == len(ai_results)
assert ai is mock.sentinel.ai


@pytest.mark.asyncio
async def test_form_network(app):
legacy_module = False

@@ -251,29 +254,38 @@ async def mock_at_command(cmd, *args):
app._api._queued_at = mock.MagicMock(
spec=XBee._at_command, side_effect=mock_at_command
)
app._get_association_state = mock.MagicMock(
app._get_association_state = mock.AsyncMock(
spec=application.ControllerApplication._get_association_state,
side_effect=asyncio.coroutine(mock.MagicMock(return_value=0x00)),
return_value=0x00,
)

app.write_network_info = mock.MagicMock(wraps=app.write_network_info)

await app.form_network()
assert app._api._at_command.call_count >= 1
assert app._api._queued_at.call_count >= 7
assert app._nwk == 0x0000

network_info = app.write_network_info.mock_calls[0][2]["network_info"]

app._api._queued_at.assert_any_call("SC", 1 << (network_info.channel - 11))
app._api._queued_at.assert_any_call("KY", b"ZigBeeAlliance09")

app._api._at_command.reset_mock()
app._api._queued_at.reset_mock()
legacy_module = True
await app.form_network()
assert app._api._at_command.call_count >= 1
assert app._api._queued_at.call_count >= 7
assert app._nwk == 0x0000

network_info = app.write_network_info.mock_calls[0][2]["network_info"]

app._api._queued_at.assert_any_call("SC", 1 << (network_info.channel - 11))
app._api._queued_at.assert_any_call("KY", b"ZigBeeAlliance09")


async def _test_startup(
async def _test_start_network(
app,
ai_status=0xFF,
auto_form=False,
api_mode=True,
api_config_succeeds=True,
ee=1,
@@ -282,9 +294,9 @@ async def _test_startup(
legacy_module=False,
):
ai_tries = 5
app._nwk = mock.sentinel.nwk
app.state.node_info.nwk = mock.sentinel.nwk

async def _at_command_mock(cmd, *args):
def _at_command_mock(cmd, *args):
nonlocal ai_tries
if not api_mode:
raise asyncio.TimeoutError
@@ -297,105 +309,86 @@ async def _at_command_mock(cmd, *args):
"CE": 1 if ai_status == 0 else 0,
"EO": eo,
"EE": ee,
"ID": mock.sentinel.at_id,
"ID": 0x25DCF87E03EA5906,
"MY": 0xFFFE if ai_status else 0x0000,
"NJ": mock.sentinel.at_nj,
"OI": mock.sentinel.at_oi,
"OI": 0xDD94,
"OP": mock.sentinel.at_op,
"SH": 0x08070605,
"SL": 0x04030201,
"ZS": zs,
}.get(cmd, None)

async def init_api_mode_mock():
def init_api_mode_mock():
nonlocal api_mode
api_mode = api_config_succeeds
return api_config_succeeds

app.form_network = CoroutineMock()
with mock.patch("zigpy_xbee.api.XBee") as XBee_mock:
api_mock = mock.MagicMock()
api_mock._at_command = mock.AsyncMock(side_effect=_at_command_mock)
api_mock.init_api_mode = mock.AsyncMock(side_effect=init_api_mode_mock)

with mock.patch.object(XBee, "new") as api:
api.return_value._at_command = CoroutineMock(side_effect=_at_command_mock)
api.return_value.init_api_mode = CoroutineMock(side_effect=init_api_mode_mock)
await app.startup(auto_form=auto_form)
return app
XBee_mock.new = mock.AsyncMock(return_value=api_mock)

await app.connect()

@pytest.mark.asyncio
async def test_startup_ai(app):
auto_form = True
await _test_startup(app, 0x00, auto_form)
assert app._nwk == 0x0000
assert app._ieee == t.EUI64(range(1, 9))
assert app.form_network.call_count == 0
app.form_network = mock.AsyncMock()
await app.load_network_info()
await app.start_network()
return app

auto_form = False
await _test_startup(app, 0x00, auto_form)
assert app._nwk == 0x0000
assert app._ieee == t.EUI64(range(1, 9))
assert app.form_network.call_count == 0

auto_form = True
await _test_startup(app, 0x06, auto_form)
assert app._nwk == 0xFFFE
assert app._ieee == t.EUI64(range(1, 9))
assert app.form_network.call_count == 1
async def test_start_network(app):
await _test_start_network(app, ai_status=0x00)
assert app.state.node_info.nwk == 0x0000
assert app.state.node_info.ieee == t.EUI64(range(1, 9))
assert app.state.network_info.pan_id == 0xDD94
assert app.state.network_info.extended_pan_id == t.ExtendedPanId.convert(
"25:dc:f8:7e:03:ea:59:06"
)

auto_form = False
await _test_startup(app, 0x06, auto_form)
assert app._nwk == 0xFFFE
assert app._ieee == t.EUI64(range(1, 9))
await _test_start_network(app, ai_status=0x00)
assert app.state.node_info.nwk == 0x0000
assert app.state.node_info.ieee == t.EUI64(range(1, 9))
assert app.form_network.call_count == 0

auto_form = True
await _test_startup(app, 0x00, auto_form, zs=1)
assert app._nwk == 0x0000
assert app._ieee == t.EUI64(range(1, 9))
assert app.form_network.call_count == 1
with pytest.raises(zigpy.exceptions.NetworkNotFormed):
await _test_start_network(app, ai_status=0x06)

auto_form = False
await _test_startup(app, 0x06, auto_form, legacy_module=True)
assert app._nwk == 0xFFFE
assert app._ieee == t.EUI64(range(1, 9))
assert app.form_network.call_count == 0
with pytest.raises(zigpy.exceptions.NetworkNotFormed):
await _test_start_network(app, ai_status=0x00, zs=1)

auto_form = True
await _test_startup(app, 0x00, auto_form, zs=1, legacy_module=True)
assert app._nwk == 0x0000
assert app._ieee == t.EUI64(range(1, 9))
assert app.form_network.call_count == 1
with pytest.raises(zigpy.exceptions.NetworkNotFormed):
await _test_start_network(app, ai_status=0x06, legacy_module=True)

with pytest.raises(zigpy.exceptions.NetworkNotFormed):
await _test_start_network(app, ai_status=0x00, zs=1, legacy_module=True)

@pytest.mark.asyncio
async def test_startup_no_api_mode(app):
auto_form = True
await _test_startup(app, 0x00, auto_form, api_mode=False)
assert app._nwk == 0x0000
assert app._ieee == t.EUI64(range(1, 9))
assert app.form_network.call_count == 0

async def test_start_network_no_api_mode(app):
await _test_start_network(app, ai_status=0x00, api_mode=False)
assert app.state.node_info.nwk == 0x0000
assert app.state.node_info.ieee == t.EUI64(range(1, 9))
assert app._api.init_api_mode.call_count == 1
assert app._api._at_command.call_count >= 16


@pytest.mark.asyncio
async def test_startup_api_mode_config_fails(app):
auto_form = True
await _test_startup(app, 0x00, auto_form, api_mode=False, api_config_succeeds=False)
assert app._nwk == mock.sentinel.nwk
assert app._ieee is None
assert app.form_network.call_count == 0
async def test_start_network_api_mode_config_fails(app):
with pytest.raises(zigpy.exceptions.ControllerException):
await _test_start_network(
app, ai_status=0x00, api_mode=False, api_config_succeeds=False
)

assert app._api.init_api_mode.call_count == 1
assert app._api._at_command.call_count == 1


@pytest.mark.asyncio
async def test_permit(app):
app._api._at_command = mock.MagicMock(
side_effect=asyncio.coroutine(mock.MagicMock())
)
app._api._at_command = mock.AsyncMock()
time_s = 30
await app.permit_ncp(time_s)
assert app._api._at_command.call_count == 3
assert app._api._at_command.call_count == 2
assert app._api._at_command.call_args_list[0][0][1] == time_s


@@ -444,31 +437,26 @@ def _mock_command(
)


@pytest.mark.asyncio
async def test_request_with_reply(app):
r = await _test_request(app, expect_reply=True, send_success=True)
assert r[0] == 0


@pytest.mark.asyncio
async def test_request_without_node_desc(app):
r = await _test_request(app, expect_reply=True, send_success=True, node_desc=False)
assert r[0] == 0


@pytest.mark.asyncio
async def test_request_send_timeout(app):
r = await _test_request(app, send_timeout=True)
assert r[0] != 0


@pytest.mark.asyncio
async def test_request_send_fail(app):
r = await _test_request(app, send_success=False)
assert r[0] != 0


@pytest.mark.asyncio
async def test_request_extended_timeout(app):
is_end_device = False
r = await _test_request(app, True, True, is_end_device=is_end_device)
@@ -491,12 +479,10 @@ async def test_request_extended_timeout(app):
app._api._command.reset_mock()


@pytest.mark.asyncio
async def test_force_remove(app):
await app.force_remove(mock.sentinel.device)


@pytest.mark.asyncio
async def test_shutdown(app):
app._api.close = mock.MagicMock()
await app.shutdown()
@@ -578,19 +564,16 @@ def _mock_command(
return await app.mrequest(group_id, 0x0260, 1, 2, seq, b"\xaa\x55\xbe\xef")


@pytest.mark.asyncio
async def test_mrequest_with_reply(app):
r = await _test_mrequest(app, send_success=True)
assert r[0] == 0


@pytest.mark.asyncio
async def test_mrequest_send_timeout(app):
r = await _test_mrequest(app, send_timeout=True)
assert r[0] != 0


@pytest.mark.asyncio
async def test_mrequest_send_fail(app):
r = await _test_mrequest(app, send_success=False)
assert r[0] != 0
6 changes: 3 additions & 3 deletions tests/test_types.py
Original file line number Diff line number Diff line change
@@ -24,20 +24,20 @@ def test_serialize():


def test_bytes_serialize():
data = 0x89AB .to_bytes(4, "big")
data = 0x89AB.to_bytes(4, "big")
result = t.Bytes(data).serialize()
assert result == data


def test_bytes_deserialize():
data, rest = t.Bytes.deserialize(0x89AB .to_bytes(3, "big"))
data, rest = t.Bytes.deserialize(0x89AB.to_bytes(3, "big"))
assert data == b"\x00\x89\xAB"
assert rest == b""


def test_atcommand():
cmd = "AI".encode("ascii")
data = 0x06 .to_bytes(4, "big")
data = 0x06.to_bytes(4, "big")
r_cmd, r_data = t.ATCommand.deserialize(cmd + data)

assert r_cmd == cmd
1 change: 0 additions & 1 deletion tests/test_uart.py
Original file line number Diff line number Diff line change
@@ -31,7 +31,6 @@ def test_baudrate_fail(gw):
gw.baudrate = 3333


@pytest.mark.asyncio
async def test_connect(monkeypatch):
api = mock.MagicMock()

5 changes: 3 additions & 2 deletions zigpy_xbee/api.py
Original file line number Diff line number Diff line change
@@ -284,7 +284,7 @@ async def new(
application: "zigpy_xbee.zigbee.application.ControllerApplication",
config: Dict[str, Any],
) -> "XBee":
"""Create new instance from """
"""Create new instance from"""
xbee_api = cls(config)
await xbee_api.connect()
xbee_api.set_application(application)
@@ -313,14 +313,15 @@ def connection_lost(self, exc: Exception) -> None:
self._uart = None
if self._conn_lost_task and not self._conn_lost_task.done():
self._conn_lost_task.cancel()
self._conn_lost_task = asyncio.ensure_future(self._connection_lost())
self._conn_lost_task = asyncio.create_task(self._connection_lost())

async def _connection_lost(self) -> None:
"""Reconnect serial port."""
try:
await self._reconnect_till_done()
except asyncio.CancelledError:
LOGGER.debug("Cancelling reconnection attempt")
raise

async def _reconnect_till_done(self) -> None:
attempt = 1
194 changes: 121 additions & 73 deletions zigpy_xbee/zigbee/application.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import asyncio
import binascii
import logging
import time
from typing import Any, Dict, Optional
from typing import Any

import zigpy.application
import zigpy.config
@@ -11,9 +13,11 @@
import zigpy.quirks
import zigpy.types
import zigpy.util
from zigpy.zcl import foundation
from zigpy.zcl.clusters.general import Groups
from zigpy.zdo.types import NodeDescriptor, ZDOCmd
import zigpy.zdo.types as zdo_t

import zigpy_xbee
import zigpy_xbee.api
from zigpy_xbee.config import CONF_DEVICE, CONFIG_SCHEMA, SCHEMA_DEVICE
from zigpy_xbee.types import EUI64, UNKNOWN_IEEE, UNKNOWN_NWK, TXStatus
@@ -37,110 +41,132 @@ class ControllerApplication(zigpy.application.ControllerApplication):

probe = zigpy_xbee.api.XBee.probe

def __init__(self, config: Dict[str, Any]):
def __init__(self, config: dict[str, Any]):
super().__init__(config=zigpy.config.ZIGPY_SCHEMA(config))
self._api: Optional[zigpy_xbee.api.XBee] = None
self._nwk = 0
self._api: zigpy_xbee.api.XBee | None = None

async def shutdown(self):
async def disconnect(self):
"""Shutdown application."""
if self._api:
self._api.close()

async def startup(self, auto_form=False):
"""Perform a complete application startup"""
async def connect(self):
self._api = await zigpy_xbee.api.XBee.new(self, self._config[CONF_DEVICE])
try:
# Ensure we have escaped commands
await self._api._at_command("AP", 2)
except asyncio.TimeoutError:
LOGGER.debug("No response to API frame. Configure API mode")
if not await self._api.init_api_mode():
LOGGER.error("Failed to configure XBee API mode.")
return False
raise zigpy.exceptions.ControllerException(
"Failed to configure XBee API mode."
)

await self._api._at_command("AO", 0x03)
async def start_network(self):
association_state = await asyncio.wait_for(
self._get_association_state(), timeout=4
)

serial_high = await self._api._at_command("SH")
serial_low = await self._api._at_command("SL")
ieee = EUI64.deserialize(
serial_high.to_bytes(4, "big") + serial_low.to_bytes(4, "big")
)[0]
self._ieee = zigpy.types.EUI64(ieee)
LOGGER.debug("Read local IEEE address as %s", self._ieee)
# Enable ZDO passthrough
await self._api._at_command("AO", 0x03)

try:
association_state = await asyncio.wait_for(
self._get_association_state(), timeout=4
)
except asyncio.TimeoutError:
association_state = 0xFF
self._nwk = await self._api._at_command("MY")
enc_enabled = await self._api._at_command("EE")
enc_options = await self._api._at_command("EO")
zb_profile = await self._api._at_command("ZS")

should_form = (
enc_enabled != 1,
enc_options != 2,
zb_profile != 2,
association_state != 0,
self._nwk != 0,
)
if auto_form and any(should_form):
await self.form_network()
if (
enc_enabled != 1
or enc_options != 2
or zb_profile != 2
or association_state != 0
or self.state.node_info.nwk != 0x0000
):
raise zigpy.exceptions.NetworkNotFormed("Network is not formed")

# Disable joins
await self._api._at_command("NJ", 0)
await self._api._at_command("SP", CONF_CYCLIC_SLEEP_PERIOD)
await self._api._at_command("SN", CONF_POLL_TIMEOUT)
id = await self._api._at_command("ID")
LOGGER.debug("Extended PAN ID: 0x%016x", id)
id = await self._api._at_command("OP")
LOGGER.debug("Operating Extended PAN ID: 0x%016x", id)
id = await self._api._at_command("OI")
LOGGER.debug("PAN ID: 0x%04x", id)
try:
ce = await self._api._at_command("CE")
LOGGER.debug("Coordinator %s", "enabled" if ce else "disabled")
except RuntimeError as exc:
LOGGER.debug("sending CE command: %s", exc)

dev = zigpy.device.Device(self, self.ieee, self.nwk)
dev = zigpy.device.Device(
self, self.state.node_info.ieee, self.state.node_info.nwk
)
dev.status = zigpy.device.Status.ENDPOINTS_INIT
dev.add_endpoint(XBEE_ENDPOINT_ID)
xbee_dev = XBeeCoordinator(self, self.ieee, self.nwk, dev)

xbee_dev = XBeeCoordinator(
self, self.state.node_info.ieee, self.state.node_info.nwk, dev
)
self.listener_event("raw_device_initialized", xbee_dev)
self.devices[dev.ieee] = xbee_dev

async def force_remove(self, dev):
"""Forcibly remove device from NCP."""
pass
async def load_network_info(self, *, load_devices=False):
# Load node info
node_info = self.state.node_info
node_info.nwk = zigpy.types.NWK(await self._api._at_command("MY"))
serial_high = await self._api._at_command("SH")
serial_low = await self._api._at_command("SL")
node_info.ieee = zigpy.types.EUI64(
(serial_high.to_bytes(4, "big") + serial_low.to_bytes(4, "big"))[::-1]
)

try:
if await self._api._at_command("CE") == 0x01:
node_info.logical_type = zdo_t.LogicalType.Coordinator
else:
node_info.logical_type = zdo_t.LogicalType.EndDevice
except RuntimeError:
LOGGER.warning("CE command failed, assuming node is coordinator")
node_info.logical_type = zdo_t.LogicalType.Coordinator

# Load network info
pan_id = await self._api._at_command("OI")
extended_pan_id = await self._api._at_command("ID")

network_info = self.state.network_info
network_info.source = f"zigpy-xbee@{zigpy_xbee.__version__}"
network_info.pan_id = zigpy.types.PanId(pan_id)
network_info.extended_pan_id = zigpy.types.ExtendedPanId(
zigpy.types.uint64_t(extended_pan_id).serialize()
)
network_info.channel = await self._api._at_command("CH")

async def write_network_info(self, *, network_info, node_info):
scan_bitmask = 1 << (network_info.channel - 11)

async def form_network(self, channel=15, pan_id=None, extended_pan_id=None):
LOGGER.info("Forming network on channel %s", channel)
scan_bitmask = 1 << (channel - 11)
await self._api._queued_at("ZS", 2)
await self._api._queued_at("SC", scan_bitmask)
await self._api._queued_at("EE", 1)
await self._api._queued_at("EO", 2)
await self._api._queued_at("NK", 0)
await self._api._queued_at("KY", b"ZigBeeAlliance09")

await self._api._queued_at("NK", network_info.network_key.key.serialize())
await self._api._queued_at("KY", network_info.tc_link_key.key.serialize())

await self._api._queued_at("NJ", 0)
await self._api._queued_at("SP", CONF_CYCLIC_SLEEP_PERIOD)
await self._api._queued_at("SN", CONF_POLL_TIMEOUT)

try:
await self._api._queued_at("CE", 1)
except RuntimeError:
pass

await self._api._at_command("WR")

await asyncio.wait_for(self._api.coordinator_started_event.wait(), timeout=10)
association_state = await asyncio.wait_for(
self._get_association_state(), timeout=10
)
LOGGER.debug("Association state: %s", association_state)
self._nwk = await self._api._at_command("MY")
assert self._nwk == 0x0000

async def force_remove(self, dev):
"""Forcibly remove device from NCP."""
pass

async def add_endpoint(self, descriptor):
"""Register a new endpoint on the device."""
# This is not provided by the XBee API
pass

async def _get_association_state(self):
"""Wait for Zigbee to start."""
@@ -161,7 +187,7 @@ async def mrequest(
data,
*,
hops=0,
non_member_radius=3
non_member_radius=3,
):
"""Submit and send data out as a multicast transmission.
:param group_id: destination multicast address
@@ -190,8 +216,8 @@ async def mrequest(
return TXStatus.NETWORK_ACK_FAILURE, "Timeout waiting for ACK"

if v != TXStatus.SUCCESS:
return v, "Error sending tsn #%s: %s".format(sequence, v.name)
return v, "Successfully sent tsn #%s: %s".format(sequence, v.name)
return v, f"Error sending tsn #{sequence}: {v.name}"
return v, f"Successfully sent tsn #{sequence}: {v.name}"

async def request(
self,
@@ -244,8 +270,8 @@ async def request(
return TXStatus.NETWORK_ACK_FAILURE, "Timeout waiting for ACK"

if v != TXStatus.SUCCESS:
return v, "Error sending tsn #%s: %s".format(sequence, v.name)
return v, "Succesfuly sent tsn #%s: %s".format(sequence, v.name)
return v, f"Error sending tsn #{sequence}: {v.name}"
return v, f"Succesfuly sent tsn #{sequence}: {v.name}"

@zigpy.util.retryable_request
def remote_at_command(
@@ -264,7 +290,9 @@ async def permit_ncp(self, time_s=60):
assert 0 <= time_s <= 254
await self._api._at_command("NJ", time_s)
await self._api._at_command("AC")
await self._api._at_command("CB", 2)

async def permit_with_key(self, node, code, time_s=60):
raise NotImplementedError("XBee does not support install codes")

def handle_modem_status(self, status):
LOGGER.info("Modem status update: %s (%s)", status.name, status.value)
@@ -276,7 +304,7 @@ def handle_rx(
LOGGER.info("handle_rx self addressed")

ember_ieee = zigpy.types.EUI64(src_ieee)
if dst_ep == 0 and cluster_id == ZDOCmd.Device_annce:
if dst_ep == 0 and cluster_id == zdo_t.ZDOCmd.Device_annce:
# ZDO Device announce request
nwk, rest = zigpy.types.NWK.deserialize(data[1:])
ieee, rest = zigpy.types.EUI64.deserialize(rest)
@@ -296,9 +324,10 @@ def handle_rx(
self.handle_join(nwk, ieee, 0)

try:
self.devices[self.ieee].last_seen = time.time()
self._device.last_seen = time.time()
except KeyError:
pass

try:
device = self.get_device(nwk=src_nwk)
except KeyError:
@@ -363,27 +392,46 @@ async def broadcast(
return TXStatus.NETWORK_ACK_FAILURE, "Timeout waiting for ACK"

if v != TXStatus.SUCCESS:
return v, "Error sending broadcast tsn #%s: %s".format(sequence, v.name)
return v, "Succesfuly sent broadcast tsn #%s: %s".format(sequence, v.name)
return v, f"Error sending broadcast tsn #{sequence}: {v.name}"
return v, f"Succesfuly sent broadcast tsn #{sequence}: {v.name}"


class XBeeCoordinator(zigpy.quirks.CustomDevice):
class XBeeGroup(zigpy.quirks.CustomCluster, Groups):
cluster_id = 0x0006

class XBeeGroupResponse(zigpy.quirks.CustomCluster, Groups):
import zigpy.zcl.foundation as f

cluster_id = 0x8006
ep_attribute = "xbee_groups_response"

client_commands = {**Groups.client_commands}
client_commands[0x0004] = ("remove_all_response", (f.Status,), True)
client_commands = {
**Groups.client_commands,
0x04: foundation.ZCLCommandDef(
"remove_all_response", {"status": foundation.Status}, is_reply=True
),
}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.node_desc = NodeDescriptor(
0x00, 0x40, 0x8E, 0x101E, 0x52, 0x00FF, 0x2C00, 0x00FF, 0x00
self.node_desc = zdo_t.NodeDescriptor(
logical_type=zdo_t.LogicalType.Coordinator,
complex_descriptor_available=0,
user_descriptor_available=0,
reserved=0,
aps_flags=0,
frequency_band=zdo_t.NodeDescriptor.FrequencyBand.Freq2400MHz,
mac_capability_flags=(
zdo_t.NodeDescriptor.MACCapabilityFlags.AllocateAddress
| zdo_t.NodeDescriptor.MACCapabilityFlags.RxOnWhenIdle
| zdo_t.NodeDescriptor.MACCapabilityFlags.MainsPowered
| zdo_t.NodeDescriptor.MACCapabilityFlags.FullFunctionDevice
),
manufacturer_code=4126,
maximum_buffer_size=82,
maximum_incoming_transfer_size=255,
server_mask=11264,
maximum_outgoing_transfer_size=255,
descriptor_capability_field=zdo_t.NodeDescriptor.DescriptorCapability.NONE,
)

replacement = {