-
Notifications
You must be signed in to change notification settings - Fork 174
/
Copy pathapplication.py
1377 lines (1153 loc) · 48.7 KB
/
application.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import abc
import asyncio
import collections
from collections.abc import AsyncGenerator, Coroutine
import contextlib
from datetime import datetime, timezone
import errno
import logging
import os
import random
import sys
import time
import typing
from typing import Any, TypeVar
import warnings
if sys.version_info[:2] < (3, 11):
from async_timeout import timeout as asyncio_timeout # pragma: no cover
else:
from asyncio import timeout as asyncio_timeout # pragma: no cover
import zigpy.appdb
import zigpy.backups
import zigpy.config as conf
from zigpy.const import INTERFERENCE_MESSAGE
from zigpy.datastructures import PriorityDynamicBoundedSemaphore
import zigpy.device
import zigpy.endpoint
import zigpy.exceptions
import zigpy.group
import zigpy.listeners
import zigpy.ota
import zigpy.profiles
import zigpy.quirks
import zigpy.state
import zigpy.topology
import zigpy.types as t
import zigpy.typing
import zigpy.util
import zigpy.zcl
import zigpy.zdo
import zigpy.zdo.types as zdo_types
DEFAULT_ENDPOINT_ID = 1
LOGGER = logging.getLogger(__name__)
TRANSIENT_CONNECTION_ERRORS = {
errno.ENETUNREACH,
}
ENERGY_SCAN_WARN_THRESHOLD = 0.75 * 255
_R = TypeVar("_R")
CHANNEL_CHANGE_BROADCAST_DELAY_S = 1.0
CHANNEL_CHANGE_SETTINGS_RELOAD_DELAY_S = 1.0
class ControllerApplication(zigpy.util.ListenableMixin, abc.ABC):
SCHEMA = conf.CONFIG_SCHEMA
_watchdog_period: int = 30
_probe_configs: list[dict[str, Any]] = []
def __init__(self, config: dict) -> None:
self.devices: dict[t.EUI64, zigpy.device.Device] = {}
self.state: zigpy.state.State = zigpy.state.State()
self._listeners = {}
self._config = self.SCHEMA(config)
self._dblistener = None
self._groups = zigpy.group.Groups(self)
self._listeners = {}
self._send_sequence = 0
self._tasks: set[asyncio.Future[Any]] = set()
self._watchdog_task: asyncio.Task | None = None
self._concurrent_requests_semaphore = PriorityDynamicBoundedSemaphore(
self._config[conf.CONF_MAX_CONCURRENT_REQUESTS]
)
self.ota = zigpy.ota.OTA(self._config[conf.CONF_OTA], self)
self.backups: zigpy.backups.BackupManager = zigpy.backups.BackupManager(self)
self.topology: zigpy.topology.Topology = zigpy.topology.Topology(self)
self._req_listeners: collections.defaultdict[
zigpy.device.Device,
collections.deque[zigpy.listeners.BaseRequestListener],
] = collections.defaultdict(lambda: collections.deque([]))
def create_task(
self, target: Coroutine[Any, Any, _R], name: str | None = None
) -> asyncio.Task[_R]:
"""Create a task and store a reference to it until the task completes.
target: target to call.
"""
task = asyncio.get_running_loop().create_task(target, name=name)
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)
return task
async def _load_db(self) -> None:
"""Restore save state."""
database_file = self.config[conf.CONF_DATABASE]
if not database_file:
return
self._dblistener = await zigpy.appdb.PersistingListener.new(database_file, self)
await self._dblistener.load()
self._add_db_listeners()
def _add_db_listeners(self):
if self._dblistener is None:
return
self.add_listener(self._dblistener)
self.groups.add_listener(self._dblistener)
self.backups.add_listener(self._dblistener)
self.topology.add_listener(self._dblistener)
def _remove_db_listeners(self):
if self._dblistener is None:
return
self.topology.remove_listener(self._dblistener)
self.backups.remove_listener(self._dblistener)
self.groups.remove_listener(self._dblistener)
self.remove_listener(self._dblistener)
async def initialize(self, *, auto_form: bool = False) -> None:
"""Starts the network on a connected radio, optionally forming one with random
settings if necessary.
"""
# Make sure the first thing we do is feed the watchdog
if self.config[conf.CONF_WATCHDOG_ENABLED]:
await self.watchdog_feed()
self._watchdog_task = asyncio.create_task(self._watchdog_loop())
last_backup = self.backups.most_recent_backup()
try:
await self.load_network_info(load_devices=False)
except zigpy.exceptions.NetworkNotFormed:
LOGGER.info("Network is not formed")
if not auto_form:
raise
if last_backup is None:
# Form a new network if we have no backup
await self.form_network()
else:
# Otherwise, restore the most recent backup
LOGGER.info("Restoring the most recent network backup")
await self.backups.restore_backup(last_backup)
LOGGER.debug("Network info: %s", self.state.network_info)
LOGGER.debug("Node info: %s", self.state.node_info)
new_state = self.backups.from_network_state()
if (
self.config[conf.CONF_NWK_VALIDATE_SETTINGS]
and last_backup is not None
and not new_state.is_compatible_with(last_backup)
):
raise zigpy.exceptions.NetworkSettingsInconsistent(
f"Radio network settings are not compatible with most recent backup!\n"
f"Current settings: {new_state!r}\n"
f"Last backup: {last_backup!r}",
old_state=last_backup,
new_state=new_state,
)
await self.start_network()
self._persist_coordinator_model_strings_in_db()
# Some radios erroneously permit joins on startup
try:
await self.permit(0)
except zigpy.exceptions.DeliveryError as e:
if e.status != t.MACStatus.MAC_CHANNEL_ACCESS_FAILURE:
raise
# Some radios (like the Conbee) can fail to deliver the startup broadcast
# due to interference
LOGGER.warning("Failed to send startup broadcast: %s", e)
LOGGER.warning(INTERFERENCE_MESSAGE)
if self.config[conf.CONF_NWK_BACKUP_ENABLED]:
self.backups.start_periodic_backups(
# Config specifies the period in minutes, not seconds
period=(60 * self.config[conf.CONF_NWK_BACKUP_PERIOD])
)
if self.config[conf.CONF_TOPO_SCAN_ENABLED]:
# Config specifies the period in minutes, not seconds
self.topology.start_periodic_scans(
period=(60 * self.config[zigpy.config.CONF_TOPO_SCAN_PERIOD])
)
if (
self.config[conf.CONF_OTA][conf.CONF_OTA_ENABLED]
and self.config[conf.CONF_OTA][conf.CONF_OTA_BROADCAST_ENABLED]
):
self.ota.start_periodic_broadcasts(
initial_delay=self._config[conf.CONF_OTA][
conf.CONF_OTA_BROADCAST_INITIAL_DELAY
],
interval=self._config[conf.CONF_OTA][conf.CONF_OTA_BROADCAST_INTERVAL],
)
async def startup(self, *, auto_form: bool = False) -> None:
"""Starts a network, optionally forming one with random settings if necessary."""
try:
await self.connect()
await self.initialize(auto_form=auto_form)
except Exception as e: # noqa: BLE001
await self.shutdown(db=False)
if isinstance(e, ConnectionError) or (
isinstance(e, OSError) and e.errno in TRANSIENT_CONNECTION_ERRORS
):
raise zigpy.exceptions.TransientConnectionError from e
raise
@classmethod
async def new(
cls, config: dict, auto_form: bool = False, start_radio: bool = True
) -> ControllerApplication:
"""Create new instance of application controller."""
app = cls(config)
await app._load_db()
if start_radio:
await app.startup(auto_form=auto_form)
return app
async def energy_scan(
self, channels: t.Channels, duration_exp: int, count: int
) -> dict[int, float]:
"""Runs an energy detection scan and returns the per-channel scan results."""
try:
rsp = await self._device.zdo.Mgmt_NWK_Update_req(
zigpy.zdo.types.NwkUpdate(
ScanChannels=channels,
ScanDuration=duration_exp,
ScanCount=count,
)
)
except (asyncio.TimeoutError, zigpy.exceptions.DeliveryError):
LOGGER.warning("Coordinator does not support energy scanning")
scanned_channels = channels
energy_values = [0] * scanned_channels
else:
_, scanned_channels, _, _, energy_values = rsp
return dict(zip(scanned_channels, energy_values))
async def _move_network_to_channel(
self, new_channel: int, new_nwk_update_id: int
) -> None:
"""Broadcasts the channel migration update request."""
# Default implementation for radios that migrate via a loopback ZDO request
await self._device.zdo.Mgmt_NWK_Update_req(
zigpy.zdo.types.NwkUpdate(
ScanChannels=zigpy.types.Channels.from_channel_list([new_channel]),
ScanDuration=zigpy.zdo.types.NwkUpdate.CHANNEL_CHANGE_REQ,
nwkUpdateId=new_nwk_update_id,
)
)
async def move_network_to_channel(
self, new_channel: int, *, num_broadcasts: int = 5
) -> None:
"""Moves the network to a new channel."""
if self.state.network_info.channel == new_channel:
return
new_nwk_update_id = (self.state.network_info.nwk_update_id + 1) % 0xFF
for attempt in range(num_broadcasts):
LOGGER.info(
"Broadcasting migration to channel %s (%s of %s)",
new_channel,
attempt + 1,
num_broadcasts,
)
await zigpy.zdo.broadcast(
app=self,
command=zigpy.zdo.types.ZDOCmd.Mgmt_NWK_Update_req,
grpid=None,
radius=30, # Explicitly set the maximum radius
broadcast_address=zigpy.types.BroadcastAddress.ALL_DEVICES,
NwkUpdate=zigpy.zdo.types.NwkUpdate(
ScanChannels=zigpy.types.Channels.from_channel_list([new_channel]),
ScanDuration=zigpy.zdo.types.NwkUpdate.CHANNEL_CHANGE_REQ,
nwkUpdateId=new_nwk_update_id,
),
)
await asyncio.sleep(CHANNEL_CHANGE_BROADCAST_DELAY_S)
# Move the coordinator itself, if supported
await self._move_network_to_channel(
new_channel=new_channel, new_nwk_update_id=new_nwk_update_id
)
# Wait for settings to update
while self.state.network_info.channel != new_channel:
LOGGER.info("Waiting for channel change to take effect")
await self.load_network_info(load_devices=False)
await asyncio.sleep(CHANNEL_CHANGE_SETTINGS_RELOAD_DELAY_S)
LOGGER.info("Successfully migrated to channel %d", new_channel)
async def form_network(self, *, fast: bool = False) -> None:
"""Writes random network settings to the coordinator."""
# First, make the settings consistent and randomly generate missing values
channel = self.config[conf.CONF_NWK][conf.CONF_NWK_CHANNEL]
channels = self.config[conf.CONF_NWK][conf.CONF_NWK_CHANNELS]
pan_id = self.config[conf.CONF_NWK][conf.CONF_NWK_PAN_ID]
extended_pan_id = self.config[conf.CONF_NWK][conf.CONF_NWK_EXTENDED_PAN_ID]
network_key = self.config[conf.CONF_NWK][conf.CONF_NWK_KEY]
tc_address = self.config[conf.CONF_NWK][conf.CONF_NWK_TC_ADDRESS]
stack_specific = {}
if fast:
# Indicate to the radio library that the network is ephemeral
stack_specific["form_quickly"] = True
if pan_id is None:
pan_id = random.SystemRandom().randint(0x0001, 0xFFFE + 1)
if channel is None and fast:
# Don't run an energy scan if this is an ephemeral network
channel = next(iter(channels))
elif channel is None and not fast:
# We can't run an energy scan without a running network on most radios
try:
await self.start_network()
except zigpy.exceptions.NetworkNotFormed:
await self.form_network(fast=True)
await self.start_network()
channel_energy = await self.energy_scan(
channels=t.Channels.ALL_CHANNELS, duration_exp=4, count=1
)
channel = zigpy.util.pick_optimal_channel(channel_energy, channels=channels)
if extended_pan_id is None:
# TODO: exclude `FF:FF:FF:FF:FF:FF:FF:FF` and possibly more reserved EPIDs
extended_pan_id = t.ExtendedPanId(os.urandom(8))
if network_key is None:
network_key = t.KeyData(os.urandom(16))
if tc_address is None:
tc_address = t.EUI64.UNKNOWN
network_info = zigpy.state.NetworkInfo(
extended_pan_id=extended_pan_id,
pan_id=pan_id,
nwk_update_id=self.config[conf.CONF_NWK][conf.CONF_NWK_UPDATE_ID],
nwk_manager_id=0x0000,
channel=channel,
channel_mask=t.Channels.from_channel_list([channel]),
security_level=5,
network_key=zigpy.state.Key(
key=network_key,
tx_counter=0,
rx_counter=0,
seq=self.config[conf.CONF_NWK][conf.CONF_NWK_KEY_SEQ],
),
tc_link_key=zigpy.state.Key(
key=self.config[conf.CONF_NWK][conf.CONF_NWK_TC_LINK_KEY],
tx_counter=0,
rx_counter=0,
seq=0,
partner_ieee=tc_address,
),
children=[],
key_table=[],
nwk_addresses={},
stack_specific=stack_specific,
)
node_info = zigpy.state.NodeInfo(
nwk=0x0000,
ieee=t.EUI64.UNKNOWN, # Use the device IEEE address
logical_type=zdo_types.LogicalType.Coordinator,
)
LOGGER.debug("Forming a new network")
await self.backups.restore_backup(
backup=zigpy.backups.NetworkBackup(
network_info=network_info,
node_info=node_info,
),
counter_increment=0,
allow_incomplete=True,
create_new=(not fast),
)
async def shutdown(self, *, db: bool = True) -> None:
"""Shutdown controller."""
if self._watchdog_task is not None:
self._watchdog_task.cancel()
self.ota.stop_periodic_broadcasts()
self.backups.stop_periodic_backups()
self.topology.stop_periodic_scans()
try:
await self.disconnect()
except Exception: # noqa: BLE001
LOGGER.warning("Failed to disconnect from radio", exc_info=True)
if db and self._dblistener:
self._remove_db_listeners()
try:
await self._dblistener.shutdown()
except Exception: # noqa: BLE001
LOGGER.warning("Failed to disconnect from database", exc_info=True)
def add_device(self, ieee: t.EUI64, nwk: t.NWK) -> zigpy.device.Device:
"""Creates a zigpy `Device` object with the provided IEEE and NWK addresses."""
assert isinstance(ieee, t.EUI64)
# TODO: Shut down existing device
dev = zigpy.device.Device(self, ieee, nwk)
self.devices[ieee] = dev
return dev
def device_initialized(self, device: zigpy.device.Device) -> None:
"""Used by a device to signal that it is initialized"""
LOGGER.debug("Device is initialized %s", device)
self.listener_event("raw_device_initialized", device)
device = zigpy.quirks.get_device(device)
self.devices[device.ieee] = device
if self._dblistener is not None:
device.add_context_listener(self._dblistener)
self.listener_event("device_initialized", device)
async def remove(
self, ieee: t.EUI64, remove_children: bool = True, rejoin: bool = False
) -> None:
"""Try to remove a device from the network.
:param ieee: address of the device to be removed
"""
assert isinstance(ieee, t.EUI64)
dev = self.devices.get(ieee)
if not dev:
LOGGER.debug("Device not found for removal: %s", ieee)
return
dev.cancel_initialization()
LOGGER.info("Removing device 0x%04x (%s)", dev.nwk, ieee)
self.create_task(
self._remove_device(dev, remove_children=remove_children, rejoin=rejoin),
f"remove_device-nwk={dev.nwk!r}-ieee={ieee!r}",
)
if dev.node_desc is not None and dev.node_desc.is_end_device:
parents = []
for parent in self.devices.values():
for zdo_neighbor in self.topology.neighbors[parent.ieee]:
try:
neighbor = self.get_device(ieee=zdo_neighbor.ieee)
except KeyError:
continue
if neighbor is dev:
parents.append(parent)
for parent in parents:
LOGGER.debug(
"Sending leave request for %s to %s parent", dev.ieee, parent.ieee
)
opts = parent.zdo.LeaveOptions.RemoveChildren
if rejoin:
opts |= parent.zdo.LeaveOptions.Rejoin
parent.zdo.create_catching_task(
parent.zdo.Mgmt_Leave_req(dev.ieee, opts)
)
self.listener_event("device_removed", dev)
async def _remove_device(
self,
device: zigpy.device.Device,
remove_children: bool = True,
rejoin: bool = False,
) -> None:
"""Send a remove request then pop the device."""
try:
async with asyncio_timeout(
30
if device.node_desc is not None and device.node_desc.is_end_device
else 7
):
await device.zdo.leave(remove_children=remove_children, rejoin=rejoin)
except (zigpy.exceptions.DeliveryError, asyncio.TimeoutError) as ex:
LOGGER.debug("Sending 'zdo_leave_req' failed: %s", ex)
self.devices.pop(device.ieee, None)
def deserialize(
self,
sender: zigpy.device.Device,
endpoint_id: t.uint8_t,
cluster_id: t.uint16_t,
data: bytes,
) -> tuple[Any, bytes]:
return sender.deserialize(endpoint_id, cluster_id, data)
def handle_join(
self,
nwk: t.NWK,
ieee: t.EUI64,
parent_nwk: t.NWK,
*,
handle_rejoin: bool = True,
) -> None:
"""Called when a device joins or announces itself on the network."""
ieee = t.EUI64(ieee)
try:
dev = self.get_device(ieee=ieee)
except KeyError:
dev = self.add_device(ieee, nwk)
LOGGER.info("New device 0x%04x (%s) joined the network", nwk, ieee)
new_join = True
else:
if handle_rejoin:
LOGGER.info("Device 0x%04x (%s) joined the network", nwk, ieee)
new_join = False
if dev.nwk != nwk:
LOGGER.debug("Device %s changed id (0x%04x => 0x%04x)", ieee, dev.nwk, nwk)
dev.nwk = nwk
new_join = True
# Not all stacks send a ZDO command when a device joins so the last_seen should
# be updated
dev.last_seen = datetime.now(timezone.utc)
# Cancel all pending requests for the device
dev._concurrent_requests_semaphore.cancel_waiting(
zigpy.exceptions.DeliveryError("Device has re-joined the network")
)
if new_join:
self.listener_event("device_joined", dev)
dev.schedule_initialize()
elif not dev.is_initialized:
# Re-initialize partially-initialized devices but don't emit "device_joined"
dev.schedule_initialize()
elif handle_rejoin:
# Rescan groups for devices that are not newly joining and initialized
dev.schedule_group_membership_scan()
def handle_leave(self, nwk: t.NWK, ieee: t.EUI64):
"""Called when a device has left the network."""
LOGGER.info("Device 0x%04x (%s) left the network", nwk, ieee)
try:
dev = self.get_device(ieee=ieee)
except KeyError:
return
dev._concurrent_requests_semaphore.cancel_waiting(
zigpy.exceptions.DeliveryError("Device has left the network")
)
self.listener_event("device_left", dev)
def handle_relays(self, nwk: t.NWK, relays: list[t.NWK]) -> None:
"""Called when a list of relaying devices is received."""
try:
device = self.get_device(nwk=nwk)
except KeyError:
LOGGER.warning("Received relays from an unknown device: %s", nwk)
self.create_task(
self._discover_unknown_device(nwk),
f"discover_unknown_device_from_relays-nwk={nwk!r}",
)
else:
device.relays = zigpy.util.filter_relays(relays)
@classmethod
async def probe(cls, device_config: dict[str, Any]) -> bool | dict[str, Any]:
"""Probes the device specified by `device_config` and returns valid device settings
if the radio supports the device. If the device is not supported, `False` is
returned.
"""
device_configs = [conf.SCHEMA_DEVICE(device_config)]
for overrides in cls._probe_configs:
new_config = conf.SCHEMA_DEVICE({**device_config, **overrides})
if new_config not in device_configs:
device_configs.append(new_config)
for config in device_configs:
app = cls({conf.CONF_DEVICE: config})
try:
await app.connect()
except Exception: # noqa: BLE001
LOGGER.debug("Failed to probe with config %s", config, exc_info=True)
else:
return config
finally:
await app.disconnect()
return False
@abc.abstractmethod
async def connect(self) -> None:
"""Connect to the radio hardware and verify that it is compatible with the library.
This method should be stateless if the connection attempt fails.
"""
raise NotImplementedError # pragma: no cover
async def watchdog_feed(self) -> None:
"""Reset the firmware watchdog timer."""
LOGGER.debug("Feeding watchdog")
await self._watchdog_feed()
async def _watchdog_feed(self) -> None:
"""Reset the firmware watchdog timer. Implemented by the radio library."""
async def _watchdog_loop(self) -> None:
"""Watchdog loop to periodically test if the stack is still running."""
LOGGER.debug("Starting watchdog loop")
while True:
await asyncio.sleep(self._watchdog_period)
try:
await self.watchdog_feed()
except Exception as e: # noqa: BLE001
LOGGER.warning("Watchdog failure", exc_info=e)
# Treat the watchdog failure as a disconnect
self.connection_lost(e)
break
LOGGER.debug("Stopping watchdog loop")
def connection_lost(self, exc: Exception) -> None:
"""Connection lost callback."""
LOGGER.debug("Connection to the radio has been lost: %r", exc)
self.listener_event("connection_lost", exc)
@abc.abstractmethod
async def disconnect(self):
"""Disconnects from the radio hardware and shuts down the network."""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
async def start_network(self):
"""Starts a Zigbee network with settings currently stored in the radio hardware."""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
async def force_remove(self, dev: zigpy.device.Device):
"""Instructs the radio to remove a device with a lower-level leave command. Not all
radios implement this.
"""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
async def add_endpoint(self, descriptor: zdo_types.SimpleDescriptor):
"""Registers a new endpoint on the controlled device. Not all radios will implement
this.
"""
raise NotImplementedError # pragma: no cover
async def register_endpoints(self) -> None:
"""Registers all necessary endpoints.
The exact order in which this method is called depends on the radio module.
"""
await self.add_endpoint(
zdo_types.SimpleDescriptor(
endpoint=1,
profile=zigpy.profiles.zha.PROFILE_ID,
device_type=zigpy.profiles.zha.DeviceType.IAS_CONTROL,
device_version=0b0000,
input_clusters=[
zigpy.zcl.clusters.general.Basic.cluster_id,
zigpy.zcl.clusters.general.OnOff.cluster_id,
zigpy.zcl.clusters.general.Time.cluster_id,
zigpy.zcl.clusters.general.Ota.cluster_id,
zigpy.zcl.clusters.security.IasAce.cluster_id,
],
output_clusters=[
zigpy.zcl.clusters.general.PowerConfiguration.cluster_id,
zigpy.zcl.clusters.general.PollControl.cluster_id,
zigpy.zcl.clusters.security.IasZone.cluster_id,
zigpy.zcl.clusters.security.IasWd.cluster_id,
],
)
)
await self.add_endpoint(
zdo_types.SimpleDescriptor(
endpoint=2,
profile=zigpy.profiles.zll.PROFILE_ID,
device_type=zigpy.profiles.zll.DeviceType.CONTROLLER,
device_version=0b0000,
input_clusters=[zigpy.zcl.clusters.general.Basic.cluster_id],
output_clusters=[],
)
)
for endpoint in self.config[conf.CONF_ADDITIONAL_ENDPOINTS]:
await self.add_endpoint(endpoint)
@contextlib.asynccontextmanager
async def _limit_concurrency(self, *, priority: int = t.PacketPriority.NORMAL):
"""Async context manager to limit global coordinator request concurrency."""
start_time = time.monotonic()
was_locked = self._concurrent_requests_semaphore.locked()
if was_locked:
LOGGER.debug(
"Max concurrency (%s) reached, delaying request (%s enqueued)",
self._concurrent_requests_semaphore.max_value,
self._concurrent_requests_semaphore.num_waiting,
)
async with self._concurrent_requests_semaphore(priority=priority):
if was_locked:
LOGGER.debug(
"Previously delayed request is now running, delayed by %0.2fs",
time.monotonic() - start_time,
)
yield
@abc.abstractmethod
async def send_packet(self, packet: t.ZigbeePacket) -> None:
"""Send a Zigbee packet using the appropriate addressing mode and provided options."""
raise NotImplementedError # pragma: no cover
def build_source_route_to(self, dest: zigpy.device.Device) -> list[t.NWK] | None:
"""Compute a source route to the destination device."""
if dest.relays is None:
return None
# TODO: utilize topology scanner information
return dest.relays[::-1]
async def request(
self,
device: zigpy.device.Device,
profile: t.uint16_t,
cluster: t.uint16_t,
src_ep: t.uint8_t,
dst_ep: t.uint8_t,
sequence: t.uint8_t,
data: bytes,
*,
expect_reply: bool = True,
use_ieee: bool = False,
extended_timeout: bool = False,
ask_for_ack: bool | None = None,
priority: int = t.PacketPriority.NORMAL,
) -> tuple[zigpy.zcl.foundation.Status, str]:
"""Submit and send data out as an unicast transmission.
:param device: destination device
:param profile: Zigbee Profile ID to use for outgoing message
:param cluster: cluster id where the message is being sent
:param src_ep: source endpoint id
:param dst_ep: destination endpoint id
:param sequence: transaction sequence number of the message
:param data: Zigbee message payload
:param expect_reply: True if this is essentially a request
:param use_ieee: use EUI64 for destination addressing
:param extended_timeout: instruct the radio to use slower APS retries
"""
if use_ieee:
src = t.AddrModeAddress(
addr_mode=t.AddrMode.IEEE, address=self.state.node_info.ieee
)
dst = t.AddrModeAddress(addr_mode=t.AddrMode.IEEE, address=device.ieee)
else:
src = t.AddrModeAddress(
addr_mode=t.AddrMode.NWK, address=self.state.node_info.nwk
)
dst = t.AddrModeAddress(addr_mode=t.AddrMode.NWK, address=device.nwk)
if self.config[conf.CONF_SOURCE_ROUTING]:
source_route = self.build_source_route_to(dest=device)
else:
source_route = None
tx_options = t.TransmitOptions.NONE
if ask_for_ack is not None:
# Prefer `ask_for_ack` to `expect_reply`
if ask_for_ack:
tx_options |= t.TransmitOptions.ACK
elif not expect_reply:
tx_options |= t.TransmitOptions.ACK
await self.send_packet(
t.ZigbeePacket(
src=src,
src_ep=src_ep,
dst=dst,
dst_ep=dst_ep,
tsn=sequence,
profile_id=profile,
cluster_id=cluster,
data=t.SerializableBytes(data),
extended_timeout=extended_timeout,
source_route=source_route,
tx_options=tx_options,
priority=priority,
)
)
return (zigpy.zcl.foundation.Status.SUCCESS, "")
async def mrequest(
self,
group_id: t.uint16_t,
profile: t.uint8_t,
cluster: t.uint16_t,
src_ep: t.uint8_t,
sequence: t.uint8_t,
data: bytes,
*,
hops: int = 0,
non_member_radius: int = 3,
priority: int = t.PacketPriority.NORMAL,
):
"""Submit and send data out as a multicast transmission.
:param group_id: destination multicast address
:param profile: Zigbee Profile ID to use for outgoing message
:param cluster: cluster id where the message is being sent
:param src_ep: source endpoint id
:param sequence: transaction sequence number of the message
:param data: Zigbee message payload
:param hops: the message will be delivered to all nodes within this number of
hops of the sender. A value of zero is converted to MAX_HOPS
:param non_member_radius: the number of hops that the message will be forwarded
by devices that are not members of the group. A value
of 7 or greater is treated as infinite
"""
await self.send_packet(
t.ZigbeePacket(
src=t.AddrModeAddress(
addr_mode=t.AddrMode.NWK, address=self.state.node_info.nwk
),
src_ep=src_ep,
dst=t.AddrModeAddress(addr_mode=t.AddrMode.Group, address=group_id),
tsn=sequence,
profile_id=profile,
cluster_id=cluster,
data=t.SerializableBytes(data),
tx_options=t.TransmitOptions.NONE,
radius=hops,
non_member_radius=non_member_radius,
priority=priority,
)
)
return (zigpy.zcl.foundation.Status.SUCCESS, "")
async def broadcast(
self,
profile: t.uint16_t,
cluster: t.uint16_t,
src_ep: t.uint8_t,
dst_ep: t.uint8_t,
grpid: t.uint16_t,
radius: int,
sequence: t.uint8_t,
data: bytes,
broadcast_address: t.BroadcastAddress = t.BroadcastAddress.RX_ON_WHEN_IDLE,
priority: int = t.PacketPriority.NORMAL,
) -> tuple[zigpy.zcl.foundation.Status, str]:
"""Submit and send data out as an unicast transmission.
:param profile: Zigbee Profile ID to use for outgoing message
:param cluster: cluster id where the message is being sent
:param src_ep: source endpoint id
:param dst_ep: destination endpoint id
:param: grpid: group id to address the broadcast to
:param radius: max radius of the broadcast
:param sequence: transaction sequence number of the message
:param data: zigbee message payload
:param timeout: how long to wait for transmission ACK
:param broadcast_address: broadcast address.
"""
await self.send_packet(
t.ZigbeePacket(
src=t.AddrModeAddress(
addr_mode=t.AddrMode.NWK, address=self.state.node_info.nwk
),
src_ep=src_ep,
dst=t.AddrModeAddress(
addr_mode=t.AddrMode.Broadcast, address=broadcast_address
),
dst_ep=dst_ep,
tsn=sequence,
profile_id=profile,
cluster_id=cluster,
data=t.SerializableBytes(data),
tx_options=t.TransmitOptions.NONE,
radius=radius,
priority=priority,
)
)
return (zigpy.zcl.foundation.Status.SUCCESS, "")
async def _discover_unknown_device(self, nwk: t.NWK) -> None:
"""Discover the IEEE address of a device with an unknown NWK."""
return await zigpy.zdo.broadcast(
app=self,
command=zdo_types.ZDOCmd.IEEE_addr_req,
grpid=None,
radius=0,
NWKAddrOfInterest=nwk,
RequestType=zdo_types.AddrRequestType.Single,
StartIndex=0,
)
def _maybe_parse_zdo(self, packet: t.ZigbeePacket) -> None:
"""Attempt to parse an incoming packet as ZDO, to extract useful notifications."""
# The current zigpy device may not exist if we receive a packet early
try:
zdo = self._device.zdo
except KeyError:
zdo = zigpy.zdo.ZDO(None)
try:
zdo_hdr, zdo_args = zdo.deserialize(
cluster_id=packet.cluster_id, data=packet.data.serialize()
)
except ValueError:
LOGGER.debug("Could not parse ZDO message from packet")
return
# Interpret useful global ZDO responses and notifications
if zdo_hdr.command_id == zdo_types.ZDOCmd.Device_annce:
nwk, ieee, _ = zdo_args
self.handle_join(nwk=nwk, ieee=ieee, parent_nwk=None)
elif zdo_hdr.command_id in (
zdo_types.ZDOCmd.NWK_addr_rsp,
zdo_types.ZDOCmd.IEEE_addr_rsp,
):
status, ieee, nwk, _, _, _ = zdo_args
if status == zdo_types.Status.SUCCESS:
LOGGER.debug("Discovered IEEE address for NWK=%s: %s", nwk, ieee)
self.handle_join(
nwk=nwk, ieee=ieee, parent_nwk=None, handle_rejoin=False
)
def packet_received(self, packet: t.ZigbeePacket) -> None:
"""Notify zigpy of a received Zigbee packet."""
LOGGER.debug("Received a packet: %r", packet)
assert packet.src is not None
assert packet.dst is not None