diff --git a/README.md b/README.md index 72cfa844..ea6677c9 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,8 @@ currently packaged with adapters for pulling and converting `.grib` data from: - [ECMWF MARS API](https://apps.ecmwf.int/mars-catalogue) - [DWD's ICON Model from the Opendata API](https://opendata.dwd.de) - [CMC's GDPS Model from the Opendata API](https://dd.weather.gc.ca/) +- [NOAA's GFS Model from AWS Open Data](https://noaa-gfs-bdp-pds.s3.amazonaws.com) +- [NOAA's GFS Model from NCAR's Archive](https://rda.ucar.edu/datasets/ds084.1/) Similarly, the service can write to multiple sinks: diff --git a/src/nwp_consumer/internal/inputs/noaa/__init__.py b/src/nwp_consumer/internal/inputs/noaa/__init__.py new file mode 100644 index 00000000..c0ab0b44 --- /dev/null +++ b/src/nwp_consumer/internal/inputs/noaa/__init__.py @@ -0,0 +1,4 @@ +__all__ = ["AWSClient", "NCARClient"] + +from .aws import Client as AWSClient +from .ncar import Client as NCARClient \ No newline at end of file diff --git a/src/nwp_consumer/internal/inputs/noaa/_consts.py b/src/nwp_consumer/internal/inputs/noaa/_consts.py new file mode 100644 index 00000000..f03bfdf2 --- /dev/null +++ b/src/nwp_consumer/internal/inputs/noaa/_consts.py @@ -0,0 +1,8 @@ +"""Defines all parameters available from NOAA.""" + + +GFS_VARIABLES = ['siconc_surface_instant','slt_surface_instant','cape_surface_instant','t_surface_instant','sp_surface_instant','lsm_surface_instant','sr_surface_instant','vis_surface_instant','prate_surface_instant','acpcp_surface_accum','sde_surface_instant','cin_surface_instant','orog_surface_instant','tp_surface_accum','lhtfl_surface_avg','shtfl_surface_avg','crain_surface_instant','cfrzr_surface_instant','cicep_surface_instant','csnow_surface_instant','cprat_surface_instant','cpofp_surface_instant','pevpr_surface_instant','sdwe_surface_instant','uflx_surface_avg','vflx_surface_avg','gust_surface_instant','fricv_surface_instant','u-gwd_surface_avg','v-gwd_surface_avg','hpbl_surface_instant','dswrf_surface_avg','uswrf_surface_avg','dlwrf_surface_avg','ulwrf_surface_avg','lftx_surface_instant','4lftx_surface_instant','veg_surface_instant','watr_surface_accum','gflux_surface_avg','fco2rec_surface_instant','hindex_surface_instant','wilt_surface_instant','fldcp_surface_instant','al_surface_avg','SUNSD_surface_instant','prate_surface_avg','crain_surface_avg','cfrzr_surface_avg','cicep_surface_avg','csnow_surface_avg','cprat_surface_avg','pres_instant','q_instant','t_instant','u_instant','v_instant','u10_instant','v10_instant','t2m_instant','d2m_instant','tmax_max','tmin_min','sh2_instant','r2_instant','aptmp_instant','u100_instant','v100_instant','refd_instant','t','u','v','q','w','gh','r','absv','o3mr','wz','tcc','clwmr','icmr','rwmr','snmr','grle',] + +MISSING_STEP_0_VARIABLES = ['slt_surface_instant','sr_surface_instant','acpcp_surface_accum','tp_surface_accum','lhtfl_surface_avg','shtfl_surface_avg','cprat_surface_instant','pevpr_surface_instant','uflx_surface_avg','vflx_surface_avg','fricv_surface_instant','u-gwd_surface_avg','v-gwd_surface_avg','dswrf_surface_avg','uswrf_surface_avg','dlwrf_surface_avg','ulwrf_surface_avg','veg_surface_instant','watr_surface_accum','gflux_surface_avg','fco2rec_surface_instant','al_surface_avg','prate_surface_avg','crain_surface_avg','cfrzr_surface_avg','cicep_surface_avg','csnow_surface_avg','cprat_surface_avg','tmax_max','tmin_min','refd_instant','q',] + +EXTRA_STEP_0_VARIABLES = ["landn_surface_instant","5wavh"] diff --git a/src/nwp_consumer/internal/inputs/noaa/_models.py b/src/nwp_consumer/internal/inputs/noaa/_models.py new file mode 100644 index 00000000..997aa75f --- /dev/null +++ b/src/nwp_consumer/internal/inputs/noaa/_models.py @@ -0,0 +1,22 @@ +import datetime as dt + +from nwp_consumer import internal + + +class NOAAFileInfo(internal.FileInfoModel): + def __init__( + self, it: dt.datetime, filename: str, currentURL: str, step: int, + ) -> "NOAAFileInfo": + self._it = it + self._filename = filename + self._url = currentURL + self.step = step + + def filename(self) -> str: + return self._filename + + def filepath(self) -> str: + return self._url + "/" + self._filename + + def it(self) -> dt.datetime: + return self._it diff --git a/src/nwp_consumer/internal/inputs/noaa/aws.py b/src/nwp_consumer/internal/inputs/noaa/aws.py new file mode 100644 index 00000000..0a965e04 --- /dev/null +++ b/src/nwp_consumer/internal/inputs/noaa/aws.py @@ -0,0 +1,312 @@ +"""Implements a client to fetch NOAA data from AWS.""" +import bz2 +import datetime as dt +import pathlib +import re +import typing +import urllib.request + +import requests +import structlog +import xarray as xr +import cfgrib + +from nwp_consumer import internal + +from ._consts import GFS_VARIABLES +from ._models import NOAAFileInfo + +log = structlog.getLogger() + +# See https://www.nco.ncep.noaa.gov/pmb/products/gfs/gfs.t00z.pgrb2.0p25.f003.shtml for a list of NOAA GFS parameters +PARAMETER_RENAME_MAP: dict[str, str] = { + "t2m_instant": internal.OCFShortName.TemperatureAGL.value, + "tcc": internal.OCFShortName.HighCloudCover.value, + "dswrf_surface_avg": internal.OCFShortName.DownwardShortWaveRadiationFlux.value, + "dlwrf_surface_avg": internal.OCFShortName.DownwardLongWaveRadiationFlux.value, + "sdwe_surface_instant": internal.OCFShortName.SnowDepthWaterEquivalent.value, + "r": internal.OCFShortName.RelativeHumidityAGL.value, + "u10_instant": internal.OCFShortName.WindUComponentAGL.value, + "v10_instant": internal.OCFShortName.WindVComponentAGL.value, +} + +COORDINATE_ALLOW_LIST: typing.Sequence[str] = ("time", "step", "latitude", "longitude") + + +class Client(internal.FetcherInterface): + """Implements a client to fetch NOAA data from AWS.""" + + baseurl: str # The base URL for the NOAA model + model: str # The model to fetch data for + parameters: list[str] # The parameters to fetch + conform: bool # Whether to rename parameters to OCF names and clear unwanted coordinates + + def __init__(self, model: str, hours: int = 48, param_group: str = "default") -> None: + """Create a new NOAA Client. + + Exposes a client for NOAA data from AWS that conforms to the FetcherInterface. + + Args: + model: The model to fetch data for. Valid models is "global". + param_group: The set of parameters to fetch. + Valid groups are "default", "full", and "basic". + """ + self.baseurl = "https://noaa-gfs-bdp-pds.s3.amazonaws.com" + + match (param_group, model): + case ("default", _): + self.parameters = list(PARAMETER_RENAME_MAP.keys()) + self.conform = True + case ("basic", "global"): + self.parameters = ["t2m_instant", "tcc",] + self.conform = True + case ("full", "global"): + self.parameters = GFS_VARIABLES + self.conform = False + case (_, _): + raise ValueError( + f"unknown parameter group {param_group}." + "Valid groups are 'default', 'full', 'basic'", + ) + + self.model = model + self.hours = hours + + def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoModel]: # noqa: D102 + + if it.hour not in [0, 6, 12, 18]: + return [] + + files: list[internal.FileInfoModel] = [] + + # Files are split per timestep + # And the url includes the time and init time + # https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.20201206/00/atmos/gfs.t00z.pgrb2.0p25.f000 + + # Fetch AWS webpage detailing the available files for the parameter + response = requests.get(f"{self.baseurl}/gfs.{it.strftime('%Y%m%d')}/{it.strftime('%H')}/", timeout=3) + + if response.status_code != 200: + log.warn( + event="error fetching filelisting webpage for parameter", + status=response.status_code, + url=response.url, + inittime=it.strftime("%Y-%m-%d %H:%M"), + ) + return [] + + # The webpage's HTML
contains a list of tags + # * Each tag has a href, most of which point to a file) + for line in response.text.splitlines(): + # Check if the line contains a href, if not, skip it + refmatch = re.search(pattern=r'href="(.+)">', string=line) + if refmatch is None: + continue + + # The href contains the name of a file - parse this into a FileInfo object + fi: NOAAFileInfo | None = None + # The baseurl has to have the time and init time added to it for GFS + fi = _parseAWSFilename( + name=refmatch.groups()[0], + baseurl=f"{self.baseurl}/gfs.{it.strftime('%Y%m%d')}/{it.strftime('%H')}", + match_aux=not self.conform, + it=it, + ) + # Ignore the file if it is not for today's date or has a step > 48 (when conforming) + if fi is None or (fi.step > self.hours and self.conform): + continue + + # Add the file to the list + files.append(fi) + + log.debug( + event="listed files for init time", + inittime=it.strftime("%Y-%m-%d %H:%M"), + url=response.url, + numfiles=len(files), + ) + + return files + + def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset: # noqa: D102 + log.debug(event="mapping raw file to xarray dataset", filepath=p.as_posix()) + + # Load the raw file as a dataset + try: + ds = cfgrib.open_datasets( + p.as_posix(), + ) + except Exception as e: + log.warn( + event="error converting raw file as dataset", + error=e, + filepath=p.as_posix(), + ) + return xr.Dataset() + + # Process all the parameters into a single file + ds = [ + d for d in ds if any(x in d.coords for x in ["surface", "heightAboveGround", "isobaricInhPa"]) + ] + + # Split into surface, heightAboveGround, and isobaricInhPa lists + surface = [d for d in ds if "surface" in d.coords] + heightAboveGround = [d for d in ds if "heightAboveGround" in d.coords] + isobaricInhPa = [d for d in ds if "isobaricInhPa" in d.coords] + + # Update name of each data variable based off the attribute GRIB_stepType + for i, d in enumerate(surface): + for variable in d.data_vars.keys(): + d = d.rename({variable: f"{variable}_surface_{d[f'{variable}'].attrs['GRIB_stepType']}"}) + surface[i] = d + for i, d in enumerate(heightAboveGround): + for variable in d.data_vars.keys(): + d = d.rename({variable: f"{variable}_{d[f'{variable}'].attrs['GRIB_stepType']}"}) + heightAboveGround[i] = d + + surface = xr.merge(surface) + # Drop unknown data variable + surface = surface.drop_vars("unknown_surface_instant", errors="ignore") + heightAboveGround = xr.merge(heightAboveGround) + isobaricInhPa = xr.merge(isobaricInhPa) + + ds = xr.merge([surface, heightAboveGround, isobaricInhPa]) + + # Only conform the dataset if requested (defaults to True) + if self.conform: + # Rename the parameters to the OCF names + # Drop variables that are not in the OCF list first + ds = ds.drop_vars( + names=[v for v in ds.data_vars if v not in PARAMETER_RENAME_MAP.keys()], + errors="ignore", + ) + # * Only do so if they exist in the dataset + for oldParamName, newParamName in PARAMETER_RENAME_MAP.items(): + if oldParamName in ds: + ds = ds.rename({oldParamName: newParamName}) + + # Delete unwanted coordinates + ds = ds.drop_vars( + names=[c for c in ds.coords if c not in COORDINATE_ALLOW_LIST], + errors="ignore", + ) + + # * Each chunk is a single time step + # Does not use teh "variable" dimension, as this makes a 86GiB dataset for a single timestamp + # Keeping variables separate keeps the dataset small enough to fit in memory + if self.conform: + ds = ( + ds.rename({"time": "init_time"}) + .expand_dims("init_time") + .expand_dims("step") + .to_array(dim="variable", name=f"NOAA_{self.model}".upper()) + .to_dataset() + .transpose("variable", "init_time", "step", ...) + .sortby("step") + .sortby("variable") + .chunk( + { + "init_time": 1, + "step": -1, + "variable": -1, + }, + ) + ) + else: + ds = ( + ds.rename({"time": "init_time"}) + .expand_dims("init_time") + .expand_dims("step") + .transpose("init_time", "step", ...) + .sortby("step") + .chunk( + { + "init_time": 1, + "step": -1, + }, + ) + ) + + return ds + + def downloadToTemp( # noqa: D102 + self, + *, + fi: internal.FileInfoModel, + ) -> tuple[internal.FileInfoModel, pathlib.Path]: + log.debug(event="requesting download of file", file=fi.filename(), path=fi.filepath()) + try: + response = urllib.request.urlopen(fi.filepath()) + except Exception as e: + log.warn( + event="error calling url for file", + url=fi.filepath(), + filename=fi.filename(), + error=e, + ) + return fi, pathlib.Path() + + if response.status != 200: + log.warn( + event="error downloading file", + status=response.status, + url=fi.filepath(), + filename=fi.filename(), + ) + return fi, pathlib.Path() + + # Extract the bz2 file when downloading + tfp: pathlib.Path = internal.TMP_DIR / fi.filename() + with open(tfp, "wb") as f: + f.write(response.read()) + + log.debug( + event="fetched all data from file", + filename=fi.filename(), + url=fi.filepath(), + filepath=tfp.as_posix(), + nbytes=tfp.stat().st_size, + ) + + return fi, tfp + + +def _parseAWSFilename( + name: str, + baseurl: str, + match_aux: bool = True, + match_main: bool = True, + it: dt.datetime | None = None, +) -> NOAAFileInfo | None: + """Parse a string of HTML into an NOAAFileInfo object, if it contains one. + + Args: + name: The name of the file to parse + baseurl: The base URL for the AWS NOAA model + """ + # Only 2 types of file, they contain all variables in it + # "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.20231206/06/atmos/gfs.t06z.pgrb2.0p25.f002" + # "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.20231206/06/atmos/gfs.t06z.pgrb2b.0p25.f002" + # Define the regex patterns to match the different types of file; X is step, L is level + mainRegex = r"gfs.t(\d{2})z.pgrb2.0p25.f(\d{3})" + # Auxiliary files have b appended to them + auxRegex = r"gfs.t(\d{2})z.pgrb2b.0p25.f(\d{3})" + itstring = paramstring = "" + stepstring = "000" + # Try to match the href to one of the regex patterns + mainmatch = re.search(pattern=mainRegex, string=name) + auxmatch = re.search(pattern=auxRegex, string=name) + + if mainmatch and match_main: + itstring, stepstring = mainmatch.groups() + elif auxmatch and match_aux: + itstring, stepstring = auxmatch.groups() + else: + return None + + return NOAAFileInfo( + it=it, + filename=name, + currentURL=f"{baseurl}/gfs.t{itstring}z.pgrb2.0p25.f{stepstring}", + step=int(stepstring), + ) diff --git a/src/nwp_consumer/internal/inputs/noaa/gfs.0p25.2023121906.f001.grib2 b/src/nwp_consumer/internal/inputs/noaa/gfs.0p25.2023121906.f001.grib2 new file mode 100644 index 00000000..d96cf9c3 Binary files /dev/null and b/src/nwp_consumer/internal/inputs/noaa/gfs.0p25.2023121906.f001.grib2 differ diff --git a/src/nwp_consumer/internal/inputs/noaa/gfs.t06z.pgrb2.0p25.f001 b/src/nwp_consumer/internal/inputs/noaa/gfs.t06z.pgrb2.0p25.f001 new file mode 100644 index 00000000..d96cf9c3 Binary files /dev/null and b/src/nwp_consumer/internal/inputs/noaa/gfs.t06z.pgrb2.0p25.f001 differ diff --git a/src/nwp_consumer/internal/inputs/noaa/ncar.py b/src/nwp_consumer/internal/inputs/noaa/ncar.py new file mode 100644 index 00000000..b78e972c --- /dev/null +++ b/src/nwp_consumer/internal/inputs/noaa/ncar.py @@ -0,0 +1,306 @@ +"""Implements a client to fetch NOAA data from NCAR.""" +import bz2 +import datetime as dt +import pathlib +import re +import typing +import urllib.request + +import requests +import structlog +import xarray as xr +import cfgrib + +from nwp_consumer import internal + +from ._consts import GFS_VARIABLES +from ._models import NOAAFileInfo + +log = structlog.getLogger() + +# See https://www.nco.ncep.noaa.gov/pmb/products/gfs/gfs.t00z.pgrb2.0p25.f003.shtml for a list of NOAA parameters +PARAMETER_RENAME_MAP: dict[str, str] = { + "t2m_instant": internal.OCFShortName.TemperatureAGL.value, + "tcc": internal.OCFShortName.HighCloudCover.value, + "dswrf_surface_avg": internal.OCFShortName.DownwardShortWaveRadiationFlux.value, + "dlwrf_surface_avg": internal.OCFShortName.DownwardLongWaveRadiationFlux.value, + "sdwe_surface_instant": internal.OCFShortName.SnowDepthWaterEquivalent.value, + "r": internal.OCFShortName.RelativeHumidityAGL.value, + "u10_instant": internal.OCFShortName.WindUComponentAGL.value, + "v10_instant": internal.OCFShortName.WindVComponentAGL.value, +} + +COORDINATE_ALLOW_LIST: typing.Sequence[str] = ("time", "step", "latitude", "longitude") + + +class Client(internal.FetcherInterface): + """Implements a client to fetch NOAA data from NCAR.""" + + baseurl: str # The base URL for the NOAA model + model: str # The model to fetch data for + parameters: list[str] # The parameters to fetch + conform: bool # Whether to rename parameters to OCF names and clear unwanted coordinates + + def __init__(self, model: str, hours: int = 48, param_group: str = "default") -> None: + """Create a new NOAA Client. + + Exposes a client for NOAA data from NCAR that conforms to the FetcherInterface. + + Args: + model: The model to fetch data for. Valid models are "global". + param_group: The set of parameters to fetch. + Valid groups are "default", "full", and "basic". + """ + self.baseurl = "https://data.rda.ucar.edu/ds084.1" + + match (param_group, model): + case ("default", _): + self.parameters = list(PARAMETER_RENAME_MAP.keys()) + self.conform = True + case ("basic", "global"): + self.parameters = ["t2m_instant", "tcc", ] + self.conform = True + case ("full", "global"): + self.parameters = GFS_VARIABLES + self.conform = False + case (_, _): + raise ValueError( + f"unknown parameter group {param_group}." + "Valid groups are 'default', 'full', 'basic'", + ) + + self.model = model + self.hours = hours + + def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoModel]: # noqa: D102 + + if it.hour not in [0, 6, 12, 18]: + return [] + + files: list[internal.FileInfoModel] = [] + + # Fetch NCAR webpage detailing the available files for the parameter + response = requests.get(f"{self.baseurl}/{it.strftime('%Y')}/{it.strftime('%Y%m%d')}/", timeout=3) + + if response.status_code != 200: + log.warn( + event="error fetching filelisting webpage for parameter", + status=response.status_code, + url=response.url, + inittime=it.strftime("%Y-%m-%d %H:%M"), + ) + return [] + + # The webpage's HTML contains a list of tags + # * Each tag has a href, most of which point to a file) + for line in response.text.splitlines(): + # Check if the line contains a href, if not, skip it + refmatch = re.search(pattern=r'href="(.+)">', string=line) + if refmatch is None: + continue + + # The href contains the name of a file - parse this into a FileInfo object + fi: NOAAFileInfo | None = None + # The baseurl has to have the time and init time added to it for GFS + fi = _parseNCARFilename( + name=refmatch.groups()[0], + baseurl=f"{self.baseurl}/{it.strftime('%Y')}/{it.strftime('%Y%m%d')}", + ) + # Ignore the file if it is not for today's date or has a step > 48 (when conforming) + if fi is None or (fi.step > self.hours and self.conform): + continue + + # Add the file to the list + files.append(fi) + + log.debug( + event="listed files for init time", + inittime=it.strftime("%Y-%m-%d %H:%M"), + url=response.url, + numfiles=len(files), + ) + + return files + + def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset: # noqa: D102 + if p.suffix != ".grib2": + log.warn( + event="cannot map non-grib file to dataset", + filepath=p.as_posix(), + ) + return xr.Dataset() + + log.debug(event="mapping raw file to xarray dataset", filepath=p.as_posix()) + + # Load the raw file as a dataset + try: + ds = cfgrib.open_datasets( + p.as_posix(), + ) + except Exception as e: + log.warn( + event="error converting raw file as dataset", + error=e, + filepath=p.as_posix(), + ) + return xr.Dataset() + + # Process all the parameters into a single file + ds = [ + d for d in ds if any(x in d.coords for x in ["surface", "heightAboveGround", "isobaricInhPa"]) + ] + + # Split into surface, heightAboveGround, and isobaricInhPa lists + surface = [d for d in ds if "surface" in d.coords] + heightAboveGround = [d for d in ds if "heightAboveGround" in d.coords] + isobaricInhPa = [d for d in ds if "isobaricInhPa" in d.coords] + + # Update name of each data variable based off the attribute GRIB_stepType + for i, d in enumerate(surface): + for variable in d.data_vars.keys(): + d = d.rename({variable: f"{variable}_surface_{d[f'{variable}'].attrs['GRIB_stepType']}"}) + surface[i] = d + for i, d in enumerate(heightAboveGround): + for variable in d.data_vars.keys(): + d = d.rename({variable: f"{variable}_{d[f'{variable}'].attrs['GRIB_stepType']}"}) + heightAboveGround[i] = d + + surface = xr.merge(surface) + # Drop unknown data variable + surface = surface.drop_vars("unknown_surface_instant", errors="ignore") + heightAboveGround = xr.merge(heightAboveGround) + isobaricInhPa = xr.merge(isobaricInhPa) + + ds = xr.merge([surface, heightAboveGround, isobaricInhPa]) + + # Only conform the dataset if requested (defaults to True) + if self.conform: + # Rename the parameters to the OCF names + # Drop variables that are not in the OCF list first + ds = ds.drop_vars( + names=[v for v in ds.data_vars if v not in PARAMETER_RENAME_MAP.keys()], + errors="ignore", + ) + # * Only do so if they exist in the dataset + for oldParamName, newParamName in PARAMETER_RENAME_MAP.items(): + if oldParamName in ds: + ds = ds.rename({oldParamName: newParamName}) + + # Delete unwanted coordinates + ds = ds.drop_vars( + names=[c for c in ds.coords if c not in COORDINATE_ALLOW_LIST], + errors="ignore", + ) + + # * Each chunk is a single time step + # Does not use teh "variable" dimension, as this makes a 86GiB dataset for a single timestamp + # Keeping variables separate keeps the dataset small enough to fit in memory + if self.conform: + ds = ( + ds.rename({"time": "init_time"}) + .expand_dims("init_time") + .expand_dims("step") + .to_array(dim="variable", name=f"NOAA_{self.model}".upper()) + .to_dataset() + .transpose("variable", "init_time", "step", ...) + .sortby("step") + .sortby("variable") + .chunk( + { + "init_time": 1, + "step": -1, + "variable": -1, + }, + ) + ) + else: + ds = ( + ds.rename({"time": "init_time"}) + .expand_dims("init_time") + .expand_dims("step") + .transpose("init_time", "step", ...) + .sortby("step") + .chunk( + { + "init_time": 1, + "step": -1, + }, + ) + ) + + return ds + + def downloadToTemp( # noqa: D102 + self, + *, + fi: internal.FileInfoModel, + ) -> tuple[internal.FileInfoModel, pathlib.Path]: + log.debug(event="requesting download of file", file=fi.filename(), path=fi.filepath()) + try: + response = urllib.request.urlopen(fi.filepath()) + except Exception as e: + log.warn( + event="error calling url for file", + url=fi.filepath(), + filename=fi.filename(), + error=e, + ) + return fi, pathlib.Path() + + if response.status != 200: + log.warn( + event="error downloading file", + status=response.status, + url=fi.filepath(), + filename=fi.filename(), + ) + return fi, pathlib.Path() + + # Extract the bz2 file when downloading + tfp: pathlib.Path = internal.TMP_DIR / fi.filename() + with open(tfp, "wb") as f: + f.write(response.read()) + + log.debug( + event="fetched all data from file", + filename=fi.filename(), + url=fi.filepath(), + filepath=tfp.as_posix(), + nbytes=tfp.stat().st_size, + ) + + return fi, tfp + + +def _parseNCARFilename( + name: str, + baseurl: str, + match_main: bool = True, +) -> NOAAFileInfo | None: + """Parse a string of HTML into an NOAAFileInfo object, if it contains one. + + Args: + name: The name of the file to parse + baseurl: The base URL for the NOAA NCAR model + """ + # Define the regex patterns to match the different types of file; X is step, L is level + mainRegex = r"gfs.0p25.(\d{10}).f(\d{3}).grib2" + # Auxiliary files have b appended to them + itstring = paramstring = "" + stepstring = "000" + # Try to match the href to one of the regex patterns + mainmatch = re.search(pattern=mainRegex, string=name) + + if mainmatch and match_main: + itstring, stepstring = mainmatch.groups() + else: + return None + + it = dt.datetime.strptime(itstring, "%Y%m%d%H").replace(tzinfo=dt.timezone.utc) + + return NOAAFileInfo( + it=it, + filename=name, + currentURL=f"{baseurl}/gfs.0p25.{it.strftime('%Y%m%d%H')}.f{stepstring}.grib2", + step=int(stepstring), + ) diff --git a/src/nwp_consumer/internal/inputs/noaa/test_aws.py b/src/nwp_consumer/internal/inputs/noaa/test_aws.py new file mode 100644 index 00000000..85b03c14 --- /dev/null +++ b/src/nwp_consumer/internal/inputs/noaa/test_aws.py @@ -0,0 +1,48 @@ +import datetime as dt +import pathlib +import unittest +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ._models import NOAAFileInfo + +from .aws import Client, _parseAWSFilename + +testClient = Client(model="global", param_group="full") + + +class TestClient(unittest.TestCase): + def test_mapTemp(self) -> None: + # Test with global file + testFilePath: pathlib.Path = ( + pathlib.Path(__file__).parent / "gfs.t06z.pgrb2.0p25.f001" + ) + out = testClient.mapTemp(p=testFilePath) + # Check latitude and longitude are injected + self.assertTrue("latitude" in out.coords) + self.assertTrue("longitude" in out.coords) + # Check that the dimensions are correctly ordered and renamed + self.assertEqual( + out[next(iter(out.data_vars.keys()))].dims, + ("init_time", "step", "latitude", "longitude"), + ) + self.assertEqual(len(out["latitude"].values), 721) + self.assertEqual(len(out["longitude"].values), 1440) + self.assertEqual(len(out["init_time"].values), 1) + self.assertEqual(len(out["step"].values), 1) + + +class TestParseIconFilename(unittest.TestCase): + baseurl = "https://noaa-gfs-bdp-pds.s3.amazonaws.com" + + def test_parsesSingleLevel(self) -> None: + filename: str = "gfs.t06z.pgrb2.0p25.f005" + it = dt.datetime(2020, 9, 1, 6, tzinfo=dt.timezone.utc) + out: NOAAFileInfo | None = _parseAWSFilename( + name=filename, + baseurl=f"{self.baseurl}/gfs.{it.strftime('%Y%m%d')}/{it.strftime('%H')}", + it=it, + ) + self.assertIsNotNone(out) + self.assertEqual(out.filename(), filename) + self.assertEqual(out.it(), dt.datetime(2020, 9, 1, 6, tzinfo=dt.timezone.utc)) diff --git a/src/nwp_consumer/internal/inputs/noaa/test_ncar.py b/src/nwp_consumer/internal/inputs/noaa/test_ncar.py new file mode 100644 index 00000000..8c17ec07 --- /dev/null +++ b/src/nwp_consumer/internal/inputs/noaa/test_ncar.py @@ -0,0 +1,47 @@ +import datetime as dt +import pathlib +import unittest +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ._models import NOAAFileInfo + +from .ncar import Client, _parseNCARFilename + +testClient = Client(model="global", param_group="full") + + +class TestClient(unittest.TestCase): + def test_mapTemp(self) -> None: + # Test with global file + testFilePath: pathlib.Path = ( + pathlib.Path(__file__).parent / "gfs.0p25.2023121906.f001.grib2" + ) + out = testClient.mapTemp(p=testFilePath) + # Check latitude and longitude are injected + self.assertTrue("latitude" in out.coords) + self.assertTrue("longitude" in out.coords) + # Check that the dimensions are correctly ordered and renamed + self.assertEqual( + out[next(iter(out.data_vars.keys()))].dims, + ("init_time", "step", "latitude", "longitude"), + ) + self.assertEqual(len(out["latitude"].values), 721) + self.assertEqual(len(out["longitude"].values), 1440) + self.assertEqual(len(out["init_time"].values), 1) + self.assertEqual(len(out["step"].values), 1) + + +class TestParseIconFilename(unittest.TestCase): + baseurl = "https://data.rda.ucar.edu/ds084.1" + + def test_parsesSingleLevel(self) -> None: + filename: str = "gfs.0p25.2016010300.f003.grib2" + it = dt.datetime(2016, 1, 3, 0, tzinfo=dt.timezone.utc) + out: NOAAFileInfo | None = _parseNCARFilename( + name=filename, + baseurl=f"{self.baseurl}/{it.strftime('%Y')}/{it.strftime('%Y%m%d')}", + ) + self.assertIsNotNone(out) + self.assertEqual(out.filename(), filename) + self.assertEqual(out.it(), dt.datetime(2016, 1, 3, 0, tzinfo=dt.timezone.utc)) diff --git a/src/nwp_consumer/internal/service/service.py b/src/nwp_consumer/internal/service/service.py index ee0cf01c..10f0f83e 100644 --- a/src/nwp_consumer/internal/service/service.py +++ b/src/nwp_consumer/internal/service/service.py @@ -330,16 +330,10 @@ def _saveAsTempZipZarr(ds: xr.Dataset) -> pathlib.Path: ) if tempZarrPath.exists(): tempZarrPath.unlink() - dataVar: str = next(iter(ds.data_vars.keys())) with zarr.ZipStore(path=tempZarrPath.as_posix(), mode="w") as store: ds.to_zarr( store=store, - encoding={ - "init_time": {"units": "nanoseconds since 1970-01-01"}, - dataVar: { - "compressor": Blosc2(cname="zstd", clevel=5), - }, - }, + encoding=_generate_encoding(ds=ds) ) return tempZarrPath @@ -353,19 +347,20 @@ def _saveAsTempRegularZarr(ds: xr.Dataset) -> pathlib.Path: ) if tempZarrPath.exists() and tempZarrPath.is_dir(): shutil.rmtree(tempZarrPath.as_posix()) - dataVar: str = next(iter(ds.data_vars.keys())) ds.to_zarr( store=tempZarrPath.as_posix(), - encoding={ - "init_time": {"units": "nanoseconds since 1970-01-01"}, - dataVar: { - "compressor": Blosc2(cname="zstd", clevel=5), - }, - }, + encoding=_generate_encoding(ds=ds), ) return tempZarrPath +def _generate_encoding(ds: xr.Dataset) -> dict[str, dict[str, str] | dict[str, Blosc2]]: + encoding = {"init_time": {"units": "nanoseconds since 1970-01-01"}} + for var in ds.data_vars: + encoding[var] = {"compressor": Blosc2(cname="zstd", clevel=5)} + return encoding + + def _dataQualityFilter(ds: xr.Dataset) -> bool: """Filter out data that is not of sufficient quality.""" if ds == xr.Dataset(): diff --git a/src/test_integration/test_inputs_integration.py b/src/test_integration/test_inputs_integration.py index 589ec1aa..7da05674 100644 --- a/src/test_integration/test_inputs_integration.py +++ b/src/test_integration/test_inputs_integration.py @@ -13,6 +13,7 @@ from nwp_consumer.internal.inputs.icon._models import IconFileInfo from nwp_consumer.internal.inputs.metoffice._models import MetOfficeFileInfo from nwp_consumer.internal.inputs.cmc._models import CMCFileInfo +from nwp_consumer.internal.inputs.noaa._models import NOAAFileInfo storageClient = outputs.localfs.Client() @@ -111,6 +112,41 @@ def test_downloadsRawGribFileFromCMC(self) -> None: self.assertTrue(tmpPath.name.endswith(".grib2")) self.assertGreater(tmpPath.stat().st_size, 40000) + def test_downloadsRawGribFileFromAWSNOAA(self) -> None: + noaaInitTime: dt.datetime = dt.datetime.now(tz=dt.UTC).replace( + hour=0, minute=0, second=0, microsecond=0, + ) + + noaaClient = inputs.noaa.aws.Client( + model="global", + param_group="basic", + ) + fileInfo = NOAAFileInfo( + it=noaaInitTime, + filename=f"gfs.t00z.pgrb2.0p25.f001", + currentURL=f"https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.{noaaInitTime.strftime('%Y%m%d')}/{noaaInitTime.strftime('%H')}", + step=1, + ) + _, tmpPath = noaaClient.downloadToTemp(fi=fileInfo) + self.assertGreater(tmpPath.stat().st_size, 40000) + + def test_downloadsRawGribFileFromNCARNOAA(self) -> None: + noaaInitTime: dt.datetime = dt.datetime.now(tz=dt.UTC).replace( + hour=0, minute=0, second=0, microsecond=0, + ) + + noaaClient = inputs.noaa.ncar.Client( + model="global", + param_group="basic", + ) + fileInfo = NOAAFileInfo( + it=noaaInitTime, + filename=f"gfs.0p25.2016010300.f003.grib2", + currentURL=f"https://data.rda.ucar.edu/ds084.1/2016/20160103", + step=3, + ) + _, tmpPath = noaaClient.downloadToTemp(fi=fileInfo) + self.assertGreater(tmpPath.stat().st_size, 40000) class TestListRawFilesForInitTime(unittest.TestCase): def test_getsFileInfosFromCEDA(self) -> None: @@ -192,5 +228,32 @@ def test_getsFileInfosFromCMC(self) -> None: self.assertTrue(len(gepsFileInfos) > 0) self.assertNotEqual(fileInfos, gepsFileInfos) + + def test_getsFileInfosFromAWSNOAA(self) -> None: + noaaInitTime: dt.datetime = dt.datetime.now(tz=dt.UTC).replace( + hour=0, minute=0, second=0, microsecond=0, + ) + noaaClient = inputs.noaa.aws.Client( + model="global", + hours=4, + param_group="basic", + ) + fileInfos = noaaClient.listRawFilesForInitTime(it=noaaInitTime) + self.assertTrue(len(fileInfos) > 0) + + def test_getsFileInfosFromNCARNOAA(self) -> None: + noaaInitTime: dt.datetime = dt.datetime.now(tz=dt.UTC).replace( + hour=0, minute=0, second=0, microsecond=0, + ) + noaaInitTime = noaaInitTime.replace(year=2017) + noaaClient = inputs.noaa.ncar.Client( + model="global", + hours=4, + param_group="basic", + ) + fileInfos = noaaClient.listRawFilesForInitTime(it=noaaInitTime) + self.assertTrue(len(fileInfos) > 0) + + if __name__ == "__main__": unittest.main()