-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resilience doens't work if workers
for client.submit
is specified.
#8320
Comments
@trivialfis: Thanks for reporting this issue! Please note that Dask generally does not reuse worker addresses (what you'd get from As a side note, if you're trying to improve resilience for XGBoost on Dask, I'd be interested to hear about your requirements and approach. From a brief look at resilience for XGBoost on Dask in the past, it looked like the current implementation did not match Dask's task-based model very well. Maybe sparring a bit can help us discover a better approach for you to implement on the XGBoost side. I'd suggest creating an issue on XGBoost and @ me there if you're interested. |
@hendrikmakait Thank you for the reply. Quoting from @crusaderky dask/dask#10239 (comment)
As a result, the resource approach might not work, since collective communication requires all workers to be available simultaneously. With resources, it's reasonable for the scheduler to finish the tasks sequentially.
I'm currently working on different things but will come back to it. The plan is to just handle exceptions inside XGBoost at the moment (like network errors that we can catch). But to handle sigkill, this issue needs to be resolved since we need dask to restore itself before XGBoost can do anything. From my perspective, so far the best scenario for XGBoost is to have collective style tasks support in dask. |
That makes sense; one alternative to explore would be whether XGBoost functionality could also be written in a more task-based approach (I suspect the answer might be no). Your best bet might be doing something akin to Dask's P2P system, which tracks the involved workers and triggers reconciliation logic if a worker dies. Another related issue would be #7346 which would eagerly fail restricted tasks on worker removal. |
Indeed. There are a couple of patterns for training ML models, like collective, and parameter servers, all of which require all the workers to be available for synchronization. Would be great if there were some references on how deep learning libraries are integrated with dask, I assume they have more advancements than us considering the popularity there, along with its scale (LLM uses a huge number of workers).
Thank you for the suggestion. I'm not sure if I can (or should) look into these features though, after all XGBoost is a ML library. |
You'll likely want to have a chat with one or more of the Dask core contributors and then we'll figure out a way forward. |
Yes, I shared it with @pentschev when this issue was raised, seems to be not trivial considering the collective-style task feature request has been going for a while now. |
I'd suggest to drop by at one of the next Dask Monthly meetings and raise your work/problems there (https://docs.dask.org/en/stable/support.html). There is one tomorrow, but I suspect many people will still be out of office. |
Thank you for sharing! I will try to join one after the holiday season. |
Sounds great, in the meantime I've given #7346 a mild makeover so that I think we can close this issue as a duplicate of it. Sounds good? |
Yup, let me close it. |
So, we discussed this issue a little in the maintainers call. Right now, everything appears to be very hard coded towards shuffles but I think generally speaking the P2PExtension wold lend itself to be expanded to support a more or less generic interface for this. Right now, we're already implementing two different "applications" which are similar in that they send a lot of data but they are very different in their implementation details. The two applications DataFrame shuffle and Array rechunking are essentially implementing this distributed/distributed/shuffle/_core.py Line 62 in b1597b6
distributed/distributed/shuffle/_core.py Line 421 in b1597b6
The worker and scheduler extensions essentially take care of most/all of the synchronization and integration with the scheduler but don't implement application specific logic. These
etc. The concrete implementations can be found here for DataFrame and here for Array Those shuffle runs are also owning disk and comm buffers but the more general version is just a subset of the current classes interface. I think this is not well formalized and definitely not documented very well but it could be. I wonder if an application like xgboost couldn't just implement this interface such that we'll throw it into the extension and treat it as a "shuffle" |
When the
workers
parameter forclient.submit
is specified, the restarted worker hangs.XGBoost uses this parameter to guarantee all workers are unique for a collective-style task. It's especially important for GPU-based tasks since NCCL doesn't support running multiple processes on the same device.
Environment:
The text was updated successfully, but these errors were encountered: