Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first step to fix dvd-dev/hilo#486 by making room to a second websock… #505

Merged
merged 45 commits into from
Feb 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
f4b24ef
first step to fix dvd-dev/hilo#486 by making room to a second websock…
Leicas Nov 26, 2024
b93d147
Initial linting
ic-dev21 Nov 26, 2024
f06d1c7
Update __init__.py
ic-dev21 Dec 6, 2024
4e2ac52
Update __init__.py
ic-dev21 Dec 6, 2024
db898b2
Some more progress
ic-dev21 Dec 26, 2024
77e2136
Ajout de stock dans le callback
ic-dev21 Dec 26, 2024
0a8cc12
Add logging and callback
ic-dev21 Jan 4, 2025
c6459d7
Update __init__.py
ic-dev21 Jan 4, 2025
8427c19
Linting
ic-dev21 Jan 4, 2025
8edec7e
Update __init__.py
ic-dev21 Jan 4, 2025
adb92d7
One more argument
ic-dev21 Jan 5, 2025
9e4ea39
Un autre
ic-dev21 Jan 5, 2025
061e9d3
This doesn't work causes error
ic-dev21 Jan 5, 2025
609e998
Update __init__.py
ic-dev21 Jan 5, 2025
6fbd36c
Update __init__.py
ic-dev21 Jan 6, 2025
bceefa5
Logging + linting/refactor
ic-dev21 Jan 8, 2025
0ed885e
Update __init__.py
ic-dev21 Jan 9, 2025
0887907
Update labeler.yml
ic-dev21 Jan 17, 2025
e04a94c
Websocket sensor en parallèle
ic-dev21 Jan 17, 2025
f655cde
Restore causait problème (à date)
ic-dev21 Jan 17, 2025
dc79f8a
Adding websocket listeners
ic-dev21 Jan 21, 2025
b972e5b
Debugging, logging
ic-dev21 Jan 22, 2025
3329d95
Exception handling
ic-dev21 Jan 24, 2025
cee6f14
Linting
ic-dev21 Jan 24, 2025
a2eade6
Attribute removal on completed cahllenge
ic-dev21 Jan 24, 2025
61c05a6
Remove dead code cleanup
ic-dev21 Jan 30, 2025
5b705ac
Linting.
ic-dev21 Jan 30, 2025
cf0fefc
Reduce the race condition probability
fersingb Jan 30, 2025
26d5129
Merge branch 'main' into fixissue#486
ic-dev21 Jan 31, 2025
081fac5
List is not dict
ic-dev21 Jan 31, 2025
14231bf
Update homeassistant requirement from ~=2025.1.0 to ~=2025.1.4
dependabot[bot] Jan 31, 2025
25f274e
Bump ruff from 0.9.1 to 0.9.4
dependabot[bot] Jan 31, 2025
1e9e9ac
Merge pull request #560 from dvd-dev/dependabot/pip/homeassistant-app…
ic-dev21 Jan 31, 2025
2289312
Merge pull request #561 from dvd-dev/dependabot/pip/ruff-0.9.4
ic-dev21 Jan 31, 2025
6b5eef8
Merge pull request #559 from fersingb/main
ic-dev21 Feb 1, 2025
73fc4c9
Remove duplicate
ic-dev21 Feb 1, 2025
8c4ccc1
Update sensor.py
ic-dev21 Feb 1, 2025
1c96e22
Update sensor.py
ic-dev21 Feb 1, 2025
cc7de89
Merge branch 'main' into fixissue#486
ic-dev21 Feb 1, 2025
2920bd7
Update sensor.py
ic-dev21 Feb 1, 2025
1e04ef0
Ménage de commentaires
ic-dev21 Feb 1, 2025
fe4958f
Update sensor.py
ic-dev21 Feb 1, 2025
0d4eb3f
Update sensor.py
ic-dev21 Feb 1, 2025
f3dcb5f
Clean up excess logging
ic-dev21 Feb 1, 2025
3fd6d67
Linting
ic-dev21 Feb 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ jobs:
steps:
- name: Check out the repository
uses: actions/[email protected]

- name: Run Labeler
uses: crazy-max/[email protected]
with:
configuration-path: .github/labels.yml
skip-delete: true
254 changes: 188 additions & 66 deletions custom_components/hilo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,19 @@ def __init__(self, hass: HomeAssistant, entry: ConfigEntry, api: API) -> None:
self.find_meter(self._hass)
self.entry = entry
self.devices: Devices = Devices(api)
self._websocket_reconnect_task: asyncio.Task | None = None
self._update_task: asyncio.Task | None = None
self.invocations = {0: self.subscribe_to_location}
self.challenge_id = 0
self._websocket_reconnect_tasks: list[asyncio.Task | None] = [None, None]
self._update_task: list[asyncio.Task | None] = [None, None]
self.invocations = {
0: self.subscribe_to_location,
1: self.subscribe_to_challenge,
2: self.subscribe_to_challengelist,
}
self.hq_plan_name = entry.options.get(CONF_HQ_PLAN_NAME, DEFAULT_HQ_PLAN_NAME)
self.appreciation = entry.options.get(
CONF_APPRECIATION_PHASE, DEFAULT_APPRECIATION_PHASE
)
self.pre_cold = entry.options.get(
CONF_PRE_COLD_PHASE, DEFAULT_PRE_COLD_PHASE # this is new
)
self.pre_cold = entry.options.get(CONF_PRE_COLD_PHASE, DEFAULT_PRE_COLD_PHASE)
self.challenge_lock = entry.options.get(
CONF_CHALLENGE_LOCK, DEFAULT_CHALLENGE_LOCK
)
Expand All @@ -260,25 +263,93 @@ def __init__(self, hass: HomeAssistant, entry: ConfigEntry, api: API) -> None:
self._events: dict = {}
if self.track_unknown_sources:
self._api._get_device_callbacks = [self._get_unknown_source_tracker]
self._websocket_listeners = []

def validate_heartbeat(self, event: WebsocketEvent) -> None:
heartbeat_time = from_utc_timestamp(event.arguments[0]) # type: ignore
if self._api.log_traces:
LOG.debug(f"Heartbeat: {time_diff(heartbeat_time, event.timestamp)}")

@callback
async def on_websocket_event(self, event: WebsocketEvent) -> None:
"""Define a callback for receiving a websocket event."""
async_dispatcher_send(self._hass, DISPATCHER_TOPIC_WEBSOCKET_EVENT, event)
if event.event_type == "COMPLETE":
cb = self.invocations.get(event.invocation)
if cb:
async_call_later(self._hass, 3, cb(event.invocation))
elif event.target == "Heartbeat":
self.validate_heartbeat(event)
elif event.target == "DevicesValuesReceived":
# When receiving attribute values for unknown devices, assume
# we have refresh the device list.
def register_websocket_listener(self, listener):
"""Register a listener for websocket events."""
LOG.debug(f"Registering websocket listener: {listener.__class__.__name__}")
self._websocket_listeners.append(listener)

async def _handle_websocket_message(self, event):
"""Process websocket messages and notify listeners."""

LOG.debug(f"Received websocket message type: {event}")
target = event.target
LOG.debug(f"handle_websocket_message_target {target}")
msg_data = event
LOG.debug(f"handle_websocket_message_ msg_data {msg_data}")

if target == "ChallengeListInitialValuesReceived":
msg_type = "challenge_list_initial"
elif target == "ChallengeAdded":
msg_type = "challenge_added"
elif target == "ChallengeDetailsUpdated":
msg_type = "challenge_details_update"
elif target == "ChallengeConsumptionUpdatedValuesReceived":
msg_type = "challenge_details_update"
elif target == "ChallengeDetailsUpdatedValuesReceived":
msg_type = "challenge_details_update"
elif target == "ChallengeDetailsInitialValuesReceived":
msg_type = "challenge_details_update"
elif target == "ChallengeListUpdatedValuesReceived":
msg_type = "challenge_details_update"

# ic-dev21 Notify listeners
for listener in self._websocket_listeners:
handler_name = f"handle_{msg_type}"
if hasattr(listener, handler_name):
handler = getattr(listener, handler_name)
try:
# ic-dev21 Extract the arguments from the WebsocketEvent object
if isinstance(msg_data, WebsocketEvent):
arguments = msg_data.arguments
if arguments: # ic-dev21 check if there are arguments
await handler(arguments[0])
else:
LOG.warning(f"Received empty arguments for {msg_type}")
else:
await handler(msg_data)
except Exception as e:
LOG.error(f"Error in websocket handler {handler_name}: {e}")

async def _handle_challenge_events(self, event: WebsocketEvent) -> None:
"""Handle all challenge-related websocket events."""
if event.target == "ChallengeDetailsInitialValuesRecei0ved":
challenge = event.arguments[0]
LOG.debug(f"ChallengeDetailsInitialValuesReceived, challenge = {challenge}")
self.challenge_id = challenge.get("id")

elif event.target == "ChallengeDetailsUpdatedValuesReceived":
LOG.debug("ChallengeDetailsUpdatedValuesReceived")

elif event.target == "ChallengeListUpdatedValuesReceived":
LOG.debug("ChallengeListUpdatedValuesReceived")
self.challenge_phase = event.arguments[0][0]["currentPhase"]

elif event.target == "ChallengeAdded":
LOG.debug("ChallengeAdded")
challenge = event.arguments[0]
self.challenge_id = challenge.get("id")
await self.subscribe_to_challenge(1, self.challenge_id)

elif event.target == "ChallengeListInitialValuesReceived":
LOG.debug("ChallengeListInitialValuesReceived")
challenges = event.arguments[0]

for challenge in challenges:
challenge_id = challenge.get("id")
self.challenge_phase = challenge.get("currentPhase")
self.challenge_id = challenge.get("id")
await self.subscribe_to_challenge(1, challenge_id)

async def _handle_device_events(self, event: WebsocketEvent) -> None:
"""Handle all device-related websocket events."""
if event.target == "DevicesValuesReceived":
new_devices = any(
self.devices.find_device(item["deviceId"]) is None
for item in event.arguments[0]
Expand All @@ -290,67 +361,110 @@ async def on_websocket_event(self, event: WebsocketEvent) -> None:
await self.devices.update()

updated_devices = self.devices.parse_values_received(event.arguments[0])
# NOTE(dvd): If we don't do this, we need to wait until the coordinator
# runs (scan_interval) to have updated data in the dashboard.
for device in updated_devices:
async_dispatcher_send(
self._hass, SIGNAL_UPDATE_ENTITY.format(device.id)
)

elif event.target == "DeviceListInitialValuesReceived":
# This websocket event only happens after calling SubscribeToLocation.
# This triggers an update without throwing an exception
new_devices = await self.devices.update_devicelist_from_signalr(
event.arguments[0]
)
await self.devices.update_devicelist_from_signalr(event.arguments[0])

elif event.target == "DeviceListUpdatedValuesReceived":
# This message only contains display information, such as the Device's name (as set in the app), it's groupid, icon, etc.
# Updating the device name causes issues in the integration, it detects it as a new device and creates a new entity.
# Ignore this call, for now... (update_devicelist_from_signalr does work, but causes the issue above)
# await self.devices.update_devicelist_from_signalr(event.arguments[0])
LOG.debug(
"Received 'DeviceListUpdatedValuesReceived' message, not implemented yet."
)

elif event.target == "DevicesListChanged":
# This message only contains the location_id and is used to inform us that devices have been removed from the location.
# Device deletion is not implemented yet, so we just log the message for now.
LOG.debug("Received 'DevicesListChanged' message, not implemented yet.")

elif event.target == "DeviceAdded":
# Same structure as DeviceList* but only one device instead of a list
devices = []
devices.append(event.arguments[0])
new_devices = await self.devices.update_devicelist_from_signalr(devices)
devices = [event.arguments[0]]
await self.devices.update_devicelist_from_signalr(devices)

elif event.target == "DeviceDeleted":
# Device deletion is not implemented yet, so we just log the message for now.
LOG.debug("Received 'DeviceDeleted' message, not implemented yet.")

elif event.target == "GatewayValuesReceived":
# Gateway deviceId hardcoded to 1 as it is not returned by Gateways/Info.
# First time we encounter a GatewayValueReceived event, update device with proper deviceid.
gateway = self.devices.find_device(1)
if gateway:
gateway.id = event.arguments[0][0]["deviceId"]
LOG.debug(f"Updated Gateway's deviceId from default 1 to {gateway.id}")

updated_devices = self.devices.parse_values_received(event.arguments[0])
# NOTE(dvd): If we don't do this, we need to wait until the coordinator
# runs (scan_interval) to have updated data in the dashboard.
for device in updated_devices:
async_dispatcher_send(
self._hass, SIGNAL_UPDATE_ENTITY.format(device.id)
)

@callback
async def on_websocket_event(self, event: WebsocketEvent) -> None:
"""Define a callback for receiving a websocket event."""
async_dispatcher_send(self._hass, DISPATCHER_TOPIC_WEBSOCKET_EVENT, event)

if event.event_type == "COMPLETE":
cb = self.invocations.get(event.invocation)
if cb:
async_call_later(self._hass, 3, cb(event.invocation))

elif event.target == "Heartbeat":
self.validate_heartbeat(event)

elif "Challenge" in event.target:
await self._handle_challenge_events(event)
await self._handle_websocket_message(event)

elif "Device" in event.target or event.target == "GatewayValuesReceived":
await self._handle_device_events(event)

else:
LOG.warning(f"Unhandled websocket event: {event}")

@callback
async def subscribe_to_location(self, inv_id: int) -> None:
"""Sends the json payload to receive updates from the location."""
LOG.debug(f"Subscribing to location {self.devices.location_id}")
await self._api.websocket.async_invoke(
await self._api.websocket_devices.async_invoke(
[self.devices.location_id], "SubscribeToLocation", inv_id
)

@callback
async def subscribe_to_challenge(self, inv_id: int, event_id: int = 0) -> None:
"""Sends the json payload to receive updates from the challenge."""
# ic-dev21 : data structure of the message was incorrect, needed the "fixed" strings
LOG.debug(f"ic-dev21 subscribe to challenge :{event_id} or {self.challenge_id}")
event_id = event_id or self.challenge_id

LOG.debug(
f"Subscribing to challenge {event_id} at location {self.devices.location_id}"
)
await self._api.websocket_challenges.async_invoke(
[{"locationId": self.devices.location_id, "eventId": event_id}],
"SubscribeToChallenge",
inv_id,
)

@callback
async def subscribe_to_challengelist(self, inv_id: int) -> None:
"""Sends the json payload to receive updates from the challenge list."""
# ic-dev21 this will be necessary to get the challenge list
LOG.debug(
f"Subscribing to challenge list at location {self.devices.location_id}"
)
await self._api.websocket_challenges.async_invoke(
[{"locationId": self.devices.location_id}],
"SubscribeToChallengeList",
inv_id,
)

@callback
async def request_status_update(self) -> None:
await self._api.websocket.send_status()
await self._api.websocket_devices.send_status()
for inv_id, inv_cb in self.invocations.items():
await inv_cb(inv_id)

@callback
async def request_status_update_challenge(self) -> None:
await self._api.websocket_challenges.send_status()
for inv_id, inv_cb in self.invocations.items():
await inv_cb(inv_id)

Expand Down Expand Up @@ -424,20 +538,28 @@ async def async_init(self, scan_interval: int) -> None:
self._hass, self.entry, self.unknown_tracker_device
)

self._api.websocket.add_connect_callback(self.request_status_update)
self._api.websocket.add_event_callback(self.on_websocket_event)
self._websocket_reconnect_task = asyncio.create_task(
self.start_websocket_loop()
self._api.websocket_devices.add_connect_callback(self.request_status_update)
self._api.websocket_devices.add_event_callback(self.on_websocket_event)
self._api.websocket_challenges.add_connect_callback(
self.request_status_update_challenge
)
self._api.websocket_challenges.add_event_callback(self.on_websocket_event)
self._websocket_reconnect_tasks[0] = asyncio.create_task(
self.start_websocket_loop(self._api.websocket_devices, 0)
)
self._websocket_reconnect_tasks[1] = asyncio.create_task(
self.start_websocket_loop(self._api.websocket_challenges, 1)
)
# asyncio.create_task(self._api.websocket.async_connect())

# asyncio.create_task(self._api.websocket_devices.async_connect())

async def websocket_disconnect_listener(_: Event) -> None:
"""Define an event handler to disconnect from the websocket."""
if TYPE_CHECKING:
assert self._api.websocket
assert self._api.websocket_devices

if self._api.websocket.connected:
await self._api.websocket.async_disconnect()
if self._api.websocket_devices.connected:
await self._api.websocket_devices.async_disconnect()

self.entry.async_on_unload(
self._hass.bus.async_listen_once(
Expand All @@ -452,37 +574,37 @@ async def websocket_disconnect_listener(_: Event) -> None:
update_method=self.async_update,
)

async def start_websocket_loop(self) -> None:
async def start_websocket_loop(self, websocket, id) -> None:
"""Start a websocket reconnection loop."""
if TYPE_CHECKING:
assert self._api.websocket
assert websocket

should_reconnect = True

try:
await self._api.websocket.async_connect()
await self._api.websocket.async_listen()
await websocket.async_connect()
await websocket.async_listen()
except asyncio.CancelledError:
LOG.debug("Request to cancel websocket loop received")
raise
except WebsocketError as err:
LOG.error(f"Failed to connect to websocket: {err}", exc_info=err)
await self.cancel_websocket_loop()
await self.cancel_websocket_loop(websocket, id)
except InvalidCredentialsError:
LOG.warning("Invalid credentials? Refreshing websocket infos")
await self.cancel_websocket_loop()
await self.cancel_websocket_loop(websocket, id)
await self._api.refresh_ws_token()
except Exception as err: # pylint: disable=broad-except
LOG.error(
f"Unknown exception while connecting to websocket: {err}", exc_info=err
)
await self.cancel_websocket_loop()
await self.cancel_websocket_loop(websocket, id)

if should_reconnect:
LOG.info("Disconnected from websocket; reconnecting in 5 seconds.")
await asyncio.sleep(5)
self._websocket_reconnect_task = self._hass.async_create_task(
self.start_websocket_loop()
self._websocket_reconnect_tasks[id] = self._hass.async_create_task(
self.start_websocket_loop(websocket, id)
)

async def cancel_task(self, task) -> None:
Expand All @@ -496,15 +618,15 @@ async def cancel_task(self, task) -> None:
task = None
return task

async def cancel_websocket_loop(self) -> None:
async def cancel_websocket_loop(self, websocket, id) -> None:
"""Stop any existing websocket reconnection loop."""
self._websocket_reconnect_task = await self.cancel_task(
self._websocket_reconnect_task
self._websocket_reconnect_tasks[id] = await self.cancel_task(
self._websocket_reconnect_tasks[id]
)
self._update_task = await self.cancel_task(self._update_task)
self._update_task[id] = await self.cancel_task(self._update_task[id])
if TYPE_CHECKING:
assert self._api.websocket
await self._api.websocket.async_disconnect()
assert websocket
await websocket.async_disconnect()

async def async_update(self) -> None:
"""Updates tarif periodically."""
Expand Down
Loading
Loading