Skip to content

Commit

Permalink
Merge pull request #108 from richpsharp/feature/multiprocess_raster_c…
Browse files Browse the repository at this point in the history
…alculator

Feature/multiprocess raster calculator
  • Loading branch information
phargogh authored Aug 24, 2020
2 parents ec9a1ad + 944e830 commit 3087162
Show file tree
Hide file tree
Showing 8 changed files with 713 additions and 43 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build-py-dists.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ jobs:
include:
- python-version: 3.7
numpy: "numpy~=1.15"

- python-version: 3.8
numpy: "numpy~=1.17"

Expand Down
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Release History

Unreleased Changes
------------------
* Modified ``pygeoprocessing.numpy_array_to_raster`` to take boolean arrays.
* Modified ``pygeoprocessing.convolve_2d`` to guard against nonsensical queries
to both ``ignore_nodata_and_edges=True`` but also ``mask_nodata=False``.
A query of this combination now raises a ``ValueError`` to guard against
Expand All @@ -21,6 +22,11 @@ Unreleased Changes
parameter ``write_diagnostic_vector``. When ``True``, this parameter will
cause a new vector per outflow feature to be created in the ``working_dir``.
This parameter defaults to ``False``. This is a change from prior behavior,
when the diagnostic vectors were always created, which could occupy a lot of
computational time under large outflow geometries.
* Added a ``pygeoprocessing.multiprocessing.raster_calculator`` function which
matches the API and results of ``pygeoprocessing.raster_calculator`` but uses
multiple processing cores to compute raster calculation blocks.
when the diagnostic vectors were always created, which could occupy
significant computational time under large outflow geometries.
* Minor performance improvement to ``pygeoprocessing.convolve_2d`` by
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
packages=[
'pygeoprocessing',
'pygeoprocessing.routing',
'pygeoprocessing.multiprocessing',
],
package_dir={
'pygeoprocessing': 'src/pygeoprocessing'
Expand All @@ -54,7 +55,6 @@
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft',
'Operating System :: POSIX',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Scientific/Engineering :: GIS',
Expand Down
66 changes: 47 additions & 19 deletions src/pygeoprocessing/geoprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pprint
import queue
import shutil
import sys
import tempfile
import threading
import time
Expand All @@ -29,6 +30,11 @@
import shapely.prepared
import shapely.wkb

# This is used to efficiently pass data to the raster stats worker if available
if sys.version_info >= (3, 8):
import multiprocessing.shared_memory


class ReclassificationMissingValuesError(Exception):
"""Raised when a raster value is not a valid key to a dictionary.
Expand All @@ -43,6 +49,7 @@ def __init__(self, msg, missing_values):
self.missing_values = missing_values
super().__init__(msg, missing_values)


LOGGER = logging.getLogger(__name__)

# Used in joining finished TaskGraph Tasks.
Expand Down Expand Up @@ -137,13 +144,10 @@ def raster_calculator(
name string as the first element and a GDAL creation options
tuple/list as the second. Defaults to
geoprocessing.DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS.
Returns:
None
Raises:
ValueError: invalid input provided
"""
if not base_raster_path_band_const_list:
raise ValueError(
Expand Down Expand Up @@ -345,14 +349,30 @@ def raster_calculator(
try:
last_time = time.time()

block_offset_list = list(iterblocks(
(target_raster_path, 1), offset_only=True,
largest_block=largest_block))

if calc_raster_stats:
# if this queue is used to send computed valid blocks of
# the raster to an incremental statistics calculator worker
stats_worker_queue = queue.Queue()
exception_queue = queue.Queue()

if sys.version_info >= (3, 8):
# The stats worker keeps running variables as a float64, so
# all input rasters are dtype float64 -- make the shared memory
# size equivalent.
block_size_bytes = (
numpy.dtype(numpy.float64).itemsize *
block_offset_list[0]['win_xsize'] *
block_offset_list[0]['win_ysize'])

shared_memory = multiprocessing.shared_memory.SharedMemory(
create=True, size=block_size_bytes)

else:
stats_worker_queue = None
exception_queue = None

if calc_raster_stats:
# To avoid doing two passes on the raster to calculate standard
Expand All @@ -366,7 +386,7 @@ def raster_calculator(
LOGGER.info('starting stats_worker')
stats_worker_thread = threading.Thread(
target=geoprocessing_core.stats_worker,
args=(stats_worker_queue, exception_queue))
args=(stats_worker_queue, len(block_offset_list)))
stats_worker_thread.daemon = True
stats_worker_thread.start()
LOGGER.info('started stats_worker %s', stats_worker_thread)
Expand All @@ -375,9 +395,7 @@ def raster_calculator(
n_pixels = n_cols * n_rows

# iterate over each block and calculate local_op
for block_offset in iterblocks(
(target_raster_path, 1), offset_only=True,
largest_block=largest_block):
for block_offset in block_offset_list:
# read input blocks
offset_list = (block_offset['yoff'], block_offset['xoff'])
blocksize = (block_offset['win_ysize'], block_offset['win_xsize'])
Expand Down Expand Up @@ -421,19 +439,28 @@ def raster_calculator(
"shape %s but got this instead: %s" % (
blocksize, target_block))

target_band.WriteArray(
target_block, yoff=block_offset['yoff'],
xoff=block_offset['xoff'])

# send result to stats calculator
if stats_worker_queue:
# guard against an undefined nodata target
if nodata_target is not None:
valid_block = target_block[target_block != nodata_target]
if valid_block.size > 0:
stats_worker_queue.put(valid_block)
target_block = target_block[target_block != nodata_target]
target_block = target_block.astype(numpy.float64).flatten()

if sys.version_info >= (3, 8):
shared_memory_array = numpy.ndarray(
target_block.shape, dtype=target_block.dtype,
buffer=shared_memory.buf)
shared_memory_array[:] = target_block[:]

stats_worker_queue.put((
shared_memory_array.shape, shared_memory_array.dtype,
shared_memory))
else:
stats_worker_queue.put(target_block.flatten())

target_band.WriteArray(
target_block, yoff=block_offset['yoff'],
xoff=block_offset['xoff'])
stats_worker_queue.put(target_block)

pixels_processed += blocksize[0] * blocksize[1]
last_time = _invoke_timed_callback(
Expand All @@ -445,8 +472,6 @@ def raster_calculator(
LOGGER.info('100.0% complete')

if calc_raster_stats:
LOGGER.info("signaling stats worker to terminate")
stats_worker_queue.put(None)
LOGGER.info("Waiting for raster stats worker result.")
stats_worker_thread.join(_MAX_TIMEOUT)
if stats_worker_thread.is_alive():
Expand Down Expand Up @@ -478,6 +503,9 @@ def raster_calculator(
stats_worker_thread.join(_MAX_TIMEOUT)
if stats_worker_thread.is_alive():
raise RuntimeError("stats_worker_thread.join() timed out")
if sys.version_info >= (3, 8):
shared_memory.close()
shared_memory.unlink()

# check for an exception in the workers, otherwise get result
# and pass to writer
Expand Down Expand Up @@ -2410,7 +2438,6 @@ def convolve_2d(
name string as the first element and a GDAL creation options
tuple/list as the second. Defaults to a GTiff driver tuple
defined at geoprocessing.DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS.
Returns:
None
Expand Down Expand Up @@ -3634,6 +3661,7 @@ def numpy_array_to_raster(
None
"""
numpy_to_gdal_type = {
numpy.dtype(numpy.bool): gdal.GDT_Byte,
numpy.dtype(numpy.int8): gdal.GDT_Byte,
numpy.dtype(numpy.uint8): gdal.GDT_Byte,
numpy.dtype(numpy.int16): gdal.GDT_Int16,
Expand Down
50 changes: 31 additions & 19 deletions src/pygeoprocessing/geoprocessing_core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
# distutils: language=c++
# cython: language_level=3
import logging
import multiprocessing
import os
import pickle
import shutil
import sys
import tempfile
import time
import traceback
import zlib

cimport cython
cimport libcpp.algorithm
Expand Down Expand Up @@ -529,18 +532,21 @@ def calculate_slope(

@cython.boundscheck(False)
@cython.cdivision(True)
def stats_worker(stats_work_queue, exception_queue):
def stats_worker(stats_work_queue, expected_blocks):
"""Worker to calculate continuous min, max, mean and standard deviation.
Parameters:
stats_work_queue (Queue): a queue of 1D numpy arrays or None. If
None, function puts a (min, max, mean, stddev) tuple to the
queue and quits.
expected_blocks (int): number of expected payloads through
``stats_work_queue``. Will terminate after this many.
Returns:
None
"""
LOGGER.debug(f'stats worker PID: {os.getpid()}')
cdef numpy.ndarray[numpy.float64_t, ndim=1] block
cdef double M_local = 0.0
cdef double S_local = 0.0
Expand All @@ -551,13 +557,20 @@ def stats_worker(stats_work_queue, exception_queue):
cdef long long n = 0L
payload = None

try:
while True:
for index in range(expected_blocks):
try:
existing_shm = None
payload = stats_work_queue.get()
if payload is None:
LOGGER.debug('payload is None, terminating')
break
block = payload.astype(numpy.float64)
if sys.version_info >= (3, 8):
shape, dtype, existing_shm = payload
block = numpy.ndarray(
shape, dtype=dtype, buffer=existing_shm.buf)
else:
block = payload
if block.size == 0:
continue
n_elements = block.size
with nogil:
for i in range(n_elements):
Expand All @@ -579,21 +592,20 @@ def stats_worker(stats_work_queue, exception_queue):
min_value = x
elif x > max_value:
max_value = x
except Exception as e:
LOGGER.exception(
"exception %s %s %s %s %s", x, M_local, S_local, n, payload)
raise

if n > 0:
stats_work_queue.put(
(min_value, max_value, M_local,
(S_local / <double>n) ** 0.5))
else:
LOGGER.warning(
"No valid pixels were received, sending None.")
stats_work_queue.put(None)

if n > 0:
stats_work_queue.put(
(min_value, max_value, M_local, (S_local / <double>n) ** 0.5))
else:
LOGGER.warning(
"No valid pixels were received, sending None.")
stats_work_queue.put(None)
except Exception as e:
LOGGER.exception(
"exception %s %s %s %s %s", x, M_local, S_local, n, payload)
exception_queue.put(e)
while not stats_work_queue.empty():
stats_work_queue.get()
raise

ctypedef long long int64t
ctypedef FastFileIterator[long long]* FastFileIteratorLongLongIntPtr
Expand Down
1 change: 1 addition & 0 deletions src/pygeoprocessing/multiprocessing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .raster_calculator import raster_calculator
Loading

0 comments on commit 3087162

Please sign in to comment.