Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add progress bar to ancillary.multicore #39

Merged
merged 5 commits into from
Apr 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions spatialist/ancillary.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
##############################################################
# core routines for software spatialist
# John Truckenbrodt 2014-2020
# John Truckenbrodt 2014-2020,2024
##############################################################
"""
This script gathers central functions and classes for general applications
Expand All @@ -25,6 +25,7 @@
import zipfile as zf
from typing import Iterable, List
import numpy as np
import progressbar as pb

try:
import pathos.multiprocessing as mp
Expand Down Expand Up @@ -115,6 +116,7 @@ def dissolve(inlist):
out.extend(dissolve(i)) if isinstance(i, list) else out.append(i)
return out


def parent_dirs(path: str) -> Iterable[str]:
"""
generator that yields parent directories of a zipfile path
Expand Down Expand Up @@ -153,7 +155,7 @@ def namelist_with_implicit_dirs(root: zf.ZipFile) -> List[str]:
for file_name in root.namelist():
complete_namelist.update(set(parent_dirs(file_name)))
complete_namelist.add(file_name)

return list(complete_namelist)


Expand Down Expand Up @@ -257,7 +259,7 @@ def finder(target, matchlist, foldermode=0, regex=False, recursive=True):
raise TypeError("parameter 'target' must be of type str or list")


def multicore(function, cores, multiargs, **singleargs):
def multicore(function, cores, multiargs, pbar=False, **singleargs):
"""
wrapper for multicore process execution

Expand All @@ -271,6 +273,8 @@ def multicore(function, cores, multiargs, **singleargs):
multiargs: dict
a dictionary containing sub-function argument names as keys and lists of arguments to be
distributed among the processes as values
pbar: bool
add a progress bar? Does not yet work on Windows.
singleargs
all remaining arguments which are invariant among the subprocesses

Expand Down Expand Up @@ -304,12 +308,8 @@ def multicore(function, cores, multiargs, **singleargs):
tblib.pickling_support.install()

# compare the function arguments with the multi and single arguments and raise errors if mismatches occur
if sys.version_info >= (3, 0):
check = inspect.getfullargspec(function)
varkw = check.varkw
else:
check = inspect.getargspec(function)
varkw = check.keywords
check = inspect.getfullargspec(function)
varkw = check.varkw

if not check.varargs and not varkw:
multiargs_check = [x for x in multiargs if x not in check.args]
Expand Down Expand Up @@ -364,24 +364,42 @@ def multicore(function, cores, multiargs, **singleargs):
result = dill.load(tmp)
return result
else:
results = None

def wrapper(**kwargs):
try:
return function(**kwargs)
# hide print messages in the sub-processes
with HiddenPrints():
out = function(**kwargs)
return out
except Exception as e:
return ExceptionWrapper(e)

# block printing of the executed function
with HiddenPrints():
# start pool of processes and do the work
try:
pool = mp.Pool(processes=cores)
except NameError:
raise ImportError("package 'pathos' could not be imported")
results = pool.imap(lambda x: wrapper(**x), processlist)
pool.close()
pool.join()
jobs = len(processlist)
progress = None
chunksize, remainder = divmod(jobs, cores * 4)
if remainder:
chunksize += 1

if pbar:
widgets = [pb.Percentage(), pb.Bar(), pb.Timer(), ' ', pb.ETA()]
progress = pb.ProgressBar(max_value=jobs, widgets=widgets).start()

try:
pool = mp.ProcessPool(processes=cores)
except NameError:
raise ImportError("package 'pathos' could not be imported")
results = pool.amap(lambda x: wrapper(**x), processlist)
while not results.ready():
left = results._number_left * chunksize
done = jobs - left if left <= jobs else 0
if pbar:
progress.update(done)

results = results.get()
pool.close()
pool.join()

if progress is not None:
progress.finish()

i = 0
out = []
Expand Down