Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resilience doens't work if workers for client.submit is specified. #8320

Closed
trivialfis opened this issue Nov 1, 2023 · 11 comments
Closed

Resilience doens't work if workers for client.submit is specified. #8320

trivialfis opened this issue Nov 1, 2023 · 11 comments
Labels
needs info Needs further information from the user

Comments

@trivialfis
Copy link

trivialfis commented Nov 1, 2023

When the workers parameter for client.submit is specified, the restarted worker hangs.

XGBoost uses this parameter to guarantee all workers are unique for a collective-style task. It's especially important for GPU-based tasks since NCCL doesn't support running multiple processes on the same device.

import os
import time

import distributed
from dask.distributed import Client, LocalCluster


def get_client_workers(client):
    workers = client.scheduler_info()["workers"]
    return list(workers.keys())


def fn(i: int):
    pid = os.getpid()
    worker = distributed.get_worker()
    # pick a pid and run `kill -15 pid` in another terminal
    print("i:", i, "pid: ", pid, " w:", worker.id)
    now = time.time()
    time.sleep(15)
    end = time.time()
    print("after sleep", end - now)
    return pid


async def _launch():
    client = distributed.get_client()
    workers = get_client_workers(client)
    print(workers)
    futs = []
    for i in range(len(workers)):
        f = client.submit(fn, i, pure=False, workers=[workers[i]])
        futs.append(f)

    results = await client.gather(futs)
    return results


def launch(client):
    results = client.sync(_launch)
    print(results)


if __name__ == "__main__":
    with LocalCluster(n_workers=2, threads_per_worker=4) as cluster:
        with Client(cluster) as client:
            launch(client)

Environment:

  • Dask version: 2023.7.1
  • Python version: 3.10.12
  • Operating System: Ubuntu 22.04
  • Install method (conda, pip, source): conda
@hendrikmakait
Copy link
Member

@trivialfis: Thanks for reporting this issue! Please note that Dask generally does not reuse worker addresses (what you'd get from get_client_workers) when restarting workers. IIUC, your aim is to guarantee that no worker runs multiple tasks at the same time. One way to achieve this would be using worker resources (https://distributed.dask.org/en/latest/resources.html). You could assign each worker a single resource and request that for your task. Does that help?

As a side note, if you're trying to improve resilience for XGBoost on Dask, I'd be interested to hear about your requirements and approach. From a brief look at resilience for XGBoost on Dask in the past, it looked like the current implementation did not match Dask's task-based model very well. Maybe sparring a bit can help us discover a better approach for you to implement on the XGBoost side. I'd suggest creating an issue on XGBoost and @ me there if you're interested.

@hendrikmakait hendrikmakait added needs info Needs further information from the user and removed needs triage labels Jan 2, 2024
@trivialfis
Copy link
Author

@hendrikmakait Thank you for the reply.

Quoting from @crusaderky dask/dask#10239 (comment)

This is because XGBoost is an MPI application shoehorned on top of dask.

As a result, the resource approach might not work, since collective communication requires all workers to be available simultaneously. With resources, it's reasonable for the scheduler to finish the tasks sequentially.

I'd be interested to hear about your requirements and approach.

I'm currently working on different things but will come back to it. The plan is to just handle exceptions inside XGBoost at the moment (like network errors that we can catch). But to handle sigkill, this issue needs to be resolved since we need dask to restore itself before XGBoost can do anything.

From my perspective, so far the best scenario for XGBoost is to have collective style tasks support in dask.

@hendrikmakait
Copy link
Member

As a result, the resource approach might not work, since collective communication requires all workers to be available simultaneously. With resources, it's reasonable for the scheduler to finish the tasks sequentially.

That makes sense; one alternative to explore would be whether XGBoost functionality could also be written in a more task-based approach (I suspect the answer might be no).

Your best bet might be doing something akin to Dask's P2P system, which tracks the involved workers and triggers reconciliation logic if a worker dies.

Another related issue would be #7346 which would eagerly fail restricted tasks on worker removal.

@trivialfis
Copy link
Author

trivialfis commented Jan 3, 2024

(I suspect the answer might be no).

Indeed. There are a couple of patterns for training ML models, like collective, and parameter servers, all of which require all the workers to be available for synchronization. Would be great if there were some references on how deep learning libraries are integrated with dask, I assume they have more advancements than us considering the popularity there, along with its scale (LLM uses a huge number of workers).

Your best bet might be doing something akin to Dask's P2P system, which tracks the involved workers and triggers reconciliation logic if a worker dies.

Thank you for the suggestion. I'm not sure if I can (or should) look into these features though, after all XGBoost is a ML library.

@hendrikmakait
Copy link
Member

Thank you for the suggestion. I'm not sure if I can (or should) look into these features though, after all XGBoost is a ML library.

You'll likely want to have a chat with one or more of the Dask core contributors and then we'll figure out a way forward.

@trivialfis
Copy link
Author

Yes, I shared it with @pentschev when this issue was raised, seems to be not trivial considering the collective-style task feature request has been going for a while now.

@hendrikmakait
Copy link
Member

I'd suggest to drop by at one of the next Dask Monthly meetings and raise your work/problems there (https://docs.dask.org/en/stable/support.html). There is one tomorrow, but I suspect many people will still be out of office.

@trivialfis
Copy link
Author

Thank you for sharing! I will try to join one after the holiday season.

@hendrikmakait
Copy link
Member

Thank you for sharing! I will try to join one after the holiday season.

Sounds great, in the meantime I've given #7346 a mild makeover so that I think we can close this issue as a duplicate of it. Sounds good?

@trivialfis
Copy link
Author

Yup, let me close it.

@fjetter
Copy link
Member

fjetter commented Mar 7, 2024

So, we discussed this issue a little in the maintainers call. Right now, everything appears to be very hard coded towards shuffles but I think generally speaking the P2PExtension wold lend itself to be expanded to support a more or less generic interface for this. Right now, we're already implementing two different "applications" which are similar in that they send a lot of data but they are very different in their implementation details.

The two applications DataFrame shuffle and Array rechunking are essentially implementing this ShuffleRun interface and the ShuffleRunSpec (this is effectively a run factory)

class ShuffleRun(Generic[_T_partition_id, _T_partition_type]):

class ShuffleRunSpec(Generic[_T_partition_id]):

The worker and scheduler extensions essentially take care of most/all of the synchronization and integration with the scheduler but don't implement application specific logic.

These Run classes are both implementing an

etc.

The concrete implementations can be found here for DataFrame and here for Array

Those shuffle runs are also owning disk and comm buffers but the more general version is just a subset of the current classes interface.

I think this is not well formalized and definitely not documented very well but it could be. I wonder if an application like xgboost couldn't just implement this interface such that we'll throw it into the extension and treat it as a "shuffle"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs info Needs further information from the user
Projects
None yet
Development

No branches or pull requests

3 participants