Skip to content

Commit

Permalink
Switch periodic cleanup task to call_later (#913)
Browse files Browse the repository at this point in the history
- Simplifies AsyncEngine to avoid the long running
  task
  • Loading branch information
bdraco authored Jul 17, 2021
1 parent b2a7a00 commit 38eb271
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
19 changes: 19 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async def test_reaper():
entries_with_cache = list(itertools.chain(*(cache.entries_with_name(name) for name in cache.names())))
await asyncio.sleep(1.2)
entries = list(itertools.chain(*(cache.entries_with_name(name) for name in cache.names())))
assert zeroconf.cache.get(record_with_1s_ttl) is None
await aiozc.async_close()
assert not zeroconf.question_history.suppresses(question, now, other_known_answers)
assert entries != original_entries
Expand All @@ -82,6 +83,24 @@ async def test_reaper():
assert record_with_1s_ttl not in entries


@pytest.mark.asyncio
async def test_reaper_aborts_when_done():
"""Ensure cache cleanup stops when zeroconf is done."""
with patch.object(_core, "_CACHE_CLEANUP_INTERVAL", 10):
assert _core._CACHE_CLEANUP_INTERVAL == 10
aiozc = AsyncZeroconf(interfaces=['127.0.0.1'])
zeroconf = aiozc.zeroconf
record_with_10s_ttl = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 10, b'a')
record_with_1s_ttl = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 1, b'b')
zeroconf.cache.async_add_records([record_with_10s_ttl, record_with_1s_ttl])
assert zeroconf.cache.get(record_with_10s_ttl) is not None
assert zeroconf.cache.get(record_with_1s_ttl) is not None
await aiozc.async_close()
await asyncio.sleep(1.2)
assert zeroconf.cache.get(record_with_10s_ttl) is not None
assert zeroconf.cache.get(record_with_1s_ttl) is not None


class Framework(unittest.TestCase):
def test_launch_and_close(self):
rv = r.Zeroconf(interfaces=r.InterfaceChoice.All)
Expand Down
33 changes: 17 additions & 16 deletions zeroconf/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __init__(
self.senders: List[asyncio.DatagramTransport] = []
self._listen_socket = listen_socket
self._respond_sockets = respond_sockets
self._cache_cleanup_task: Optional[asyncio.Task] = None
self._cleanup_timer: Optional[asyncio.TimerHandle] = None
self._running_event: Optional[asyncio.Event] = None

def setup(self, loop: asyncio.AbstractEventLoop, loop_thread_ready: Optional[threading.Event]) -> None:
Expand All @@ -110,8 +110,10 @@ def setup(self, loop: asyncio.AbstractEventLoop, loop_thread_ready: Optional[thr
async def _async_setup(self, loop_thread_ready: Optional[threading.Event]) -> None:
"""Set up the instance."""
assert self.loop is not None
self._cleanup_timer = self.loop.call_later(
millis_to_seconds(_CACHE_CLEANUP_INTERVAL), self._async_cache_cleanup
)
await self._async_create_endpoints()
self._cache_cleanup_task = self.loop.create_task(self._async_cache_cleanup())
assert self._running_event is not None
self._running_event.set()
if loop_thread_ready:
Expand Down Expand Up @@ -142,26 +144,25 @@ async def _async_create_endpoints(self) -> None:
if s in sender_sockets:
self.senders.append(cast(asyncio.DatagramTransport, transport))

async def _async_cache_cleanup(self) -> None:
def _async_cache_cleanup(self) -> None:
"""Periodic cache cleanup."""
while not self.zc.done:
now = current_time_millis()
self.zc.question_history.async_expire(now)
self.zc.record_manager.async_updates(
now, [RecordUpdate(record, None) for record in self.zc.cache.async_expire(now)]
)
self.zc.record_manager.async_updates_complete()
await asyncio.sleep(millis_to_seconds(_CACHE_CLEANUP_INTERVAL))
now = current_time_millis()
self.zc.question_history.async_expire(now)
self.zc.record_manager.async_updates(
now, [RecordUpdate(record, None) for record in self.zc.cache.async_expire(now)]
)
self.zc.record_manager.async_updates_complete()
assert self.loop is not None
self._cleanup_timer = self.loop.call_later(
millis_to_seconds(_CACHE_CLEANUP_INTERVAL), self._async_cache_cleanup
)

async def _async_close(self) -> None:
"""Cancel and wait for the cleanup task to finish."""
self._async_shutdown()
if self._cache_cleanup_task:
self._cache_cleanup_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._cache_cleanup_task
self._cache_cleanup_task = None
await asyncio.sleep(0) # flush out any call soons
assert self._cleanup_timer is not None
self._cleanup_timer.cancel()

def _async_shutdown(self) -> None:
"""Shutdown transports and sockets."""
Expand Down

0 comments on commit 38eb271

Please sign in to comment.