Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDF5 error when working with compressed NetCDF files and the dask multiprocessing scheduler #1836

Open
cchwala opened this issue Jan 17, 2018 · 5 comments

Comments

@cchwala
Copy link
Contributor

cchwala commented Jan 17, 2018

Code Sample, a copy-pastable example if possible

import xarray as xr
import numpy as np
import dask.multiprocessing

# Generate dummy data and build xarray dataset
mat = np.random.rand(10, 90, 90)
ds = xr.Dataset(data_vars={'foo': (('time', 'x', 'y'), mat)})

# Write dataset to netcdf without compression
ds.to_netcdf('dummy_data_3d.nc')
# Write with zlib compersison
ds.to_netcdf('dummy_data_3d_with_compression.nc', 
             encoding={'foo': {'zlib': True}})
# Write data as int16 with scale factor applied
ds.to_netcdf('dummy_data_3d_with_scale_factor.nc', 
             encoding={'foo': {'dtype': 'int16',
                               'scale_factor': 0.01,
                               '_FillValue': -9999}})

# Load data from netCDF files
ds_vanilla = xr.open_dataset('dummy_data_3d.nc', chunks={'time': 1})
ds_scaled = xr.open_dataset('dummy_data_3d_with_scale_factor.nc', chunks={'time': 1})
ds_compressed = xr.open_dataset('dummy_data_3d_with_compression.nc', chunks={'time': 1})

# Do computation using dask's multiprocessing scheduler
foo = ds_vanilla.foo.mean(dim=['x', 'y']).compute(get=dask.multiprocessing.get)
foo = ds_scaled.foo.mean(dim=['x', 'y']).compute(get=dask.multiprocessing.get)
foo = ds_compressed.foo.mean(dim=['x', 'y']).compute(get=dask.multiprocessing.get)
# The last line fails

Problem description

If NetCDF files are compressed (which is often the case) and opened with chunking enabled to use them with dask, computations using the multiprocessing scheduler fail. The above code shows this in a short example. The last line fails with a long HDF5 error log:

HDF5-DIAG: Error detected in HDF5 (1.10.1) thread 140736213758912:
  #000: H5Dio.c line 171 in H5Dread(): can't read data
    major: Dataset
    minor: Read failed
  #001: H5Dio.c line 544 in H5D__read(): can't read data
    major: Dataset
    minor: Read failed
  #002: H5Dchunk.c line 2022 in H5D__chunk_read(): error looking up chunk address
    major: Dataset
    minor: Can't get value
  #003: H5Dchunk.c line 2768 in H5D__chunk_lookup(): can't query chunk address
    major: Dataset
    minor: Can't get value
  #004: H5Dbtree.c line 1047 in H5D__btree_idx_get_addr(): can't get chunk info
    major: Dataset
    minor: Can't get value
  #005: H5B.c line 341 in H5B_find(): unable to load B-tree node
    major: B-Tree node
    minor: Unable to protect metadata
  #006: H5AC.c line 1763 in H5AC_protect(): H5C_protect() failed
    major: Object cache
    minor: Unable to protect metadata
  #007: H5C.c line 2561 in H5C_protect(): can't load entry
    major: Object cache
    minor: Unable to load metadata into cache
  #008: H5C.c line 6877 in H5C_load_entry(): Can't deserialize image
    major: Object cache
    minor: Unable to load metadata into cache
  #009: H5Bcache.c line 181 in H5B__cache_deserialize(): wrong B-tree signature
    major: B-Tree node
    minor: Bad value
Traceback (most recent call last):
  File "hdf5_bug_minimal_working_example.py", line 27, in <module>
    foo = ds_compressed.foo.mean(dim=['x', 'y']).compute(get=dask.multiprocessing.get)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/xarray/core/dataarray.py", line 658, in compute
    return new.load(**kwargs)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/xarray/core/dataarray.py", line 632, in load
    ds = self._to_temp_dataset().load(**kwargs)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/xarray/core/dataset.py", line 491, in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/dask/base.py", line 333, in compute
    results = get(dsk, keys, **kwargs)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/dask/multiprocessing.py", line 177, in get
    raise_exception=reraise, **kwargs)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/dask/local.py", line 521, in get_async
    raise_exception(exc, tb)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/dask/local.py", line 290, in execute_task
    result = _execute_task(task, data)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/dask/local.py", line 270, in _execute_task
    args2 = [_execute_task(a, cache) for a in args]
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/dask/local.py", line 270, in _execute_task
    args2 = [_execute_task(a, cache) for a in args]
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/dask/local.py", line 267, in _execute_task
    return [_execute_task(a, cache) for a in arg]
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/dask/local.py", line 271, in _execute_task
    return func(*args2)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 72, in getter
    c = np.asarray(c)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 531, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/xarray/core/indexing.py", line 538, in __array__
    return np.asarray(self.array, dtype=dtype)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 531, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/xarray/core/indexing.py", line 505, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/Users/chwala-c/miniconda2/lib/python2.7/site-packages/xarray/backends/netCDF4_.py", line 61, in __getitem__
    data = getitem(self.get_array(), key)
  File "netCDF4/_netCDF4.pyx", line 3961, in netCDF4._netCDF4.Variable.__getitem__
  File "netCDF4/_netCDF4.pyx", line 4798, in netCDF4._netCDF4.Variable._get
  File "netCDF4/_netCDF4.pyx", line 1638, in netCDF4._netCDF4._ensure_nc_success
RuntimeError: NetCDF: HDF error

A possible workaround, if the dataset fits into memory, is to use

ds = ds.persist()

I could split up my dataset to accomplish this, but the beauty of xarray and dask gets lost a little when doing this...

Output of xr.show_versions()

INSTALLED VERSIONS
------------------
commit: None
python: 2.7.14.final.0
python-bits: 64
OS: Darwin
OS-release: 16.7.0
machine: x86_64
processor: i386
byteorder: little
LC_ALL: None
LANG: de_DE.UTF-8
LOCALE: None.None

xarray: 0.10.0
pandas: 0.21.0
numpy: 1.13.3
scipy: 1.0.0
netCDF4: 1.3.1
h5netcdf: 0.5.0
Nio: None
bottleneck: 1.2.1
cyordereddict: 1.0.0
dask: 0.16.0
matplotlib: 2.1.0
cartopy: None
seaborn: 0.8.1
setuptools: 36.7.2
pip: 9.0.1
conda: 4.3.29
pytest: 3.2.5
IPython: 5.5.0
sphinx: None
@shoyer
Copy link
Member

shoyer commented Jan 17, 2018

This may be a limitation of multiprocessing with netCDF4. Can you try using dask's distributed scheduler? That might work better, even on a single machine.

@cchwala
Copy link
Contributor Author

cchwala commented Jan 17, 2018

Thanks for the quick answer.

The problem is that my actual use case also involves writing back a xarray.Dataset via to_netcdf(). I left this out of the example above to isolate the problem. With the distributed scheduler and to_netcdf(), I ran into this issue #1464. As I can see, this might be fixed "soon" (#1793).

@jhamman
Copy link
Member

jhamman commented Jan 30, 2018

I tried the above example with the multiprocessing and distributed schedulers. With the multiprocessing scheduler, I can reproduce the error described above. With the distributed scheduler, no error is encountered.

In [4]: import xarray as xr
   ...: import numpy as np
   ...: import dask.multiprocessing
   ...:
   ...: from dask.distributed import Client
   ...:
   ...: client = Client()
   ...: print(client)
   ...:
   ...: # Generate dummy data and build xarray dataset
   ...: mat = np.random.rand(10, 90, 90)
   ...: ds = xr.Dataset(data_vars={'foo': (('time', 'x', 'y'), mat)})
   ...:
   ...: # Write dataset to netcdf without compression
   ...: ds.to_netcdf('dummy_data_3d.nc')
   ...: # Write with zlib compersison
   ...: ds.to_netcdf('dummy_data_3d_with_compression.nc',
   ...:              encoding={'foo': {'zlib': True}})
   ...: # Write data as int16 with scale factor applied
   ...: ds.to_netcdf('dummy_data_3d_with_scale_factor.nc',
   ...:              encoding={'foo': {'dtype': 'int16',
   ...:                                'scale_factor': 0.01,
   ...:                                '_FillValue': -9999}})
   ...:
   ...: # Load data from netCDF files
   ...: ds_vanilla = xr.open_dataset('dummy_data_3d.nc', chunks={'time': 1})
   ...: ds_scaled = xr.open_dataset('dummy_data_3d_with_scale_factor.nc', chunks={'time': 1})
   ...: ds_compressed = xr.open_dataset('dummy_data_3d_with_compression.nc', chunks={'time': 1})
   ...:
   ...: # Do computation using dask's multiprocessing scheduler
   ...: foo = ds_vanilla.foo.mean(dim=['x', 'y']).compute()
   ...: foo = ds_scaled.foo.mean(dim=['x', 'y']).compute()
   ...: foo = ds_compressed.foo.mean(dim=['x', 'y']).compute()

I personally don't have any use cases that would prefer the multiprocessing scheduler over the distributed scheduler but I have been working on improving the I/O performance and stability with xarray and dask lately. If anyone would like to work on this, I'd gladly help this get cleaned up or put a more definitive no on whether or not this can/should work.

@cchwala
Copy link
Contributor Author

cchwala commented Jan 30, 2018

Thanks @jhamman for looking into this.

Currently I am fine with using persist() since I can break down my analysis workflow to certain time periods for which data fits into RAM on a large machine. As I have written, the distributed scheduler failed for me because of #1464. But I would like to use it in the future. From other discussions on the dask schedulers (here or on SO) using the distributed scheduler seems to be a general recommendation anyway.

In summary, I am fine with my current workaround. I do not think that solving this issue has a high priority, in particular when the distributed scheduler is further improved. The main annoyance was to track down the problem described in my first post. Hence, maybe the limitations of the schedulers could be described a bit better in the documentation. Would you want a PR on this?

@smartlixx
Copy link
Contributor

Any update to this? I got HDF error for both multiprocessing and distributed scheduler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants