Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All tasks without dependencies are root-ish #7221

Open
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

gjoseph92
Copy link
Collaborator

@gjoseph92 gjoseph92 commented Oct 28, 2022

If a task doesn't have dependencies, then it's obviously a root task. However, the current logic would only consider it so if the TaskGroup was also 2x larger than the cluster.

That allowed for the awkward case where small groups of root tasks would go down a different code path. It's not clear how much this code path was even used or needed, xref #6974.

See a little other discussion in #7204 (comment).

Closes #7274.

  • Tests added / passed
  • Passes pre-commit run --all-files

This isn't a very good test, since it's weirdly stateful and making assumptions about which worker is selected in an empty cluster
@github-actions
Copy link
Contributor

github-actions bot commented Oct 28, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±  0         15 suites  ±0   8h 6m 39s ⏱️ + 1h 37m 50s
  3 170 tests +  2    3 035 ✔️  -   48    84 💤 +1    51 +  49 
23 456 runs  +16  22 220 ✔️  - 316  904 💤 +4  332 +328 

For more details on these failures, see this check.

Results for commit 17f1ba4. ± Comparison against base commit 02b9430.

♻️ This comment has been updated with latest results.

@mrocklin
Copy link
Member

That allowed for the awkward case where small groups of root tasks would go down a different code path. It's not clear how much this code path was even used or needed, xref #6974.

I don't have a ton to add to this conversation, but some examples of important cases that look like this include:

  1. da.from_zarr where there is a single zarr array dataset in the graph
  2. client.submit (possibly in a for loop)

Would it make sense to avoid this logic in these cases?

@gjoseph92
Copy link
Collaborator Author

First, let's clarify that this is_rootish check was originally written in #4967 just to decide which tasks should get co-assigned, versus which should follow "normal" scheduling logic. (Then, within the "normal" scheduling logic, there was another branch for the no-deps case we're talking about removing in #6974.)

We've now co-opted this "should this task be co-assigned" logic and are also using it to decide "should this task be queued?" We do this because "is this a root task" happens to be a good answer to both of those questions. But when thinking about possible impacts, it could be helpful to ask "what's the effect of co-assigning these tasks" separately from "what's the effect of queuing these tasks".

  1. from_zarr
    • co-assigning: doesn't matter. There's only one task anyway.
    • queuing: makes behavior more consistent. If there are other tasks already on the cluster, it makes more sense that this should wait in the queue until space opens up (though it has no effect on actual performance, since opening a zarr dataset doesn't take much memory).
  2. single client.submit
    • co-assigning: doesn't matter. There's only one task. However, you lose round-robin. (Though test_quiet_cluster_round_robin still passes on this branch even though we're bypassing the round-robin code path, which I can't explain).
    • queuing: makes behavior more consistent. These tasks now have to wait in the queue like everything else instead of skipping in line.
  3. client.submit in a for loop
    • co-assign: well, co-assignment has always been strange for this case. Before, we'd submit the first nthreads*2 tasks randomly/round-robin, then flip to co-assignment mode and submit them in batches to a worker. Now, we'll always submit in batches, but initially the batch size will be 1 until the TaskGroup grows large enough. So worker selection/load distribution will be very slightly different, but I think not in a way that matters or is noticeable.
    • queuing: makes behavior more consistent. All tasks are queued instead of the first nthreads*2 tasks getting a fast-pass.
  4. client.submit in a for loop with follow-up tasks
    • co-assign: same as above, no meaningful effect
    • queuing: could reduce overproduction (assuming tasks are slower than task submission). Right now you'll get root task overproduction with submit in a for-loop, because the first nthreads*2 tasks will be sent to workers immediately before queuing flips on. If you client.submit all your root tasks, then client.submit downstream tasks before any root tasks have finished, overproduction will be prevented with this change.

tl;dr I can't think of a meaningful way those cases would get worse, or reason they should avoid queuing/co-assignment logic

Overall, I don't think this change should affect actual use much. Nearly all of my motivation for doing it is to reduce the number of branches to worry about, make things more consistent, simplify testing, and increase confidence that our tests are actually testing the queuing code path. For instance, because most tests just client.submit a couple tasks, a ton of tests were actually not using the queuing code path even with worker-saturation: 1.0. (This change is how I found #7223).

@mrocklin
Copy link
Member

mrocklin commented Oct 31, 2022 via email

@gjoseph92
Copy link
Collaborator Author

distributed/tests/test_scheduler.py::test_balance_many_workers - assert {0, 1, 2} == {0, 1} could be real, looking into it: https://github.com/dask/distributed/actions/runs/3364213017/jobs/5578308124#step:18:1315

This makes me wonder if the result is very meaningful though:

2022-10-31 20:08:16,899 - distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
2022-10-31 20:08:17,803 - distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
2022-10-31 20:08:40,283 - distributed.core - INFO - Event loop was unresponsive in Scheduler for 24.96s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

We were ending up with `last_worker_tasks_left` initially as 0, which immediately decremented to -1. Then it was a truthy value, so we reused the worker for the entire task group!
seemed to be relying on round-robin, since it only ever looked at one worker
@gjoseph92
Copy link
Collaborator Author

All green besides:

@gjoseph92 gjoseph92 changed the title [DNM] All tasks without dependencies are root-ish All tasks without dependencies are root-ish Nov 1, 2022
@crusaderky crusaderky self-requested a review November 2, 2022 15:34
distributed/scheduler.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@crusaderky crusaderky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a test for an impossible condition that should be replaced with an assertion (read above). Everything else looks good.

@gjoseph92
Copy link
Collaborator Author

there's a test for an impossible condition

@crusaderky see #7221 (comment)

@gjoseph92
Copy link
Collaborator Author

Again only failure is

@gjoseph92 gjoseph92 mentioned this pull request Nov 3, 2022
2 tasks
@gjoseph92
Copy link
Collaborator Author

@gjoseph92
Copy link
Collaborator Author

Another reason to merge this:

The current code path (when the taskgroup is smaller than the cluster) can have bad behavior for worker-saturation > 1.0. Because of the round-up, when using a value like 1.1, workers will be in the idle set even when all their threads are in use. In a non-homogeneous cluster, this can lead to picking a worker that's completely full even when there are workers with open threads. That's the original thing that happened in #7197.

@gjoseph92 gjoseph92 mentioned this pull request Nov 3, 2022
2 tasks
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of test refactoring going on. I would appreciate these changes to be in a separate PR to distinguish functional changes from refactoring much better. This is a non-trivial change in behavior and mixing it up with generic code refactoring makes it difficult to see what's really going on.


From what I can see you are introducing one new test and are skipping one.

You are already discussing four different edge cases (and I'm sure there are more). Can you please write them as a test? This way we know for a fact that the code behaves this way instead of relying on a lengthy argument.

Comment on lines -216 to -227
with client.get_executor(retries=5, pure=False) as e:
with client.get_executor(retries=6, pure=False) as e:
future = e.submit(varying(args))
assert future.result() == 42

with client.get_executor(retries=4) as e:
with client.get_executor(retries=1) as e:
future = e.submit(varying(args))
result = future.result()
assert result == 42

with client.get_executor(retries=2) as e:
future = e.submit(varying(args))
with pytest.raises(ZeroDivisionError, match="two"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to change these retries?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is extremely sensitive to how workers are selected on an idle cluster. It uses this stateful varying utility, which changes its behavior depending on how many times it's been called on that worker.

It had to be changed when the idle-round-robin behavior was added: https://github.com/dask/distributed/pull/4638/files#diff-59af67191283f0c64a3be8ce1f344f49b9d025f8264b77fba5c8250865bde433

So I've had to change it again since it's being removed.

Comment on lines -92 to -94
pytest.param(False, id="queue on worker"),
pytest.param(True, id="queue on scheduler"),
],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the ids?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They weren't accurate anymore. I could call then pause and clog or something.

@@ -2849,6 +2860,10 @@ async def test_get_worker_monitor_info(s, a, b):
assert res[w.address]["last_time"] is not None


@pytest.mark.skipif(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to preserve this kind of behavior. I don't care if it's actually round robin or not but if I submit three tasks and there are two workers, both workers should have work.

What's the plan to fix this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I submit three tasks and there are two workers, both workers should have work

That's not what this is testing. What you're describing above already will happen. This PR in fact improves that case. See #7197: if you set worker-saturation to 1.1 right now, test_wait_first_completed will fail because 2 tasks get assigned to the 1-threaded worker, and one task is assigned to the 2-threaded worker, because they're using the old code path.

This test is testing that if you submit a task to a completely empty cluster, wait for it to complete, release it, then submit another task, that you'll get different workers each time. That's a different and much more niche case.

Approaches for this:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete the test

@gjoseph92
Copy link
Collaborator Author

You are already discussing four different edge cases (and I'm sure there are more). Can you please write them as a test? This way we know for a fact that the code behaves this way instead of relying on a lengthy argument.

I'm not following what you'd want to assert about the cases mentioned in here #7221 (comment) (I'm assuming those are the four cases you're talking about).

There are already a number of tests that cover aspects of this behavior, for instance:

@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4)
async def test_balanced_with_submit(c, s, *workers):
L = [c.submit(slowinc, i) for i in range(4)]
await wait(L)
for w in workers:
assert len(w.data) == 1

@gen_cluster(client=True, nthreads=[("127.0.0.1", 20)] * 2)
async def test_scheduler_saturates_cores(c, s, a, b):
for delay in [0, 0.01, 0.1]:
futures = c.map(slowinc, range(100), delay=delay)
futures = c.map(slowinc, futures, delay=delay / 10)
while not s.tasks:
if s.tasks:
assert all(
len(p) >= 20
for w in s.workers.values()
for p in w.processing.values()
)
await asyncio.sleep(0.01)

@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 30)
async def test_balance_many_workers(c, s, *workers):
futures = c.map(slowinc, range(20), delay=0.2)
await wait(futures)
assert {len(w.has_what) for w in s.workers.values()} == {0, 1}
# FIXME test is very timing-based; if some threads are consistently slower than others,
# they'll receive fewer tasks from the queue (a good thing).
@pytest.mark.skipif(
MACOS and math.isfinite(dask.config.get("distributed.scheduler.worker-saturation")),
reason="flaky on macOS with queuing active",
)
@nodebug
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 30,
config={"distributed.scheduler.work-stealing": False},
)
async def test_balance_many_workers_2(c, s, *workers):
futures = c.map(slowinc, range(90), delay=0.2)
await wait(futures)
assert {len(w.has_what) for w in s.workers.values()} == {3}

I could see adding tests for:

  1. client.submit after the cluster is already saturated doesn't get to cut in line before tasks that were submitted earlier (would fail today)
  2. client.submit in a for loop with follow-up tasks doesn't have overproduction with queuing on. This would just be test_graph_execution_width using futures instead of delayed.

@fjetter
Copy link
Member

fjetter commented Nov 3, 2022

I'm cool with merging iff

  1. Find out where we hit the "dead code". If it's actually dead code we need to remove it. If it is not we should have a test that shows the difference in behavior
  2. Have a guestimate on how bad performance is
  3. CI is green-ish

@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented Nov 4, 2022

I removed the dead code in c901823 and added an assertion. Turns out there was a place it was hit. I'm addressing that in #7259, which will need to be merged first.

CI will be highly red until that's merged into here, so I won't be doing further work here until then.

@gjoseph92
Copy link
Collaborator Author

Have a guestimate on how bad performance is

Ran a benchmark on a larger cluster (not with this PR, but still relevant): #7246 (comment)

tl;dr performance impact is hardly measurable.

Alternative to dask#7259. I'm quite torn about which is cleaner. I'm leaning towards this because I think it's even weirder to call `decide_worker_rootish_queuing_disabled` on a root-ish task when queuing is enabled than to call `decide_worker_non_rootish` on a root-ish task.

This also feels more consistent with the philosophy of "stick with the original decision". And if root-ish were a static property, this is what would happen.
This reverts commit e9be596.
@gjoseph92
Copy link
Collaborator Author

I'm wondering if a better way to solve the consistency issue is to just cache root-ish-ness so we don't have to worry about it changing: #7262

@@ -2014,6 +2014,8 @@ def transition_no_worker_processing(self, key, stimulus_id):
assert ts in self.unrunnable

if ws := self.decide_worker_non_rootish(ts):
# ^ NOTE: `ts` may actually be root-ish now, but it wasn't when it went
# into `no-worker`. `TaskGroup` or cluster size could have changed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in https://github.com/dask/distributed/pull/7259/files#r1015290381 this should be impossible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Tasks which are obviously root tasks not considered rootish
4 participants