Skip to content
This repository has been archived by the owner on Oct 1, 2021. It is now read-only.

Added support for HF-LPB100 chip based device discovery #226

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Current methods of scanning:
- Plex Media Server using Good Day Mate protocol
- Logitech Media Server discovery protocol
- Daikin discovery protocol
- HF-LPB100 discovery protocol
- Web OS discovery protocol

It is the library that powers the device discovery within [Home Assistant](https://home-assistant.io/).
Expand Down
14 changes: 14 additions & 0 deletions netdisco/discoverables/hf_lpb100.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Discover HF-LPB100 chip based devices."""
from netdisco.discoverables import BaseDiscoverable


class Discoverable(BaseDiscoverable):
"""HF-LPB100 chip based discoverable."""

def __init__(self, netdis):
"""Initialize the HF-LPB100 chip based discovery."""
self._netdis = netdis

def get_entries(self):
"""Get all the Sunix Controller device details."""
return self._netdis.hf_lpb100.entries
6 changes: 6 additions & 0 deletions netdisco/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .lms import LMS
from .tellstick import Tellstick
from .daikin import Daikin
from .hf_lpb100 import HF_LPB100
from .smartglass import XboxSmartGlass

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -40,6 +41,7 @@ def __init__(self):
self.lms = None
self.tellstick = None
self.daikin = None
self.hf_lpb100 = None
self.xbox_smartglass = None

self.is_discovering = False
Expand Down Expand Up @@ -71,6 +73,9 @@ def scan(self):
self.daikin = Daikin()
self.daikin.scan()

self.hf_lpb100 = HF_LPB100()
self.hf_lpb100.scan()

self.xbox_smartglass = XboxSmartGlass()
self.xbox_smartglass.scan()

Expand All @@ -87,6 +92,7 @@ def stop(self):
self.lms = None
self.tellstick = None
self.daikin = None
self.hf_lpb100 = None
self.xbox_smartglass = None
self.discoverables = None
self.is_discovering = False
Expand Down
95 changes: 95 additions & 0 deletions netdisco/hf_lpb100.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""HF-LPB100 chip based device discovery."""
import socket
from datetime import timedelta

from netdisco.const import ATTR_HOST, ATTR_SERIAL, ATTR_MODEL_NAME

DISCOVERY_MSG = b'HF-A11ASSISTHREAD'

DISCOVERY_ADDRESS = '<broadcast>'
DISCOVERY_PORT = 48899
DISCOVERY_TIMEOUT = timedelta(seconds=2)


class HF_LPB100:
"""Base class to discover HF-LPB100 chip based devices."""

def __init__(self):
"""Initialize the HF-LPB100 chip based discovery."""
self.entries = []

def scan(self):
"""Scan the network."""
self.update()

def all(self):
"""Scan and return all found entries."""
self.scan()
return self.entries

def update(self):
"""Scan network for HF-LPB100 chip based devices."""
entries = []

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as cs:
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
cs.settimeout(DISCOVERY_TIMEOUT.seconds)

received_messages = []

try:
# send a local broadcast via udp with a "magic packet"
cs.sendto(DISCOVERY_MSG, (DISCOVERY_ADDRESS, DISCOVERY_PORT))
cs.settimeout(DISCOVERY_TIMEOUT.seconds)

while True:
data, address = cs.recvfrom(4096)
received_messages.append(data.decode("UTF-8"))

except socket.timeout:
if len(received_messages) <= 0:
return []

for message in received_messages:
try:
controller = self._parse_discovery_response(message)
if controller is not None:
entries.append(controller)
# pylint: disable=W0702
except: # noqa: E722
print("Error parsing discovery message: %s" % message)
return None

self.entries = entries

@staticmethod
def _parse_discovery_response(message: str) -> {}:
# parse received message
data = str.split(message, ",")

# check validity
if len(data) == 3:
# extract data
ip = data[0]
hw_id = data[1]
model = data[2]

return {
ATTR_HOST: ip,
ATTR_SERIAL: hw_id,
ATTR_MODEL_NAME: model
}


def main():
"""Test HF-LPB100 chip based discovery."""
from pprint import pprint
hf_lpb100 = HF_LPB100()
pprint("Scanning for HF-LPB100 chip based devices..")
hf_lpb100.update()
pprint(hf_lpb100.entries)


if __name__ == "__main__":
main()