diff --git a/spatialist/ancillary.py b/spatialist/ancillary.py index 3f1a3e4..8aa39a4 100644 --- a/spatialist/ancillary.py +++ b/spatialist/ancillary.py @@ -1,6 +1,6 @@ ############################################################## # core routines for software spatialist -# John Truckenbrodt 2014-2020 +# John Truckenbrodt 2014-2020,2024 ############################################################## """ This script gathers central functions and classes for general applications @@ -25,6 +25,7 @@ import zipfile as zf from typing import Iterable, List import numpy as np +import progressbar as pb try: import pathos.multiprocessing as mp @@ -115,6 +116,7 @@ def dissolve(inlist): out.extend(dissolve(i)) if isinstance(i, list) else out.append(i) return out + def parent_dirs(path: str) -> Iterable[str]: """ generator that yields parent directories of a zipfile path @@ -153,7 +155,7 @@ def namelist_with_implicit_dirs(root: zf.ZipFile) -> List[str]: for file_name in root.namelist(): complete_namelist.update(set(parent_dirs(file_name))) complete_namelist.add(file_name) - + return list(complete_namelist) @@ -257,7 +259,7 @@ def finder(target, matchlist, foldermode=0, regex=False, recursive=True): raise TypeError("parameter 'target' must be of type str or list") -def multicore(function, cores, multiargs, **singleargs): +def multicore(function, cores, multiargs, pbar=False, **singleargs): """ wrapper for multicore process execution @@ -271,6 +273,8 @@ def multicore(function, cores, multiargs, **singleargs): multiargs: dict a dictionary containing sub-function argument names as keys and lists of arguments to be distributed among the processes as values + pbar: bool + add a progress bar? Does not yet work on Windows. singleargs all remaining arguments which are invariant among the subprocesses @@ -304,12 +308,8 @@ def multicore(function, cores, multiargs, **singleargs): tblib.pickling_support.install() # compare the function arguments with the multi and single arguments and raise errors if mismatches occur - if sys.version_info >= (3, 0): - check = inspect.getfullargspec(function) - varkw = check.varkw - else: - check = inspect.getargspec(function) - varkw = check.keywords + check = inspect.getfullargspec(function) + varkw = check.varkw if not check.varargs and not varkw: multiargs_check = [x for x in multiargs if x not in check.args] @@ -364,24 +364,42 @@ def multicore(function, cores, multiargs, **singleargs): result = dill.load(tmp) return result else: - results = None - def wrapper(**kwargs): try: - return function(**kwargs) + # hide print messages in the sub-processes + with HiddenPrints(): + out = function(**kwargs) + return out except Exception as e: return ExceptionWrapper(e) - # block printing of the executed function - with HiddenPrints(): - # start pool of processes and do the work - try: - pool = mp.Pool(processes=cores) - except NameError: - raise ImportError("package 'pathos' could not be imported") - results = pool.imap(lambda x: wrapper(**x), processlist) - pool.close() - pool.join() + jobs = len(processlist) + progress = None + chunksize, remainder = divmod(jobs, cores * 4) + if remainder: + chunksize += 1 + + if pbar: + widgets = [pb.Percentage(), pb.Bar(), pb.Timer(), ' ', pb.ETA()] + progress = pb.ProgressBar(max_value=jobs, widgets=widgets).start() + + try: + pool = mp.ProcessPool(processes=cores) + except NameError: + raise ImportError("package 'pathos' could not be imported") + results = pool.amap(lambda x: wrapper(**x), processlist) + while not results.ready(): + left = results._number_left * chunksize + done = jobs - left if left <= jobs else 0 + if pbar: + progress.update(done) + + results = results.get() + pool.close() + pool.join() + + if progress is not None: + progress.finish() i = 0 out = []