From 97f9d60fd3889248069507c52f7deb51ddfab4f4 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 18 Jun 2021 21:27:00 -0600 Subject: [PATCH 01/11] Actor: don't hold key references on workers Fixes #4936 I don't think this is quite the right implementation. 1) Why does the `worker=` kwarg exist? It doesn't seem to be used. But it should be. Taking the `if worker` codepath would bypass this whole issue. 2) What if a user is using an Actor within a task? In that case, `get_worker` would return a Worker, but we _would_ want to hold a reference to the Actor key (as long as that task was running). I think a better implementation might be to include in `__reduce__` whether or not the Actor handle should be a weakref or not, basically. And in `Worker.get_data`, construct it such that it is a weakref. --- distributed/actor.py | 8 ++--- distributed/tests/test_actor.py | 57 ++++++++++++++++++++++++++++++++- distributed/worker.py | 2 +- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index 77b2cda67de..231cc8b3a2d 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -3,11 +3,11 @@ import threading from queue import Queue -from .client import Future, default_client +from .client import Future from .protocol import to_serialize from .utils import iscoroutinefunction, sync, thread_state from .utils_comm import WrappedKey -from .worker import get_worker +from .worker import get_client, get_worker class Actor(WrappedKey): @@ -63,8 +63,8 @@ def __init__(self, cls, address, key, worker=None): except ValueError: self._worker = None try: - self._client = default_client() - self._future = Future(key) + self._client = get_client() + self._future = Future(key, inform=self._worker is None) except ValueError: self._client = None diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 851ee7e8b2a..87c126bf2f9 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -515,7 +515,7 @@ def check(dask_worker): start = time() while any(client.run(check).values()): sleep(0.01) - assert time() < start + 30 + assert time() < start + 10 @gen_cluster( @@ -566,6 +566,61 @@ async def wait(self): await c.gather(futures) +@gen_cluster(client=True, client_kwargs=dict(set_as_default=False)) +# ^ NOTE: without `set_as_default=False`, `get_client()` within worker would return +# the same client instance the test is using (because it's all one process). +# Even with this, both workers will share the same client instance. +async def test_worker_actor_handle_is_weakref(c, s, a, b): + counter = c.submit(Counter, actor=True, workers=[a.address]) + + await c.submit(lambda _: None, counter, workers=[b.address]) + + del counter + + start = time() + while a.actors or b.data: + await asyncio.sleep(0.1) + assert time() < start + 10 + + +def test_worker_actor_handle_is_weakref_sync(client): + workers = list(client.run(lambda: None)) + counter = client.submit(Counter, actor=True, workers=[workers[0]]) + + client.submit(lambda _: None, counter, workers=[workers[1]]).result() + + del counter + + def check(dask_worker): + return len(dask_worker.data) + len(dask_worker.actors) + + start = time() + while any(client.run(check).values()): + sleep(0.01) + assert time() < start + 10 + + +def test_worker_actor_handle_is_weakref_from_compute_sync(client): + workers = list(client.run(lambda: None)) + + with dask.annotate(workers=workers[0]): + counter = dask.delayed(Counter)() + with dask.annotate(workers=workers[1]): + intermediate = dask.delayed(lambda c: None)(counter) + with dask.annotate(workers=workers[0]): + final = dask.delayed(lambda x, c: x)(intermediate, counter) + + final.compute(actors=counter, optimize_graph=False) + + def worker_tasks_running(dask_worker): + return len(dask_worker.data) + len(dask_worker.actors) + + start = time() + while any(client.run(worker_tasks_running).values()): + sleep(0.01) + assert time() < start + 10 + + def test_one_thread_deadlock(): with cluster(nworkers=2) as (cl, w): client = Client(cl["address"]) diff --git a/distributed/worker.py b/distributed/worker.py index bd1bf6f0d75..44e05f05024 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1417,7 +1417,7 @@ async def get_data( if k in self.actors: from .actor import Actor - data[k] = Actor(type(self.actors[k]), self.address, k) + data[k] = Actor(type(self.actors[k]), self.address, k, worker=self) msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}} nbytes = {k: self.tasks[k].nbytes for k in data if k in self.tasks} From c3539c23bebb4bc57996b5dc9f07c4dab8a05226 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 21 Jun 2021 12:05:34 -0600 Subject: [PATCH 02/11] use gen_cluster timeout --- distributed/tests/test_actor.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 87c126bf2f9..0fa31b45dfb 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -466,7 +466,7 @@ def f(block, ps=None): @pytest.mark.flaky(reruns=10, reruns_delay=5) -@gen_cluster(client=True) +@gen_cluster(client=True, timeout=10) async def test_compute(c, s, a, b): @dask.delayed def f(n, counter): @@ -485,10 +485,8 @@ def check(counter, blanks): result = await c.compute(final, actors=counter) assert result == 0 + 1 + 2 + 3 + 4 - start = time() while a.data or b.data: await asyncio.sleep(0.01) - assert time() < start + 30 def test_compute_sync(client): From e786f39cc2654d61bfbeaa52f3e2fff9b02151c1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 21 Jun 2021 18:41:34 -0600 Subject: [PATCH 03/11] Debug flaky test_robust_to_bad_sizeof_estimates --- distributed/tests/test_worker.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index ba4de5199d2..1dad7ebbbd5 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1111,6 +1111,7 @@ async def test_robust_to_bad_sizeof_estimates(c, s, a): np = pytest.importorskip("numpy") memory = psutil.Process().memory_info().rss a.memory_limit = memory / 0.7 + 400e6 + print("memory limit:", format_bytes(a.memory_limit)) class BadAccounting: def __init__(self, data): @@ -1128,6 +1129,12 @@ def f(n): start = time() while not a.data.disk: + print( + "RSS:", + format_bytes(psutil.Process().memory_info().rss), + "disk:", + list(a.data.disk), + ) await asyncio.sleep(0.1) assert time() < start + 5 From 7d780cdc3c1cb2bbf2b56c80c56ec19f607f39dc Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 11:18:01 -0600 Subject: [PATCH 04/11] Remove worker= kwarg Make client/worker mode mutually exclusive --- distributed/actor.py | 20 +++++++++----------- distributed/worker.py | 2 +- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index 231cc8b3a2d..79e96f5b28c 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -49,24 +49,22 @@ class Actor(WrappedKey): 2 """ - def __init__(self, cls, address, key, worker=None): + def __init__(self, cls, address, key): self._cls = cls self._address = address self.key = key self._future = None - if worker: - self._worker = worker - self._client = None - else: - try: - self._worker = get_worker() - except ValueError: - self._worker = None + self._client = None + self._worker = None + + try: + self._worker = get_worker() + except ValueError: try: self._client = get_client() - self._future = Future(key, inform=self._worker is None) + self._future = Future(key) except ValueError: - self._client = None + pass def __repr__(self): return "" % (self._cls.__name__, self.key) diff --git a/distributed/worker.py b/distributed/worker.py index 44e05f05024..bd1bf6f0d75 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1417,7 +1417,7 @@ async def get_data( if k in self.actors: from .actor import Actor - data[k] = Actor(type(self.actors[k]), self.address, k, worker=self) + data[k] = Actor(type(self.actors[k]), self.address, k) msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}} nbytes = {k: self.tasks[k].nbytes for k in data if k in self.tasks} From 0edc1a55011c0697b4e3aedbc6c6b6b797ead345 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 12:44:17 -0600 Subject: [PATCH 05/11] Roll back to first commit 97f9d60fd --- distributed/actor.py | 21 ++++++++++++--------- distributed/tests/test_actor.py | 4 +++- distributed/worker.py | 2 +- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index 79e96f5b28c..82e78afd716 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -49,22 +49,25 @@ class Actor(WrappedKey): 2 """ - def __init__(self, cls, address, key): + def __init__(self, cls, address, key, worker=None): self._cls = cls self._address = address self.key = key self._future = None - self._client = None - self._worker = None - - try: - self._worker = get_worker() - except ValueError: + if worker: + self._worker = worker + self._client = None + else: + try: + self._worker = get_worker() + except ValueError: + self._worker = None try: self._client = get_client() - self._future = Future(key) + self._future = Future(key, inform=self._worker is None) + # ^ When running on a worker, only hold a weak reference to the key, otherwise the key could become unreleasable. except ValueError: - pass + self._client = None def __repr__(self): return "" % (self._cls.__name__, self.key) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 0fa31b45dfb..87c126bf2f9 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -466,7 +466,7 @@ def f(block, ps=None): @pytest.mark.flaky(reruns=10, reruns_delay=5) -@gen_cluster(client=True, timeout=10) +@gen_cluster(client=True) async def test_compute(c, s, a, b): @dask.delayed def f(n, counter): @@ -485,8 +485,10 @@ def check(counter, blanks): result = await c.compute(final, actors=counter) assert result == 0 + 1 + 2 + 3 + 4 + start = time() while a.data or b.data: await asyncio.sleep(0.01) + assert time() < start + 30 def test_compute_sync(client): diff --git a/distributed/worker.py b/distributed/worker.py index bd1bf6f0d75..44e05f05024 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1417,7 +1417,7 @@ async def get_data( if k in self.actors: from .actor import Actor - data[k] = Actor(type(self.actors[k]), self.address, k) + data[k] = Actor(type(self.actors[k]), self.address, k, worker=self) msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}} nbytes = {k: self.tasks[k].nbytes for k in data if k in self.tasks} From 904f0f6cf2e3a5373d901566b40de88d3058075e Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 12:44:33 -0600 Subject: [PATCH 06/11] Revert "Debug flaky test_robust_to_bad_sizeof_estimates" This reverts commit e786f39cc2654d61bfbeaa52f3e2fff9b02151c1. --- distributed/tests/test_worker.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 1dad7ebbbd5..ba4de5199d2 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1111,7 +1111,6 @@ async def test_robust_to_bad_sizeof_estimates(c, s, a): np = pytest.importorskip("numpy") memory = psutil.Process().memory_info().rss a.memory_limit = memory / 0.7 + 400e6 - print("memory limit:", format_bytes(a.memory_limit)) class BadAccounting: def __init__(self, data): @@ -1129,12 +1128,6 @@ def f(n): start = time() while not a.data.disk: - print( - "RSS:", - format_bytes(psutil.Process().memory_info().rss), - "disk:", - list(a.data.disk), - ) await asyncio.sleep(0.1) assert time() < start + 5 From aa87db8a4a47119d159a81b8db5e9dbd91ae7379 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 12:52:43 -0600 Subject: [PATCH 07/11] driveby: fix Actor._sync for async workers I'm assuming that TODO comment had a typo and meant to say "support async operation" --- distributed/actor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/actor.py b/distributed/actor.py index 82e78afd716..5bdfbb2fb69 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -110,7 +110,8 @@ def _sync(self, func, *args, **kwargs): if self._client: return self._client.sync(func, *args, **kwargs) else: - # TODO support sync operation by checking against thread ident of loop + if self._asynchronous: + return func(*args, **kwargs) return sync(self._worker.loop, func, *args, **kwargs) def __dir__(self): From 417629417ca18c07a96dabcbdf9bea9b80b010c5 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 12:53:03 -0600 Subject: [PATCH 08/11] Note that `get_worker` may behave badly in tests --- distributed/actor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/actor.py b/distributed/actor.py index 5bdfbb2fb69..7b9c80bb22a 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -59,6 +59,7 @@ def __init__(self, cls, address, key, worker=None): self._client = None else: try: + # TODO: `get_worker` may return the wrong worker instance for async local clusters (most tests) self._worker = get_worker() except ValueError: self._worker = None From 4dac6424c2b3e88b7958e24762a9ebd9eec9d28e Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 14:54:46 -0600 Subject: [PATCH 09/11] rerun tests From cb8ca761cb9f0269d41b61c758566bbd8a6a0c79 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 22 Jun 2021 22:17:27 -0600 Subject: [PATCH 10/11] Set timeouts back to 30 just in case --- distributed/tests/test_actor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 87c126bf2f9..ebd02b95df3 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -515,7 +515,7 @@ def check(dask_worker): start = time() while any(client.run(check).values()): sleep(0.01) - assert time() < start + 10 + assert time() < start + 30 @gen_cluster( @@ -580,7 +580,7 @@ async def test_worker_actor_handle_is_weakref(c, s, a, b): start = time() while a.actors or b.data: await asyncio.sleep(0.1) - assert time() < start + 10 + assert time() < start + 30 def test_worker_actor_handle_is_weakref_sync(client): @@ -597,7 +597,7 @@ def check(dask_worker): start = time() while any(client.run(check).values()): sleep(0.01) - assert time() < start + 10 + assert time() < start + 30 def test_worker_actor_handle_is_weakref_from_compute_sync(client): @@ -618,7 +618,7 @@ def worker_tasks_running(dask_worker): start = time() while any(client.run(worker_tasks_running).values()): sleep(0.01) - assert time() < start + 10 + assert time() < start + 30 def test_one_thread_deadlock(): From e2cee3f1999c49f44a524d1653811013fb9f88a2 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 19 Jul 2021 17:51:29 -0800 Subject: [PATCH 11/11] clarify comment --- distributed/actor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/actor.py b/distributed/actor.py index b490f011a8c..19828281dc3 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -60,6 +60,7 @@ def __init__(self, cls, address, key, worker=None): else: try: # TODO: `get_worker` may return the wrong worker instance for async local clusters (most tests) + # when run outside of a task (when deserializing a key pointing to an Actor, etc.) self._worker = get_worker() except ValueError: self._worker = None