Skip to content

Commit

Permalink
Throw EventLoopBlocked instead of concurrent.futures.TimeoutError (#1032
Browse files Browse the repository at this point in the history
)
  • Loading branch information
bdraco authored Nov 18, 2021
1 parent 4241c76 commit 21bd107
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 24 deletions.
12 changes: 12 additions & 0 deletions tests/utils/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import pytest

from zeroconf import EventLoopBlocked
from zeroconf._core import _CLOSE_TIMEOUT
from zeroconf._utils import asyncio as aioutils
from zeroconf.const import _LOADED_SYSTEM_TIMEOUT
Expand Down Expand Up @@ -112,3 +113,14 @@ def test_cumulative_timeouts_less_than_close_plus_buffer():
assert (
aioutils._TASK_AWAIT_TIMEOUT + aioutils._GET_ALL_TASKS_TIMEOUT + aioutils._WAIT_FOR_LOOP_TASKS_TIMEOUT
) < 1 + _CLOSE_TIMEOUT + _LOADED_SYSTEM_TIMEOUT


async def test_run_coro_with_timeout() -> None:
"""Test running a coroutine with a timeout raises EventLoopBlocked."""
loop = asyncio.get_event_loop()

def _run_in_loop():
aioutils.run_coro_with_timeout(asyncio.sleep(0.3), loop, 0.1)

with pytest.raises(EventLoopBlocked), patch.object(aioutils, "_LOADED_SYSTEM_TIMEOUT", 0.0):
await loop.run_in_executor(None, _run_in_loop)
23 changes: 15 additions & 8 deletions zeroconf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import sys

from ._cache import DNSCache # noqa # import needed for backwards compat
from ._core import Zeroconf # noqa # import needed for backwards compat
from ._core import Zeroconf
from ._dns import ( # noqa # import needed for backwards compat
DNSAddress,
DNSEntry,
Expand All @@ -36,16 +36,17 @@
DNSText,
DNSQuestionType,
)
from ._logger import QuietLogger, log # noqa # import needed for backwards compat
from ._exceptions import ( # noqa # import needed for backwards compat
from ._exceptions import (
AbstractMethodException,
BadTypeInNameException,
Error,
EventLoopBlocked,
IncomingDecodeError,
NamePartTooLongException,
NonUniqueNameException,
ServiceNameAlreadyRegistered,
)
from ._logger import QuietLogger, log # noqa # import needed for backwards compat
from ._protocol.incoming import DNSIncoming # noqa # import needed for backwards compat
from ._protocol.outgoing import DNSOutgoing # noqa # import needed for backwards compat
from ._services import ( # noqa # import needed for backwards compat
Expand All @@ -54,9 +55,7 @@
ServiceListener,
ServiceStateChange,
)
from ._services.browser import ( # noqa # import needed for backwards compat
ServiceBrowser,
)
from ._services.browser import ServiceBrowser
from ._services.info import ( # noqa # import needed for backwards compat
instance_name_from_service_info,
ServiceInfo,
Expand Down Expand Up @@ -85,16 +84,24 @@

__all__ = [
"__version__",
"DNSQuestionType",
"Zeroconf",
"ServiceInfo",
"ServiceBrowser",
"ServiceListener",
"Error",
"DNSQuestionType",
"InterfaceChoice",
"ServiceStateChange",
"IPVersion",
"ZeroconfServiceTypes",
# Exceptions
"Error",
"AbstractMethodException",
"BadTypeInNameException",
"EventLoopBlocked",
"IncomingDecodeError",
"NamePartTooLongException",
"NonUniqueNameException",
"ServiceNameAlreadyRegistered",
]

if sys.version_info <= (3, 6): # pragma: no cover
Expand Down
35 changes: 30 additions & 5 deletions zeroconf/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,12 @@ def _async_shutdown(self) -> None:
transport.close()

def close(self) -> None:
"""Close from sync context."""
"""Close from sync context.
While it is not expected during normal operation,
this function may raise EventLoopBlocked if the underlying
call to `_async_close` cannot be completed.
"""
assert self.loop is not None
# Guard against Zeroconf.close() being called from the eventloop
if get_running_loop() == self.loop:
Expand Down Expand Up @@ -554,7 +559,12 @@ def register_service(
service. The name of the service may be changed if needed to make
it unique on the network. Additionally multiple cooperating responders
can register the same service on the network for resilience
(if you want this behavior set `cooperating_responders` to `True`)."""
(if you want this behavior set `cooperating_responders` to `True`).
While it is not expected during normal operation,
this function may raise EventLoopBlocked if the underlying
call to `register_service` cannot be completed.
"""
assert self.loop is not None
run_coro_with_timeout(
await_awaitable(
Expand Down Expand Up @@ -591,7 +601,12 @@ async def async_register_service(
def update_service(self, info: ServiceInfo) -> None:
"""Registers service information to the network with a default TTL.
Zeroconf will then respond to requests for information for that
service."""
service.
While it is not expected during normal operation,
this function may raise EventLoopBlocked if the underlying
call to `async_update_service` cannot be completed.
"""
assert self.loop is not None
run_coro_with_timeout(
await_awaitable(self.async_update_service(info)), self.loop, _REGISTER_TIME * _REGISTER_BROADCASTS
Expand Down Expand Up @@ -662,7 +677,12 @@ def _add_broadcast_answer( # pylint: disable=no-self-use
out.add_answer_at_time(dns_address, 0)

def unregister_service(self, info: ServiceInfo) -> None:
"""Unregister a service."""
"""Unregister a service.
While it is not expected during normal operation,
this function may raise EventLoopBlocked if the underlying
call to `async_unregister_service` cannot be completed.
"""
assert self.loop is not None
run_coro_with_timeout(
self.async_unregister_service(info), self.loop, _UNREGISTER_TIME * _REGISTER_BROADCASTS
Expand Down Expand Up @@ -708,7 +728,12 @@ async def async_unregister_all_services(self) -> None:
self.async_send(out)

def unregister_all_services(self) -> None:
"""Unregister all registered services."""
"""Unregister all registered services.
While it is not expected during normal operation,
this function may raise EventLoopBlocked if the underlying
call to `async_unregister_all_services` cannot be completed.
"""
assert self.loop is not None
run_coro_with_timeout(
self.async_unregister_all_services(), self.loop, _UNREGISTER_TIME * _REGISTER_BROADCASTS
Expand Down
24 changes: 17 additions & 7 deletions zeroconf/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,38 @@


class Error(Exception):
pass
"""Base class for all zeroconf exceptions."""


class IncomingDecodeError(Error):
pass
"""Exception when there is invalid data in an incoming packet."""


class NonUniqueNameException(Error):
pass
"""Exception when the name is already registered."""


class NamePartTooLongException(Error):
pass
"""Exception when the name is too long."""


class AbstractMethodException(Error):
pass
"""Exception when a required method is not implemented."""


class BadTypeInNameException(Error):
pass
"""Exception when the type in a name is invalid."""


class ServiceNameAlreadyRegistered(Error):
pass
"""Exception when a service name is already registered."""


class EventLoopBlocked(Error):
"""Exception when the event loop is blocked.
This exception is never expected to be thrown
during normal operation. It should only happen
when the cpu is maxed out or there is something blocking
the event loop.
"""
4 changes: 4 additions & 0 deletions zeroconf/_services/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,10 @@ def request(
) -> bool:
"""Returns true if the service could be discovered on the
network, and updates this object with details discovered.
While it is not expected during normal operation,
this function may raise EventLoopBlocked if the underlying
call to `async_request` cannot be completed.
"""
assert zc.loop is not None and zc.loop.is_running()
if zc.loop == get_running_loop():
Expand Down
22 changes: 18 additions & 4 deletions zeroconf/_utils/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
"""

import asyncio
import concurrent.futures
import contextlib
import queue
from typing import Any, Awaitable, Coroutine, List, Optional, Set, cast

from .time import millis_to_seconds
from .._exceptions import EventLoopBlocked
from ..const import _LOADED_SYSTEM_TIMEOUT

# The combined timeouts should be lower than _CLOSE_TIMEOUT + _WAIT_FOR_LOOP_TASKS_TIMEOUT
Expand Down Expand Up @@ -91,10 +93,22 @@ async def await_awaitable(aw: Awaitable) -> None:


def run_coro_with_timeout(aw: Coroutine, loop: asyncio.AbstractEventLoop, timeout: float) -> Any:
"""Run a coroutine with a timeout."""
return asyncio.run_coroutine_threadsafe(aw, loop).result(
millis_to_seconds(timeout) + _LOADED_SYSTEM_TIMEOUT
)
"""Run a coroutine with a timeout.
The timeout should only be used as a safeguard to prevent
the program from blocking forever. The timeout should
never be expected to be reached during normal operation.
While not expected during normal operations, the
function raises `EventLoopBlocked` if the coroutine takes
longer to complete than the timeout.
"""
try:
return asyncio.run_coroutine_threadsafe(aw, loop).result(
millis_to_seconds(timeout) + _LOADED_SYSTEM_TIMEOUT
)
except concurrent.futures.TimeoutError as ex:
raise EventLoopBlocked from ex


def shutdown_loop(loop: asyncio.AbstractEventLoop) -> None:
Expand Down

0 comments on commit 21bd107

Please sign in to comment.