Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trouble scaling cudf merge benchmark with ucx ~64 nodes on NERSC perlmutter #930

Closed
lastephey opened this issue Jun 9, 2022 · 48 comments · Fixed by #933
Closed

trouble scaling cudf merge benchmark with ucx ~64 nodes on NERSC perlmutter #930

lastephey opened this issue Jun 9, 2022 · 48 comments · Fixed by #933

Comments

@lastephey
Copy link

Dear dask-cuda devs,

We've been working with @quasiben to do some benchmarking with Dask UCX on NERSC's Perlmutter system. (You can see some background in @rcthomas issue here). So far I've been able to get the TCP-based cudf merge benchmark to scale to about 256 nodes (it hit timeout but I think it would have finished) but I cannot get the UCX-based benchmark to run successfully at or above 64 nodes. The good news is that when it runs, UCX is quite a bit faster than TCP, but of course it would be nice to get it to scale up. You can view a summary of the benchmarking data I've gathered so far.

More details:

A few months ago @quasiben kindly provided his benchmark script which I have lightly modified for my recent benchmarking efforts. You can view my modified version here.

I am using UCX 1.12.1 and the 22.06 rapids-nightly build. You can view my buildscript here. Note this is all being done outside a container using the NERSC shared filesystems.

I have set export UCXPY_LOG_LEVEL=DEBUG and included the logs of all of my trials here. Probably the most useful log is the UCX 64 node run. It's really huge and hard to navigate (sorry), but around line 4103066 you start to see errors like

[1654722743.050706] [nid001897:44756:0]         ib_mlx5.h:672  UCX  ERROR   madvise(DONTFORK, buf=0x55790ec0a000, len=131072) failed: Cannot allocate memory
[1654722743.050979] [nid001897:44756] UCXPY  DEBUG Error callback for endpoint 0x7f31609bad40 called with status -3: Input/output error

Seeing these I shrank the problem a bit compared to what @quasiben had done in his benchmark (40k vs 50k). I also made --rmm-pool-size 35GB a bit smaller. I also bumped up the ulimit as far as it can go on Perlmutter, although I don't know if this matters.

Do you think this is really a memory issue? Based on this it sounds like it could be an RDMA config issue, but I am way out of my depth here so I'd appreciate any advice you have.

Thank you very much,
Laurie

@pentschev
Copy link
Member

Thanks @lastephey for the very detailed report.

Seeing these I shrank the problem a bit compared to what @quasiben had done in his benchmark (40k vs 50k). I also made --rmm-pool-size 35GB a bit smaller.

The error seems to be coming from the mlx5 interface, therefore I think the RMM pool size should be ok (assuming all GPUs in the cluster have more than 35GB).

I also bumped up the ulimit as far as it can go on Perlmutter, although I don't know if this matters.

Do you think this is really a memory issue? Based on this it sounds like it could be an RDMA config issue, but I am way out of my depth here so I'd appreciate any advice you have.

Based on the link you posted and from our experience, memlock was always problematic and in all systems we use UCX we had to ask our IT to set that to unlimited. Would you be able to confirm what is the state of memlock on Perlmutter, and perhaps set it to unlimited? If this is not something that you have permission to set, it may be worth checking with IT whether it would be possible to set to unlimited. I have also asked internally whether people have additional ideas of what that could be, but today and tomorrow NVIDIA folks are OOO, so I may not have any further details until Monday.

@lastephey
Copy link
Author

Hi @pentschev,

Thanks for your quick reply. We don't have a /etc/security/limits.d/rdma.conf, but we do have a /etc/security/limits.d/99-slingshot-network.conf.

stephey@perlmutter:login33:/etc/security/limits.d> cat 99-slingshot-network.conf 
# Slingshot Network limits configuration

*       hard    memlock     -1
*       soft    memlock     -1

I'm not sure if the -1 means it's already set to unlimited. I'll see if I can find out more.

Yes, all of our GPUs are A100s with 40 GB each, so it sounds like we should be fine in terms of GPU memory. Thanks for the information.

@lastephey
Copy link
Author

I clarified with our systems team that this means memlock is already set to unlimited.

@pentschev
Copy link
Member

I checked internally and it seems like the usual UCX workaround for that would be to set UCX_IB_FORK_INIT=n to disable marking pages available for child process, i.e., we wouldn't be allowed to fork. However, the problem with Dask-CUDA is we need to spawn new processes within Python to correctly setup new workers (i.e., set the correct environment variables), and we do use spawn in Dask, but Python implements this as fork+exec: https://bugs.python.org/issue46367 .

It seems another alternative would be increasing vm.max_map_count (most likely will require root privileges) from the 65535 default:

sysctl -w vm.max_map_count=655350

Could you check whether it is possible to increase that?

@quasiben
Copy link
Member

I was working on #834 so we get around the fork/spawn issue. I can probably pick this back up next week

@wence-
Copy link
Contributor

wence- commented Jun 14, 2022

I think that multiprocessings forkserver context is a possible way to handle this. The idea is that it creates a child process which will be responsible for subsequent forks. If this is set up before dask/UCX-Py import and initialize the UCX backend, then I think things might work. I've used (morally) this approach in the past when seeing similar problems running with openmpi which in a number of configurations forbids fork after init.

A small testcase suggests this may work:

Consider:

import sys
from multiprocessing import forkserver

from dask.distributed.utils import mp_context

if __name__ == "__main__":
    if len(sys.argv) > 1:
        forkserver.ensure_running()
    print(mp_context)
    from dask_cuda import LocalCUDACluster
    cluster = LocalCUDACluster(protocol="ucx", CUDA_VISIBLE_DEVICES=[0])

If I do:

$ export UCX_IB_FORK_INIT=n
$ export DASK_DISTRIBUTED__WORKER_MULTIPROCESSING_METHOD=forkserver
$ python test.py # don't ensure the forkserver before UCX-py is loaded
...
Aborted (core dumped)
$ python test.py 1 # ensure forkserver is launched
... all fine

Which is at least minimal progress.

It looks like there are a few other places where a multiprocessing context is created:

  • in dask/multiprocessing.py, which can be controlled by setting DASK_MULTIPROCESSING__CONTEXT=forkserver;
  • and ucp/utils.py which is hard-coded so probably needs a configuration.

@pentschev
Copy link
Member

@wence- would you be willing to open a PR in Distributed to add a similar config for the hardcoded value in ucp/utils.py? If you need any help, I can guide you through what is probably necessary tomorrow.

@wence-
Copy link
Contributor

wence- commented Jun 14, 2022

Presume s/Distributed/UCX-Py/, but yes, can do.

@pentschev
Copy link
Member

Sorry, for a moment I confused ucp/utils.py with distributed/comm/ucx.py, which would be a bit more work. But in UCX-Py we can simply add another environment variable to control that.

@wence-
Copy link
Contributor

wence- commented Jun 14, 2022

If that ends up working for @lastephey we should figure out a more principled way of doing this. The import-order dance seems sufficiently delicate that I couldn't figure out how to set the dask.distributed config via code (rather than environment variables) in a way that worked.

@lastephey
Copy link
Author

I can give @wence-'s example a shot. Since we are not using LocalCUDACluster, does it make sense to initialize via multiprocessing before we start the Dask scheduler?

@wence-
Copy link
Contributor

wence- commented Jun 14, 2022

I can give @wence-'s example a shot. Since we are not using LocalCUDACluster, does it make sense to initialize via multiprocessing before we start the Dask scheduler?

Yes, the LocalCUDACluster part was just me trying to find a minimal failing test. I think the critical thing is that the forkserver.ensure_running() call occurs before any clusters are launched anywhere.

@lastephey
Copy link
Author

We've seen issues with forking pop up in an mpi context too (more info here). Our suggestion has been to set

export IBV_FORK_SAFE=1
export RDMAV_HUGEPAGES_SAFE=1

Do you think that would be helpful here as well?

Re: @wence- 's solution, it seems like we'll have to launch the cluster with the API rather than the CLI. Is that a fair assessment?

@lastephey
Copy link
Author

Sorry, maybe I spoke too soon. Looks like I can go in and modify $CONDA_PREFIX/lib/python3.9/site-packages/distributed/cli/dask_scheduler.py and dask_worker.py, and $CONDA_PREFIX/lib/python3.9/site-packages/dask_cuda/benchmarks/local_cudf_merge.py to all include the multiprocessing wrapper. They will each have their own forkserver. I'll give that a try and report back.

@pentschev
Copy link
Member

I am not totally sure those variables will be necessary, the link mentions problems with Python subprocessing in mpi4py, subprocessing also applies to Dask, so it is possible those will help. However, there aren't much more details on what the problems are exactly, so it seems difficult to say for sure. Perhaps, if resources and time permit, try both to see what happens?

There's also the following comment on that link:

However it's also possible these variables will not fix the error. The most robust and future-proof strategy is to remove instances of spawning forks/subprocesses within mpi4py code.

Judging by that, it seems like we will anyway need to find a more robust solution that would not lead to Dask processes forking/spawning, and there @quasiben 's solution from #834 may come in handy, but maybe that would be only an initial step and we would need more work still.

@pentschev
Copy link
Member

Sorry, maybe I spoke too soon. Looks like I can go in and modify $CONDA_PREFIX/lib/python3.9/site-packages/distributed/cli/dask_scheduler.py and dask_worker.py, and $CONDA_PREFIX/lib/python3.9/site-packages/dask_cuda/benchmarks/local_cudf_merge.py to all include the multiprocessing wrapper. They will each have their own forkserver. I'll give that a try and report back.

I am not sure if the scheduler forks/spawns, so maybe it isn't needed there. Also note that the Dask-CUDA benchmarks use dask_cuda_worker.py instead of dask_worker.py, so you probably want to modify that instead. But I also think that would be the idea for the CLI tools as well.

@wence-
Copy link
Contributor

wence- commented Jun 14, 2022

Judging by that, it seems like we will anyway need to find a more robust solution that would not lead to Dask processes forking/spawning,

FWIW, the forkserver approach is robust (it isolates the forker from the IB stack, and I have existence proofs running on big machines), one just has to track down all the sequencing to ensure that all relevant processes set things up correctly.

@lastephey
Copy link
Author

My small 4 node tests with the forkserver completed, so no problems there. Just have to wait in the queue for my larger jobs to complete.

@pentschev
Copy link
Member

FWIW, the forkserver approach is robust (it isolates the forker from the IB stack, and I have existence proofs running on big machines), one just has to track down all the sequencing to ensure that all relevant processes set things up correctly.

I didn't mean to say that the forkserver is not robust enough for this, I only meant to say that we may be able to come up with a solution where we don't need to spawn new processes at all, which would hopefully make things clearer.

@lastephey
Copy link
Author

My 32 node test completed successfully, but my 64 node test failed (job log here).

I think the important part of the log is around line 4083490:

[1655256353.861590] [nid002725:54152] UCXPY  DEBUG [Send #428] ep: 0x7f1192a00080, tag: 0x8f54b17c3a88c16c, nbytes: 16, type: <class 'bytes'>
[1655256354.133411] [nid001561:96989:0]         ib_mlx5.h:672  UCX  ERROR   madvise(DONTFORK, buf=0x556be15ac000, len=131072) failed: Cannot allocate memory
[1655256354.134188] [nid003097:28544] UCXPY  DEBUG Error callback for endpoint 0x7efbe938fbc0 called with status -23: Operation rejected by remote peer
Task exception was never retrieved
future: <Task finished name='Task-53155' coro=<_listener_handler_coroutine() done, defined at /global/common/software/das/stephey/dask-ucx/lib/python3.9/site-packages/ucx_py-0.27.0a0+9.ge8ebe72-py3.9-linux-x86_64.egg/ucp/core.py:128> exception=UCXError('Input/output error')>
Traceback (most recent call last):
  File "/global/common/software/das/stephey/dask-ucx/lib/python3.9/site-packages/ucx_py-0.27.0a0+9.ge8ebe72-py3.9-linux-x86_64.egg/ucp/core.py", line 135, in _listener_handler_coroutine
    endpoint = ucx_api.UCXEndpoint.create_from_conn_request(
  File "ucp/_libs/ucx_endpoint.pyx", line 315, in ucp._libs.ucx_api.UCXEndpoint.create_from_conn_request
  File "ucp/_libs/ucx_endpoint.pyx", line 231, in ucp._libs.ucx_api.UCXEndpoint.__init__
  File "ucp/_libs/utils.pyx", line 113, in ucp._libs.ucx_api.assert_ucs_status
ucp._libs.exceptions.UCXError: Input/output error

I did add the multiprocessing forkserver to dask_scheduler.py, dask_cuda_worker.py, and local_cudf_merge.py. You can see these modified files here. Do you see any issues with how I've added the forkserver modifications?

I'll try testing with those two additional environment variables just in case they are helpful since the UCX error is still related to forking.

@lastephey
Copy link
Author

In the case with

export IBV_FORK_SAFE=1
export RDMAV_HUGEPAGES_SAFE=1

and the previous forkserver configuration, the 64 node job still failed, but with different errors than with the forksever alone (job log here). I couldn't find any instance of fork in the job log.

Some of the more serious looking errors are:

[1655272629.231772] [nid002825:33030] UCXPY  DEBUG Endpoint.abort(): 0x7f709329d840

on line 3250600 of the job log

2022-06-14 22:57:06,242 - distributed.worker - ERROR - Worker stream died during communication: ucx://10.249.24.250:60822

on line 3250647 of the job log

and

2022-06-14 22:57:06,258 - distributed.worker - ERROR - failed during get data with ucx://10.249.23.85:50355 -> ucx://10.249.24.250:60822

on line 3252406 of the job log.

@wence-
Copy link
Contributor

wence- commented Jun 15, 2022

Hmm, without the IBV_FORK_SAFE=1 and RDMAV_HUGEPAGES_SAFE=1 setting, can you try running with export UCX_IB_FORK_INIT=n? This is the workaround mentioned by @pentschev which use of forkserver should allow us to try (previously we couldn't use it because the processes wanted to fork after UCX was initialised).

@wence-
Copy link
Contributor

wence- commented Jun 15, 2022

Actually, scratch that. In the logs I notice that we have <multiprocessing.context.SpawnContext object at 0x7f06cfa89970> printed (this comes from your modified benchmark script that prints the dask/distributed mp_context variable). So it looks like we didn't catch all the places that a multiprocessing context is created. Let me have a go at replicating this exact run so I can provide some patches that set the forkserver context everywhere...

@wence-
Copy link
Contributor

wence- commented Jun 15, 2022

I did add the multiprocessing forkserver to dask_scheduler.py, dask_cuda_worker.py, and local_cudf_merge.py. You can see these modified files here. Do you see any issues with how I've added the forkserver modifications?

That looked right, unfortunately we also needed to ensure that dask/distributed use the forkserver multiprocessing context. Can you add DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=forkserver to the set of environment variables you provide to the different runners in your slurm script as well? Something like this:

diff -u nersc-ben-benchmark.sh.orig nersc-ben-benchmark.sh
--- nersc-ben-benchmark.sh.orig	2022-06-15 12:58:51.000000000 +0100
+++ nersc-ben-benchmark.sh	2022-06-15 12:59:28.000000000 +0100
@@ -37,6 +37,7 @@
 UCX_TCP_MAX_CONN_RETRIES=255 \
 DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=3600s \
 DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP=3600s \
+DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=forkserver \
 python -m distributed.cli.dask_scheduler \
 --protocol $protocol \
 --interface hsn0 \
@@ -66,6 +67,7 @@
     UCX_TCP_MAX_CONN_RETRIES=255 \
     DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=3600s \
     DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP=3600s \
+    DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=forkserver \
     UCX_MAX_RNDV_RAILS=1 \
     UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda \
     UCX_MEMTYPE_CACHE=n \
@@ -79,6 +81,7 @@
     UCX_TCP_MAX_CONN_RETRIES=255 \
     DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=3600s \
     DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP=3600s \
+    DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=forkserver \
     UCX_MAX_RNDV_RAILS=1 \
     UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda \
     UCX_MEMTYPE_CACHE=n \
@@ -100,6 +103,7 @@
 echo "Client start: $(date +%s)"
    DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=3600s \
    DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP=3600s \
+   DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=forkserver \
    UCX_MAX_RNDV_RAILS=1 \
    UCX_TCP_MAX_CONN_RETRIES=255 \
    UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES=cuda \

Diff finished.  Wed Jun 15 13:02:52 2022

@wence-
Copy link
Contributor

wence- commented Jun 15, 2022

Actually, scratch that. In the logs I notice that we have <multiprocessing.context.SpawnContext object at 0x7f06cfa89970> printed (this comes from your modified benchmark script that prints the dask/distributed mp_context variable). So it looks like we didn't catch all the places that a multiprocessing context is created. Let me have a go at replicating this exact run so I can provide some patches that set the forkserver context everywhere...

With dask/distributed#6580 and #933 you should be able to run any of the dask-cuda benchmarks with --multiprocessing-method forkserver and get the forkserver approach set up and working properly, fingers crossed :).

@lastephey
Copy link
Author

Ok, I reran with the DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=forkserver environment setting in the appropriate places-- you can see the updated jobscript here.

The 64 node job failed. The output looks similar to the failure I saw last night (line 4171401):

[1655317269.470212] [nid001521:124553:0]         ib_mlx5.h:672  UCX  ERROR   madvise(DONTFORK, buf=0x55f8a24d5000, len=131072) failed: Cannot allocate memory
[1655317269.470694] [nid001833:125521] UCXPY  DEBUG Error callback for endpoint 0x7f6287229600 called with status -23: Operation rejected by remote peer
Task exception was never retrieved
future: <Task finished name='Task-53247' coro=<_listener_handler_coroutine() done, defined at /global/common/software/das/stephey/dask-ucx/lib/python3.9/site-packages/ucx_py-0.27.0a0+9.ge8ebe72-py3.9-linux-x86_64.egg/ucp/core.py:128> exception=UCXError('Input/output error')>
Traceback (most recent call last):
  File "/global/common/software/das/stephey/dask-ucx/lib/python3.9/site-packages/ucx_py-0.27.0a0+9.ge8ebe72-py3.9-linux-x86_64.egg/ucp/core.py", line 135, in _listener_handler_coroutine
    endpoint = ucx_api.UCXEndpoint.create_from_conn_request(
  File "ucp/_libs/ucx_endpoint.pyx", line 315, in ucp._libs.ucx_api.UCXEndpoint.create_from_conn_request
  File "ucp/_libs/ucx_endpoint.pyx", line 231, in ucp._libs.ucx_api.UCXEndpoint.__init__
  File "ucp/_libs/utils.pyx", line 113, in ucp._libs.ucx_api.assert_ucs_status
ucp._libs.exceptions.UCXError: Input/output error
[1655317269.470490] [nid001521:124553:0]       wireup_cm.c:1193 UCX  WARN  failed to create server ep and connect to worker address on device mlx5_1:1, tl_bitmap 0xf8000 0x0, status Input/output error

Maybe UCX/UCX-py needs to be instructed not to fork as well?

@pentschev
Copy link
Member

It seems like in your jobscript you're not setting UCX_IB_FORK_INIT=n. If that was really the case, could you try again with that set?

@lastephey
Copy link
Author

That's correct @pentschev. I misunderstood your earlier comment. I'll re-run with that addition.

@pentschev
Copy link
Member

Thanks for the patience and persistence @lastephey , and apologies if my comment was unclear.

@lastephey
Copy link
Author

No problem-- I appreciate all of your help @pentschev @wence- and @quasiben.

So far it seems like UCX_IB_FORK_INIT=n has caused my 4 node test to run quite a bit slower (~5x slower). Is that expected?

@lastephey
Copy link
Author

My 64 node test hit timeout at 90 minutes. I think maybe UCX_IB_FORK_INIT=n avoids the errors we discussed above but the performance penalty makes it almost unusable. A 64 node TCP test ran successfully in about 8 minutes, for comparison.

I do see things like this appearing in the job log around line 2123218:

[1655393096.086607] [nid002164:76951:0]           ib_md.c:1234 UCX  WARN  IB: ibv_fork_init() was disabled or failed, yet a fork() has been issued.
[1655393196.566848] [nid002352:71691] UCXPY  DEBUG [Send #010] ep: 0x7f0e13646240, tag: 0xcc2fd44f02e62096, nbytes: 16, type: <class 'bytes'>
[1655393096.086615] [nid002164:76951:0]           ib_md.c:1235 UCX  WARN  IB: data corruption might occur when using registered memory.
[1655393196.911670] [nid002697:123119] UCXPY  DEBUG [Send #010] ep: 0x7fb6dd3dd6c0, tag: 0x39dbe0c920ae905c, nbytes: 16, type: <class 'bytes'>
[1655393195.800750] [nid002164:76951:0]     ib_mlx5_log.c:162  UCX  ERROR Local QP operation on mlx5_0:1/RoCE (synd 0x2 vend 0x68 hw_synd 0/31)
[1655393196.171781] [nid002121:27959] UCXPY  DEBUG [Send #006] ep: 0x7f7761e1d840, tag: 0xd91a5a3f942036a8, nbytes: 16, type: <class 'bytes'>
[1655393195.800750] [nid002164:76951:0]     ib_mlx5_log.c:162  UCX  ERROR RC QP 0x64ed wqe[31]: SEND s-e [inl len 26] [rqpn 0x1678b dlid=0 sl=0 port=1 src_path_bits=0 dgid=::ffff:10.249.7.176 sgid_index=3 traffic_class=0]

@wence-
Copy link
Contributor

wence- commented Jun 16, 2022

I found one further place where a fork happens. As a temporary fix can you pip uninstall ptxcompiler in your environment (see rapidsai/ptxcompiler#20 for a slightly more principled approach).

but yes, that env var may help but that slowdown is unacceptable.

@lastephey
Copy link
Author

Thanks @wence-. I'm a still a bit unclear on what exactly needs to be done though- do we need both the UCX_IB_FORK_INIT=n and to manually remove all instances of forking? Or if we did find and remove all instances of forking, is UCX_IB_FORK_INIT=n no longer necessary?

@wence-
Copy link
Contributor

wence- commented Jun 16, 2022

Ah sorry, my hope is that if we track down all the instances of forking we do not need UCX_IB_FORK_INIT=n

But I am not fully sure that this will be enough.

@lastephey
Copy link
Author

Thanks for clarifying. Ok, I'll disable UCX_IB_FORK_INIT=n for now, remove the ptxcompiler, and test.

@lastephey
Copy link
Author

My latest 64 node test failed (job log here, unfortunately still with the same types of error messages:

[1655432572.415553] [nid001552:113666:0]       wireup_cm.c:1193 UCX  WARN  failed to create server ep and connect to worker address on device mlx5_1:1, tl_bitmap 0xf8000 0x0, status Input/output error
[1655432572.415745] [nid001552:113666:0]         ib_mlx5.h:672  UCX  ERROR   madvise(DONTFORK, buf=0x559fa766f000, len=131072) failed: Cannot allocate memory
[1655432572.415765] [nid001804:84478] UCXPY  DEBUG Error callback for endpoint 0x7fed82495000 called with status -23: Operation rejected by remote peer
Task exception was never retrieved
future: <Task finished name='Task-58726' coro=<_listener_handler_coroutine() done, defined at /global/common/software/das/stephey/dask-ucx/lib/python3.9/site-packages/ucx_py-0.27.0a0+9.ge8ebe72-py3.9-linux-x86_64.egg/ucp/core.py:128> exception=UCXError('Input/output error')>
Traceback (most recent call last):
  File "/global/common/software/das/stephey/dask-ucx/lib/python3.9/site-packages/ucx_py-0.27.0a0+9.ge8ebe72-py3.9-linux-x86_64.egg/ucp/core.py", line 135, in _listener_handler_coroutine
    endpoint = ucx_api.UCXEndpoint.create_from_conn_request(
  File "ucp/_libs/ucx_endpoint.pyx", line 315, in ucp._libs.ucx_api.UCXEndpoint.create_from_conn_request
  File "ucp/_libs/ucx_endpoint.pyx", line 231, in ucp._libs.ucx_api.UCXEndpoint.__init__
  File "ucp/_libs/utils.pyx", line 113, in ucp._libs.ucx_api.assert_ucs_status
ucp._libs.exceptions.UCXError: Input/output error

Confirming that I did remove ptxcompiler:

stephey@perlmutter:login26:~> module load python
stephey@perlmutter:login26:~> conda activate dask-ucx
(/global/common/software/das/stephey/dask-ucx) stephey@perlmutter:login26:~> conda list | grep "dask"
# packages in environment at /global/common/software/das/stephey/dask-ucx:
dask                      2022.5.2           pyhd8ed1ab_0    conda-forge
dask-core                 2022.5.2           pyhd8ed1ab_0    conda-forge
dask-cuda                 22.08.00a220605         py39_13    rapidsai-nightly
dask-cudf                 22.06.00a220605 cuda_11_py39_g82c062ae26_318    rapidsai-nightly
(/global/common/software/das/stephey/dask-ucx) stephey@perlmutter:login26:~> conda list | grep "ptx"
(/global/common/software/das/stephey/dask-ucx) stephey@perlmutter:login26:~> 

My assessment is that we must still be missing some instances of forking. What do you all think?

@pentschev
Copy link
Member

It is possible, I think we may have to have a second on all involved bits. @wence- should be out for some time on PTO, but I will pick this up with @quasiben next week, I think he also has access to Perlmutter which should allow us also provide us another point of support for debugging. This is anyway gonna be exciting when it is resolved!

@lastephey
Copy link
Author

Ok sounds good. FYI I'll also be out on vacation all next week. Please feel free to keep posting updates. If you get stuck with Slurm/queues/anything NERSC-related, hopefully @rcthomas can help you out. I'll be in the wilderness for a few days, but I'll try to respond when I can.

Again, I really appreciate all of your help troubleshooting this.

@rapids-bot rapids-bot bot closed this as completed in #933 Jun 27, 2022
rapids-bot bot pushed a commit that referenced this issue Jun 27, 2022
Allows selection of the method multiprocessing uses to start child
processes. Additionally, in the forkserver case, ensure the fork
server is up and running before any computation happens.

Potentially fixes #930. Needs dask/distributed#6580.

cc: @pentschev, @quasiben

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #933
@wence- wence- reopened this Jun 27, 2022
@wence-
Copy link
Contributor

wence- commented Jun 27, 2022

I'm pretty sure that merge does let things progress further, but I still have some tests outstanding

@wence-
Copy link
Contributor

wence- commented Jun 28, 2022

To update, I think that we managed to chase down all the places that the dask-cuda toolchain calls fork(), and have workarounds in place. We need to use nightly (rather than released) versions of distributed, dask-cuda, and ptxcompiler.

In particular one needs

To run we must set PTXCOMPILER_CHECK_NUMBA_CODEGEN_PATCH_NEEDED=0 in the environment (this need will disappear eventually as part of work in numba).

Run-scripts for a SLURM based submission system can be seen here https://github.com/wence-/dask-cuda-benchmarks/tree/main/perlmutter, here are some unprocessed results.

Some aspects of the dask-cuda benchmark data aggregation are not currently scalable: I will address this in #940.

@lastephey
Copy link
Author

Thanks for all your work on this.

I've been adapting your test scripts to run with a container (which at NERSC is Shifter) to try to get better and more uniform performance at scale (and also avoid filesystem quota issues). You can see my work here.

Shifter images have been squashed and as a result, are read-only. I think the dask nannies are attempting to create directories in places like /tmp-2530763-1, which results in a permission-denied error. In Shifter, the only place that is writable on-node is /tmp.

Reading the nanny source code, it looks like we should be able to control this location via the distributed.nanny.environ.local_directory setting, although my attempts to do this so far have failed. I have tried setting

TEMPDIR=$SCRATCH/python-benchmark/dask-tcp-vs-ucx/temp
export DASK_DISTRIBUTED__NANNY__ENVIRON__LOCAL_DIRECTORY=${TEMPDIR}

and also

export DASK_LOCAL_DIRECTORY=${TEMPDIR}

neither seem to have any effect.

Is there a way to do this, preferably via an environment variable setting?

For reference, here is a more complete traceback:

Traceback (most recent call last):
  File "/opt/conda/envs/ucx/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/conda/envs/ucx/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/dask_cuda/cli/dask_cuda_worker.py", line 411, in <module>
    go()
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/dask_cuda/cli/dask_cuda_worker.py", line 407, in go
    main()
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/dask_cuda/cli/dask_cuda_worker.py", line 352, in main
    worker = CUDAWorker(
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/dask_cuda/cuda_worker.py", line 200, in __init__
    self.nannies = [
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/dask_cuda/cuda_worker.py", line 201, in <listcomp>
    Nanny(
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/distributed/nanny.py", line 167, in __init__
    os.makedirs(self.local_directory, exist_ok=True)
  File "/opt/conda/envs/ucx/lib/python3.9/os.py", line 225, in makedirs
    mkdir(name, mode)
PermissionError: [Errno 13] Permission denied: '/tmp-2531002-0'

@wence-
Copy link
Contributor

wence- commented Jul 4, 2022

I think you can control this via the --local-directory argument to dask-cuda-worker. The relevant config slot seems to be dask.config["temporary-directory"], So I think export DASK_TEMPORARY_DIRECTORY=$TEMPDIR should also work.

Note that if you're pointing at a shared filesystem you should also say --shared-filesystem to dask-cuda-worker.

@lastephey
Copy link
Author

Thank you! Actually the problem was that we had set --local-directory in job.sh, which I had forgotten about. (Accidental confirmation that the CLI setting overrides the environment variable setting.) Sorry about that.

These edits to job.sh:

export TEMPDIR=/tmp/tmp-${SLURM_JOBID}-${SLURM_PROCID}
mkdir -p ${TEMPDIR}
export COMMON_ARGS="--protocol ${PROTOCOL} \
       --interface hsn0 \
       --scheduler-file ${SCHED_FILE}"
export PROTOCOL_ARGS=""
export WORKER_ARGS="--local-directory ${TEMPDIR} \
       --multiprocessing-method forkserver"

seem to do what I need.

If I understand the docs, it sounds like --shared-filesystem only applies when it can be shared by all workers, and since I'm pointing to the /tmp memory per-node, I'm going to turn this off.

I think this fixes my issue. I have some minor environment issues to fix (still need dask-cudf in the image), but once that's done, I'll kick off some benchmarking runs.

@wence-
Copy link
Contributor

wence- commented Jul 5, 2022

Hi @lastephey, great. Just to note that we've merged the benchmark updates in #940, so no need to be on any strange branches. I've updated the job scripts to match. We now have to pass --shutdown-external-cluster-on-exit to close the dask cluster when the benchmark script finishes, and can produce JSON-based output for analysis with --output-basename ....

Let us know if the runs work for you, and if we can go ahead and close this particular issue.

@lastephey
Copy link
Author

Great, thank you. Let me rebuild the image with your updates in #940 and verify that it scales. Then yes I am happy to close.

@lastephey
Copy link
Author

Ok, I think everything seems to be running up to at least 64 nodes now! You can find some early results here. I'll keep adding as I go. If you ever want to test yourself, running in the container should require only a handful of path changes.

I see some errors in the slurm-<jobid>.out files along the lines of:

2022-07-05 13:43:47,958 - distributed.batched - INFO - Batched Comm Closed <UCX (closed) Worker->Scheduler local= remote=ucx://10.249.17.13:8786>
Traceback (most recent call last):
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/distributed/batched.py", line 94, in _background_send
    nbytes = yield self.comm.write(
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/distributed/utils.py", line 777, in wrapper
    return await func(*args, **kwargs)
  File "/opt/conda/envs/ucx/lib/python3.9/site-packages/distributed/comm/ucx.py", line 250, in write
    raise CommClosedError("Endpoint is closed -- unable to send message")
distributed.comm.core.CommClosedError: Endpoint is closed -- unable to send message

but I think it's just the cluster complaining a bit while it shuts down. If those messages look normal/expected to you, please feel free to close this.

Thank you very much for all your help and work on this @wence- and @quasiben!

@wence-
Copy link
Contributor

wence- commented Jul 6, 2022

but I think it's just the cluster complaining a bit while it shuts down. If those messages look normal/expected to you, please feel free to close this.

Yes, these are expected (although it would be nice if they didn't appear at all)

@wence-
Copy link
Contributor

wence- commented Jul 6, 2022

Closing, thanks for your patience and help @lastephey

@wence- wence- closed this as completed Jul 6, 2022
@pentschev pentschev mentioned this issue Jul 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants