Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for NOAA GFS AWS/NCAR Model Support #78

Merged
merged 22 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ currently packaged with adapters for pulling and converting `.grib` data from:
- [ECMWF MARS API](https://apps.ecmwf.int/mars-catalogue)
- [DWD's ICON Model from the Opendata API](https://opendata.dwd.de)
- [CMC's GDPS Model from the Opendata API](https://dd.weather.gc.ca/)
- [NOAA's GFS Model from AWS Open Data](https://noaa-gfs-bdp-pds.s3.amazonaws.com)
- [NOAA's GFS Model from NCAR's Archive](https://rda.ucar.edu/datasets/ds084.1/)

Similarly, the service can write to multiple sinks:

Expand Down
4 changes: 4 additions & 0 deletions src/nwp_consumer/internal/inputs/noaa/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
__all__ = ["AWSClient", "NCARClient"]

from .aws import Client as AWSClient
from .ncar import Client as NCARClient
4 changes: 4 additions & 0 deletions src/nwp_consumer/internal/inputs/noaa/_consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Defines all parameters available from icon."""
jacobbieker marked this conversation as resolved.
Show resolved Hide resolved


GFS_VARIABLES = ['siconc_surface_instant','slt_surface_instant','cape_surface_instant','t_surface_instant','sp_surface_instant','lsm_surface_instant','sr_surface_instant','vis_surface_instant','prate_surface_instant','acpcp_surface_accum','sde_surface_instant','cin_surface_instant','orog_surface_instant','tp_surface_accum','lhtfl_surface_avg','shtfl_surface_avg','crain_surface_instant','cfrzr_surface_instant','cicep_surface_instant','csnow_surface_instant','cprat_surface_instant','cpofp_surface_instant','pevpr_surface_instant','sdwe_surface_instant','uflx_surface_avg','vflx_surface_avg','gust_surface_instant','fricv_surface_instant','u-gwd_surface_avg','v-gwd_surface_avg','hpbl_surface_instant','dswrf_surface_avg','uswrf_surface_avg','dlwrf_surface_avg','ulwrf_surface_avg','lftx_surface_instant','4lftx_surface_instant','veg_surface_instant','watr_surface_accum','gflux_surface_avg','fco2rec_surface_instant','hindex_surface_instant','wilt_surface_instant','fldcp_surface_instant','al_surface_avg','SUNSD_surface_instant','prate_surface_avg','crain_surface_avg','cfrzr_surface_avg','cicep_surface_avg','csnow_surface_avg','cprat_surface_avg','pres_instant','q_instant','t_instant','u_instant','v_instant','u10_instant','v10_instant','t2m_instant','d2m_instant','tmax_max','tmin_min','sh2_instant','r2_instant','aptmp_instant','u100_instant','v100_instant','refd_instant','t','u','v','q','w','gh','r','absv','o3mr','wz','tcc','clwmr','icmr','rwmr','snmr','grle',]
26 changes: 26 additions & 0 deletions src/nwp_consumer/internal/inputs/noaa/_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import datetime as dt

from nwp_consumer import internal


class NOAAFileInfo(internal.FileInfoModel):
def __init__(
self, it: dt.datetime, filename: str, currentURL: str, step: int,
) -> "NOAAFileInfo":
self._it = it
# The name of the file when stored by the storer. We decompress from bz2
# at download time, so we don't want that extension on the filename.
self._filename = filename
self._url = currentURL
self.step = step

def filename(self) -> str:
return self._filename

def filepath(self) -> str:
# The filename in the fully-qualified filepath still has the .bz2 extension
# so add it back in
return self._url + "/" + self._filename

def it(self) -> dt.datetime:
return self._it
312 changes: 312 additions & 0 deletions src/nwp_consumer/internal/inputs/noaa/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
"""Implements a client to fetch ICON data from DWD."""
import bz2
import datetime as dt
import pathlib
import re
import typing
import urllib.request

import requests
import structlog
import xarray as xr
import cfgrib

from nwp_consumer import internal

from ._consts import GFS_VARIABLES
from ._models import NOAAFileInfo

log = structlog.getLogger()

# See https://d-nb.info/1081305452/34 for a list of ICON parameters
PARAMETER_RENAME_MAP: dict[str, str] = {
"t2m_instant": internal.OCFShortName.TemperatureAGL.value,
"tcc": internal.OCFShortName.HighCloudCover.value,
"dswrf_surface_avg": internal.OCFShortName.DownwardShortWaveRadiationFlux.value,
"dlwrf_surface_avg": internal.OCFShortName.DownwardLongWaveRadiationFlux.value,
"sdwe_surface_instant": internal.OCFShortName.SnowDepthWaterEquivalent.value,
"r": internal.OCFShortName.RelativeHumidityAGL.value,
"u10_instant": internal.OCFShortName.WindUComponentAGL.value,
"v10_instant": internal.OCFShortName.WindVComponentAGL.value,
}

COORDINATE_ALLOW_LIST: typing.Sequence[str] = ("time", "step", "latitude", "longitude")


class Client(internal.FetcherInterface):
"""Implements a client to fetch ICON data from DWD."""

baseurl: str # The base URL for the ICON model
model: str # The model to fetch data for
parameters: list[str] # The parameters to fetch
conform: bool # Whether to rename parameters to OCF names and clear unwanted coordinates

def __init__(self, model: str, hours: int = 48, param_group: str = "default") -> None:
"""Create a new Icon Client.
jacobbieker marked this conversation as resolved.
Show resolved Hide resolved

Exposes a client for ICON data from DWD that conforms to the FetcherInterface.

Args:
model: The model to fetch data for. Valid models are "europe" and "global".
param_group: The set of parameters to fetch.
Valid groups are "default", "full", and "basic".
"""
self.baseurl = "https://noaa-gfs-bdp-pds.s3.amazonaws.com"

match (param_group, model):
case ("default", _):
self.parameters = list(PARAMETER_RENAME_MAP.keys())
self.conform = True
case ("basic", "global"):
self.parameters = ["t2m_instant", "tcc",]
self.conform = True
case ("full", "global"):
jacobbieker marked this conversation as resolved.
Show resolved Hide resolved
self.parameters = GFS_VARIABLES
self.conform = False
case (_, _):
raise ValueError(
f"unknown parameter group {param_group}."
"Valid groups are 'default', 'full', 'basic'",
)

self.model = model
self.hours = hours

def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoModel]: # noqa: D102

if it.hour not in [0, 6, 12, 18]:
return []

files: list[internal.FileInfoModel] = []

# Files are split per timestep
# And the url includes the time and init time
# https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.20201206/00/atmos/gfs.t00z.pgrb2.0p25.f000

# Fetch AWS webpage detailing the available files for the parameter
response = requests.get(f"{self.baseurl}/gfs.{it.strftime('%Y%m%d')}/{it.strftime('%H')}/", timeout=3)

if response.status_code != 200:
log.warn(
event="error fetching filelisting webpage for parameter",
status=response.status_code,
url=response.url,
inittime=it.strftime("%Y-%m-%d %H:%M"),
)
return []

# The webpage's HTML <body> contains a list of <a> tags
# * Each <a> tag has a href, most of which point to a file)
for line in response.text.splitlines():
# Check if the line contains a href, if not, skip it
refmatch = re.search(pattern=r'href="(.+)">', string=line)
if refmatch is None:
continue

# The href contains the name of a file - parse this into a FileInfo object
fi: NOAAFileInfo | None = None
# The baseurl has to have the time and init time added to it for GFS
fi = _parseAWSFilename(
name=refmatch.groups()[0],
baseurl=f"{self.baseurl}/gfs.{it.strftime('%Y%m%d')}/{it.strftime('%H')}",
match_aux=not self.conform,
it=it,
)
# Ignore the file if it is not for today's date or has a step > 48 (when conforming)
if fi is None or (fi.step > self.hours and self.conform):
continue

# Add the file to the list
files.append(fi)

log.debug(
event="listed files for init time",
inittime=it.strftime("%Y-%m-%d %H:%M"),
url=response.url,
numfiles=len(files),
)

return files

def mapTemp(self, *, p: pathlib.Path) -> xr.Dataset: # noqa: D102
log.debug(event="mapping raw file to xarray dataset", filepath=p.as_posix())

# Load the raw file as a dataset
try:
ds = cfgrib.open_datasets(
p.as_posix(),
)
except Exception as e:
log.warn(
event="error converting raw file as dataset",
error=e,
filepath=p.as_posix(),
)
return xr.Dataset()

# Process all the parameters into a single file
ds = [
d for d in ds if any(x in d.coords for x in ["surface", "heightAboveGround", "isobaricInhPa"])
]

# Split into surface, heightAboveGround, and isobaricInhPa lists
surface = [d for d in ds if "surface" in d.coords]
heightAboveGround = [d for d in ds if "heightAboveGround" in d.coords]
isobaricInhPa = [d for d in ds if "isobaricInhPa" in d.coords]

# Update name of each data variable based off the attribute GRIB_stepType
for i, d in enumerate(surface):
for variable in d.data_vars.keys():
d = d.rename({variable: f"{variable}_surface_{d[f'{variable}'].attrs['GRIB_stepType']}"})
surface[i] = d
for i, d in enumerate(heightAboveGround):
for variable in d.data_vars.keys():
d = d.rename({variable: f"{variable}_{d[f'{variable}'].attrs['GRIB_stepType']}"})
heightAboveGround[i] = d

surface = xr.merge(surface)
# Drop unknown data variable
surface = surface.drop_vars("unknown_surface_instant", errors="ignore")
heightAboveGround = xr.merge(heightAboveGround)
isobaricInhPa = xr.merge(isobaricInhPa)

ds = xr.merge([surface, heightAboveGround, isobaricInhPa])

# Only conform the dataset if requested (defaults to True)
if self.conform:
# Rename the parameters to the OCF names
# Drop variables that are not in the OCF list first
ds = ds.drop_vars(
names=[v for v in ds.data_vars if v not in PARAMETER_RENAME_MAP.keys()],
errors="ignore",
)
# * Only do so if they exist in the dataset
for oldParamName, newParamName in PARAMETER_RENAME_MAP.items():
if oldParamName in ds:
ds = ds.rename({oldParamName: newParamName})

# Delete unwanted coordinates
ds = ds.drop_vars(
names=[c for c in ds.coords if c not in COORDINATE_ALLOW_LIST],
errors="ignore",
)

# * Each chunk is a single time step
# Does not use teh "variable" dimension, as this makes a 86GiB dataset for a single timestamp
# Keeping variables separate keeps the dataset small enough to fit in memory
if self.conform:
jacobbieker marked this conversation as resolved.
Show resolved Hide resolved
ds = (
ds.rename({"time": "init_time"})
.expand_dims("init_time")
.expand_dims("step")
.to_array(dim="variable", name=f"NOAA_{self.model}".upper())
.to_dataset()
.transpose("variable", "init_time", "step", ...)
.sortby("step")
.sortby("variable")
.chunk(
{
"init_time": 1,
"step": -1,
"variable": -1,
},
)
)
else:
ds = (
ds.rename({"time": "init_time"})
.expand_dims("init_time")
.expand_dims("step")
.transpose("init_time", "step", ...)
.sortby("step")
.chunk(
{
"init_time": 1,
"step": -1,
},
)
)

return ds

def downloadToTemp( # noqa: D102
self,
*,
fi: internal.FileInfoModel,
) -> tuple[internal.FileInfoModel, pathlib.Path]:
log.debug(event="requesting download of file", file=fi.filename(), path=fi.filepath())
try:
response = urllib.request.urlopen(fi.filepath())
except Exception as e:
log.warn(
event="error calling url for file",
url=fi.filepath(),
filename=fi.filename(),
error=e,
)
return fi, pathlib.Path()

if response.status != 200:
log.warn(
event="error downloading file",
status=response.status,
url=fi.filepath(),
filename=fi.filename(),
)
return fi, pathlib.Path()

# Extract the bz2 file when downloading
tfp: pathlib.Path = internal.TMP_DIR / fi.filename()
with open(tfp, "wb") as f:
f.write(response.read())

log.debug(
event="fetched all data from file",
filename=fi.filename(),
url=fi.filepath(),
filepath=tfp.as_posix(),
nbytes=tfp.stat().st_size,
)

return fi, tfp


def _parseAWSFilename(
name: str,
baseurl: str,
match_aux: bool = True,
match_main: bool = True,
it: dt.datetime | None = None,
) -> NOAAFileInfo | None:
"""Parse a string of HTML into an IconFileInfo object, if it contains one.

Args:
name: The name of the file to parse
baseurl: The base URL for the ICON model
jacobbieker marked this conversation as resolved.
Show resolved Hide resolved
"""
# Only 2 types of file, they contain all variables in it
# "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.20231206/06/atmos/gfs.t06z.pgrb2.0p25.f002"
# "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.20231206/06/atmos/gfs.t06z.pgrb2b.0p25.f002"
# Define the regex patterns to match the different types of file; X is step, L is level
mainRegex = r"gfs.t(\d{2})z.pgrb2.0p25.f(\d{3})"
# Auxiliary files have b appended to them
auxRegex = r"gfs.t(\d{2})z.pgrb2b.0p25.f(\d{3})"
itstring = paramstring = ""
stepstring = "000"
# Try to match the href to one of the regex patterns
mainmatch = re.search(pattern=mainRegex, string=name)
auxmatch = re.search(pattern=auxRegex, string=name)

if mainmatch and match_main:
itstring, stepstring = mainmatch.groups()
elif auxmatch and match_aux:
itstring, stepstring = auxmatch.groups()
else:
return None

return NOAAFileInfo(
it=it,
filename=name,
currentURL=f"{baseurl}/gfs.t{itstring}z.pgrb2.0p25.f{stepstring}",
step=int(stepstring),
)
Loading
Loading