-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDF5 error when working with compressed NetCDF files and the dask multiprocessing scheduler #1836
Comments
This may be a limitation of multiprocessing with netCDF4. Can you try using dask's distributed scheduler? That might work better, even on a single machine. |
Thanks for the quick answer. The problem is that my actual use case also involves writing back a |
I tried the above example with the multiprocessing and distributed schedulers. With the multiprocessing scheduler, I can reproduce the error described above. With the distributed scheduler, no error is encountered. In [4]: import xarray as xr
...: import numpy as np
...: import dask.multiprocessing
...:
...: from dask.distributed import Client
...:
...: client = Client()
...: print(client)
...:
...: # Generate dummy data and build xarray dataset
...: mat = np.random.rand(10, 90, 90)
...: ds = xr.Dataset(data_vars={'foo': (('time', 'x', 'y'), mat)})
...:
...: # Write dataset to netcdf without compression
...: ds.to_netcdf('dummy_data_3d.nc')
...: # Write with zlib compersison
...: ds.to_netcdf('dummy_data_3d_with_compression.nc',
...: encoding={'foo': {'zlib': True}})
...: # Write data as int16 with scale factor applied
...: ds.to_netcdf('dummy_data_3d_with_scale_factor.nc',
...: encoding={'foo': {'dtype': 'int16',
...: 'scale_factor': 0.01,
...: '_FillValue': -9999}})
...:
...: # Load data from netCDF files
...: ds_vanilla = xr.open_dataset('dummy_data_3d.nc', chunks={'time': 1})
...: ds_scaled = xr.open_dataset('dummy_data_3d_with_scale_factor.nc', chunks={'time': 1})
...: ds_compressed = xr.open_dataset('dummy_data_3d_with_compression.nc', chunks={'time': 1})
...:
...: # Do computation using dask's multiprocessing scheduler
...: foo = ds_vanilla.foo.mean(dim=['x', 'y']).compute()
...: foo = ds_scaled.foo.mean(dim=['x', 'y']).compute()
...: foo = ds_compressed.foo.mean(dim=['x', 'y']).compute() I personally don't have any use cases that would prefer the multiprocessing scheduler over the distributed scheduler but I have been working on improving the I/O performance and stability with xarray and dask lately. If anyone would like to work on this, I'd gladly help this get cleaned up or put a more definitive no on whether or not this can/should work. |
Thanks @jhamman for looking into this. Currently I am fine with using In summary, I am fine with my current workaround. I do not think that solving this issue has a high priority, in particular when the distributed scheduler is further improved. The main annoyance was to track down the problem described in my first post. Hence, maybe the limitations of the schedulers could be described a bit better in the documentation. Would you want a PR on this? |
Any update to this? I got HDF error for both multiprocessing and distributed scheduler. |
Code Sample, a copy-pastable example if possible
Problem description
If NetCDF files are compressed (which is often the case) and opened with chunking enabled to use them with dask, computations using the multiprocessing scheduler fail. The above code shows this in a short example. The last line fails with a long HDF5 error log:
A possible workaround, if the dataset fits into memory, is to use
I could split up my dataset to accomplish this, but the beauty of xarray and dask gets lost a little when doing this...
Output of
xr.show_versions()
The text was updated successfully, but these errors were encountered: