Skip to content

Commit

Permalink
feat: era5land extract soil{t,m}4 #321
Browse files Browse the repository at this point in the history
  • Loading branch information
akrherz committed Jan 24, 2024
1 parent 2bcabac commit 757dbdb
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 19 deletions.
2 changes: 1 addition & 1 deletion scripts/RUN_0Z.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ cd ../era5
python fetch_era5.py $(date -u --date '6 days ago' +'%Y %m %d')

cd ../climodat
python era5land_solarrad.py $(date -u --date '7 days ago' +'%Y %m %d')
python era5land_extract.py --valid=$(date -u --date '7 days ago' +'%Y-%m-%d')

cd ../iemre
# We have hopefully gotten a refreshed 12z stage4 file, so we chunk it again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,27 @@
Run from RUN_0Z.sh for seven UTC days ago.
"""
import datetime
import sys

import click
import geopandas as gpd
import numpy as np
import pandas as pd
from affine import Affine
from pyiem.grid.zs import CachingZonalStats
from pyiem.iemre import NORTH, WEST, hourly_offset
from pyiem.util import get_dbconn, get_sqlalchemy_conn, logger, ncopen, utc
from pyiem.util import (
convert_value,
get_dbconn,
get_sqlalchemy_conn,
logger,
ncopen,
utc,
)

LOG = logger()


def compute_regions(rsds, df):
def compute_regions(data, varname, df):
"""Do the spatial averaging work."""
with get_sqlalchemy_conn("coop") as conn:
gdf = gpd.read_postgis(
Expand All @@ -31,9 +38,9 @@ def compute_regions(rsds, df):
)
affine = Affine(0.1, 0, WEST, 0, -0.1, NORTH)
czs = CachingZonalStats(affine)
data = czs.gen_stats(np.flipud(rsds), gdf["geom"])
data = czs.gen_stats(np.flipud(data), gdf["geom"])
for i, sid in enumerate(gdf.index.values):
df.at[sid, "era5land_srad"] = data[i]
df.at[sid, varname] = data[i]


def build_stations(dt) -> pd.DataFrame:
Expand All @@ -52,7 +59,8 @@ def build_stations(dt) -> pd.DataFrame:
params=(dt,),
index_col="station",
)
df["era5land_srad"] = np.nan
for col in ["era5land_srad", "era5land_soilt4_avg", "era5land_soilm4_avg"]:
df[col] = np.nan
df["i"] = np.nan
df["j"] = np.nan
LOG.info("Found %s database entries", len(df.index))
Expand All @@ -75,52 +83,77 @@ def compute(df, sids, dt, do_regions=False):
lats = nc.variables["lat"][:]
if f"{dt:%m%d}" == "1231":
rsds = np.sum(nc.variables["rsds"][idx0:], 0) * factor
# Close enough
soilm = np.mean(nc.variables["soilm"][idx0:, 0], 0)
soilt = np.mean(nc.variables["soilt"][idx0:, 0], 0)
ncfn2 = f"/mesonet/data/era5/{ets.year}_era5land_hourly.nc"
with ncopen(ncfn2) as nc2:
rsds += np.sum(nc2.variables["rsds"][:idx1], 0) * factor
else:
rsds = np.sum(nc.variables["rsds"][idx0:idx1], 0) * factor
soilm = np.mean(nc.variables["soilm"][idx0:idx1, 0], 0)
soilt = np.mean(nc.variables["soilt"][idx0:idx1, 0], 0)

df["i"] = np.digitize(df["lon"].values, lons)
df["j"] = np.digitize(df["lat"].values, lats)
rsds = rsds.filled(np.nan)
soilm = soilm.filled(np.nan)
soilt = soilt.filled(np.nan)

for sid, row in df.loc[sids].iterrows():
df.at[sid, "era5land_srad"] = rsds[int(row["j"]), int(row["i"])]
df.at[sid, "era5land_soilt4_avg"] = soilt[int(row["j"]), int(row["i"])]
df.at[sid, "era5land_soilm4_avg"] = soilm[int(row["j"]), int(row["i"])]

if do_regions:
compute_regions(rsds, df)
compute_regions(rsds, "era5land_srad", df)
compute_regions(soilt, "era5land_soilt4_avg", df)
compute_regions(soilm, "era5land_soilm4_avg", df)

LOG.info("IA0200 %s", df.at["IA0200", "era5land_srad"])
LOG.info("IA0200 %s", df.loc["IA0200"])


def do(dt):
"""Process for a given date."""
LOG.info("do(%s)", dt)
df = build_stations(dt)
df["day"] = dt
# We currently do two options
# 1. For morning sites 1-11 AM, they get yesterday's radiation
# 1. For morning sites 1-11 AM, they get yesterday's values
sids = df[(df["temp_hour"] > 0) & (df["temp_hour"] < 12)].index.values
compute(df, sids, dt - datetime.timedelta(days=1), True)
# 2. All other sites get today
sids = df[df["era5land_srad"].isna()].index.values
compute(df, sids, dt)

df["station"] = df.index.values
df["era5land_soilt4_avg"] = convert_value(
df["era5land_soilt4_avg"].values, "degK", "degF"
)

# prevent NaN from being inserted
df = df.replace({np.nan: None})
pgconn = get_dbconn("coop")
cursor = pgconn.cursor()
for sid, row in df[df["era5land_srad"].notna()].iterrows():
cursor.execute(
"UPDATE alldata set era5land_srad = %s where station = %s and "
"day = %s",
(row["era5land_srad"], sid, dt),
)
cursor.executemany(
"""
UPDATE alldata set era5land_srad = %(era5land_srad)s,
era5land_soilt4_avg = %(era5land_soilt4_avg)s,
era5land_soilm4_avg = %(era5land_soilm4_avg)s
where station = %(station)s and day = %(day)s
""",
df.to_dict("records"),
)
cursor.close()
pgconn.commit()


def main(argv):
@click.command()
@click.option("--valid", type=click.DateTime())
def main(valid):
"""Go Main Go"""
do(datetime.date(int(argv[1]), int(argv[2]), int(argv[3])))
do(valid.date())


if __name__ == "__main__":
main(sys.argv)
main()

0 comments on commit 757dbdb

Please sign in to comment.