diff --git a/README.md b/README.md index 264c4408..40fe5b6f 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ Current methods of scanning: - Plex Media Server using Good Day Mate protocol - Logitech Media Server discovery protocol - Daikin discovery protocol + - HF-LPB100 discovery protocol - Web OS discovery protocol It is the library that powers the device discovery within [Home Assistant](https://home-assistant.io/). diff --git a/netdisco/discoverables/hf_lpb100.py b/netdisco/discoverables/hf_lpb100.py new file mode 100644 index 00000000..98900d5e --- /dev/null +++ b/netdisco/discoverables/hf_lpb100.py @@ -0,0 +1,14 @@ +"""Discover HF-LPB100 chip based devices.""" +from netdisco.discoverables import BaseDiscoverable + + +class Discoverable(BaseDiscoverable): + """HF-LPB100 chip based discoverable.""" + + def __init__(self, netdis): + """Initialize the HF-LPB100 chip based discovery.""" + self._netdis = netdis + + def get_entries(self): + """Get all the Sunix Controller device details.""" + return self._netdis.hf_lpb100.entries diff --git a/netdisco/discovery.py b/netdisco/discovery.py index 27bce191..045f8444 100644 --- a/netdisco/discovery.py +++ b/netdisco/discovery.py @@ -9,6 +9,7 @@ from .lms import LMS from .tellstick import Tellstick from .daikin import Daikin +from .hf_lpb100 import HF_LPB100 from .smartglass import XboxSmartGlass _LOGGER = logging.getLogger(__name__) @@ -40,6 +41,7 @@ def __init__(self): self.lms = None self.tellstick = None self.daikin = None + self.hf_lpb100 = None self.xbox_smartglass = None self.is_discovering = False @@ -71,6 +73,9 @@ def scan(self): self.daikin = Daikin() self.daikin.scan() + self.hf_lpb100 = HF_LPB100() + self.hf_lpb100.scan() + self.xbox_smartglass = XboxSmartGlass() self.xbox_smartglass.scan() @@ -87,6 +92,7 @@ def stop(self): self.lms = None self.tellstick = None self.daikin = None + self.hf_lpb100 = None self.xbox_smartglass = None self.discoverables = None self.is_discovering = False diff --git a/netdisco/hf_lpb100.py b/netdisco/hf_lpb100.py new file mode 100644 index 00000000..0d7edca8 --- /dev/null +++ b/netdisco/hf_lpb100.py @@ -0,0 +1,95 @@ +"""HF-LPB100 chip based device discovery.""" +import socket +from datetime import timedelta + +from netdisco.const import ATTR_HOST, ATTR_SERIAL, ATTR_MODEL_NAME + +DISCOVERY_MSG = b'HF-A11ASSISTHREAD' + +DISCOVERY_ADDRESS = '' +DISCOVERY_PORT = 48899 +DISCOVERY_TIMEOUT = timedelta(seconds=2) + + +class HF_LPB100: + """Base class to discover HF-LPB100 chip based devices.""" + + def __init__(self): + """Initialize the HF-LPB100 chip based discovery.""" + self.entries = [] + + def scan(self): + """Scan the network.""" + self.update() + + def all(self): + """Scan and return all found entries.""" + self.scan() + return self.entries + + def update(self): + """Scan network for HF-LPB100 chip based devices.""" + entries = [] + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as cs: + cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + cs.settimeout(DISCOVERY_TIMEOUT.seconds) + + received_messages = [] + + try: + # send a local broadcast via udp with a "magic packet" + cs.sendto(DISCOVERY_MSG, (DISCOVERY_ADDRESS, DISCOVERY_PORT)) + cs.settimeout(DISCOVERY_TIMEOUT.seconds) + + while True: + data, address = cs.recvfrom(4096) + received_messages.append(data.decode("UTF-8")) + + except socket.timeout: + if len(received_messages) <= 0: + return [] + + for message in received_messages: + try: + controller = self._parse_discovery_response(message) + if controller is not None: + entries.append(controller) + # pylint: disable=W0702 + except: # noqa: E722 + print("Error parsing discovery message: %s" % message) + return None + + self.entries = entries + + @staticmethod + def _parse_discovery_response(message: str) -> {}: + # parse received message + data = str.split(message, ",") + + # check validity + if len(data) == 3: + # extract data + ip = data[0] + hw_id = data[1] + model = data[2] + + return { + ATTR_HOST: ip, + ATTR_SERIAL: hw_id, + ATTR_MODEL_NAME: model + } + + +def main(): + """Test HF-LPB100 chip based discovery.""" + from pprint import pprint + hf_lpb100 = HF_LPB100() + pprint("Scanning for HF-LPB100 chip based devices..") + hf_lpb100.update() + pprint(hf_lpb100.entries) + + +if __name__ == "__main__": + main()