Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Fix device memory spilling with cuDF #65

Merged
merged 23 commits into from
Jun 21, 2019

Conversation

pentschev
Copy link
Member

Solves #57

@mrocklin
Copy link
Contributor

cc @VibhuJawa

except ImportError:
_device_instances = []
return (hasattr(obj, "__cuda_array_interface__") or
any([isinstance(obj, inst) for inst in _device_instances]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Importing cudf is non-trivially expensive (a few seconds). Also, it would be nice not to have to special case libraries like this. I wonder if there is another signal we can use in cases like these. cc @kkraus14

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think though, that testing that cudf objects register as device objects would be very helpful. Same with lists of cudf objects (and other objects like cupy arrays and numba device arrays).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder. I agree that we would be better off not testing specifically for cudf objects, but currently there's no clean way of doing it. I've commented on this before as well #65 (comment). For now, I'm checking those with __module__ to avoid importing cudf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we extend __cuda_array_interface__ with the mask attribute we'll be able to handle cudf.Series nicely, but cudf.DataFrame will still need to be special cased.

Given we'd need to special case any non-array-like GPU objects for this, i.e. an XGBoost DMatrix backed by device memory, it would be good if we could design a generalized approach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A generalized approach would be great, but AFAIK, there's no error-free way to check whether an object is backed by device memory or not. Do you know of any such way @kkraus14?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

He might be suggesting something like a type registry or dispatch function, maybe similar to how we handle the sizeof function today.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea. I've added that in the latest commit.

if isinstance(obj, list) or isinstance(obj, tuple):
return any([_is_device_object(o) for o in obj])
else:
return _is_device_object(obj)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe roll this logic into _is_device_object directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since _is_device_object is not just a single condition now, I'd rather keep it separate than duplicating that code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps some light recursion:

def is_device_object(obj):
    if isinstance(obj, (list, tuple)):
        return any(map(is_device_object, obj))
    elif ...

@pentschev
Copy link
Member Author

Importing cudf is non-trivially expensive (a few seconds). Also, it would be nice not to have to special case libraries like this. I wonder if there is another signal we can use in cases like these.

Yeah, I actually overlooked that, thanks for pointing it out. I checked on side channels whether we have a good way of doing this, but we currently don't. What we can check is for the existence of the as_gpu_matrix function via hasattr, but this isn't guaranteed to be exclusive to cuDF, and that's why I tried something more certain.

@pentschev
Copy link
Member Author

I do think though, that testing that cudf objects register as device objects would be very helpful. Same with lists of cudf objects (and other objects like cupy arrays and numba device arrays).

I agree, I'm working on a test, I forgot to mark this as [WIP], doing it now.

@pentschev pentschev changed the title Fix device memory spilling with cuDF [WIP] Fix device memory spilling with cuDF May 31, 2019
@VibhuJawa
Copy link
Member

VibhuJawa commented Jun 1, 2019

So, I tried this with a slightly bigger example and I am getting the below error in the worker logs. The computation seems to be paused.

distributed.worker - ERROR - [1] Call to cuMemcpyHtoD results in CUDA_ERROR_INVALID_VALUE Traceback (most recent call last): File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/worker.py", line 2037, in release_key if key in self.data and key not in self.dep_state: File "/conda/envs/rapids/lib/python3.7/_collections_abc.py", line 666, in __contains__ self[key] File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/device_host_file.py", line 111, in __getitem__ self.device_buffer[key] = _deserialize_if_device(obj) File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/device_host_file.py", line 47, in _deserialize_if_device return deserialize_bytes(obj) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 392, in deserialize_bytes return deserialize(header, frames) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 190, in deserialize return loads(header, frames) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads return pickle.loads(b"".join(frames)) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads return pickle.loads(x) File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0.8.0a1+348.g2a7237c.dirty-py3.7-linux-x86_64.egg/cudf/dataframe/buffer.py", line 40, in __init__ self.mem = cudautils.to_device(mem) File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0.8.0a1+348.g2a7237c.dirty-py3.7-linux-x86_64.egg/cudf/utils/cudautils.py", line 22, in to_device dary, _ = rmm.auto_device(ary) File "/conda/envs/rapids/lib/python3.7/site-packages/librmm_cffi-0.8.0-py3.7.egg/librmm_cffi/wrapper.py", line 268, in auto_device devobj.copy_to_device(obj, stream=stream) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/devices.py", line 212, in _require_cuda_context return fn(*args, **kws) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/devicearray.py", line 198, in copy_to_device _driver.host_to_device(self, ary_core, self.alloc_size, stream=stream) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 1838, in host_to_device fn(device_pointer(dst), host_pointer(src, readonly=True), size, *varargs) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 293, in safe_cuda_api_call self._check_error(fname, retcode) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 328, in _check_error raise CudaAPIError(retcode, msg) numba.cuda.cudadrv.driver.CudaAPIError: [1] Call to cuMemcpyHtoD results in CUDA_ERROR_INVALID_VALUE

distributed.worker - ERROR - [1] Call to cuMemcpyHtoD results in CUDA_ERROR_INVALID_VALUE Traceback (most recent call last): File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/worker.py", line 2037, in release_key if key in self.data and key not in self.dep_state: File "/conda/envs/rapids/lib/python3.7/_collections_abc.py", line 666, in __contains__ self[key] File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/device_host_file.py", line 111, in __getitem__ self.device_buffer[key] = _deserialize_if_device(obj) File "/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/device_host_file.py", line 47, in _deserialize_if_device return deserialize_bytes(obj) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 392, in deserialize_bytes return deserialize(header, frames) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 190, in deserialize return loads(header, frames) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads return pickle.loads(b"".join(frames)) File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads return pickle.loads(x) File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0.8.0a1+348.g2a7237c.dirty-py3.7-linux-x86_64.egg/cudf/dataframe/buffer.py", line 40, in __init__ self.mem = cudautils.to_device(mem) File "/conda/envs/rapids/lib/python3.7/site-packages/cudf-0.8.0a1+348.g2a7237c.dirty-py3.7-linux-x86_64.egg/cudf/utils/cudautils.py", line 22, in to_device dary, _ = rmm.auto_device(ary) File "/conda/envs/rapids/lib/python3.7/site-packages/librmm_cffi-0.8.0-py3.7.egg/librmm_cffi/wrapper.py", line 268, in auto_device devobj.copy_to_device(obj, stream=stream) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/devices.py", line 212, in _require_cuda_context return fn(*args, **kws) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/devicearray.py", line 198, in copy_to_device _driver.host_to_device(self, ary_core, self.alloc_size, stream=stream) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 1838, in host_to_device fn(device_pointer(dst), host_pointer(src, readonly=True), size, *varargs) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 293, in safe_cuda_api_call self._check_error(fname, retcode) File "/conda/envs/rapids/lib/python3.7/site-packages/numba/cuda/cudadrv/driver.py", line 328, in _check_error raise CudaAPIError(retcode, msg) numba.cuda.cudadrv.driver.CudaAPIError: [1] Call to cuMemcpyHtoD results in CUDA_ERROR_INVALID_VALUE

distributed.worker - ERROR - '_compare_frame-b40800fb-3765-4e87-a046-9f64170b5039' Traceback (most recent call last): File "/conda/envs/rapids/lib/python3.7/site-packages/distributed/worker.py", line 2123, in release_dep if self.task_state[key] != "memory": KeyError: '_compare_frame-b40800fb-3765-4e87-a046-9f64170b5039'

distributed.worker - ERROR - Key not ready to send to worker, _compare_frame-b40800fb-3765-4e87-a046-9f64170b5039: memory

distributed.worker - ERROR - Key not ready to send to worker, _compare_frame-da2444cf-fa27-45da-9aea-2f3851393073: memory

Can you, @pentschev confirm that it worked in the example in #57 ?

@pentschev
Copy link
Member Author

@VibhuJawa yes, your example was my debugging and test case, and it works now with the changes in this PR. I've also tested with more devices, a few different device_memory_limit values, and both RAPIDS 0.7 and nightly, and all of them worked for me. Could you try running that on your side as well and confirming if that works?

@VibhuJawa
Copy link
Member

VibhuJawa commented Jun 3, 2019

@VibhuJawa yes, your example was my debugging and test case, and it works now with the changes in this PR. I've also tested with more devices, a few different device_memory_limit values, and both RAPIDS 0.7 and nightly, and all of them worked for me. Could you try running that on your side as well and confirming if that works?

@pentschev . Can confirm, The posted example works.

Unrelated, I am getting the below error when i set the memory_limit (host).

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import cudf, dask_cudf

# Use dask-cuda to start one worker per GPU on a single-node system
# When you shutdown this notebook kernel, the Dask cluster also shuts down.
cluster = LocalCUDACluster(ip='0.0.0.0',n_workers=1, device_memory_limit='10000 MiB',memory_limit='16000 MiB')
client = Client(cluster)
# # print client info
print(client)

# Code to simulate_data

def generate_file(output_file,rows=100):
    with open(output_file, 'wb') as f:
        f.write(b'A,B,C,D,E,F,G,H,I,J,K\n')
        f.write(b'22,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n23,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n'*(rows//2))
        f.close()

# generate the test file 
output_file='test.csv'

# reading it using dask_cudf
df = dask_cudf.read_csv(output_file,chunksize='100 MiB')
print(len(df))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-1-7c819d29785c> in <module>
     23 # reading it using dask_cudf
     24 df = dask_cudf.read_csv(output_file,chunksize='100 MiB')
---> 25 print(len(df))

/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in __len__(self)
    455     def __len__(self):
    456         return self.reduction(len, np.sum, token='len', meta=int,
--> 457                               split_every=False).compute()
    458 
    459     def __bool__(self):

/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2566                     should_rejoin = False
   2567             try:
-> 2568                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2569             finally:
   2570                 for f in futures.values():

/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1820                 direct=direct,
   1821                 local_worker=local_worker,
-> 1822                 asynchronous=asynchronous,
   1823             )
   1824 

/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    751             return future
    752         else:
--> 753             return sync(self.loop, func, *args, **kwargs)
    754 
    755     def __repr__(self):

/conda/envs/rapids/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    329             e.wait(10)
    330     if error[0]:
--> 331         six.reraise(*error[0])
    332     else:
    333         return result[0]

/conda/envs/rapids/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/conda/envs/rapids/lib/python3.7/site-packages/distributed/utils.py in f()
    314             if timeout is not None:
    315                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 316             result[0] = yield future
    317         except Exception as exc:
    318             error[0] = sys.exc_info()

/conda/envs/rapids/lib/python3.7/site-packages/tornado/gen.py in run(self)
    727 
    728                     try:
--> 729                         value = future.result()
    730                     except Exception:
    731                         exc_info = sys.exc_info()

/conda/envs/rapids/lib/python3.7/site-packages/tornado/gen.py in run(self)
    734                     if exc_info is not None:
    735                         try:
--> 736                             yielded = self.gen.throw(*exc_info)  # type: ignore
    737                         finally:
    738                             # Break up a reference to itself

/conda/envs/rapids/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1651                             six.reraise(CancelledError, CancelledError(key), None)
   1652                         else:
-> 1653                             six.reraise(type(exception), exception, traceback)
   1654                     if errors == "skip":
   1655                         bad_keys.add(key)

/conda/envs/rapids/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/conda/envs/rapids/lib/python3.7/site-packages/dask_cuda-0.0.0.dev0-py3.7.egg/dask_cuda/device_host_file.py in __setitem__()

/conda/envs/rapids/lib/python3.7/site-packages/zict/buffer.py in __setitem__()
     75         weight = self.weight(key, value)
     76         # Avoid useless movement for heavy values
---> 77         if self.weight(key, value) <= self.n:
     78             if key in self.slow:
     79                 del self.slow[key]

TypeError: '<=' not supported between instances of 'int' and 'str'

</details>

Edit: Put long trace in details.

@pentschev
Copy link
Member Author

@VibhuJawa thanks for reporting that. Certainly something small with the parsing/storing memory_limit. In the meantime, if you need to continue with your tests, you can use device_memory=int(16e9) (which you probably know already, my apologies if you do).

@VibhuJawa
Copy link
Member

VibhuJawa commented Jun 3, 2019

Thanks for that.

I think the previous error (#65 (comment)) was due to a large chunk-size i was using. Will try to create a minimal example that i can post. (I need larger chunksizes to make sorting more efficient)

The previous run was on private data so cant share that here.

@beckernick
Copy link
Member

cc @galipremsagar for visibility as well

@pentschev
Copy link
Member Author

This number for host memory limit is quite low. Just importing a few libraries like Pandas and a few other libraries can get us close to this point.

Yes, I was trying to shorten test time by running on smaller data, but I think that's not really useful if it doesn't work, due to such small limits. Maybe increasing that alone will fix the issues.

@pentschev
Copy link
Member Author

@mrocklin I managed to tackle the timeout/memory limit issues that were causing failures. This is now good for a review.

@pentschev pentschev changed the title [WIP] Fix device memory spilling with cuDF [REVIEW] Fix device memory spilling with cuDF Jun 12, 2019
@gen_cluster(
client=True,
ncores=[("127.0.0.1", 1)],
Worker=Worker,
timeout=300,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long does this test usually take? Five minutes seems like a very long time. Is this timeout still necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of the tests here run in under 60 seconds, but the default isn't enough for some of them, in particular there was one of the CuPy tests that I've seen failing non-deterministically. Since we're not concerned about the timeout in itself for the tests here, IMHO, it's better to have longer timeouts than having some tests failing now and then. In other words, the 300 seconds becomes more of a trigger to avoid CI from hanging forever in an eventual failure, but also prevents it from failing when it just took a while longer due to some unexpected slowness of the CI system.

yield client.run(worker_assert, nbytes, 32, 2048 + part_index_nbytes)

host_chunks = yield client.run(lambda: len(get_worker().data.host))
disk_chunks = yield client.run(lambda: len(get_worker().data.disk))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not important, but because you're using normal Workers here rather than Nannys, have direct access to the workers here. You can look at worker.data.host and worker.data.disk directly, without using client.run.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I prefer to call client.run here to make sure that works and to make all tests consistent, I've added some more client.run now. Perhaps we can generalize that in the future to a single assertion function.

if isinstance(obj, list) or isinstance(obj, tuple):
return any([_is_device_object(o) for o in obj])
else:
return _is_device_object(obj)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

except ImportError:
_device_instances = []
return (hasattr(obj, "__cuda_array_interface__") or
any([isinstance(obj, inst) for inst in _device_instances]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

@galipremsagar
Copy link
Contributor

galipremsagar commented Jun 14, 2019

@pentschev

This PR does work in case of the snippet which @VibhuJawa posted in this link: #57 (comment)

But below code is causing an out of memory exception and not controlling spilling over of memory.

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import cudf, dask_cudf

cluster = LocalCUDACluster(ip='0.0.0.0',n_workers=1, device_memory_limit='10000 MiB')
client = Client(cluster)

print(client)


# Code to simulate_data

def generate_file(output_file,rows=100):
    with open(output_file, 'wb') as f:
        f.write(b'A,B,C,D,E,F,G,H,I,J,K\n')
        f.write(b'127.0.0.1,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n121.1.2.4,697,56,0.0,0.0,0.0,0.0,0.0,0.0,0,0\n'*(rows//2))
        f.close()

# generate the test file 


output_file='test.csv'
# Uncomment below
generate_file(output_file,rows=100_000_000_0)

# reading it using dask_cudf


df = dask_cudf.read_csv(output_file,chunksize='100 MiB')
print(df.head(10).to_pandas())


# converting all IPs to integer(long) values
def to_long(df):
    gpu_strings = df['A'].data
    df['int_ips'] = gpu_strings.ip2int()
    return df


long_df = df.map_partitions(to_long)
x = long_df.persist()
wait(x)

@pentschev
Copy link
Member Author

@galipremsagar thanks for reminding me of that case. This is a discussion @VibhuJawa and I had on side channels, I reposted it in #57 (comment) for visibility.

@pentschev
Copy link
Member Author

For the case @galipremsagar asked about, unfortunately, this can't be handled from the dask-cuda side alone, and will likely require changes to cuDF in the future. The only solution for the time being is to reduce chunksize.

@pentschev
Copy link
Member Author

Regardless of the issues with specific pipelines, this PR solves the general cuDF cases where the memory is exposed on the Python side. Therefore, @mrocklin please review so we can get it merged before the code freeze tomorrow!

pentschev and others added 2 commits June 21, 2019 10:05
@pentschev
Copy link
Member Author

Thanks @kkraus14 for the review!

@mrocklin
Copy link
Contributor

This seems fine to me

@pentschev
Copy link
Member Author

Thanks for the review @mrocklin, merging.

@pentschev pentschev merged commit 5cffd41 into rapidsai:branch-0.8 Jun 21, 2019
@pentschev pentschev deleted the fix-cudf-device-memory-spill branch September 9, 2019 08:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants