Skip to content

Commit

Permalink
moved _run method to bias correction interface ``AbstractBiasCorr…
Browse files Browse the repository at this point in the history
…ection``
  • Loading branch information
bnb32 committed Jan 6, 2025
1 parent ee8e237 commit 2d3d4a9
Showing 1 changed file with 17 additions and 40 deletions.
57 changes: 17 additions & 40 deletions sup3r/bias/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,10 @@ class AbstractBiasCorrection(ABC):
"""Minimal interface for bias correction classes"""

@abstractmethod
def _get_run_kwargs(self, **kwargs_extras):
def _get_run_kwargs(self, **run_single_kwargs):
"""Get dictionary of kwarg dictionaries to use for calls to
``_run_single``. Each key-value pair is a bias_gid with the associated
``_run_single`` arguments for that gid"""

def _run_in_parallel(self, task_kwargs, max_workers=None):
"""
Execute a list of tasks in parallel using ``ProcessPoolExecutor``.
Parameters
----------
task_kwargs : dictionary
A dictionary of keyword argument dictionaries for a single call to
``task_function``.
max_workers : int, optional
The maximum number of workers to use. If None, it uses all
available.
Returns
-------
results : dictionary
A dictionary of results from the executed tasks with the same keys
as ``task_kwargs``.
"""

results = {}
with ProcessPoolExecutor(max_workers=max_workers) as exe:
futures = {
exe.submit(self._run_single, **kwargs): bias_gid
for bias_gid, kwargs in task_kwargs.items()
}
for future in as_completed(futures):
bias_gid = futures[future]
results[bias_gid] = future.result()
return results
``_run_single`` kwargs dict for that gid"""

def _run(
self,
Expand All @@ -58,7 +27,7 @@ def _run(
fill_extend=True,
smooth_extend=0,
smooth_interior=0,
**kwargs_extras,
**run_single_kwargs,
):
"""Run correction factor calculations for every site in the bias
dataset
Expand Down Expand Up @@ -87,7 +56,7 @@ def _run(
Option to smooth the scalar/adder data within the valid spatial
domain. This can reduce the affect of extreme values within
aggregations over large number of pixels.
kwargs_extras: dict
run_single_kwargs: dict
Additional kwargs that get sent to ``_run_single`` e.g.
daily_reduction='avg', zero_rate_threshold=1.157e-7
Expand All @@ -100,7 +69,8 @@ def _run(
"""
self.bad_bias_gids = []

task_kwargs = self._get_run_kwargs(**kwargs_extras)
task_kwargs = self._get_run_kwargs(**run_single_kwargs)

# sup3r DataHandler opening base files will load all data in parallel
# during the init and should not be passed in parallel to workers
if isinstance(self.base_dh, DataHandler):
Expand All @@ -116,9 +86,16 @@ def _run(
logger.info(
'Running parallel calculation with %s workers.', max_workers
)
results = self._run_in_parallel(
task_kwargs, max_workers=max_workers
)
results = {}
with ProcessPoolExecutor(max_workers=max_workers) as exe:
futures = {
exe.submit(self._run_single, **kwargs): bias_gid
for bias_gid, kwargs in task_kwargs.items()
}
for future in as_completed(futures):
bias_gid = futures[future]
results[bias_gid] = future.result()

for i, (bias_gid, single_out) in enumerate(results.items()):
raster_loc = np.where(self.bias_gid_raster == bias_gid)
for key, arr in single_out.items():
Expand Down Expand Up @@ -164,4 +141,4 @@ def _run_single(
base_dh_inst=None,
match_zero_rate=False,
):
"""Find the bias correction factors at a single site"""
"""Run correction factor calculations for a single site"""

0 comments on commit 2d3d4a9

Please sign in to comment.