Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue up non-rootish tasks if they break priority ordering #7526

Closed
wants to merge 7 commits into from

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Feb 8, 2023

First of all, I hate this but it's a rather pragmatic solution that should only have any real effect if something bad happens

Closes #7496

Queuing does not support any worker restrictions and can therefore break priority task ordering by assigning non-queued, lower prio tasks to workers before it gets a chance to de-queue the queued up tasks.

This is more or less a relict of how queuing is enabled since it prefers scheduling non-queued tasks first. Basically we're calling stimulus_queue_slots_maybe_opened after transitions which causes the non-queued tasks to be transitioned first before checking on queued tasks.

From this perspective, what I'm proposing here is a bit of an ugly work around but it's pretty straight forward. So far, I only verified it on the actual P2P problem presented in #7496 and it works as expected. Will need to look into a test now

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's possible here that tasks that are queued but have restrictions could block up the queue.

Say 1 task slot opens up on worker A, so in stimulus_queue_slots_maybe_opened we peek the first task on the queue and transition it to processing. That task is restricted to only run on worker B, so we don't schedule it. But next in the queue is a task without restrictions. We should have run that immediately, but instead, worker A's thread will just remain unused. The restricted task remains at the front of the queue.

The next time a task slot opens (say on worker C), we'll peek 2 tasks, so the second, unrestricted task on the queue does get scheduled. But the first, restricted task stays at the front of the queue.

In general, we'll only schedule task_slots_available - num_unrunnable_queued_tasks each time, when we should be scheduling task_slots_available tasks. The restricted tasks will slowly pile up at the front of the queue.

We'd have to switch from a peekn here to a linear search through the queue, like I mentioned in #7496 (comment). In that case, a HeapSet is probably the wrong data structure and we'd want a SortedSet instead. But also, when there are a lot of unrunnable tasks on the queue (which I think there would be in the shuffling case), an operation that's currently O(1) in the typical case would get more expensive. It could be worth implementing this and trying it out—maybe in reality the performance cost isn't so bad—but theoretically it does seem slower.

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Show resolved Hide resolved
self.queued.discard(ts)
worker_msgs = self._add_to_processing(ts, ws)
# If no worker, task just stays `queued`
if self.is_rootish(ts):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if/else gives me flashbacks to #7280 / #7221 / #7262. Since we still have that inconsistency where is_rootish can change while the task is in the queue, I'd just want to think through the other possible reasons why ts might not be rootish here.

@fjetter
Copy link
Member Author

fjetter commented Feb 8, 2023

The next time a task slot opens (say on worker C), we'll peek 2 tasks, so the second, unrestricted task on the queue does get scheduled. But the first, restricted task stays at the front of the queue.

In general, we'll only schedule task_slots_available - num_unrunnable_queued_tasks each time, when we should be scheduling task_slots_available tasks. The restricted tasks will slowly pile up at the front of the queue.

This change does not enable queuing for restricted tasks directly but only for lower priority tasks, i.e. the peek still works.

I think this argument is still valid but I suspect you'd need to write a pretty custom graph to actually trigger something like this.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 8, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       24 files  ±  0         24 suites  ±0   10h 31m 58s ⏱️ + 5m 58s
  3 344 tests +  7    3 231 ✔️  -   2     105 💤 +1    8 +  8 
39 426 runs  +84  37 519 ✔️ +48  1 873 💤 +2  34 +34 

For more details on these failures, see this check.

Results for commit 7f6e80c. ± Comparison against base commit 63ae1db.

♻️ This comment has been updated with latest results.

@gjoseph92
Copy link
Collaborator

This change does not enable queuing for restricted tasks directly but only for lower priority tasks, i.e. the peek still works.

Yes, the change does guarantee that there are some tasks ahead of the restricted task when it's inserted. But at some point, all those will be completed and it'll end up at the front of the queue. At that point, the restricted task will take up space at the front of the queue for a while until the right worker opens.

Unless I'm missing some mechanism that would pull the restricted tasks out from the middle of the queue and schedule them?

I imagine this works okay in the context of P2P. I'm more worried about other cases where people are already using restrictions. For example, if you have some huge 30min tasks that use resource restrictions to only run on a couple workers, once those workers are full, other tasks with those restrictions could clog up the front of the queue for 30min until a viable slot opens again.

The problem I'm describing is almost a scheduler corollary of #6136, which is kind of interesting.

@fjetter
Copy link
Member Author

fjetter commented Feb 9, 2023

There is no explicit mechanism from prohibiting the queue to block up. However, if a task was popped that can not run, it should be moved back to waiting/constrained and would not block an actual slot. whenever the next task finishes, we'd then schedule two queued tasks instead of one. Considering how unlikely this is, I'm not worried about it, yet.

Before I'll engage on any further theoretical back and forth I'll check what kind of tests are failing

@fjetter
Copy link
Member Author

fjetter commented Feb 9, 2023

For some reason our CI runs froze during test_threadsafe_get. What's interesting is that even pytest-timeout could not kill the tests but we actually ran into GH actions timeout. I wouldn't be terribly surprised if any modifications to queuing would deadlock the scheduler but this is very strange

@gjoseph92 gjoseph92 mentioned this pull request Feb 9, 2023
2 tasks
@fjetter
Copy link
Member Author

fjetter commented Feb 13, 2023

Interestingly, I found a case where queuing breaks depth first even without any restrictions. Currently investigating

Copy link
Member Author

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one of our test cases (test_dataframe_set_index_sync) I encountered that tasks were queued up because they would break priority ordering but the queued up tasks didn't have any restrictions. I haven't entirely understood what's happening here, yet.

What's even more interesting is that the behavior is very easy to trigger with cient.compute but not with dask.compute

Code to reproduce (This is the inner workings of set_index to calc quantiles from the test case. I haven't tried reproducing this with a simpler example

import dask
from dask.utils import M
from distributed import Client

with Client() as client:
    df = dask.datasets.timeseries(
        start="2000",
        end="2001",
        dtypes={"value": float, "name": str, "id": int},
        seed=1,
    )
    partition_col = df["name"]
    divisions = partition_col._repartition_quantiles(df.npartitions)
    mins = partition_col.map_partitions(M.min)
    maxes = partition_col.map_partitions(M.max)

    # client.gather(client.compute([mins, maxes, divisions]))
    dask.compute([mins, maxes, divisions])

Rendering a smaller version of this graph shows one anomaly (left hand side, number 28)

quantiles

I can see how this node is identified as being of lower priority than other root tasks ( which in this specific case is not true but possibly for larger graphs) although I would argue this is wrong since its dependency has to be held in memory way longer. I'm not sure if dask.order considers fan-out dependents to be smaller/more important than their dependencies.

Indeed we can see that with these modifications, the quantile computation has a higher memory footprint because we'd push back 28 into the queue instead of running it eagerly once possible.

I have no idea why I don't see this problem with dask.compute

Comment on lines +2700 to 2702
if self._is_rootish_no_restrictions(ts):
assert not self.idle_task_count, (ts, self.idle_task_count)
self._validate_ready(ts)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an earlier version I asserted that if a task is not rootish, it would have restrictions. This caused CI to fail very hard.

It turns out that running non-rootish tasks with lower priority than queued tasks is not impossible

@fjetter
Copy link
Member Author

fjetter commented Feb 15, 2023

I have a couple of very interesting results from benchmarking then comparing this branch to #7531 i.e. the benchmarking results include exclusively the changes corresponding to queuing up tasks that would otherwise execute too early given their priority

https://github.com/coiled/coiled-runtime/actions/runs/4172251035

There are a couple of minor/mediocre speedups in wall time for some tests but that's not very exciting. However, the memory footprint goes through the roof!

These are rather early results but this lets me think that a major contributing factor to the success of queuing is in fact that we are breaking priority ordering in a very specific way (This is not the only contributor).

Wall Clock

image

Avg memory

image

Peak memory

image

@gjoseph92
Copy link
Collaborator

a major contributing factor to the success of queuing is in fact that we are breaking priority ordering in a very specific way

This is interesting. I some research on queuing and priority ordering a couple months ago, when trying to add co-assignment.

This has motivated me to finally post that writeup (in unedited, too-long form). @fjetter you might find the videos and discussion in the middle interesting: #7555.

@fjetter
Copy link
Member Author

fjetter commented Feb 17, 2023

This has motivated me to finally post that writeup (in unedited, too-long form). @fjetter you might find the videos and discussion in the middle interesting: #7555.

Thanks. I'll have a look as soon as I find the time.


I'm closing this PR. Not following priorities strictly is apparently an unintended(?) perk of task queuing so this PR obviously goes in the wrong direction. However, I believe this motivates a couple of interesting experiments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

P2P shuffling and queuing combined may cause high memory usage with dask.dataframe.merge
2 participants