Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance with dask.array fails #3833

Closed
RAMitchell opened this issue May 27, 2020 · 5 comments
Closed

Rebalance with dask.array fails #3833

RAMitchell opened this issue May 27, 2020 · 5 comments
Labels

Comments

@RAMitchell
Copy link

RAMitchell commented May 27, 2020

Hi, I am trying to rebalance a dask array evenly across workers with the following script:

import dask.array as da
from dask.distributed import Client, wait, LocalCluster
import dask
import numpy as np


def main():
    print(dask.__version__)
    cluster = LocalCluster(n_workers=8)
    client = Client(cluster)
    n = 5000
    m = 10000
    X_np = np.random.random((m, n))
    X = da.from_array(X_np, chunks=(100, n)).persist()
    wait(X)
    client.rebalance()
    cluster.close()
    client.close()


if __name__ == "__main__":
    main()

This results in:

2.16.0
distributed.worker - WARNING - Could not find data: {"('array-d6823717500dcebf6fea561299610b3b', 77, 0)": ['tcp://127.0.0.1:36715'], "('array-d6823717500dcebf6fea561299610b3b', 83, 0)": ['tcp://127.0.0.1:46453']} on workers: [] (who_has: {"('array-d6823717500dcebf6fea561299610b3b', 78, 0)": ['tcp://127.0.0.1:33229'], "('array-d6823717500dcebf6fea561299610b3b', 38, 0)": ['tcp://127.0.0.1:33229'], "('array-d6823717500dcebf6fea561299610b3b', 22, 0)": ['tcp://127.0.0.1:33229'], "('array-d6823717500dcebf6fea561299610b3b', 11, 0)": ['tcp://127.0.0.1:33229'], "('array-d6823717500dcebf6fea561299610b3b', 94, 0)": ['tcp://127.0.0.1:33229'], "('array-d6823717500dcebf6fea561299610b3b', 41, 0)": ['tcp://127.0.0.1:33229'], "('array-d6823717500dcebf6fea561299610b3b', 69, 0)": ['tcp://127.0.0.1:33229'], "('array-d6823717500dcebf6fea561299610b3b', 19, 0)": ['tcp://127.0.0.1:33229'], "('array-d6823717500dcebf6fea561299610b3b', 3, 0)": ['tcp://127.0.0.1:33229'], "('array-d6823717500dcebf6fea561299610b3b', 93, 0)": ['tcp://127.0.0.1:33229'], "('array-d6823717500dcebf6fea561299610b3b', 77, 0)": ['tcp://127.0.0.1:36715'], "('array-d6823717500dcebf6fea561299610b3b', 83, 0)": ['tcp://127.0.0.1:46453']})
distributed.utils - ERROR - 'keys'
Traceback (most recent call last):
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/utils.py", line 664, in log_errors
    yield
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/scheduler.py", line 3122, in rebalance
    "keys": tuple(concat(r["keys"].keys() for r in result)),
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/scheduler.py", line 3122, in <genexpr>
    "keys": tuple(concat(r["keys"].keys() for r in result)),
KeyError: 'keys'
distributed.core - ERROR - 'keys'
Traceback (most recent call last):
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/core.py", line 403, in handle_comm
    result = await result
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/scheduler.py", line 3122, in rebalance
    "keys": tuple(concat(r["keys"].keys() for r in result)),
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/scheduler.py", line 3122, in <genexpr>
    "keys": tuple(concat(r["keys"].keys() for r in result)),
KeyError: 'keys'
Traceback (most recent call last):
  File "mre_github.py", line 22, in <module>
    main()
  File "mre_github.py", line 16, in main
    client.rebalance()
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/client.py", line 3111, in rebalance
    return self.sync(self._rebalance, futures, workers, **kwargs)
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/client.py", line 816, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/utils.py", line 347, in sync
    raise exc.with_traceback(tb)
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/utils.py", line 331, in f
    result[0] = yield future
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/client.py", line 3086, in _rebalance
    result = await self.scheduler.rebalance(keys=keys, workers=workers)
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/core.py", line 750, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/core.py", line 549, in send_recv
    raise exc.with_traceback(tb)
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/core.py", line 403, in handle_comm
    result = await result
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/scheduler.py", line 3122, in rebalance
    "keys": tuple(concat(r["keys"].keys() for r in result)),
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/scheduler.py", line 3122, in <genexpr>
    "keys": tuple(concat(r["keys"].keys() for r in result)),
KeyError: 'keys'

I am aware that the way I am loading data is not necessarily optimal. I specifically want to address the case where the data comes via numpy to dask.array and needs to be rebalanced before significant work is done on each worker (xgboost training).

@jacobtomlinson
Copy link
Member

Looks related to #3670

@jacobtomlinson
Copy link
Member

jacobtomlinson commented May 27, 2020

I am able to reproduce this on master. I've opened #3834 because we should be getting a Value Error instead of a KeyError because there are missing keys reported during rebalance.

However I'm not certain the keys are actually missing because running the rebalance again succeeds, and I am also able to call X.compute() without issue.

An unpleasant workaround for this example is:

try:
    client.rebalance()
except:
    client.rebalance()

@RAMitchell
Copy link
Author

I'm getting the updated error message but the problem still persists in distributed 2.26:

2.26.0
distributed.worker - WARNING - Could not find data: {"('array-61ad310c38b65a47424c1bf0ef9ab594', 43, 0)": ['tcp://127.0.0.1:41573']} on workers: [] (who_has: {"('array-61ad310c38b65a47424c1bf0ef9ab594', 13, 0)": ['tcp://127.0.0.1:33749'], "('array-61ad310c38b65a47424c1bf0ef9ab594', 33, 0)": ['tcp://127.0.0.1:33749'], "('array-61ad310c38b65a47424c1bf0ef9ab594', 49, 0)": ['tcp://127.0.0.1:33749'], "('array-61ad310c38b65a47424c1bf0ef9ab594', 55, 0)": ['tcp://127.0.0.1:33749'], "('array-61ad310c38b65a47424c1bf0ef9ab594', 65, 0)": ['tcp://127.0.0.1:45625'], "('array-61ad310c38b65a47424c1bf0ef9ab594', 43, 0)": ['tcp://127.0.0.1:41573']})
Traceback (most recent call last):
  File "mre_github.py", line 23, in <module>
    main()
  File "mre_github.py", line 17, in main
    client.rebalance()
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/client.py", line 3159, in rebalance
    return self.sync(self._rebalance, futures, workers, **kwargs)
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/client.py", line 834, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/home/nfs/rorym/miniconda3/envs/cudf_test/lib/python3.7/site-packages/distributed/client.py", line 3137, in _rebalance
    f"During rebalance {len(result['keys'])} keys were found to be missing"
ValueError: During rebalance 1 keys were found to be missing

@jacobtomlinson
Copy link
Member

Does a second call to rebalance still succeed?

@crusaderky
Copy link
Collaborator

Closing as duplicate of #4906

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants