-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Speculatively assign tasks to workers #3974
Comments
Some first steps:
|
I'm wondering what problem this is solving exactly because I see a few potential gains/pitfalls here A) Ensure there is no network traffic of intermediate results between the execution of e.g. A1 and B1 If A) is the goal, I'm wondering if this approach is actually worth its complexity. After all, if our task2worker decision is accurate, we should not pay any network cost since B1 should be assigned to the same worker, shouldn't it? If B) is the goal, I'm wondering how the state machine on the scheduler is actually looking like. Can the scheduler still distinguish between And actually, if one of the indirect goals is to remove the necessity of fusing, I would argue that one of the convenient things about fusing is that it also helps to reduce the number of overall tasks since the scheduler becomes pretty busy if we reach graphs of significant size (>1M tasks) and this approach would not help in dealing with this. I'm wondering here if the cost of optimization outweighs the cost of task overhead |
The main objective is to avoid task fusion on the client side. This allows us to transmit high level graphs directly to the scheduler and avoid creating low-level graphs on the client entirely. I expect that this will significantly outweigh the costs of added tasks on the scheduler. Also, we're working to make the scheduler itself faster. Currently we process around 5000 tasks per second. I suspect/hope that we can improve on this significantly. It's a bit easier to optimize just the scheduler rather than both the scheduler and the client.
This problem exists today. The scheduler assigns all tasks the "processing" state after they have been assigned to a worker, but they may still be waiting in a queue, or waiting on data to transfer. The Scheduler does not know when a task has actually started running. This does come up in work stealing. There is a complex handshake to handle this. I believe that the current behavior is that the scheduler suggests to a worker "Hey Alice maybe you should go steal task X from worker Bob". Alice then tries to do that and Bob either agrees or says "nope, I'm working on X right now". |
For context, here is a link to the dataframe optimization code. The only thing below the We get a lot of speedup if we're able to return the HighLevelGraph |
Motivation
Currently when a task becomes ready to run the scheduler finds the best worker for that task distributes that task to that worker to be enqueued. The worker then handles whatever communication is necessary in order to collect the dependencies for that task and once it's ready it puts it in a queue to be run in a local ThreadPoolExecutor. When it finishes the worker informs the scheduler and goes on to its next task. When the scheduler receives news that the task has finished it marks all of its dependents, each of which go through this process again.
This is occasionally sub-optimal. Consider the following graph:
If we have one worker then both of the A's will be sent to that worker. The worker will finish A1, send the report to the scheduler that it is finished, and begin work on A2. It will then get information from the scheduler that B1 is ready to compute and it will work on that next. As a result, the resulting order of execution looks like the following:
When really we would have preferred ...
Today we often avoid this situation by doing task fusion on the client side, so that we really only have two tasks
If we remove low-level task fusion (which we may want to do for performance reasons) then it would be nice to capture this same A, B, A, B behavior some other way
Send tasks to workers before dependencies are set
One way to capture this same behavior is to send more tasks down to the worker, even before they are ready to run. Today we only send a task to a worker once we have high confidence that that is where it should run, which typically we only know after we understand the data sizes of all of its inputs. We only know this once its dependencies are done, so we only send tasks to workers once all of their dependencies are complete.
However, we could send not-yet-ready-to-run tasks to a worker with high confidence if all of that task's dependencies are also present or running on that worker. This happens to fully consume the low-level task fusion case. If
A1
is running on a worker then we believe with high probability thatB1
will also run on that same worker.Changes
This would be a significant change to the worker. We would need to add states to track dependencies, much like how we do in the dask.local scheduler, or a very stripped down version of the dask.distributed scheduler.
This will also require managing interactions with other parts of Dask like ...
Learning
I think that this task is probably a good learning task for someone who has some moderate exposure to the distributed scheduler and wants to level up a bit. Adding dependencies to the worker will, I think, force someone to think a lot about the systems that help make Dask run while mostly implementing minor versions of them. cc @quasiben @jacobtomlinson @madsbk
The text was updated successfully, but these errors were encountered: