-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Processing ERA5 (project native6) leads to huge dask graphs #2375
Comments
has the data changed? ie could it be that there are now a lot more storage (HDF5) chunks that Dask is struggling to distribute? |
or, conversely, iris IO has changed |
No, I think nothing in particular has changed. I am pretty sure the problem exists since a long time, I only found some time now to investigate this further. |
aha - interesting, then maybe worth investigation ERA5's chunking, I have a hunch it's made up of really tiny HDF5 chunks 🍞 |
What's the straightforward way to check that? 😁 |
if you install import os
import ujson
import fsspec
from kerchunk.hdf import SingleHdf5ToZarr
def _correct_compressor_and_filename(content, varname, bryan_bucket=False):
"""
Correct the compressor type as it comes out of Kerchunk (>=0.2.4; pinned).
Also correct file name as Kerchnk now prefixes it with "s3://"
and for special buckets like Bryan's bnl the correct file is bnl/file.nc
not s3://bnl/file.nc
"""
new_content = content.copy()
# prelimniary assembly
try:
new_zarray = ujson.loads(new_content['refs'][f"{varname}/.zarray"])
group = False
except KeyError:
new_zarray = ujson.loads(new_content['refs'][f"{varname} /{varname}/.zarray"])
group = True
# re-add the correct compressor if it's in the "filters" list
if new_zarray["compressor"] is None and new_zarray["filters"]:
for zfilter in new_zarray["filters"]:
if zfilter["id"] == "zlib":
new_zarray["compressor"] = zfilter
new_zarray["filters"].remove(zfilter)
if not group:
new_content['refs'][f"{varname}/.zarray"] = ujson.dumps(new_zarray)
else:
new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray)
return new_content
def create_kerchunks(file_url, varname, outf):
"""Translate a netCDF4/HDF5 to a Zarr file object."""
fs = fsspec.filesystem('')
with fs.open(file_url, 'rb') as local_file:
try:
h5chunks = SingleHdf5ToZarr(local_file, file_url,
inline_threshold=0)
except OSError as exc:
raiser_1 = f"Unable to open file {file_url}. "
raiser_2 = "Check if file is netCDF3 or netCDF-classic"
print(raiser_1 + raiser_2)
raise exc
with fs.open(outf, 'wb') as f:
content = h5chunks.translate()
content = _correct_compressor_and_filename(content,
varname)
f.write(ujson.dumps(content).encode())
zarray = ujson.loads(content['refs'][f"{varname}/.zarray"])
zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"])
return outf, zarray, zattrs
|
you don't even need to correct the compressor - that's just so the metadatda is nice when you write to the json file 👍 |
An example would here (on Levante): |
I'll run PyActive on that file tomorrow, bud - am very curious to see it's B-tree and chunks etc, am quite confident it's the chunking that's affecting Dask performance |
Something that might be related to that: I'd tried to use iris' new |
17 000 shouldn't be too dramatic, at 1ms per item that would be 17s, but I can imagine that if you have a considerable further increase in the number of chunks coming from temporal statistics that are implemented using I looked into this and found the following:
|
OK gents, so here's some low level info about this nasty file:
def open_with_netCDF4():
input_file = "/work/bd0854/DATA/ESMValTool2/RAWOBS/Tier3/ERA5/v1/mon/ta/era5_temperature_1979_monthly.nc"
with Dataset(input_file, "r") as fid:
print(fid)
data = fid['t']
print(data.chunking()) gets us the info:
so the file is not storage (HDF5)-chunked, so it comes a big beautiful (12, 37, 721, 1440) shape contiguous dataset, uncompressed by the looks of it too - so no wonder Dask is having a hard time dealing with it - probably, as @bouweandela mentions, it tries to rechunk it (chunk it, in actuality) Looking at its time of creation
this tells me this was nc-ed from a GRIB file, why they used NETCDF3 standards is beyond me! |
@schlunma by any chance, can you also look at hourly? @Karen-A-Garcia used the cmorizer on the hourly ERA data and it was flying, but we've never looked at the graphs. As a heavy user of hourly data, that would be interesting to see. |
@malininae did you use 3D data or 2D data? For me, reading and processing 2D data was not an issue, only the much bigger 3D variables were a problem. |
Well, on a year or two year scale it is fine, but I'm using like 84 years of hourly ERA5 data, which gets trickier 😰 |
But you were saying that you didn't have any problems with hourly data? Also for the 84 years? Also, Bouwe's comment suggests that these graph sizes are to be expected, so I think we can close this issue. |
In my (limited) experience, you can expect an overloaded scheduler (I think this is called 'scheduler contention' in the dask dashboard) when the task graph has a size of 1 million or more because at 1 ms per task, that would be 1000 seconds just to keep track of all the tasks and then the optimization of the graph also takes time of course. |
Closing this, feel free to reopen if necessary. |
at least this helped identify the issue in that iris issue eh - and now they have a fix 😁 |
The following very simple recipe leads to lots of dask tasks, which slows down the calculations a lot. This becomes especially bad if additional custom preprocessor steps are used.
These are the dask graph sizes after each preprocessing step:
As you can see,
fix_metadata
andconcatenate
add ~7500 elements to the graph each. @ESMValGroup/technical-lead-development-teamThe text was updated successfully, but these errors were encountered: