Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing ERA5 (project native6) leads to huge dask graphs #2375

Closed
schlunma opened this issue Mar 19, 2024 · 19 comments
Closed

Processing ERA5 (project native6) leads to huge dask graphs #2375

schlunma opened this issue Mar 19, 2024 · 19 comments
Labels
dask related to improvements using Dask

Comments

@schlunma
Copy link
Contributor

The following very simple recipe leads to lots of dask tasks, which slows down the calculations a lot. This becomes especially bad if additional custom preprocessor steps are used.

documentation:
  description: Test
  authors:
    - schlund_manuel
  title: Test.

datasets:
  - {dataset: ERA5, project: native6, type: reanaly, version: v1, tier: 3}

diagnostics:
  test:
    scripts:
      null
    variables:
      ta:
        mip: Amon

These are the dask graph sizes after each preprocessing step:

load
after: 2107

fix_metadata
before: 2107
after: 10363

concatenate
before: 10363
after: 17587

cmor_check_metadata
before: 17587
after: 17587

fix_data
before: 17587
after: 17587

cmor_check_data
before: 17587
after: 17587

add_supplementary_variables
before: 17587
after: 17587

save
before: 17587

As you can see, fix_metadata and concatenate add ~7500 elements to the graph each. @ESMValGroup/technical-lead-development-team

@schlunma schlunma added the dask related to improvements using Dask label Mar 19, 2024
@schlunma schlunma changed the title Processing ERA5 (project native6) leads to lots of dask tasks Processing ERA5 (project native6) leads to hugh dask graphs Mar 19, 2024
@schlunma schlunma changed the title Processing ERA5 (project native6) leads to hugh dask graphs Processing ERA5 (project native6) leads to huge dask graphs Mar 19, 2024
@valeriupredoi
Copy link
Contributor

has the data changed? ie could it be that there are now a lot more storage (HDF5) chunks that Dask is struggling to distribute?

@valeriupredoi
Copy link
Contributor

or, conversely, iris IO has changed

@schlunma
Copy link
Contributor Author

No, I think nothing in particular has changed. I am pretty sure the problem exists since a long time, I only found some time now to investigate this further.

@valeriupredoi
Copy link
Contributor

aha - interesting, then maybe worth investigation ERA5's chunking, I have a hunch it's made up of really tiny HDF5 chunks 🍞

@schlunma
Copy link
Contributor Author

What's the straightforward way to check that? 😁

@valeriupredoi
Copy link
Contributor

if you install kerchunk from conda-forge, then you can use this utility to convert the ERA5 netCDF4 to a Zarr reference file, and you can then count the chunks - chunks are stored with indices like (x, y, z) depending on the number of dims:

import os
import ujson
import fsspec
from kerchunk.hdf import SingleHdf5ToZarr


def _correct_compressor_and_filename(content, varname, bryan_bucket=False):
    """
    Correct the compressor type as it comes out of Kerchunk (>=0.2.4; pinned).
    Also correct file name as Kerchnk now prefixes it with "s3://"
    and for special buckets like Bryan's bnl the correct file is bnl/file.nc
    not s3://bnl/file.nc
    """
    new_content = content.copy()

    # prelimniary assembly
    try:
        new_zarray =  ujson.loads(new_content['refs'][f"{varname}/.zarray"])
        group = False
    except KeyError:
        new_zarray =  ujson.loads(new_content['refs'][f"{varname} /{varname}/.zarray"])
        group = True

    # re-add the correct compressor if it's in the "filters" list
    if new_zarray["compressor"] is None and new_zarray["filters"]:
        for zfilter in new_zarray["filters"]:
            if zfilter["id"] == "zlib":
                new_zarray["compressor"] = zfilter
                new_zarray["filters"].remove(zfilter)

        if not group:
            new_content['refs'][f"{varname}/.zarray"] = ujson.dumps(new_zarray)
        else:
            new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray)

    return new_content


def create_kerchunks(file_url, varname, outf):
    """Translate a netCDF4/HDF5 to a Zarr file object."""
    fs = fsspec.filesystem('')
    with fs.open(file_url, 'rb') as local_file:
        try:
            h5chunks = SingleHdf5ToZarr(local_file, file_url,
                                        inline_threshold=0)
        except OSError as exc:
            raiser_1 = f"Unable to open file {file_url}. "
            raiser_2 = "Check if file is netCDF3 or netCDF-classic"
            print(raiser_1 + raiser_2)
            raise exc

        with fs.open(outf, 'wb') as f:
            content = h5chunks.translate()
            content = _correct_compressor_and_filename(content,
                                                       varname)
            f.write(ujson.dumps(content).encode())

    zarray =  ujson.loads(content['refs'][f"{varname}/.zarray"])
    zattrs =  ujson.loads(content['refs'][f"{varname}/.zattrs"])

    return outf, zarray, zattrs

file_url is just the full path to the file (provided you are on a local POSIX FS), varname is the name of the variable that needs looking at (short name, like tas) and outf is a JSON file you are writing to - anything, as long as you are extending it with a .json extensuion; or, you can point me to an ERA5 file and I can run PyActive on it, and investigate the chunks 🍺

@valeriupredoi
Copy link
Contributor

you don't even need to correct the compressor - that's just so the metadatda is nice when you write to the json file 👍

@schlunma
Copy link
Contributor Author

An example would here (on Levante): /work/bd0854/DATA/ESMValTool2/RAWOBS/Tier3/ERA5/v1/mon/ta/era5_temperature_1979_monthly.nc. Cheers V 🍻

@valeriupredoi
Copy link
Contributor

I'll run PyActive on that file tomorrow, bud - am very curious to see it's B-tree and chunks etc, am quite confident it's the chunking that's affecting Dask performance

@schlunma
Copy link
Contributor Author

Something that might be related to that: I'd tried to use iris' new CHUNK_CONTROL feature on that data and run into an error, see SciTools/iris#5864.

@bouweandela
Copy link
Member

17 000 shouldn't be too dramatic, at 1ms per item that would be 17s, but I can imagine that if you have a considerable further increase in the number of chunks coming from temporal statistics that are implemented using iris.cube.Cube.aggregated_by SciTools/iris#5455 you run into trouble.

I looked into this and found the following:

  • the input data consists of about 40 files of ~1GB stored as 16-bit integers with a 64-bit floating point scale factor and offset. This results in an array that is ~ 4GB and has 50 chunks, so 40 files with 50 chunks each explains the graph size of about 2000 after load.

  • fixing ERA5 does various things, each of which act on all chunks in the array:

    so this would result in the original 50 + 4 operations on 50 chunks = 250 per file after fix metadata, so 10 000 items in the graph for all 40 files

  • there is a rechunk step after this, applied here:

    item.data = item.core_data().rechunk()
    that leads to a further increase in the number of operations.

  • concatenate appears to increase the task graph size with roughly one more task per chunk

@valeriupredoi
Copy link
Contributor

OK gents, so here's some low level info about this nasty file:

  • to start with, it's a in NETCDF3 format - who writes data in NETCDF3 these days? This is not good
  • since it's NETCDF3 h5py, h5netcdf and Kerchunk can't read it, but alas
  • netCDF4 is compatible, and a very quick loader:
def open_with_netCDF4():
    input_file = "/work/bd0854/DATA/ESMValTool2/RAWOBS/Tier3/ERA5/v1/mon/ta/era5_temperature_1979_monthly.nc"
    with Dataset(input_file, "r") as fid:
        print(fid)
        data = fid['t']
        print(data.chunking())

gets us the info:

<class 'netCDF4._netCDF4.Dataset'>
root group (NETCDF3_64BIT_OFFSET data model, file format NETCDF3):
    Conventions: CF-1.6
    history: 2020-04-23 08:58:03 GMT by grib_to_netcdf-2.16.0: /opt/ecmwf/eccodes/bin/grib_to_netcdf -S param -o /cache/data9/adaptor.mars.internal-1587632239.0829659-3867-37-6f7c8649-12e9-4051-9c55-d5fe2fdaba2f.nc /cache/tmp/6f7c8649-12e9-4051-9c55-d5fe2fdaba2f-adaptor.mars.internal-1587632239.083458-3867-14-tmp.grib
    dimensions(sizes): longitude(1440), latitude(721), level(37), time(12)
    variables(dimensions): float32 longitude(longitude), float32 latitude(latitude), int32 level(level), int32 time(time), int16 t(time, level, latitude, longitude)
    groups: 
None

so the file is not storage (HDF5)-chunked, so it comes a big beautiful (12, 37, 721, 1440) shape contiguous dataset, uncompressed by the looks of it too - so no wonder Dask is having a hard time dealing with it - probably, as @bouweandela mentions, it tries to rechunk it (chunk it, in actuality)

Looking at its time of creation

// global attributes:
		:Conventions = "CF-1.6" ;
		:history = "2020-04-23 08:58:03 GMT by grib_to_netcdf-2.16.0: /opt/ecmwf/eccodes/bin/grib_to_netcdf -S param -o /cache/data9/adaptor.mars.internal-1587632239.0829659-3867-37-6f7c8649-12e9-4051-9c55-d5fe2fdaba2f.nc /cache/tmp/6f7c8649-12e9-4051-9c55-d5fe2fdaba2f-adaptor.mars.internal-1587632239.083458-3867-14-tmp.grib"

this tells me this was nc-ed from a GRIB file, why they used NETCDF3 standards is beyond me!

@malininae
Copy link
Contributor

@schlunma by any chance, can you also look at hourly? @Karen-A-Garcia used the cmorizer on the hourly ERA data and it was flying, but we've never looked at the graphs. As a heavy user of hourly data, that would be interesting to see.

@schlunma
Copy link
Contributor Author

schlunma commented Apr 2, 2024

@malininae did you use 3D data or 2D data? For me, reading and processing 2D data was not an issue, only the much bigger 3D variables were a problem.

@malininae
Copy link
Contributor

@malininae did you use 3D data or 2D data? For me, reading and processing 2D data was not an issue, only the much bigger 3D variables were a problem.

Well, on a year or two year scale it is fine, but I'm using like 84 years of hourly ERA5 data, which gets trickier 😰

@schlunma
Copy link
Contributor Author

schlunma commented Apr 3, 2024

But you were saying that you didn't have any problems with hourly data? Also for the 84 years?

Also, Bouwe's comment suggests that these graph sizes are to be expected, so I think we can close this issue.

@bouweandela
Copy link
Member

bouweandela commented Apr 3, 2024

In my (limited) experience, you can expect an overloaded scheduler (I think this is called 'scheduler contention' in the dask dashboard) when the task graph has a size of 1 million or more because at 1 ms per task, that would be 1000 seconds just to keep track of all the tasks and then the optimization of the graph also takes time of course.

@schlunma
Copy link
Contributor Author

schlunma commented Apr 3, 2024

Closing this, feel free to reopen if necessary.

@schlunma schlunma closed this as completed Apr 3, 2024
@valeriupredoi
Copy link
Contributor

at least this helped identify the issue in that iris issue eh - and now they have a fix 😁

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dask related to improvements using Dask
Projects
None yet
Development

No branches or pull requests

4 participants