Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get_worker can return the wrong worker with an async local cluster #4959

Open
gjoseph92 opened this issue Jun 23, 2021 · 2 comments
Open

get_worker can return the wrong worker with an async local cluster #4959

gjoseph92 opened this issue Jun 23, 2021 · 2 comments

Comments

@gjoseph92
Copy link
Collaborator

Split out from #4937 (comment).

What happened:

When using a local cluster in async mode, get_worker always returns the same Worker instance, no matter which worker it's being called within.

What you expected to happen:

get_worker

Minimal Complete Verifiable Example:

@gen_cluster(client=True)
async def test_get_worker_async_local(c, s, *workers):
    assert len(workers) > 1
    def whoami():
        return get_worker().address

    results = await c.run(whoami)
    assert len(set(results.values())) == len(workers), results
E       AssertionError: {'tcp://127.0.0.1:59774': 'tcp://127.0.0.1:59776', 'tcp://127.0.0.1:59776': 'tcp://127.0.0.1:59776'}
E       assert 1 == 2
E         +1
E         -2

Anything else we need to know?:

This probably almost never affects users directly. But since most tests use an async local cluster with @gen_cluster, I'm concerned what edge-case behavior we might testing incorrectly.

Also, note that the same issue affects get_client. This feels a tiny bit less bad (at least it's always the right client, unlikeget_worker), but still can have some strange effects. In multiple places, worker code updates the default Client instance assuming it's in a separate process. With multiple workers trampling the default client, I wonder if this affects tests around advanced secede/client-within-task workloads.

I feel like the proper solution here would be to set a contextvar for the current worker that's updated as we context-switch in and out of that worker. Identifying the points where those switches have to happen seems tricky though.

I also think it would be reasonable for get_worker to error if len(Worker._instances) > 1.

Environment:

  • Dask version: ac35e0f
  • Python version: 3.9.5
  • Operating System: macOS
  • Install method (conda, pip, source): source
@jrbourbeau
Copy link
Member

Thanks for the detailed description and example test @gjoseph92!

As mentioned offline, when get_worker is called from inside a task thread_state.execution_state["worker"] should point to the corresponding worker which is running the task

try:
return thread_state.execution_state["worker"]
except AttributeError:
try:
return first(w for w in Worker._instances if w.status == Status.running)
except StopIteration:
raise ValueError("No workers found")

In the case that get_worker is called outside of a task, what should the "correct" worker be?

@gjoseph92
Copy link
Collaborator Author

Just adding a note here since I didn't see it written down: setting thread_state.execution_state["worker"] while deserializing (and serializing) on a worker would probably alleviate most of the problems we see with this issue. It typically seems to come up with stateful things that interact with worker machinery like Actors, ShuffleService, etc. that define a custom __setstate__ which tries to store the current get_worker() in an instance variable when unpickled.

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Oct 27, 2021
Fixes dask#4959

`get_client` was calling the private `Worker._get_client` method when it ran within a task. `_get_client` should really have been called `_make_client`, since it created a new client every time. The simplest correct thing to do instead would have been to use the `Worker.client` property, which caches this instance.

In order to pass the `timeout` parameter through though, I changed `Worker.get_client` to actually match its docstring and always return the same instance.
gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Oct 27, 2021
Fixes dask#4959

`get_client` was calling the private `Worker._get_client` method when it ran within a task. `_get_client` should really have been called `_make_client`, since it created a new client every time. The simplest correct thing to do instead would have been to use the `Worker.client` property, which caches this instance.

In order to pass the `timeout` parameter through though, I changed `Worker.get_client` to actually match its docstring and always return the same instance.
gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Oct 27, 2021
This is probably a good idea in general (xref dask#4959), but it particularly helps with performance deserializing Futures, which have a fastpath through `Client.current` that bypasses a number of unnecessarily slow things that `get_client` does before it checks `Client.current`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants