Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
jeeftor committed Sep 20, 2024
2 parents 9d5faf5 + b7281d6 commit 503aad3
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 20 deletions.
25 changes: 20 additions & 5 deletions example_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from collections import Counter
from dotenv import load_dotenv

from weatherflow4py.models.ws.websocket_request import (
ListenStartMessage,
RapidWindListenStartMessage,
Expand All @@ -14,7 +15,7 @@
import logging
from pprint import pprint

NUMBER_OF_MESSAGES = 1
NUMBER_OF_MESSAGES = 5
REQUIRED_MESSAGE_TYPES = {"obs_st", "rapid_wind"}

ws_logger = logging.getLogger("websockets.client")
Expand Down Expand Up @@ -65,8 +66,8 @@ async def main():
api.register_wind_callback(wind_cb)

await api.connect()
await api.send_message(ListenStartMessage(device_id=device))
await api.send_message(RapidWindListenStartMessage(device_id=device))
await api.send_message_and_wait(ListenStartMessage(device_id=device))
await api.send_message_and_wait(RapidWindListenStartMessage(device_id=device))

while sum(
message_counter.values()
Expand All @@ -75,13 +76,19 @@ async def main():
):
await asyncio.sleep(1)

print("-" * 80)
await api.stop_all_listeners()
print("-" * 80)
# await api.send_message_and_wait(ListenStopMessage(device_id=device))
# await api.send_message_and_wait(RapidWindListenStopMessage(device_id=device))

print("Received all required message types and reached message limit.")
print("Final message count:")
pprint(dict(message_counter))
print("DATA::")
pprint(api.messages)

await api.close()
# await api.close()
print("Connection closed.")


Expand All @@ -101,4 +108,12 @@ async def main():
)
except asyncio.TimeoutError:
logger.warning("Listen task cancellation timed out or was cancelled")
loop.close()

# Keep the loop running until Ctrl+C is hit
try:
while True:
loop.run_forever()
except KeyboardInterrupt:
logger.info("Received keyboard interrupt. Exiting...")
finally:
loop.close()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "weatherflow4py"
version = "1.1.14"
version = "1.1.16"
description = "Python library used by Home Assistant to interact with the WeatherFlow REST API"
authors = ["Jeef <[email protected]>"]
readme = "README.md"
Expand Down
80 changes: 66 additions & 14 deletions weatherflow4py/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
WebsocketResponseBuilder,
ObservationTempestWS,
RapidWindWS,
AcknowledgementWS,
)

from .const import WS_LOGGER
Expand Down Expand Up @@ -159,6 +160,44 @@ async def send_message(self, message_type: WebsocketRequest):
WS_LOGGER.debug(f"Sending message: {message}")
await self._send(message)

async def send_message_and_wait(
self, message_type: WebsocketRequest, timeout: float = 5.0
) -> Optional[AcknowledgementWS]:
message = message_type.json
WS_LOGGER.debug(f"Sending message and waiting for ACK: {message}")

# Create a future to store the ACK response
ack_future = asyncio.Future()

# Register a temporary callback for ACK messages
def ack_callback(ack: AcknowledgementWS):
if not ack_future.done():
ack_future.set_result(ack)

# Store the original ACK callback if it exists
original_ack_callback = self.callbacks.get(EventType.ACKNOWLEDGEMENT.value)

# Set our temporary callback
self.callbacks[EventType.ACKNOWLEDGEMENT.value] = ack_callback

try:
# Send the message
await self._send(message)

# Wait for the ACK with a timeout
return await asyncio.wait_for(ack_future, timeout=timeout)

except asyncio.TimeoutError:
WS_LOGGER.warning(f"Timeout waiting for ACK after sending: {message}")
return None

finally:
# Restore the original callback or remove our temporary one
if original_ack_callback:
self.callbacks[EventType.ACKNOWLEDGEMENT.value] = original_ack_callback
else:
self.callbacks.pop(EventType.ACKNOWLEDGEMENT.value, None)

async def connect(self, ssl_context: Optional[SSLContext] = None):
"""Establishes a WebSocket connection and starts a background listening task.
Expand Down Expand Up @@ -225,29 +264,38 @@ def is_connected(self):
# Check if the websocket connection is open
return self.websocket and not self.websocket.closed

async def close(self, timeout: float = 5.0) -> None:
async def stop_all_listeners(self):
"""
Close the WebSocket connection and clean up resources.
Args:
timeout (float): Maximum time to wait for tasks to complete (default: 5.0 seconds)
Stop listening for all devices - waits for acknowledgement
"""
if not self.is_connected:
return

# Stop listening for all devices
stop_tasks = []
for device_id in self.device_ids:
stop_tasks.extend(
[
self.send_message(ListenStopMessage(device_id=device_id)),
self.send_message(RapidWindListenStopMessage(device_id=device_id)),
self.send_message_and_wait(ListenStopMessage(device_id=device_id)),
self.send_message_and_wait(
RapidWindListenStopMessage(device_id=device_id)
),
]
)

# Wait for all stop messages to be sent
if stop_tasks:
await asyncio.gather(*stop_tasks, return_exceptions=True)
for task in stop_tasks:
await task

WS_LOGGER.info("Stopped listening for all devices")

async def close(self, timeout: float = 5.0) -> None:
"""
Close the WebSocket connection and clean up resources.
Args:
timeout (float): Maximum time to wait for tasks to complete (default: 5.0 seconds)
"""
if not self.is_connected:
return

await self.stop_all_listeners()

# Cancel the listen task
if self.listen_task and not self.listen_task.done():
Expand All @@ -258,13 +306,17 @@ async def close(self, timeout: float = 5.0) -> None:
WS_LOGGER.warning("Listen task cancellation timed out")
except asyncio.CancelledError:
WS_LOGGER.info("Listen task was cancelled")
except Exception as e:
WS_LOGGER.error(f"Exception during listen task cancellation: {e}")

# Close the WebSocket connection
if self.websocket:
try:
await asyncio.wait_for(self.websocket.close(), timeout=timeout)
except TimeoutError:
except asyncio.TimeoutError:
WS_LOGGER.warning("WebSocket close operation timed out")
except Exception as e:
WS_LOGGER.error(f"Exception during WebSocket close operation: {e}")
finally:
if self.websocket.closed:
WS_LOGGER.info("WebSocket connection successfully closed")
Expand Down

0 comments on commit 503aad3

Please sign in to comment.