Skip to content

Commit

Permalink
Merge pull request #386 from dbekaert/feat/cube
Browse files Browse the repository at this point in the history
Support for HRRR
  • Loading branch information
jlmaurer authored Nov 11, 2022
2 parents 1585569 + 60ee31b commit ce420af
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 111 deletions.
2 changes: 1 addition & 1 deletion tools/RAiDER/checkArgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def checkArgs(args):
wetFilename, hydroFilename = makeDelayFileNames(
d,
args.los,
'nc',
args.raster_format,
args.weather_model._dataset.upper(),
args.output_directory,
)
Expand Down
133 changes: 91 additions & 42 deletions tools/RAiDER/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from RAiDER.processWM import prepareWeatherModel
from RAiDER.utilFcns import (
writeDelays, projectDelays, writePnts2HDF5,
lla2ecef, transform_bbox, clip_bbox,
lla2ecef, transform_bbox, clip_bbox, rio_profile,
)


Expand Down Expand Up @@ -109,13 +109,12 @@ def tropo_delay(dt, wetFilename, hydroFilename, args):

###########################################################
# Load the downloaded model file for CRS information
ds = xarray.load_dataset(weather_model_file)
try:
wm_proj = ds['CRS']
except:
wm_proj = rio_profile(f"netcdf:{weather_model_file}:t")["crs"]
if wm_proj is None:
print("WARNING: I can't find a CRS in the weather model file, so I will assume you are using WGS84")
wm_proj = 4326
ds.close()
wm_proj = CRS.from_epsg(4326)
else:
wm_proj = CRS.from_wkt(wm_proj.to_wkt())
####################################################################

####################################################################
Expand All @@ -140,7 +139,6 @@ def tropo_delay(dt, wetFilename, hydroFilename, args):
hgts,
pnt_proj,
wm_proj,
flip_xy=pnt_proj.axis_info[0].abbrev in ["X","E","Lon"]
)
else:
# interpolators require y, x, z
Expand All @@ -164,8 +162,6 @@ def tropo_delay(dt, wetFilename, hydroFilename, args):
else:
raise ValueError("Unknown operation type")

del ds # cleanup

###########################################################
# Write the delays to file
# Different options depending on the inputs
Expand Down Expand Up @@ -219,6 +215,9 @@ def tropo_delay_cube(dt, wf, args, model_file=None):
except CRSError:
raise ValueError('output_projection argument is not a valid CRS specifier')

# For testing multiprocessing
# TODO - move this to configuration
nproc = 1

# logging
logger.debug('Starting to run the weather model cube calculation')
Expand Down Expand Up @@ -273,12 +272,13 @@ def tropo_delay_cube(dt, wf, args, model_file=None):
ds.close()
logger.debug(f'Output height range is {min(heights)} to {max(heights)}')


try:
wm_proj = ds["CRS"]
except:
print("WARNING: I can't find a CRS in the weather model file, so I will assume you are using WGS84")
wm_proj = CRS.from_epsg(4326)
# Load CRS from weather model file
wm_proj = rio_profile(f"netcdf:{weather_model_file}:t")["crs"]
if wm_proj is None:
print("WARNING: I can't find a CRS in the weather model file, so I will assume you are using WGS84")
wm_proj = CRS.from_epsg(4326)
else:
wm_proj = CRS.from_wkt(wm_proj.to_wkt())

# Build the output grid
zpts = np.array(heights)
Expand All @@ -299,7 +299,6 @@ def tropo_delay_cube(dt, wf, args, model_file=None):
xpts, ypts, zpts,
wm_proj, crs,
[ifWet, ifHydro])

else:
out_type = "slant range"
out_filename = wf.replace("_ztd", "_std").replace("wet", "tropo")
Expand All @@ -310,14 +309,26 @@ def tropo_delay_cube(dt, wf, args, model_file=None):
)

# Get pointwise interpolators
ifWet, ifHydro = getInterpolators(weather_model_file, "pointwise")
ifWet, ifHydro = getInterpolators(weather_model_file,
kind="pointwise",
shared=(nproc > 1))

# Build cube
wetDelay, hydroDelay = build_cube_ray(
xpts, ypts, zpts,
dt, args["los"]._file, args["look_dir"],
wm_proj, crs,
[ifWet, ifHydro])
if nproc == 1:
wetDelay, hydroDelay = build_cube_ray(
xpts, ypts, zpts,
dt, args["los"]._file, args["look_dir"],
wm_proj, crs,
[ifWet, ifHydro])

### Use multi-processing here
else:
# Pre-build output arrays

# Create worker pool

# Loop over heights
raise NotImplementedError

# Write output file
# Modify this as needed for NISAR / other projects
Expand Down Expand Up @@ -403,7 +414,7 @@ def checkQueryPntsFile(pnts_file, query_shape):
return write_flag


def transformPoints(lats, lons, hgts, old_proj, new_proj,flip_xy=False):
def transformPoints(lats, lons, hgts, old_proj, new_proj):
'''
Transform lat/lon/hgt data to an array of points in a new
projection
Expand All @@ -415,17 +426,26 @@ def transformPoints(lats, lons, hgts, old_proj, new_proj,flip_xy=False):
hgts - Ellipsoidal height in meters
old_proj - the original projection of the points
new_proj - the new projection in which to return the points
flip_xy - Boolean flag to specify XY order of old_proj
Returns
-------
the array of query points in the weather model coordinate system
the array of query points in the weather model coordinate system (YX)
'''
t = Transformer.from_crs(old_proj, new_proj)
if flip_xy:
return np.stack(t.transform(lons, lats, hgts), axis=-1).T

# Flags for flipping inputs or outputs
in_flip = old_proj.axis_info[0].direction == "east"
out_flip = new_proj.axis_info[0].direction == "east"

if in_flip:
res = t.transform(lons, lats, hgts)
else:
res = t.transform(lats, lons, hgts)

if out_flip:
return np.stack((res[1], res[0], res[2]), axis=-1).T
else:
return np.stack(t.transform(lats, lons, hgts), axis=-1).T
return np.stack(res, axis=-1).T


def build_cube(xpts, ypts, zpts, model_crs, pts_crs, interpolators):
Expand All @@ -443,6 +463,17 @@ def build_cube(xpts, ypts, zpts, model_crs, pts_crs, interpolators):
zmin = min(interpolators[0].grid[2])
zmax = max(interpolators[0].grid[2])

# print("Output grid: ")
# print("crs: ", pts_crs)
# print("X: ", xpts[0], xpts[-1])
# print("Y: ", ypts[0], ypts[-1])

# ii = interpolators[0]
# print("Model grid: ")
# print("crs: ", model_crs)
# print("X: ", ii.grid[1][0], ii.grid[1][-1])
# print("Y: ", ii.grid[0][0], ii.grid[0][-1])

# Loop over heights and compute delays
for ii, ht in enumerate(zpts):

Expand All @@ -455,7 +486,6 @@ def build_cube(xpts, ypts, zpts, model_crs, pts_crs, interpolators):
transformPoints(
yy, xx, np.full(yy.shape, ht),
pts_crs, model_crs,
flip_xy=pts_crs.axis_info[0].abbrev in ["X", "E", "Lon"]
), (2, 1, 0))
else:
pts = np.stack([yy, xx, np.full(yy.shape, ht)], axis=-1)
Expand All @@ -466,11 +496,14 @@ def build_cube(xpts, ypts, zpts, model_crs, pts_crs, interpolators):
return outputArrs


def build_cube_ray(xpts, ypts, zpts, ref_time, orbit_file, look_dir, model_crs, pts_crs, interpolators):
def build_cube_ray(xpts, ypts, zpts, ref_time, orbit_file, look_dir, model_crs,
pts_crs, interpolators, outputArrs=None):
"""
Iterate over interpolators and build a cube
"""

# Some constants for this module
# TODO - Read this from constants or configuration
MAX_SEGMENT_LENGTH = 1000.
MAX_TROPO_HEIGHT = 50000.

Expand Down Expand Up @@ -504,14 +537,19 @@ def build_cube_ray(xpts, ypts, zpts, ref_time, orbit_file, look_dir, model_crs,
xx, yy = np.meshgrid(xpts, ypts)

# Output arrays
outputArrs = [np.zeros((zpts.size, ypts.size, xpts.size))
for mm in range(len(interpolators))]
output_created_here = False
if outputArrs is None:
output_created_here = True
outputArrs = [np.zeros((zpts.size, ypts.size, xpts.size))
for mm in range(len(interpolators))]

# Various transformers needed here
epsg4326 = CRS.from_epsg(4326)
cube_to_llh = Transformer.from_crs(pts_crs, epsg4326,
always_xy=True)
ecef_to_model = Transformer.from_crs(CRS.from_epsg(4978), model_crs)
# For calling the interpolators
flip_xy = model_crs.axis_info[0].direction == "east"

# Loop over heights of output cube and compute delays
for hh, ht in enumerate(zpts):
Expand Down Expand Up @@ -611,7 +649,13 @@ def build_cube_ray(xpts, ypts, zpts, ref_time, orbit_file, look_dir, model_crs,
cos_factor = (high_ht - low_ht) / ray_length

# Determine number of parts to break ray into
nParts = int(np.ceil(ray_length.max() / MAX_SEGMENT_LENGTH)) + 1
try:
nParts = int(np.ceil(ray_length.max() / MAX_SEGMENT_LENGTH)) + 1
except ValueError:
raise ValueError(
"geo2rdr did not converge. Check orbit coverage"
)

if (nParts == 1):
raise RuntimeError(
"Ray with one segment encountered"
Expand All @@ -626,12 +670,17 @@ def build_cube_ray(xpts, ypts, zpts, ref_time, orbit_file, look_dir, model_crs,
pts_xyz = low_xyz + ff * (high_xyz - low_xyz)

# Ray point in model coordinates
pts = np.stack(
ecef_to_model.transform(
pts_xyz[..., 0],
pts_xyz[..., 1],
pts_xyz[..., 2]
), axis=-1)
pts = ecef_to_model.transform(
pts_xyz[..., 0],
pts_xyz[..., 1],
pts_xyz[..., 2]
)

# Order for the interpolator
if flip_xy:
pts = np.stack((pts[1], pts[0], pts[2]), axis=-1)
else:
pts = np.stack(pts, axis=-1)

# Trapezoidal integration with scaling
wt = 0.5 if findex in [0, fracs.size-1] else 1.0
Expand All @@ -645,5 +694,5 @@ def build_cube_ray(xpts, ypts, zpts, ref_time, orbit_file, look_dir, model_crs,
val[np.isnan(val)] = 0.0
out += wt * val


return outputArrs
if output_created_here:
return outputArrs
29 changes: 27 additions & 2 deletions tools/RAiDER/delayFcns.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ def get_delays(
return wet_delay, hydro_delay


def getInterpolators(wm_file, kind='pointwise'):
def getInterpolators(wm_file, kind='pointwise', shared=False):
'''
Read 3D gridded data from a processed weather model file and wrap it with
Read 3D gridded data from a processed weather model file and wrap it with
an interpolator
'''
# Get the weather model data
Expand All @@ -105,12 +105,37 @@ def getInterpolators(wm_file, kind='pointwise'):
wet = np.array(f.variables['wet_total'][:]).transpose(1, 2, 0)
hydro = np.array(f.variables['hydro_total'][:]).transpose(1, 2, 0)

# If shared interpolators are requested
# The arrays are not modified - so turning off lock for performance
if shared:
xs_wm = make_shared_raw(xs_wm)
ys_wm = make_shared_raw(ys_wm)
zs_wm = make_shared_raw(zs_wm)
wet = make_shared_raw(wet)
hydro = make_shared_raw(hydro)


ifWet = Interpolator((ys_wm, xs_wm, zs_wm), wet, fill_value=np.nan)
ifHydro = Interpolator((ys_wm, xs_wm, zs_wm), hydro, fill_value=np.nan)

return ifWet, ifHydro


def make_shared_raw(inarr):
"""
Make numpy view array of mp.Array
"""
# Create flat shared array
shared_arr = mp.RawArray('d', inarr.size)
# Create a numpy view of it
shared_arr_np = np.ndarray(inarr.shape, dtype=np.float64,
buffer=shared_arr)
# Copy data to shared array
np.copyto(shared_arr_np, inarr)

return shared_arr_np


def chunk(chunkSize, in_shape):
'''
Create a set of indices to use as chunks
Expand Down
Loading

0 comments on commit ce420af

Please sign in to comment.