From e36024009c5aaf2fa7d47ae08fd33b399129ecc9 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 17 Apr 2023 10:18:40 -1000 Subject: [PATCH] Add webhook support to onvif --- .../components/onvif/binary_sensor.py | 20 +- homeassistant/components/onvif/device.py | 25 +- homeassistant/components/onvif/diagnostics.py | 4 + homeassistant/components/onvif/event.py | 853 ++++++++++++++---- homeassistant/components/onvif/manifest.json | 2 +- homeassistant/components/onvif/models.py | 18 + homeassistant/components/onvif/sensor.py | 21 +- requirements_all.txt | 2 +- requirements_test_all.txt | 2 +- tests/components/onvif/__init__.py | 6 + tests/components/onvif/test_diagnostics.py | 10 + 11 files changed, 769 insertions(+), 194 deletions(-) diff --git a/homeassistant/components/onvif/binary_sensor.py b/homeassistant/components/onvif/binary_sensor.py index 8f79b43296f24..3676e3b6c2750 100644 --- a/homeassistant/components/onvif/binary_sensor.py +++ b/homeassistant/components/onvif/binary_sensor.py @@ -24,7 +24,7 @@ async def async_setup_entry( async_add_entities: AddEntitiesCallback, ) -> None: """Set up a ONVIF binary sensor.""" - device = hass.data[DOMAIN][config_entry.unique_id] + device: ONVIFDevice = hass.data[DOMAIN][config_entry.unique_id] entities = { event.uid: ONVIFBinarySensor(event.uid, device) @@ -39,16 +39,20 @@ async def async_setup_entry( ) async_add_entities(entities.values()) + uids_by_platform = device.events.get_uids_by_platform("binary_sensor") @callback - def async_check_entities(): + def async_check_entities() -> None: """Check if we have added an entity for the event.""" - new_entities = [] - for event in device.events.get_platform("binary_sensor"): - if event.uid not in entities: - entities[event.uid] = ONVIFBinarySensor(event.uid, device) - new_entities.append(entities[event.uid]) - async_add_entities(new_entities) + nonlocal uids_by_platform + if not (missing := uids_by_platform.difference(entities)): + return + new_entities: dict[str, ONVIFBinarySensor] = { + uid: ONVIFBinarySensor(uid, device) for uid in missing + } + if new_entities: + entities.update(new_entities) + async_add_entities(new_entities.values()) device.events.async_add_listener(async_check_entities) diff --git a/homeassistant/components/onvif/device.py b/homeassistant/components/onvif/device.py index a9f8625521e2a..9eddd66ecd9e6 100644 --- a/homeassistant/components/onvif/device.py +++ b/homeassistant/components/onvif/device.py @@ -101,7 +101,7 @@ async def async_setup(self) -> None: # Create event manager assert self.config_entry.unique_id - self.events = EventManager(self.hass, self.device, self.config_entry.unique_id) + self.events = EventManager(self.hass, self.device, self.config_entry, self.name) # Fetch basic device info and capabilities self.info = await self.async_get_device_info() @@ -159,10 +159,10 @@ async def async_manually_set_date_and_time(self) -> None: async def async_check_date_and_time(self) -> None: """Warns if device and system date not synced.""" - LOGGER.debug("Setting up the ONVIF device management service") + LOGGER.debug("%s: Setting up the ONVIF device management service", self.name) device_mgmt = self.device.create_devicemgmt_service() - LOGGER.debug("Retrieving current device date/time") + LOGGER.debug("%s: Retrieving current device date/time", self.name) try: system_date = dt_util.utcnow() device_time = await device_mgmt.GetSystemDateAndTime() @@ -174,7 +174,7 @@ async def async_check_date_and_time(self) -> None: ) return - LOGGER.debug("Device time: %s", device_time) + LOGGER.debug("%s: Device time: %s", self.name, device_time) tzone = dt_util.DEFAULT_TIME_ZONE cdate = device_time.LocalDateTime @@ -185,7 +185,9 @@ async def async_check_date_and_time(self) -> None: tzone = dt_util.get_time_zone(device_time.TimeZone.TZ) or tzone if cdate is None: - LOGGER.warning("Could not retrieve date/time on this camera") + LOGGER.warning( + "%s: Could not retrieve date/time on this camera", self.name + ) else: cam_date = dt.datetime( cdate.Date.Year, @@ -201,7 +203,8 @@ async def async_check_date_and_time(self) -> None: cam_date_utc = cam_date.astimezone(dt_util.UTC) LOGGER.debug( - "Device date/time: %s | System date/time: %s", + "%s: Device date/time: %s | System date/time: %s", + self.name, cam_date_utc, system_date, ) @@ -266,10 +269,6 @@ async def async_get_capabilities(self): media_capabilities = await media_service.GetServiceCapabilities() snapshot = media_capabilities and media_capabilities.SnapshotUri - pullpoint = False - with suppress(ONVIFError, Fault, RequestError, XMLParseError): - pullpoint = await self.events.async_start() - ptz = False with suppress(ONVIFError, Fault, RequestError): self.device.get_definition("ptz") @@ -280,7 +279,11 @@ async def async_get_capabilities(self): self.device.create_imaging_service() imaging = True - return Capabilities(snapshot, pullpoint, ptz, imaging) + events = False + with suppress(ONVIFError, Fault, RequestError, XMLParseError): + events = await self.events.async_start() + + return Capabilities(snapshot, events, ptz, imaging) async def async_get_profiles(self) -> list[Profile]: """Obtain media profiles for this device.""" diff --git a/homeassistant/components/onvif/diagnostics.py b/homeassistant/components/onvif/diagnostics.py index eb818f53a3a5e..d7f2c51530898 100644 --- a/homeassistant/components/onvif/diagnostics.py +++ b/homeassistant/components/onvif/diagnostics.py @@ -28,5 +28,9 @@ async def async_get_config_entry_diagnostics( "capabilities": asdict(device.capabilities), "profiles": [asdict(profile) for profile in device.profiles], } + data["events"] = { + "webhook_manager_state": device.events.webhook_manager.state, + "pullpoint_manager_state": device.events.pullpoint_manager.state, + } return data diff --git a/homeassistant/components/onvif/event.py b/homeassistant/components/onvif/event.py index 5bc2a8248fc94..f4c113dc398ed 100644 --- a/homeassistant/components/onvif/event.py +++ b/homeassistant/components/onvif/event.py @@ -5,24 +5,57 @@ from collections.abc import Callable from contextlib import suppress import datetime as dt -from logging import DEBUG, WARNING -from httpx import RemoteProtocolError, TransportError +from aiohttp.web import Request +from httpx import RemoteProtocolError, RequestError, TransportError from onvif import ONVIFCamera, ONVIFService +from onvif.client import NotificationManager +from onvif.exceptions import ONVIFError from zeep.exceptions import Fault, XMLParseError -from homeassistant.core import CALLBACK_TYPE, CoreState, HomeAssistant, callback +from homeassistant.components import webhook +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import ( + CALLBACK_TYPE, + CoreState, + HassJob, + HomeAssistant, + callback, +) from homeassistant.helpers.event import async_call_later -from homeassistant.util import dt as dt_util +from homeassistant.helpers.network import NoURLAvailableError, get_url -from .const import LOGGER -from .models import Event +from .const import DOMAIN, LOGGER +from .models import Event, PullPointManagerState, WebHookManagerState from .parsers import PARSERS UNHANDLED_TOPICS: set[str] = set() SUBSCRIPTION_ERRORS = (Fault, asyncio.TimeoutError, TransportError) +CREATE_ERRORS = (ONVIFError, Fault, RequestError, XMLParseError) SET_SYNCHRONIZATION_POINT_ERRORS = (*SUBSCRIPTION_ERRORS, TypeError) +UNSUBSCRIBE_ERRORS = (XMLParseError, *SUBSCRIPTION_ERRORS) +RENEW_ERRORS = (ONVIFError, RequestError, XMLParseError, *SUBSCRIPTION_ERRORS) +# +# We only keep the subscription alive for 3 minutes, and will keep +# renewing it every 1.5 minutes. This is to avoid the camera +# accumulating subscriptions which will be impossible to clean up +# since ONVIF does not provide a way to list existing subscriptions. +# +# If we max out the number of subscriptions, the camera will stop +# sending events to us, and we will not be able to recover until +# the subscriptions expire or the camera is rebooted. +# +SUBSCRIPTION_TIME = dt.timedelta(minutes=3) +SUBSCRIPTION_RELATIVE_TIME = ( + "PT3M" # use relative time since the time on the camera is not reliable +) +SUBSCRIPTION_RENEW_INTERVAL = SUBSCRIPTION_TIME.total_seconds() / 2 +SUBSCRIPTION_RENEW_INTERVAL_ON_ERROR = 60.0 + +PULLPOINT_POLL_TIME = dt.timedelta(seconds=60) +PULLPOINT_MESSAGE_LIMIT = 100 +PULLPOINT_COOLDOWN_TIME = 0.75 def _stringify_onvif_error(error: Exception) -> str: @@ -32,45 +65,49 @@ def _stringify_onvif_error(error: Exception) -> str: return str(error) -def _get_next_termination_time() -> str: - """Get next termination time.""" - return ( - (dt_util.utcnow() + dt.timedelta(days=1)) - .isoformat(timespec="seconds") - .replace("+00:00", "Z") - ) - - class EventManager: """ONVIF Event Manager.""" def __init__( - self, hass: HomeAssistant, device: ONVIFCamera, unique_id: str + self, + hass: HomeAssistant, + device: ONVIFCamera, + config_entry: ConfigEntry, + name: str, ) -> None: """Initialize event manager.""" - self.hass: HomeAssistant = hass - self.device: ONVIFCamera = device - self.unique_id: str = unique_id - self.started: bool = False + self.hass = hass + self.device = device + self.config_entry = config_entry + self.unique_id = config_entry.unique_id + self.name = name - self._subscription: ONVIFService = None + self.webhook_manager = WebHookManager(self) + self.pullpoint_manager = PullPointManager(self) + + self._uid_by_platform: dict[str, set[str]] = {} self._events: dict[str, Event] = {} self._listeners: list[CALLBACK_TYPE] = [] - self._unsub_refresh: CALLBACK_TYPE | None = None - super().__init__() + @property + def started(self) -> bool: + """Return True if event manager is started.""" + return ( + self.webhook_manager.state == WebHookManagerState.STARTED + or self.pullpoint_manager.state == PullPointManagerState.STARTED + ) @property - def platforms(self) -> set[str]: - """Return platforms to setup.""" - return {event.platform for event in self._events.values()} + def has_listeners(self) -> bool: + """Return if there are listeners.""" + return bool(self._listeners) @callback def async_add_listener(self, update_callback: CALLBACK_TYPE) -> Callable[[], None]: """Listen for data updates.""" # This is the first listener, set up polling. if not self._listeners: - self.async_schedule_pull() + self.pullpoint_manager.async_schedule_pull_messages() self._listeners.append(update_callback) @@ -87,188 +124,676 @@ def async_remove_listener(self, update_callback: CALLBACK_TYPE) -> None: if update_callback in self._listeners: self._listeners.remove(update_callback) - if not self._listeners and self._unsub_refresh: - self._unsub_refresh() - self._unsub_refresh = None + if not self._listeners: + self.pullpoint_manager.async_cancel_pull_messages() async def async_start(self) -> bool: """Start polling events.""" - if not await self.device.create_pullpoint_subscription( - {"InitialTerminationTime": _get_next_termination_time()} + # Always start pull point first, since it will populate the event list + event_via_pull_point = await self.pullpoint_manager.async_start() + events_via_webhook = await self.webhook_manager.async_start() + return events_via_webhook or event_via_pull_point + + async def async_stop(self) -> None: + """Unsubscribe from events.""" + self._listeners = [] + await self.pullpoint_manager.async_stop() + await self.webhook_manager.async_stop() + + @callback + def async_callback_listeners(self) -> None: + """Update listeners.""" + for update_callback in self._listeners: + update_callback() + + # pylint: disable=protected-access + async def async_parse_messages(self, messages) -> None: + """Parse notification message.""" + unique_id = self.unique_id + assert unique_id is not None + for msg in messages: + # Guard against empty message + if not msg.Topic: + continue + + topic = msg.Topic._value_1 + if not (parser := PARSERS.get(topic)): + if topic not in UNHANDLED_TOPICS: + LOGGER.info( + "%s: No registered handler for event from %s: %s", + self.name, + unique_id, + msg, + ) + UNHANDLED_TOPICS.add(topic) + continue + + event = await parser(unique_id, msg) + + if not event: + LOGGER.info( + "%s: Unable to parse event from %s: %s", self.name, unique_id, msg + ) + return + + self.get_uids_by_platform(event.platform).add(event.uid) + self._events[event.uid] = event + + def get_uid(self, uid: str) -> Event | None: + """Retrieve event for given id.""" + return self._events.get(uid) + + def get_platform(self, platform) -> list[Event]: + """Retrieve events for given platform.""" + return [event for event in self._events.values() if event.platform == platform] + + def get_uids_by_platform(self, platform: str) -> set[str]: + """Retrieve uids for a given platform.""" + if (possible_uids := self._uid_by_platform.get(platform)) is None: + uids: set[str] = set() + self._uid_by_platform[platform] = uids + return uids + return possible_uids + + @callback + def async_webhook_failed(self) -> None: + """Mark webhook as failed.""" + if self.pullpoint_manager.state != PullPointManagerState.PAUSED: + return + LOGGER.debug("%s: Switching to PullPoint for events", self.name) + self.pullpoint_manager.async_resume() + + @callback + def async_webhook_working(self) -> None: + """Mark webhook as working.""" + if self.pullpoint_manager.state != PullPointManagerState.STARTED: + return + LOGGER.debug("%s: Switching to webhook for events", self.name) + self.pullpoint_manager.async_pause() + + +class PullPointManager: + """ONVIF PullPoint Manager. + + If the camera supports webhooks and the webhook is reachable, the pullpoint + manager will keep the pull point subscription alive, but will not poll for + messages unless the webhook fails. + """ + + def __init__(self, event_manager: EventManager) -> None: + """Initialize pullpoint manager.""" + self.state = PullPointManagerState.STOPPED + + self._event_manager = event_manager + self._device = event_manager.device + self._hass = event_manager.hass + self._name = event_manager.name + + self._pullpoint_subscription: ONVIFService = None + self._pullpoint_service: ONVIFService = None + self._pull_lock: asyncio.Lock = asyncio.Lock() + + self._cancel_pull_messages: CALLBACK_TYPE | None = None + self._cancel_pullpoint_renew: CALLBACK_TYPE | None = None + + self._renew_lock: asyncio.Lock = asyncio.Lock() + self._renew_or_restart_job = HassJob( + self._async_renew_or_restart_pullpoint, + f"{self._name}: renew or restart pullpoint", + ) + self._pull_messages_job = HassJob( + self._async_background_pull_messages, + f"{self._name}: pull messages", + ) + + async def async_start(self) -> bool: + """Start pullpoint subscription.""" + assert ( + self.state == PullPointManagerState.STOPPED + ), "PullPoint manager already started" + LOGGER.debug("%s: Starting PullPoint manager", self._name) + if not await self._async_start_pullpoint(): + self.state = PullPointManagerState.FAILED + return False + self.state = PullPointManagerState.STARTED + return True + + @callback + def async_pause(self) -> None: + """Pause pullpoint subscription.""" + LOGGER.debug("%s: Pausing PullPoint manager", self._name) + self.state = PullPointManagerState.PAUSED + self._hass.async_create_task(self._async_cancel_and_unsubscribe()) + + @callback + def async_resume(self) -> None: + """Resume pullpoint subscription.""" + LOGGER.debug("%s: Resuming PullPoint manager", self._name) + self.state = PullPointManagerState.STARTED + self.async_schedule_pullpoint_renew(0.0) + + @callback + def async_schedule_pullpoint_renew(self, delay: float) -> None: + """Schedule PullPoint subscription renewal.""" + self._async_cancel_pullpoint_renew() + self._cancel_pullpoint_renew = async_call_later( + self._hass, + delay, + self._renew_or_restart_job, + ) + + @callback + def async_cancel_pull_messages(self) -> None: + """Cancel the PullPoint task.""" + if self._cancel_pull_messages: + self._cancel_pull_messages() + self._cancel_pull_messages = None + + @callback + def async_schedule_pull_messages(self, delay: float | None = None) -> None: + """Schedule async_pull_messages to run. + + Used as fallback when webhook is not working. + + Must not check if the webhook is working. + """ + self.async_cancel_pull_messages() + if self.state != PullPointManagerState.STARTED: + return + if self._pullpoint_service: + when = delay if delay is not None else PULLPOINT_COOLDOWN_TIME + self._cancel_pull_messages = async_call_later( + self._hass, when, self._pull_messages_job + ) + + async def async_stop(self) -> None: + """Unsubscribe from PullPoint and cancel callbacks.""" + self.state = PullPointManagerState.STOPPED + await self._async_cancel_and_unsubscribe() + + async def _async_start_pullpoint(self) -> bool: + """Start pullpoint subscription.""" + try: + try: + started = await self._async_create_pullpoint_subscription() + except RemoteProtocolError: + # http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server + # to close the connection at any time, we treat this as a normal and try again + # once since we do not want to declare the camera as not supporting PullPoint + # if it just happened to close the connection at the wrong time. + started = await self._async_create_pullpoint_subscription() + except CREATE_ERRORS as err: + LOGGER.debug( + "%s: Device does not support PullPoint service or has too many subscriptions: %s", + self._name, + _stringify_onvif_error(err), + ) + return False + + if started: + self.async_schedule_pullpoint_renew(SUBSCRIPTION_RENEW_INTERVAL) + + return started + + async def _async_cancel_and_unsubscribe(self) -> None: + """Cancel and unsubscribe from PullPoint.""" + self._async_cancel_pullpoint_renew() + self.async_cancel_pull_messages() + await self._async_unsubscribe_pullpoint() + + async def _async_renew_or_restart_pullpoint( + self, now: dt.datetime | None = None + ) -> None: + """Renew or start pullpoint subscription.""" + if self._hass.is_stopping or self.state != PullPointManagerState.STARTED: + return + if self._renew_lock.locked(): + LOGGER.debug("%s: PullPoint renew already in progress", self._name) + # Renew is already running, another one will be + # scheduled when the current one is done if needed. + return + async with self._renew_lock: + next_attempt = SUBSCRIPTION_RENEW_INTERVAL_ON_ERROR + try: + if ( + await self._async_renew_pullpoint() + or await self._async_restart_pullpoint() + ): + next_attempt = SUBSCRIPTION_RENEW_INTERVAL + finally: + self.async_schedule_pullpoint_renew(next_attempt) + + async def _async_create_pullpoint_subscription(self) -> bool: + """Create pullpoint subscription.""" + + if not await self._device.create_pullpoint_subscription( + {"InitialTerminationTime": SUBSCRIPTION_RELATIVE_TIME} ): + LOGGER.debug("%s: Failed to create PullPoint subscription", self._name) return False # Create subscription manager - self._subscription = self.device.create_subscription_service( + self._pullpoint_subscription = self._device.create_subscription_service( "PullPointSubscription" ) - # Renew immediately - await self.async_renew() + # Create the service that will be used to pull messages from the device. + self._pullpoint_service = self._device.create_pullpoint_service() # Initialize events - pullpoint = self.device.create_pullpoint_service() with suppress(*SET_SYNCHRONIZATION_POINT_ERRORS): - await pullpoint.SetSynchronizationPoint() - response = await pullpoint.PullMessages( - {"MessageLimit": 100, "Timeout": dt.timedelta(seconds=5)} - ) + sync_result = await self._pullpoint_service.SetSynchronizationPoint() + LOGGER.debug("%s: SetSynchronizationPoint: %s", self._name, sync_result) - # Parse event initialization - await self.async_parse_messages(response.NotificationMessage) + # Always schedule an initial pull messages + self.async_schedule_pull_messages(0.0) - self.started = True return True - async def async_stop(self) -> None: - """Unsubscribe from events.""" - self._listeners = [] - self.started = False - - if not self._subscription: - return - - with suppress(*SUBSCRIPTION_ERRORS): - await self._subscription.Unsubscribe() - self._subscription = None + @callback + def _async_cancel_pullpoint_renew(self) -> None: + """Cancel the pullpoint renew task.""" + if self._cancel_pullpoint_renew: + self._cancel_pullpoint_renew() + self._cancel_pullpoint_renew = None - async def async_restart(self, _now: dt.datetime | None = None) -> None: + async def _async_restart_pullpoint(self) -> bool: """Restart the subscription assuming the camera rebooted.""" - if not self.started: + self.async_cancel_pull_messages() + await self._async_unsubscribe_pullpoint() + restarted = await self._async_start_pullpoint() + if restarted and self._event_manager.has_listeners: + LOGGER.debug("%s: Restarted PullPoint subscription", self._name) + self.async_schedule_pull_messages(0.0) + return restarted + + async def _async_unsubscribe_pullpoint(self) -> None: + """Unsubscribe the pullpoint subscription.""" + if not self._pullpoint_subscription: return + LOGGER.debug("%s: Unsubscribing from PullPoint", self._name) + try: + await self._pullpoint_subscription.Unsubscribe() + except UNSUBSCRIBE_ERRORS as err: + LOGGER.debug( + ( + "%s: Failed to unsubscribe PullPoint subscription;" + " This is normal if the device restarted: %s" + ), + self._name, + _stringify_onvif_error(err), + ) + self._pullpoint_subscription = None - if self._subscription: - # Suppressed. The subscription may no longer exist. - try: - await self._subscription.Unsubscribe() - except (XMLParseError, *SUBSCRIPTION_ERRORS) as err: - LOGGER.debug( - ( - "Failed to unsubscribe ONVIF PullPoint subscription for '%s';" - " This is normal if the device restarted: %s" - ), - self.unique_id, - err, - ) - self._subscription = None - + async def _async_renew_pullpoint(self) -> bool: + """Renew the PullPoint subscription.""" + if not self._pullpoint_subscription: + return False try: - restarted = await self.async_start() + # The first time we renew, we may get a Fault error so we + # suppress it. The subscription will be restarted in + # async_restart later. + await self._pullpoint_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME) + LOGGER.debug("%s: Renewed PullPoint subscription", self._name) + return True + except RENEW_ERRORS as err: + LOGGER.debug( + "%s: Failed to renew PullPoint subscription; %s", + self._name, + _stringify_onvif_error(err), + ) + return False + + async def _async_pull_messages_with_lock(self) -> bool: + """Pull messages from device while holding the lock. + + This function must not be called directly, it should only + be called from _async_pull_messages. + + Returns True if the subscription is working. + + Returns False if the subscription is not working and should be restarted. + """ + assert self._pull_lock.locked(), "Pull lock must be held" + assert self._pullpoint_service is not None, "PullPoint service does not exist" + event_manager = self._event_manager + LOGGER.debug( + "%s: Pulling PullPoint messages timeout=%s limit=%s", + self._name, + PULLPOINT_POLL_TIME, + PULLPOINT_MESSAGE_LIMIT, + ) + try: + response = await self._pullpoint_service.PullMessages( + { + "MessageLimit": PULLPOINT_MESSAGE_LIMIT, + "Timeout": PULLPOINT_POLL_TIME, + } + ) + except RemoteProtocolError as err: + # Either a shutdown event or the camera closed the connection. Because + # http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server + # to close the connection at any time, we treat this as a normal. Some + # cameras may close the connection if there are no messages to pull. + LOGGER.debug( + "%s: PullPoint subscription encountered a remote protocol error " + "(this is normal for some cameras): %s", + self._name, + _stringify_onvif_error(err), + ) + return True except (XMLParseError, *SUBSCRIPTION_ERRORS) as err: - restarted = False # Device may not support subscriptions so log at debug level # when we get an XMLParseError - LOGGER.log( - DEBUG if isinstance(err, XMLParseError) else WARNING, - ( - "Failed to restart ONVIF PullPoint subscription for '%s'; " - "Retrying later: %s" - ), - self.unique_id, + LOGGER.debug( + "%s: Failed to fetch PullPoint subscription messages: %s", + self._name, _stringify_onvif_error(err), ) + # Treat errors as if the camera restarted. Assume that the pullpoint + # subscription is no longer valid. + return False - if not restarted: - # Try again in a minute - self._unsub_refresh = async_call_later(self.hass, 60, self.async_restart) - elif self._listeners: + if self.state != PullPointManagerState.STARTED: + # If the webhook became started working during the long poll, + # and we got paused, our data is stale and we should not process it. LOGGER.debug( - "Restarted ONVIF PullPoint subscription for '%s'", self.unique_id + "%s: PullPoint is paused (likely due to working webhook), skipping PullPoint messages", + self._name, ) - self.async_schedule_pull() + return True - async def async_renew(self) -> None: - """Renew subscription.""" - if not self._subscription: - return + # Parse response + if (notification_message := response.NotificationMessage) and ( + number_of_events := len(notification_message) + ): + LOGGER.debug( + "%s: continuous PullMessages: %s event(s)", + self._name, + number_of_events, + ) + await event_manager.async_parse_messages(notification_message) + event_manager.async_callback_listeners() + else: + LOGGER.debug("%s: continuous PullMessages: no events", self._name) - with suppress(*SUBSCRIPTION_ERRORS): - # The first time we renew, we may get a Fault error so we - # suppress it. The subscription will be restarted in - # async_restart later. - await self._subscription.Renew(_get_next_termination_time()) + return True - def async_schedule_pull(self) -> None: - """Schedule async_pull_messages to run.""" - self._unsub_refresh = async_call_later(self.hass, 1, self.async_pull_messages) + @callback + def _async_background_pull_messages(self, _now: dt.datetime | None = None) -> None: + """Pull messages from device in the background.""" + self._cancel_pull_messages = None + self._hass.async_create_background_task( + self._async_pull_messages(), + f"{self._name} background pull messages", + ) - async def async_pull_messages(self, _now: dt.datetime | None = None) -> None: + async def _async_pull_messages(self) -> None: """Pull messages from device.""" - if self.hass.state == CoreState.running: + event_manager = self._event_manager + + if self._pull_lock.locked(): + # Pull messages if the lock is not already locked + # any pull will do, so we don't need to wait for the lock + LOGGER.debug( + "%s: PullPoint subscription is already locked, skipping pull", + self._name, + ) + return + + async with self._pull_lock: + # Before we pop out of the lock we always need to schedule the next pull + # or call async_schedule_pullpoint_renew if the pull fails so the pull + # loop continues. try: - pullpoint = self.device.create_pullpoint_service() - response = await pullpoint.PullMessages( - {"MessageLimit": 100, "Timeout": dt.timedelta(seconds=60)} - ) + if self._hass.state == CoreState.running: + if not await self._async_pull_messages_with_lock(): + self.async_schedule_pullpoint_renew(0.0) + return + finally: + if event_manager.has_listeners: + self.async_schedule_pull_messages() + + +class WebHookManager: + """Manage ONVIF webhook subscriptions. + + If the camera supports webhooks, we will use that instead of + pullpoint subscriptions as soon as we detect that the camera + can reach our webhook. + """ + + def __init__(self, event_manager: EventManager) -> None: + """Initialize webhook manager.""" + self.state = WebHookManagerState.STOPPED + + self._event_manager = event_manager + self._device = event_manager.device + self._hass = event_manager.hass + self._webhook_unique_id = f"{DOMAIN}_{event_manager.config_entry.entry_id}" + self._name = event_manager.name + + self._webhook_url: str | None = None + + self._webhook_subscription: ONVIFService | None = None + self._notification_manager: NotificationManager | None = None + + self._cancel_webhook_renew: CALLBACK_TYPE | None = None + self._renew_lock = asyncio.Lock() + self._renew_or_restart_job = HassJob( + self._async_renew_or_restart_webhook, + f"{self._name}: renew or restart webhook", + ) - # Renew subscription if less than two hours is left - if ( - dt_util.as_utc(response.TerminationTime) - dt_util.utcnow() - ).total_seconds() < 7200: - await self.async_renew() - except RemoteProtocolError: - # Likely a shutdown event, nothing to see here - return - except (XMLParseError, *SUBSCRIPTION_ERRORS) as err: - # Device may not support subscriptions so log at debug level - # when we get an XMLParseError - LOGGER.log( - DEBUG if isinstance(err, XMLParseError) else WARNING, - ( - "Failed to fetch ONVIF PullPoint subscription messages for" - " '%s': %s" - ), - self.unique_id, - _stringify_onvif_error(err), - ) - # Treat errors as if the camera restarted. Assume that the pullpoint - # subscription is no longer valid. - self._unsub_refresh = None - await self.async_restart() - return + async def async_start(self) -> bool: + """Start polling events.""" + LOGGER.debug("%s: Starting webhook manager", self._name) + assert ( + self.state == WebHookManagerState.STOPPED + ), "Webhook manager already started" + assert self._webhook_url is None, "Webhook already registered" + self._async_register_webhook() + if not await self._async_start_webhook(): + self.state = WebHookManagerState.FAILED + return False + self.state = WebHookManagerState.STARTED + return True - # Parse response - await self.async_parse_messages(response.NotificationMessage) + async def async_stop(self) -> None: + """Unsubscribe from events.""" + self.state = WebHookManagerState.STOPPED + self._async_cancel_webhook_renew() + await self._async_unsubscribe_webhook() + self._async_unregister_webhook() - # Update entities - for update_callback in self._listeners: - update_callback() + @callback + def _async_schedule_webhook_renew(self, delay: float) -> None: + """Schedule webhook subscription renewal.""" + self._async_cancel_webhook_renew() + self._cancel_webhook_renew = async_call_later( + self._hass, + delay, + self._renew_or_restart_job, + ) - # Reschedule another pull - if self._listeners: - self.async_schedule_pull() + async def _async_create_webhook_subscription(self) -> None: + """Create webhook subscription.""" + LOGGER.debug("%s: Creating webhook subscription", self._name) + self._notification_manager = self._device.create_notification_manager( + { + "InitialTerminationTime": SUBSCRIPTION_RELATIVE_TIME, + "ConsumerReference": {"Address": self._webhook_url}, + } + ) + self._webhook_subscription = await self._notification_manager.setup() + await self._notification_manager.start() + LOGGER.debug("%s: Webhook subscription created", self._name) - # pylint: disable=protected-access - async def async_parse_messages(self, messages) -> None: - """Parse notification message.""" - for msg in messages: - # Guard against empty message - if not msg.Topic: - continue + async def _async_start_webhook(self) -> bool: + """Start webhook.""" + try: + try: + await self._async_create_webhook_subscription() + except RemoteProtocolError: + # http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server + # to close the connection at any time, we treat this as a normal and try again + # once since we do not want to declare the camera as not supporting webhooks + # if it just happened to close the connection at the wrong time. + await self._async_create_webhook_subscription() + except CREATE_ERRORS as err: + self._event_manager.async_webhook_failed() + LOGGER.debug( + "%s: Device does not support notification service or too many subscriptions: %s", + self._name, + _stringify_onvif_error(err), + ) + return False - topic = msg.Topic._value_1 - if not (parser := PARSERS.get(topic)): - if topic not in UNHANDLED_TOPICS: - LOGGER.info( - "No registered handler for event from %s: %s", - self.unique_id, - msg, - ) - UNHANDLED_TOPICS.add(topic) - continue + self._async_schedule_webhook_renew(SUBSCRIPTION_RENEW_INTERVAL) + return True - event = await parser(self.unique_id, msg) + async def _async_restart_webhook(self) -> bool: + """Restart the webhook subscription assuming the camera rebooted.""" + await self._async_unsubscribe_webhook() + return await self._async_start_webhook() - if not event: - LOGGER.info("Unable to parse event from %s: %s", self.unique_id, msg) + async def _async_renew_webhook(self) -> bool: + """Renew webhook subscription.""" + if not self._webhook_subscription: + return False + try: + await self._webhook_subscription.Renew(SUBSCRIPTION_RELATIVE_TIME) + LOGGER.debug("%s: Renewed Webhook subscription", self._name) + return True + except RENEW_ERRORS as err: + LOGGER.debug( + "%s: Failed to renew webhook subscription %s", + self._name, + _stringify_onvif_error(err), + ) + return False + + async def _async_renew_or_restart_webhook( + self, now: dt.datetime | None = None + ) -> None: + """Renew or start webhook subscription.""" + if self._hass.is_stopping or self.state != WebHookManagerState.STARTED: + return + if self._renew_lock.locked(): + LOGGER.debug("%s: Webhook renew already in progress", self._name) + # Renew is already running, another one will be + # scheduled when the current one is done if needed. + return + async with self._renew_lock: + next_attempt = SUBSCRIPTION_RENEW_INTERVAL_ON_ERROR + try: + if ( + await self._async_renew_webhook() + or await self._async_restart_webhook() + ): + next_attempt = SUBSCRIPTION_RENEW_INTERVAL + finally: + self._async_schedule_webhook_renew(next_attempt) + + @callback + def _async_register_webhook(self) -> None: + """Register the webhook for motion events.""" + LOGGER.debug("%s: Registering webhook: %s", self._name, self._webhook_unique_id) + + try: + base_url = get_url(self._hass, prefer_external=False) + except NoURLAvailableError: + try: + base_url = get_url(self._hass, prefer_external=True) + except NoURLAvailableError: return - self._events[event.uid] = event + webhook_id = self._webhook_unique_id + webhook.async_register( + self._hass, DOMAIN, webhook_id, webhook_id, self._async_handle_webhook + ) + webhook_path = webhook.async_generate_path(webhook_id) + self._webhook_url = f"{base_url}{webhook_path}" + LOGGER.debug("%s: Registered webhook: %s", self._name, webhook_id) - def get_uid(self, uid) -> Event | None: - """Retrieve event for given id.""" - return self._events.get(uid) + @callback + def _async_unregister_webhook(self): + """Unregister the webhook for motion events.""" + LOGGER.debug( + "%s: Unregistering webhook %s", self._name, self._webhook_unique_id + ) + webhook.async_unregister(self._hass, self._webhook_unique_id) + self._webhook_url = None - def get_platform(self, platform) -> list[Event]: - """Retrieve events for given platform.""" - return [event for event in self._events.values() if event.platform == platform] + async def _async_handle_webhook( + self, hass: HomeAssistant, webhook_id: str, request: Request + ) -> None: + """Handle incoming webhook.""" + content: bytes | None = None + try: + content = await request.read() + except ConnectionResetError as ex: + LOGGER.error("Error reading webhook: %s", ex) + return + except asyncio.CancelledError as ex: + LOGGER.error("Error reading webhook: %s", ex) + raise + finally: + self._hass.async_create_background_task( + self._async_process_webhook(hass, webhook_id, content), + f"ONVIF event webhook for {self._name}", + ) + + async def _async_process_webhook( + self, hass: HomeAssistant, webhook_id: str, content: bytes | None + ) -> None: + """Process incoming webhook data in the background.""" + event_manager = self._event_manager + if content is None: + # webhook is marked as not working as something + # went wrong. We will mark it as working again + # when we receive a valid notification. + event_manager.async_webhook_failed() + return + if not self._notification_manager: + LOGGER.debug( + "%s: Received webhook before notification manager is setup", self._name + ) + return + if not (result := self._notification_manager.process(content)): + LOGGER.debug("%s: Failed to process webhook %s", self._name, webhook_id) + return + LOGGER.debug( + "%s: Processed webhook %s with %s event(s)", + self._name, + webhook_id, + len(result.NotificationMessage), + ) + event_manager.async_webhook_working() + await event_manager.async_parse_messages(result.NotificationMessage) + event_manager.async_callback_listeners() + + @callback + def _async_cancel_webhook_renew(self) -> None: + """Cancel the webhook renew task.""" + if self._cancel_webhook_renew: + self._cancel_webhook_renew() + self._cancel_webhook_renew = None + + async def _async_unsubscribe_webhook(self) -> None: + """Unsubscribe from the webhook.""" + if not self._webhook_subscription: + return + LOGGER.debug("%s: Unsubscribing from webhook", self._name) + try: + await self._webhook_subscription.Unsubscribe() + except UNSUBSCRIBE_ERRORS as err: + LOGGER.debug( + ( + "%s: Failed to unsubscribe webhook subscription;" + " This is normal if the device restarted: %s" + ), + self._name, + _stringify_onvif_error(err), + ) + self._webhook_subscription = None diff --git a/homeassistant/components/onvif/manifest.json b/homeassistant/components/onvif/manifest.json index 10b7a8451254b..41d5164452fa4 100644 --- a/homeassistant/components/onvif/manifest.json +++ b/homeassistant/components/onvif/manifest.json @@ -8,5 +8,5 @@ "documentation": "https://www.home-assistant.io/integrations/onvif", "iot_class": "local_push", "loggers": ["onvif", "wsdiscovery", "zeep"], - "requirements": ["onvif-zeep-async==1.2.11", "WSDiscovery==2.0.0"] + "requirements": ["onvif-zeep-async==1.3.0", "WSDiscovery==2.0.0"] } diff --git a/homeassistant/components/onvif/models.py b/homeassistant/components/onvif/models.py index 9f0ca2da66d87..64edc85f3d132 100644 --- a/homeassistant/components/onvif/models.py +++ b/homeassistant/components/onvif/models.py @@ -2,6 +2,7 @@ from __future__ import annotations from dataclasses import dataclass +from enum import Enum from typing import Any from homeassistant.const import EntityCategory @@ -78,3 +79,20 @@ class Event: value: Any = None entity_category: EntityCategory | None = None entity_enabled: bool = True + + +class PullPointManagerState(Enum): + """States for the pullpoint manager.""" + + STOPPED = 0 # Not running or not supported + STARTED = 1 # Running and renewing + PAUSED = 2 # Switched to webhook, but can resume + FAILED = 3 # Failed to do initial subscription + + +class WebHookManagerState(Enum): + """States for the webhook manager.""" + + STOPPED = 0 + STARTED = 1 + FAILED = 2 # Failed to do initial subscription diff --git a/homeassistant/components/onvif/sensor.py b/homeassistant/components/onvif/sensor.py index 2fb7402be2835..67da0ed979dcc 100644 --- a/homeassistant/components/onvif/sensor.py +++ b/homeassistant/components/onvif/sensor.py @@ -23,7 +23,7 @@ async def async_setup_entry( async_add_entities: AddEntitiesCallback, ) -> None: """Set up a ONVIF binary sensor.""" - device = hass.data[DOMAIN][config_entry.unique_id] + device: ONVIFDevice = hass.data[DOMAIN][config_entry.unique_id] entities = { event.uid: ONVIFSensor(event.uid, device) @@ -36,16 +36,20 @@ async def async_setup_entry( entities[entry.unique_id] = ONVIFSensor(entry.unique_id, device, entry) async_add_entities(entities.values()) + uids_by_platform = device.events.get_uids_by_platform("sensor") @callback - def async_check_entities(): + def async_check_entities() -> None: """Check if we have added an entity for the event.""" - new_entities = [] - for event in device.events.get_platform("sensor"): - if event.uid not in entities: - entities[event.uid] = ONVIFSensor(event.uid, device) - new_entities.append(entities[event.uid]) - async_add_entities(new_entities) + nonlocal uids_by_platform + if not (missing := uids_by_platform.difference(entities)): + return + new_entities: dict[str, ONVIFSensor] = { + uid: ONVIFSensor(uid, device) for uid in missing + } + if new_entities: + entities.update(new_entities) + async_add_entities(new_entities.values()) device.events.async_add_listener(async_check_entities) @@ -84,6 +88,7 @@ def __init__( @property def native_value(self) -> StateType | date | datetime | Decimal: """Return the value reported by the sensor.""" + assert self._attr_unique_id is not None if (event := self.device.events.get_uid(self._attr_unique_id)) is not None: return event.value return self._attr_native_value diff --git a/requirements_all.txt b/requirements_all.txt index 4060f09fcc4aa..5f6c6a01dbc48 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1261,7 +1261,7 @@ ondilo==0.2.0 onkyo-eiscp==1.2.7 # homeassistant.components.onvif -onvif-zeep-async==1.2.11 +onvif-zeep-async==1.3.0 # homeassistant.components.opengarage open-garage==0.2.0 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index db25fb90aaf8f..1f06d678d2d2b 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -942,7 +942,7 @@ omnilogic==0.4.5 ondilo==0.2.0 # homeassistant.components.onvif -onvif-zeep-async==1.2.11 +onvif-zeep-async==1.3.0 # homeassistant.components.opengarage open-garage==0.2.0 diff --git a/tests/components/onvif/__init__.py b/tests/components/onvif/__init__.py index ff4d88fb5b3c2..17ba41588ff37 100644 --- a/tests/components/onvif/__init__.py +++ b/tests/components/onvif/__init__.py @@ -10,8 +10,10 @@ Capabilities, DeviceInfo, Profile, + PullPointManagerState, Resolution, Video, + WebHookManagerState, ) from homeassistant.const import HTTP_DIGEST_AUTHENTICATION @@ -111,6 +113,10 @@ def setup_mock_device(mock_device): video_source_token=None, ) mock_device.profiles = [profile1] + mock_device.events = MagicMock( + webhook_manager=MagicMock(state=WebHookManagerState.STARTED), + pullpoint_manager=MagicMock(state=PullPointManagerState.PAUSED), + ) def mock_constructor(hass, config): """Fake the controller constructor.""" diff --git a/tests/components/onvif/test_diagnostics.py b/tests/components/onvif/test_diagnostics.py index 66404d60e1be7..0ef474e77908b 100644 --- a/tests/components/onvif/test_diagnostics.py +++ b/tests/components/onvif/test_diagnostics.py @@ -72,4 +72,14 @@ async def test_diagnostics( } ], }, + "events": { + "pullpoint_manager_state": { + "__type": "", + "repr": "", + }, + "webhook_manager_state": { + "__type": "", + "repr": "", + }, + }, }