Skip to content

Commit

Permalink
Making a bunch of signed integers in routing unsigned longs.
Browse files Browse the repository at this point in the history
This will avoid issues with extremely large rasters at the cost of
potentially double the memory usage.

RE:natcap#350
RE:natcap#246
  • Loading branch information
phargogh committed Sep 25, 2023
1 parent 81d27b8 commit 2e705ec
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 43 deletions.
10 changes: 10 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
Release History
===============

Unreleased Changes
------------------
* Fixed an issue where MFD flow direction was producing many nodata holes given
a large-enough DEM. These nodata holes would then propagate to flow
accumulation and stream extraction, producing very disjointed stream
networks. https://github.com/natcap/pygeoprocessing/issues/350
* Improved progress logging in MFD flow direction, MFD flow accumulation, MFD
stream extraction to prevent integer overflows in percentages and improve
the readability of log messages. https://github.com/natcap/pygeoprocessing/issues/246

2.4.1 (2023-09-05)
------------------
* The ``pygeoprocessing`` package metadata has been updated to use
Expand Down
95 changes: 52 additions & 43 deletions src/pygeoprocessing/routing/routing.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,23 @@ cdef extern from "LRUCache.h" nogil:
# this is the class type that'll get stored in the priority queue
cdef struct PixelType:
double value # pixel value
int xi # pixel x coordinate in the raster
int yi # pixel y coordinate in the raster
unsigned int xi # pixel x coordinate in the raster
unsigned int yi # pixel y coordinate in the raster
int priority # for breaking ties if two `value`s are equal.

# this struct is used to record an intermediate flow pixel's last calculated
# direction and the flow accumulation value so far
cdef struct FlowPixelType:
int xi
int yi
unsigned int xi
unsigned int yi
int last_flow_dir
double value

# this struct is used in distance_to_channel_mfd to add up each pixel's
# weighted distances and flow weights
cdef struct MFDFlowPixelType:
int xi
int yi
unsigned int xi
unsigned int yi
int last_flow_dir
double sum_of_weighted_distances
double sum_of_weights
Expand All @@ -152,21 +152,21 @@ cdef struct MFDFlowPixelType:
# d8 flow direction to walk and the source_id indicates the source stream it
# spawned from
cdef struct StreamConnectivityPoint:
int xi
int yi
unsigned int xi
unsigned int yi
int upstream_d8_dir
int source_id
unsigned int source_id

# used to record x/y locations as needed
cdef struct CoordinateType:
int xi
int yi
unsigned int xi
unsigned int yi


cdef struct FinishType:
int xi
int yi
int n_pushed
unsigned int xi
unsigned int yi
unsigned int n_pushed

# this ctype is used to store the block ID and the block buffer as one object
# inside Managed Raster
Expand Down Expand Up @@ -210,8 +210,8 @@ cdef class _ManagedRaster:
cdef int block_ymod
cdef int block_xbits
cdef int block_ybits
cdef int raster_x_size
cdef int raster_y_size
cdef unsigned int raster_x_size
cdef unsigned int raster_y_size
cdef int block_nx
cdef int block_ny
cdef int write_mode
Expand Down Expand Up @@ -714,7 +714,7 @@ def fill_pits(
cdef PitPriorityQueueType pit_queue

# properties of the parallel rasters
cdef int raster_x_size, raster_y_size, n_x_blocks
cdef unsigned int raster_x_size, raster_y_size, n_x_blocks

# variables to remember heights of DEM
cdef double center_val, dem_nodata, fill_height
Expand All @@ -724,6 +724,8 @@ def fill_pits(
# have already been processed
cdef int feature_id

cdef unsigned long current_pixel

# used to handle the case for single outlet mode
cdef int single_outlet=0, outlet_x=-1, outlet_y=-1
if single_outlet_tuple is not None:
Expand Down Expand Up @@ -1478,11 +1480,11 @@ def flow_accumulation_d8(
# data loss for any lower type that might be used in
# `dem_raster_path_band[0]`.
cdef numpy.ndarray[numpy.uint8_t, ndim=2] flow_dir_buffer_array
cdef int win_ysize, win_xsize, xoff, yoff
cdef unsigned int win_ysize, win_xsize, xoff, yoff

# the _root variables remembers the pixel index where the plateau/pit
# region was first detected when iterating over the DEM.
cdef int xi_root, yi_root
cdef unsigned int xi_root, yi_root

# these variables are used as pixel or neighbor indexes.
# _n is related to a neighbor pixel
Expand Down Expand Up @@ -1568,7 +1570,7 @@ def flow_accumulation_d8(
if ctime(NULL) - last_log_time > _LOGGING_PERIOD:
last_log_time = ctime(NULL)
current_pixel = xoff + yoff * raster_x_size
LOGGER.info('%.1f%% complete', 100.0 * current_pixel / <float>(
LOGGER.info('Flow accumulation D8 %.1f%% complete', 100.0 * current_pixel / <float>(
raster_x_size * raster_y_size))

# make a buffer big enough to capture block and boundaries around it
Expand Down Expand Up @@ -1704,11 +1706,11 @@ def flow_dir_mfd(

# the _root variables remembers the pixel index where the plateau/pit
# region was first detected when iterating over the DEM.
cdef int xi_root, yi_root
cdef unsigned int xi_root, yi_root

# these variables are used as pixel or neighbor indexes. where _q
# represents a value out of a queue, and _n is related to a neighbor pixel
cdef int i_n, xi, yi, xi_q, yi_q, xi_n, yi_n
cdef unsigned int i_n, xi, yi, xi_q, yi_q, xi_n, yi_n

# these are used to recall the local and neighbor heights of pixels
cdef double root_height, n_height, dem_nodata, n_slope
Expand Down Expand Up @@ -1753,12 +1755,16 @@ def flow_dir_mfd(
cdef queue[int] nodata_flow_dir_queue

# properties of the parallel rasters
cdef int raster_x_size, raster_y_size
cdef unsigned long raster_x_size, raster_y_size

# used for time-delayed logging
cdef time_t last_log_time
last_log_time = ctime(NULL)

# This is used in progress logging to represent how many pixels have been
# visited so far.
cdef unsigned long current_pixel

# determine dem nodata in the working type, or set an improbable value
# if one can't be determined
dem_raster_info = pygeoprocessing.get_raster_info(dem_raster_path_band[0])
Expand Down Expand Up @@ -1821,11 +1827,11 @@ def flow_dir_mfd(
# longest plateau drain distance possible for this raster.
plateau_distance_path = os.path.join(
working_dir_path, 'plateau_distance.tif')
plateau_distance_nodata = raster_x_size * raster_y_size
cdef unsigned long plateau_distance_nodata = raster_x_size * raster_y_size
pygeoprocessing.new_raster_from_base(
dem_raster_path_band[0], plateau_distance_path, gdal.GDT_Float64,
[plateau_distance_nodata], fill_value_list=[
raster_x_size * raster_y_size],
[plateau_distance_nodata],
fill_value_list=[plateau_distance_nodata],
raster_driver_creation_tuple=raster_driver_creation_tuple)
plateau_distance_managed_raster = _ManagedRaster(
plateau_distance_path, 1, 1)
Expand Down Expand Up @@ -2147,7 +2153,7 @@ def flow_dir_mfd(
dem_managed_raster.close()
plateau_distance_managed_raster.close()
shutil.rmtree(working_dir_path)
LOGGER.info('%.1f%% complete', 100.0)
LOGGER.info('Flow dir MFD %.1f%% complete', 100.0)


def flow_accumulation_mfd(
Expand Down Expand Up @@ -2193,14 +2199,14 @@ def flow_accumulation_mfd(
# data loss for any lower type that might be used in
# `dem_raster_path_band[0]`.
cdef numpy.ndarray[numpy.int32_t, ndim=2] flow_dir_mfd_buffer_array
cdef int win_ysize, win_xsize, xoff, yoff
cdef unsigned long win_ysize, win_xsize, xoff, yoff

# These are used to estimate % complete
cdef long long visit_count, pixel_count
cdef unsigned long long visit_count, pixel_count

# the _root variables remembers the pixel index where the plateau/pit
# region was first detected when iterating over the DEM.
cdef int xi_root, yi_root
cdef unsigned long xi_root, yi_root

# these variables are used as pixel or neighbor indexes.
# _n is related to a neighbor pixel
Expand Down Expand Up @@ -2230,12 +2236,14 @@ def flow_accumulation_mfd(
cdef FlowPixelType flow_pixel

# properties of the parallel rasters
cdef int raster_x_size, raster_y_size
cdef unsigned long raster_x_size, raster_y_size

# used for time-delayed logging
cdef time_t last_log_time
last_log_time = ctime(NULL)

cdef unsigned long current_pixel

if not _is_raster_path_band_formatted(flow_dir_mfd_raster_path_band):
raise ValueError(
"%s is supposed to be a raster band tuple but it's not." % (
Expand Down Expand Up @@ -2305,7 +2313,7 @@ def flow_accumulation_mfd(
if ctime(NULL) - last_log_time > _LOGGING_PERIOD:
last_log_time = ctime(NULL)
current_pixel = xoff + yoff * raster_x_size
LOGGER.info('%.1f%% complete', 100.0 * current_pixel / <float>(
LOGGER.info('Flow accum MFD %.1f%% complete', 100.0 * current_pixel / <float>(
raster_x_size * raster_y_size))

# make a buffer big enough to capture block and boundaries around it
Expand Down Expand Up @@ -2366,7 +2374,7 @@ def flow_accumulation_mfd(
if ctime(NULL) - last_log_time > _LOGGING_PERIOD:
last_log_time = ctime(NULL)
LOGGER.info(
'mfd flow accum %.1f%% complete',
'Flow accum MFD %.1f%% complete',
100.0 * visit_count / float(pixel_count))

preempted = 0
Expand Down Expand Up @@ -2428,7 +2436,7 @@ def flow_accumulation_mfd(
shutil.rmtree(tmp_dir)
except OSError:
LOGGER.exception("couldn't remove temp dir")
LOGGER.info('%.1f%% complete', 100.0)
LOGGER.info('Flow accum MFD %.1f%% complete', 100.0)


def distance_to_channel_d8(
Expand Down Expand Up @@ -2544,7 +2552,7 @@ def distance_to_channel_d8(
if ctime(NULL) - last_log_time > _LOGGING_PERIOD:
last_log_time = ctime(NULL)
current_pixel = xoff + yoff * raster_x_size
LOGGER.info('%.1f%% complete', 100.0 * current_pixel / <float>(
LOGGER.info('Dist to channel D8 %.1f%% complete', 100.0 * current_pixel / <float>(
raster_x_size * raster_y_size))

# make a buffer big enough to capture block and boundaries around it
Expand Down Expand Up @@ -2786,7 +2794,7 @@ def distance_to_channel_mfd(
if ctime(NULL) - last_log_time > _LOGGING_PERIOD:
last_log_time = ctime(NULL)
current_pixel = xoff + yoff * raster_x_size
LOGGER.info('%.1f%% complete', 100.0 * current_pixel / <float>(
LOGGER.info('Dist to channel MFD %.1f%% complete', 100.0 * current_pixel / <float>(
raster_x_size * raster_y_size))

# make a buffer big enough to capture block and boundaries around it
Expand Down Expand Up @@ -2941,7 +2949,7 @@ def distance_to_channel_mfd(
weight_raster.close()
visited_managed_raster.close()
shutil.rmtree(tmp_work_dir)
LOGGER.info('%.1f%% complete', 100.0)
LOGGER.info('Dist to channel MFD %.1f%% complete', 100.0)


def extract_streams_mfd(
Expand Down Expand Up @@ -3011,14 +3019,15 @@ def extract_streams_mfd(
cdef _ManagedRaster flow_dir_mfd_mr = _ManagedRaster(
flow_dir_mfd_path_band[0], flow_dir_mfd_path_band[1], 0)

cdef int xoff, yoff, win_xsize, win_ysize
cdef int xi, yi, xi_root, yi_root, i_n, xi_n, yi_n, i_sn, xi_sn, yi_sn
cdef unsigned int xoff, yoff, win_xsize, win_ysize
cdef unsigned int xi, yi, xi_root, yi_root, i_n, xi_n, yi_n, i_sn, xi_sn, yi_sn
cdef int flow_dir_mfd
cdef double flow_accum
cdef double trace_flow_threshold = (
trace_threshold_proportion * flow_threshold)
cdef int n_iterations = 0
cdef unsigned int n_iterations = 0
cdef int is_outlet, stream_val
cdef unsigned long current_pixel

cdef int flow_dir_nodata = pygeoprocessing.get_raster_info(
flow_dir_mfd_path_band[0])['nodata'][flow_dir_mfd_path_band[1]-1]
Expand All @@ -3041,7 +3050,7 @@ def extract_streams_mfd(
if ctime(NULL) - last_log_time > _LOGGING_PERIOD:
last_log_time = ctime(NULL)
current_pixel = xoff + yoff * raster_x_size
LOGGER.info('%.1f%% complete', 100.0 * current_pixel / <float>(
LOGGER.info('Extract streams MFD %.1f%% complete', 100.0 * current_pixel / <float>(
raster_x_size * raster_y_size))
for xi in range(win_xsize):
xi_root = xi+xoff
Expand Down Expand Up @@ -3127,7 +3136,7 @@ def extract_streams_mfd(
CoordinateType(xi_sn, yi_sn))

stream_mr.close()
LOGGER.info('filter out incomplete divergent streams')
LOGGER.info('Extract streams MFD: filter out incomplete divergent streams')
block_offsets_list = list(pygeoprocessing.iterblocks(
(target_stream_raster_path, 1), offset_only=True))
stream_raster = gdal.OpenEx(
Expand All @@ -3141,7 +3150,7 @@ def extract_streams_mfd(
yoff=block_offsets['yoff'])
stream_band = None
stream_raster = None
LOGGER.info('100.0% complete')
LOGGER.info('Extract streams MFD: 100.0% complete')


def _is_raster_path_band_formatted(raster_path_band):
Expand Down

0 comments on commit 2e705ec

Please sign in to comment.