diff --git a/distributed/client.py b/distributed/client.py index 006beb41921..c384828f335 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -630,7 +630,7 @@ def __init__( self.futures = dict() self.refcount = defaultdict(lambda: 0) - self.coroutines = [] + self._handle_report_task = None if name is None: name = dask.config.get("client-name", None) self.id = ( @@ -1011,8 +1011,7 @@ async def _start(self, timeout=no_default, **kwargs): for topic, handler in Client._default_event_handlers.items(): self.subscribe_topic(topic, handler) - self._handle_scheduler_coroutine = asyncio.ensure_future(self._handle_report()) - self.coroutines.append(self._handle_scheduler_coroutine) + self._handle_report_task = asyncio.create_task(self._handle_report()) return self @@ -1304,12 +1303,16 @@ async def _close(self, fast=False): self._send_to_scheduler({"op": "close-client"}) self._send_to_scheduler({"op": "close-stream"}) + current_task = asyncio.current_task() + handle_report_task = self._handle_report_task # Give the scheduler 'stream-closed' message 100ms to come through # This makes the shutdown slightly smoother and quieter - with suppress(AttributeError, asyncio.CancelledError, TimeoutError): - await asyncio.wait_for( - asyncio.shield(self._handle_scheduler_coroutine), 0.1 - ) + if ( + handle_report_task is not None + and handle_report_task is not current_task + ): + with suppress(asyncio.CancelledError, TimeoutError): + await asyncio.wait_for(asyncio.shield(handle_report_task), 0.1) if ( self.scheduler_comm @@ -1332,19 +1335,13 @@ async def _close(self, fast=False): if _get_global_client() is self: _set_global_client(None) - coroutines = set(self.coroutines) - for f in self.coroutines: - # cancel() works on asyncio futures (Tornado 5) - # but is a no-op on Tornado futures - with suppress(RuntimeError): - f.cancel() - if f.cancelled(): - coroutines.remove(f) - del self.coroutines[:] - - if not fast: + if ( + not fast + and handle_report_task is not None + and handle_report_task is not current_task + ): with suppress(TimeoutError, asyncio.CancelledError): - await asyncio.wait_for(asyncio.gather(*coroutines), 2) + await asyncio.wait_for(handle_report_task, 2) with suppress(AttributeError): await self.scheduler.close_rpc()