Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Future deserialization without available client #7580

Merged
merged 11 commits into from
Mar 21, 2023

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Feb 23, 2023

This is similar to

This grew a bit more complicated because I stumbled over #7498 again and had a deeper look and tried to preserve the "pass futures in collections" feature but it is fundamentally flawed. While I could bypass most of the "accidental client creations" in this PR (which is good), the fundamental flaw about it being possible to release a future before it is being deserialized is still there and hard to avoid without a more fundamental approach.

Two notable changes

  • The get_worker function now actually does what it is claiming. It returns the worker of the currently running task. Most notably this will raise if being executed outside of the threadpool. This is to a certain degree inconvenient since deserialization does typically not happen inside of our thread but I consider this important considering that our entire test suite is running asynchronously.
  • Futures, etc. are actually only initializing a client lazily if a client (or worker) isn't immediately available. This avoids problems about "when, where and how" do we deserialize a task, e.g. we can deserialize a Future or Queue object on the event loop but since we're only interacting with it in the threadpool, the initialization will be deferred. To a certain degree this is also nicer for diagnostics but that's not the motivating change for this.

This is again a supporting PR for #7564

thread_state.execution_state = execution_state
thread_state.key = key
thread_state.actor = True

result = function(*args, **kwargs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

State was actually leaking all over the place making get_worker in our test suite extremely unreliable.

Comment on lines -2722 to -2710
try:
return first(w for w in Worker._instances if w.status in WORKER_ANY_RUNNING)
except StopIteration:
raise ValueError("No workers found")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the appeal of this but this can throw off some of our test logic and tests suggest that certain functionality would work which in reality doesn't.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. due to our async tests, we are likely to get a worker just because in the same process an instance already exists. However, in reality there is no valid worker in the context, i.e. if the test example would be executed on a "real" cluster, it would fail

Comment on lines +59 to +61
@gen_cluster(client=True)
async def test_timeout_wake_waiter(c, s, a, b):
Copy link
Member Author

@fjetter fjetter Feb 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change to get_worker causes these things to break. On main, this test should actually not allowed to pass since there is no worker and no client in the context of the multi lock. However, due to the "pick first worker in the process" logic, it would pick a random worker and create implicitly a worker client. The Worker doesn't even have to be alive for this as long as it isn't GCed, yet, it would work
In most cases this is not a big deal but implicit, surprising client creations can cause things like #7498

@github-actions
Copy link
Contributor

github-actions bot commented Feb 23, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       26 files  ±  0         26 suites  ±0   11h 44m 40s ⏱️ - 41m 0s
  3 534 tests +  2    3 426 ✔️  -   1     104 💤 +  1  4 +2 
44 682 runs  +26  42 589 ✔️ +10  2 089 💤 +14  4 +2 

For more details on these failures, see this check.

Results for commit f7786ee. ± Comparison against base commit 89d5ad4.

This pull request removes 1 and adds 3 tests. Note that renamed tests count towards both.
distributed.tests.test_worker ‑ test_get_worker_name
distributed.protocol.tests.test_protocol ‑ test_deeply_nested_structures
distributed.tests.test_client ‑ test_serialize_future_without_client
distributed.tests.test_utils_test ‑ test_ensure_no_new_clients
This pull request skips 2 and un-skips 1 tests.
distributed.tests.test_worker ‑ test_get_client_coroutine_sync
distributed.tests.test_worker_client ‑ test_submit_different_names
distributed.shuffle.tests.test_shuffle ‑ test_minimal_version

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member Author

fjetter commented Feb 24, 2023

Interesting. I seem to hit a related recursion error in test_profile_server... 🤯

@fjetter fjetter force-pushed the future_serialization branch from b19041a to 7780ad7 Compare March 8, 2023 12:11
@fjetter
Copy link
Member Author

fjetter commented Mar 8, 2023

OK, so I'm not entirely sure if the profiling thing is indeed related but apparently with this change, we're more likely to generate a very deeply nested profile message. The profile collection works out nicely but as soon as one tries to submit this profile message, we're hitting a recursion error during serialization.

We can't truly protect ourselves from this during serialization since the serialization code does not know what is OK to cut out and what isn't. The only way I see we can deal with this is to be more conservative with the cutoff in profiling. Local testing found ~250 stacks to be the magical limit to get test_profile_server pass.

Comment on lines -159 to -160
if depth <= 0:
return None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check caused us to not collect any information in these cases. I think it's still valuable to get a snapshot through even if it's not the lowest frame. Moving the depth check further below achieves this (see test_profile.py)

@fjetter fjetter force-pushed the future_serialization branch from 68f48cf to f7786ee Compare March 15, 2023 12:32
@fjetter fjetter requested a review from jacobtomlinson as a code owner March 15, 2023 12:32
@hendrikmakait hendrikmakait self-requested a review March 15, 2023 13:40
Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code generally looks good to me. I have a few minor nits, and a few edge cases are untested. I suggest marking them with pragma: nocover if we don't want to test these.

return first(w for w in Worker._instances if w.status in WORKER_ANY_RUNNING)
except StopIteration:
raise ValueError("No workers found")
raise ValueError("No workers found") from None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
raise ValueError("No workers found") from None
raise ValueError("No worker found") from None

Comment on lines 1095 to 1099
with pytest.raises(AssertionError):
with ensure_no_new_clients():
async with Client(s.address, asynchronous=True):
with ensure_no_new_clients():
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the intention of this correct, I'd split it up into two distinct checks

Suggested change
with pytest.raises(AssertionError):
with ensure_no_new_clients():
async with Client(s.address, asynchronous=True):
with ensure_no_new_clients():
pass
with pytest.raises(AssertionError):
with ensure_no_new_clients():
async with Client(s.address, asynchronous=True):
pass
async with Client(s.address, asynchronous=True):
with ensure_no_new_clients():
pass

Comment on lines 77 to 80
# @pytest.mark.skipif(
# pa is not None,
# reason="We don't have a CI job that is installing a very old pyarrow version",
# )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# @pytest.mark.skipif(
# pa is not None,
# reason="We don't have a CI job that is installing a very old pyarrow version",
# )

Copy link
Member

@jrbourbeau jrbourbeau Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why this change was made to this test. My impression is that what's currently on main is still want we want. I've reverted back to what's on main (confirmed the test passes locally when pyarrow isn't installed). Happy to submit a follow-up PR if needed

@@ -86,8 +86,9 @@ async def test_minimal_version(c, s, a, b):
dtypes={"x": float, "y": float},
freq="10 s",
)
with pytest.raises(RuntimeError, match="requires pyarrow"):
await c.compute(dd.shuffle.shuffle(df, "x", shuffle="p2p"))
# with pytest.raises(RuntimeError, match="requires pyarrow"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# with pytest.raises(RuntimeError, match="requires pyarrow"):

@jrbourbeau
Copy link
Member

Noting test failures are unrelated to the changes in this PR (xref #7688)

@jrbourbeau jrbourbeau merged commit 1b34a5b into dask:main Mar 21, 2023
@fjetter fjetter deleted the future_serialization branch March 21, 2023 10:44
pentschev added a commit to pentschev/dask-cuda that referenced this pull request Mar 21, 2023
In dask/distributed#7580 `get_worker` was
modified to return the worker of a task, thus it cannot be used by
`client.run`, and we must now use `dask_worker` as the first argument to
`client.run` to obtain the worker.
rapids-bot bot pushed a commit to rapidsai/raft that referenced this pull request Mar 22, 2023
In dask/distributed#7580 get_worker was modified to return the worker of
a task, thus it cannot be used by client.run, and we must now use
dask_worker as the first argument to client.run to obtain the worker.
rapids-bot bot pushed a commit to rapidsai/dask-cuda that referenced this pull request Mar 22, 2023
In dask/distributed#7580 `get_worker` was modified to return the worker of a task, thus it cannot be used by `client.run`, and we must now use `dask_worker` as the first argument to `client.run` to obtain the worker.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #1141
rapids-bot bot pushed a commit to rapidsai/raft that referenced this pull request Mar 22, 2023
In dask/distributed#7580 get_worker was modified to return the worker of a task, thus it cannot be used by client.run, and we must now use dask_worker as the first argument to client.run to obtain the worker.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Corey J. Nolet (https://github.com/cjnolet)
  - AJ Schmidt (https://github.com/ajschmidt8)

URL: #1365
@martindurant
Copy link
Member

Since this PR, the following code fails surprisingly:

>>> client = distributed.Client()
>>> client.run(distributed.worker.get_worker)
ValueError: No worker found

This got noticed because it was causing fsspec's dask implementation to hang in tests.

@martindurant
Copy link
Member

I did this to workaround, since my filesystem only needs to know whether this is a process housing a worker, not if it happens to be running in a task.

@jrbourbeau
Copy link
Member

Thanks @martindurant, this looks related to #7696. There's a good conversation in that issue, but #7696 (comment) is the specific comment that relates to the snippet you posted

@martindurant
Copy link
Member

Thanks for the pointer. From fsspec's point of view, the question is "am I on a worker machine", independent of serialisation or task state, so I'll use the simple approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants