Skip to content

Commit

Permalink
Add support for webhooks/notifications (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Apr 17, 2023
1 parent 282bce7 commit 626b4c9
Showing 1 changed file with 74 additions and 6 deletions.
80 changes: 74 additions & 6 deletions onvif/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
from typing import Any, Dict, Optional, Tuple

import httpx
from httpx import AsyncClient, BasicAuth, DigestAuth
from httpx import AsyncClient, BasicAuth, DigestAuth, TransportError
from zeep.cache import SqliteCache
from zeep.client import AsyncClient as BaseZeepAsyncClient, Settings
from zeep.exceptions import Fault
from zeep.exceptions import Fault, XMLSyntaxError
import zeep.helpers
from zeep.loader import parse_xml
from zeep.proxy import AsyncServiceProxy
from zeep.transports import AsyncTransport, Transport
from zeep.wsa import WsAddressingPlugin
from zeep.wsdl import Document
from zeep.wsdl.bindings.soap import SoapOperation
from zeep.wsse.username import UsernameToken

from onvif.definition import SERVICES
Expand Down Expand Up @@ -206,9 +208,10 @@ def __init__(
)
)
settings = _DEFAULT_SETTINGS
document = _cached_document(url)
self.document = _cached_document(url)
self.binding_name = binding_name
self.zeep_client_authless = ZeepAsyncClient(
wsdl=document,
wsdl=self.document,
transport=self.transport,
settings=settings,
plugins=[WsAddressingPlugin()],
Expand All @@ -217,7 +220,7 @@ def __init__(
binding_name, self.xaddr
)
self.zeep_client = ZeepAsyncClient(
wsdl=document,
wsdl=self.document,
wsse=wsse,
transport=self.transport,
settings=settings,
Expand Down Expand Up @@ -287,9 +290,68 @@ def call(params=None):
return service_wrapper(getattr(self.ws_client, name))


class NotificationManager:
"""Manager to process notifications."""

def __init__(self, device: "ONVIFCamera", config: Dict[str, Any]) -> None:
"""Initialize the notification processor."""
self._service: Optional[ONVIFService] = None
self._operation: Optional[SoapOperation] = None
self._device = device
self._config = config

async def setup(self) -> ONVIFService:
"""Setup the notification processor."""
notify_service = self._device.create_notification_service()
notify_subscribe = await notify_service.Subscribe(self._config)
# pylint: disable=protected-access
self._device.xaddrs[
"http://www.onvif.org/ver10/events/wsdl/NotificationConsumer"
] = notify_subscribe.SubscriptionReference.Address._value_1
# Create subscription manager
# 5.2.3 BASIC NOTIFICATION INTERFACE - NOTIFY
# Call SetSynchronizationPoint to generate a notification message
# to ensure the webhooks are working.
#
# If this fails this is OK as it just means we will switch
# to webhook later when the first notification is received.
service = self._device.create_onvif_service(
"pullpoint", port_type="NotificationConsumer"
)
self._operation = service.document.bindings[service.binding_name].get(
"PullMessages"
)
self._service = service
return self._device.create_subscription_service("NotificationConsumer")

async def start(self) -> None:
"""Start the notification processor."""
assert self._service, "Call setup first"
try:
await self._service.SetSynchronizationPoint()
except (Fault, asyncio.TimeoutError, TransportError, TypeError):
logger.debug("%s: SetSynchronizationPoint failed", self._service.url)

def process(self, content: bytes) -> Optional[Any]:
"""Process a notification message."""
if not self._operation:
logger.debug("%s: Notifications not setup", self._device.host)
return
try:
envelope = parse_xml(
content, # type: ignore[arg-type]
_ASYNC_TRANSPORT,
settings=_DEFAULT_SETTINGS,
)
except XMLSyntaxError as exc:
logger.error("Received invalid XML: %s", exc)
return None
return self._operation.process_reply(envelope)


class ONVIFCamera:
"""
Python Implemention ONVIF compliant device
Python Implementation ONVIF compliant device
This class integrates onvif services
adjust_time parameter allows authentication on cameras without being time synchronized.
Expand Down Expand Up @@ -392,6 +454,12 @@ async def create_pullpoint_subscription(
return False
return True

def create_notification_manager(
self, config: Optional[Dict[str, Any]] = None
) -> NotificationManager:
"""Create a notification manager."""
return NotificationManager(self, config)

async def close(self):
"""Close all transports."""
await self._snapshot_client.aclose()
Expand Down

0 comments on commit 626b4c9

Please sign in to comment.