-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[REVIEW] Fix device memory spilling with cuDF #65
[REVIEW] Fix device memory spilling with cuDF #65
Conversation
cc @VibhuJawa |
dask_cuda/device_host_file.py
Outdated
except ImportError: | ||
_device_instances = [] | ||
return (hasattr(obj, "__cuda_array_interface__") or | ||
any([isinstance(obj, inst) for inst in _device_instances])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Importing cudf
is non-trivially expensive (a few seconds). Also, it would be nice not to have to special case libraries like this. I wonder if there is another signal we can use in cases like these. cc @kkraus14
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think though, that testing that cudf objects register as device objects would be very helpful. Same with lists of cudf objects (and other objects like cupy arrays and numba device arrays).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder. I agree that we would be better off not testing specifically for cudf objects, but currently there's no clean way of doing it. I've commented on this before as well #65 (comment). For now, I'm checking those with __module__
to avoid importing cudf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we extend __cuda_array_interface__
with the mask attribute we'll be able to handle cudf.Series
nicely, but cudf.DataFrame
will still need to be special cased.
Given we'd need to special case any non-array-like GPU objects for this, i.e. an XGBoost DMatrix backed by device memory, it would be good if we could design a generalized approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A generalized approach would be great, but AFAIK, there's no error-free way to check whether an object is backed by device memory or not. Do you know of any such way @kkraus14?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
He might be suggesting something like a type registry or dispatch function, maybe similar to how we handle the sizeof
function today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea. I've added that in the latest commit.
dask_cuda/device_host_file.py
Outdated
if isinstance(obj, list) or isinstance(obj, tuple): | ||
return any([_is_device_object(o) for o in obj]) | ||
else: | ||
return _is_device_object(obj) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe roll this logic into _is_device_object
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since _is_device_object
is not just a single condition now, I'd rather keep it separate than duplicating that code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps some light recursion:
def is_device_object(obj):
if isinstance(obj, (list, tuple)):
return any(map(is_device_object, obj))
elif ...
Yeah, I actually overlooked that, thanks for pointing it out. I checked on side channels whether we have a good way of doing this, but we currently don't. What we can check is for the existence of the |
I agree, I'm working on a test, I forgot to mark this as [WIP], doing it now. |
So, I tried this with a slightly bigger example and I am getting the below error in the worker logs. The computation seems to be paused. distributed.worker - ERROR - [1] Call to cuMemcpyHtoD results in CUDA_ERROR_INVALID_VALUE Traceback (most recent call last): File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/worker.py", line 2037, in release_key if key in self.data and key not in self.dep_state: File "/conda/envs/rapids/lib/python3.7/_collections_abc.py", line 666, in __contains__ self[key] File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/device_host_file.py", line 111, in __getitem__ self.device_buffer[key] = _deserialize_if_device(obj) File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/device_host_file.py", line 47, in _deserialize_if_device return deserialize_bytes(obj) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 392, in deserialize_bytes return deserialize(header, frames) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 190, in deserialize return loads(header, frames) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads return pickle.loads(b"".join(frames)) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads return pickle.loads(x) File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0.8.0a1+348.g2a7237c.dirty-py3.7-linux-x86_64.egg/cudf/dataframe/buffer.py", line 40, in __init__ self.mem = cudautils.to_device(mem) File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0.8.0a1+348.g2a7237c.dirty-py3.7-linux-x86_64.egg/cudf/utils/cudautils.py", line 22, in to_device dary, _ = rmm.auto_device(ary) File "/conda/envs/rapids/lib/python3.7/site-packages/librmm_cffi-0.8.0-py3.7.egg/librmm_cffi/wrapper.py", line 268, in auto_device devobj.copy_to_device(obj, stream=stream) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/devices.py", line 212, in _require_cuda_context return fn(*args, **kws) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/devicearray.py", line 198, in copy_to_device _driver.host_to_device(self, ary_core, self.alloc_size, stream=stream) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 1838, in host_to_device fn(device_pointer(dst), host_pointer(src, readonly=True), size, *varargs) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 293, in safe_cuda_api_call self._check_error(fname, retcode) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 328, in _check_error raise CudaAPIError(retcode, msg) numba.cuda.cudadrv.driver.CudaAPIError: [1] Call to cuMemcpyHtoD results in CUDA_ERROR_INVALID_VALUE
distributed.worker - ERROR - [1] Call to cuMemcpyHtoD results in CUDA_ERROR_INVALID_VALUE Traceback (most recent call last): File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/worker.py", line 2037, in release_key if key in self.data and key not in self.dep_state: File "/conda/envs/rapids/lib/python3.7/_collections_abc.py", line 666, in __contains__ self[key] File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/device_host_file.py", line 111, in __getitem__ self.device_buffer[key] = _deserialize_if_device(obj) File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/device_host_file.py", line 47, in _deserialize_if_device return deserialize_bytes(obj) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 392, in deserialize_bytes return deserialize(header, frames) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 190, in deserialize return loads(header, frames) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads return pickle.loads(b"".join(frames)) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads return pickle.loads(x) File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0.8.0a1+348.g2a7237c.dirty-py3.7-linux-x86_64.egg/cudf/dataframe/buffer.py", line 40, in __init__ self.mem = cudautils.to_device(mem) File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0.8.0a1+348.g2a7237c.dirty-py3.7-linux-x86_64.egg/cudf/utils/cudautils.py", line 22, in to_device dary, _ = rmm.auto_device(ary) File "/conda/envs/rapids/lib/python3.7/site-packages/librmm_cffi-0.8.0-py3.7.egg/librmm_cffi/wrapper.py", line 268, in auto_device devobj.copy_to_device(obj, stream=stream) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/devices.py", line 212, in _require_cuda_context return fn(*args, **kws) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/devicearray.py", line 198, in copy_to_device _driver.host_to_device(self, ary_core, self.alloc_size, stream=stream) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 1838, in host_to_device fn(device_pointer(dst), host_pointer(src, readonly=True), size, *varargs) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 293, in safe_cuda_api_call self._check_error(fname, retcode) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 328, in _check_error raise CudaAPIError(retcode, msg) numba.cuda.cudadrv.driver.CudaAPIError: [1] Call to cuMemcpyHtoD results in CUDA_ERROR_INVALID_VALUE
distributed.worker - ERROR - '_compare_frame-b40800fb-3765-4e87-a046-9f64170b5039' Traceback (most recent call last): File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/worker.py", line 2123, in release_dep if self.task_state[key] != "memory": KeyError: '_compare_frame-b40800fb-3765-4e87-a046-9f64170b5039'
distributed.worker - ERROR - Key not ready to send to worker, _compare_frame-b40800fb-3765-4e87-a046-9f64170b5039: memory
distributed.worker - ERROR - Key not ready to send to worker, _compare_frame-da2444cf-fa27-45da-9aea-2f3851393073: memory Can you, @pentschev confirm that it worked in the example in #57 ? |
@VibhuJawa yes, your example was my debugging and test case, and it works now with the changes in this PR. I've also tested with more devices, a few different |
@pentschev . Can confirm, The posted example works. Unrelated, I am getting the below error when i set the memory_limit (host). from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import cudf, dask_cudf
# Use dask-cuda to start one worker per GPU on a single-node system
# When you shutdown this notebook kernel, the Dask cluster also shuts down.
cluster = LocalCUDACluster(ip='0.0.0.0',n_workers=1, device_memory_limit='10000 MiB',memory_limit='16000 MiB')
client = Client(cluster)
# # print client info
print(client)
# Code to simulate_data
def generate_file(output_file,rows=100):
with open(output_file, 'wb') as f:
f.write(b'A,B,C,D,E,F,G,H,I,J,K\n')
f.write(b'22,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n23,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n'*(rows//2))
f.close()
# generate the test file
output_file='test.csv'
# reading it using dask_cudf
df = dask_cudf.read_csv(output_file,chunksize='100 MiB')
print(len(df)) ---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-1-7c819d29785c> in <module>
23 # reading it using dask_cudf
24 df = dask_cudf.read_csv(output_file,chunksize='100 MiB')
---> 25 print(len(df))
/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in __len__(self)
455 def __len__(self):
456 return self.reduction(len, np.sum, token='len', meta=int,
--> 457 split_every=False).compute()
458
459 def __bool__(self):
/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2566 should_rejoin = False
2567 try:
-> 2568 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2569 finally:
2570 for f in futures.values():
/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1820 direct=direct,
1821 local_worker=local_worker,
-> 1822 asynchronous=asynchronous,
1823 )
1824
/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
751 return future
752 else:
--> 753 return sync(self.loop, func, *args, **kwargs)
754
755 def __repr__(self):
/conda/envs/rapids/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
329 e.wait(10)
330 if error[0]:
--> 331 six.reraise(*error[0])
332 else:
333 return result[0]
/conda/envs/rapids/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
/conda/envs/rapids/lib/python3.7/site-packages/distributed/utils.py in f()
314 if timeout is not None:
315 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 316 result[0] = yield future
317 except Exception as exc:
318 error[0] = sys.exc_info()
/conda/envs/rapids/lib/python3.7/site-packages/tornado/gen.py in run(self)
727
728 try:
--> 729 value = future.result()
730 except Exception:
731 exc_info = sys.exc_info()
/conda/envs/rapids/lib/python3.7/site-packages/tornado/gen.py in run(self)
734 if exc_info is not None:
735 try:
--> 736 yielded = self.gen.throw(*exc_info) # type: ignore
737 finally:
738 # Break up a reference to itself
/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1651 six.reraise(CancelledError, CancelledError(key), None)
1652 else:
-> 1653 six.reraise(type(exception), exception, traceback)
1654 if errors == "skip":
1655 bad_keys.add(key)
/conda/envs/rapids/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
690 value = tp()
691 if value.__traceback__ is not tb:
--> 692 raise value.with_traceback(tb)
693 raise value
694 finally:
/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/device_host_file.py in __setitem__()
/conda/envs/rapids/lib/python3.7/site-packages/zict/buffer.py in __setitem__()
75 weight = self.weight(key, value)
76 # Avoid useless movement for heavy values
---> 77 if self.weight(key, value) <= self.n:
78 if key in self.slow:
79 del self.slow[key]
TypeError: '<=' not supported between instances of 'int' and 'str'
</details>
Edit: Put long trace in details. |
@VibhuJawa thanks for reporting that. Certainly something small with the parsing/storing |
Thanks for that. I think the previous error (#65 (comment)) was due to a large chunk-size i was using. Will try to create a minimal example that i can post. (I need larger chunksizes to make sorting more efficient) The previous run was on private data so cant share that here. |
cc @galipremsagar for visibility as well |
Yes, I was trying to shorten test time by running on smaller data, but I think that's not really useful if it doesn't work, due to such small limits. Maybe increasing that alone will fix the issues. |
@mrocklin I managed to tackle the timeout/memory limit issues that were causing failures. This is now good for a review. |
@gen_cluster( | ||
client=True, | ||
ncores=[("127.0.0.1", 1)], | ||
Worker=Worker, | ||
timeout=300, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How long does this test usually take? Five minutes seems like a very long time. Is this timeout still necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think most of the tests here run in under 60 seconds, but the default isn't enough for some of them, in particular there was one of the CuPy tests that I've seen failing non-deterministically. Since we're not concerned about the timeout in itself for the tests here, IMHO, it's better to have longer timeouts than having some tests failing now and then. In other words, the 300 seconds becomes more of a trigger to avoid CI from hanging forever in an eventual failure, but also prevents it from failing when it just took a while longer due to some unexpected slowness of the CI system.
yield client.run(worker_assert, nbytes, 32, 2048 + part_index_nbytes) | ||
|
||
host_chunks = yield client.run(lambda: len(get_worker().data.host)) | ||
disk_chunks = yield client.run(lambda: len(get_worker().data.disk)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not important, but because you're using normal Worker
s here rather than Nanny
s, have direct access to the workers here. You can look at worker.data.host
and worker.data.disk
directly, without using client.run
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but I prefer to call client.run
here to make sure that works and to make all tests consistent, I've added some more client.run
now. Perhaps we can generalize that in the future to a single assertion function.
dask_cuda/device_host_file.py
Outdated
if isinstance(obj, list) or isinstance(obj, tuple): | ||
return any([_is_device_object(o) for o in obj]) | ||
else: | ||
return _is_device_object(obj) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping
dask_cuda/device_host_file.py
Outdated
except ImportError: | ||
_device_instances = [] | ||
return (hasattr(obj, "__cuda_array_interface__") or | ||
any([isinstance(obj, inst) for inst in _device_instances])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping
This PR does work in case of the snippet which @VibhuJawa posted in this link: #57 (comment) But below code is causing an out of memory exception and not controlling spilling over of memory. from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import cudf, dask_cudf
cluster = LocalCUDACluster(ip='0.0.0.0',n_workers=1, device_memory_limit='10000 MiB')
client = Client(cluster)
print(client)
# Code to simulate_data
def generate_file(output_file,rows=100):
with open(output_file, 'wb') as f:
f.write(b'A,B,C,D,E,F,G,H,I,J,K\n')
f.write(b'127.0.0.1,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n121.1.2.4,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n'*(rows//2))
f.close()
# generate the test file
output_file='test.csv'
# Uncomment below
generate_file(output_file,rows=100_000_000_0)
# reading it using dask_cudf
df = dask_cudf.read_csv(output_file,chunksize='100 MiB')
print(df.head(10).to_pandas())
# converting all IPs to integer(long) values
def to_long(df):
gpu_strings = df['A'].data
df['int_ips'] = gpu_strings.ip2int()
return df
long_df = df.map_partitions(to_long)
x = long_df.persist()
wait(x) |
@galipremsagar thanks for reminding me of that case. This is a discussion @VibhuJawa and I had on side channels, I reposted it in #57 (comment) for visibility. |
For the case @galipremsagar asked about, unfortunately, this can't be handled from the dask-cuda side alone, and will likely require changes to cuDF in the future. The only solution for the time being is to reduce |
Regardless of the issues with specific pipelines, this PR solves the general cuDF cases where the memory is exposed on the Python side. Therefore, @mrocklin please review so we can get it merged before the code freeze tomorrow! |
Co-Authored-By: Keith Kraus <[email protected]>
Co-Authored-By: Keith Kraus <[email protected]>
Thanks @kkraus14 for the review! |
…to fix-cudf-device-memory-spill
This seems fine to me |
Thanks for the review @mrocklin, merging. |
Solves #57