diff --git a/ImageD11/cImageD11.py b/ImageD11/cImageD11.py index c8cc4572..54820b3b 100644 --- a/ImageD11/cImageD11.py +++ b/ImageD11/cImageD11.py @@ -21,57 +21,88 @@ # Check for the use of openmp interactions with os.fork and multiprocessing -def check_multiprocessing(): + +def check_multiprocessing(patch=False): """ I tried to suppress the "fork" method for multiprocessing, but I could not. You cannot safely use os.fork together with threads. But the cImageD11 codes uses threads via openmp, and you are importing them. So please use forkserver or spawn for multiprocessing. - + https://discuss.python.org/t/concerns-regarding-deprecation-of-fork-with-alive-threads/33555 https://github.com/FABLE-3DXRD/ImageD11/issues/177 - + > Developers should respond by adjusting their use of multiprocessing or concurrent.futures > to explicitly specify either the "forkserver" or "spawn" start methods via a context. """ + # We only do this if os.fork exists + if not hasattr(os, "fork"): + return import multiprocessing - # Problem cases are: - # child processes -> we will set num threads to 1 - parent = None - if hasattr(multiprocessing,"parent_process"): - parent = multiprocessing.parent_process() - # only for python 3.8 and up - if (parent is not None) and ('OMP_NUM_THREADS' not in os.environ): - cimaged11_omp_set_num_threads( 1 ) - # people wanting Nprocs * Mthreads need to reset after import - # or use OMP_NUM_THREADS as usual + # # now check for the fork issue - if not hasattr(multiprocessing, 'get_start_method'): - # You are on python2.7. Give up. + if not hasattr(multiprocessing, "get_start_method"): + # You are on python2.7. Give up? + warnings.warn( + "python2.7 with ImageD11.cImageD11: for multiprocessing use spawn\n" + ) return - # This has a side effect of fixing the start method if allow_none is not True + # + # This has a side effect of fixing the start method if allow_none was not True method = multiprocessing.get_start_method(allow_none=True) - if method is None: - # we can change it away from fork + if method == "fork": + warnings.warn(check_multiprocessing.__doc__) + # Problem cases are: + # child processes -> oversubscribe -> we will set num threads to 1 + # your numpy (etc) are also going to be a problem if this was needed + parent = None + if hasattr(multiprocessing, "parent_process"): + parent = multiprocessing.parent_process() + # only for python 3.8 and up + if parent is not None: + if "OMP_NUM_THREADS" not in os.environ: + # people wanting Nprocs * Mthreads may need to reset after import + # or use OMP_NUM_THREADS as usual + cimaged11_omp_set_num_threads(1) + if method in ("fork", None): + # warn if we are a child process + warnings.warn(check_multiprocessing.__doc__) + if method is None and patch: + # we should change it away from fork # this is going to break code which relied on fork - # (... but that was already broken) + # needs one of those "warn people for a year or two and then break it" + # messages. Perhaps wait on python3.14 when multiprocessing will get + # fixed and break everyones code at that time? poss = multiprocessing.get_all_start_methods() - if 'forkserver' in poss: - multiprocessing.set_start_method('forkserver') - elif 'spawn' in poss: - multiprocessing.set_start_method('spawn') + if "forkserver" in poss: + multiprocessing.set_start_method("forkserver") + elif "spawn" in poss: + multiprocessing.set_start_method("spawn") else: - raise Exception('Could not set to forkserver or spawn') - else: - # Tell them about the problem - if method == 'fork': - warnings.warn(check_multiprocessing.__doc__) + raise Exception("Could not set to forkserver or spawn") + # It is doubtful that this function can ever please everyone. + def cores_available(): - """ Return the number of CPU cores you can use """ - return len( os.sched_getaffinity( os.getpid() ) ) + """ + Return the number of CPU cores you can use + First choice = SLURM_CPUS_PER_TASK (ESRF specific) + Second choice = os.sched_getaffinity + Third choice = os.cpu_count (may be high) + Guess at 1 when os.cpu_count fails. + """ + if "SLURM_CPUS_PER_TASK" in os.environ: + return int(os.environ["SLURM_CPUS_PER_TASK"]) + if hasattr(os, "sched_getaffinity"): + return len(os.sched_getaffinity(os.getpid())) + ncpu = os.cpu_count() + if ncpu is None: + return 1 + else: + return ncpu + if cimaged11_omp_get_max_threads() == 0: # The code was compiled without openmp @@ -81,7 +112,7 @@ def cores_available(): OPENMP = True check_multiprocessing() - + # For 32 or 64 bits nbyte = struct.calcsize("P") # 4 or 8 diff --git a/ImageD11/sinograms/lima_segmenter.py b/ImageD11/sinograms/lima_segmenter.py index 872765e8..0dc9e227 100755 --- a/ImageD11/sinograms/lima_segmenter.py +++ b/ImageD11/sinograms/lima_segmenter.py @@ -3,21 +3,36 @@ """ Do segmentation of lima/eiger files with no notion of metadata Blocking will come via lima saving, so about 1000 frames per file -Make the code parallel over lima files ... no interprocess communication intended +Make the code parallel over lima files ... + minimal interprocess communication intended - read an input file which has the source/destination file names for this job """ -import sys, time, os, math, logging - +import sys +import time +import os +import math +import logging +import numpy as np +import h5py +import hdf5plugin +import fabio +import numba +from ImageD11 import sparseframe from ImageD11.sinograms import dataset +try: + from bslz4_to_sparse import chunk2sparse +except ImportError: + chunk2sparse = None + # Code to clean the 2D image and reduce it to a sparse array: # things we might edit class SegmenterOptions: - - # These are the stuff that belong to us in the hdf5 file (in our group: lima_segmenter) + # These are the stuff that belong to us in the hdf5 file + # (in our group: lima_segmenter) jobnames = ( "cut", "howmany", @@ -65,11 +80,9 @@ def setup(self): # The mask must have: # 0 == active pixel # 1 == masked pixel - m = fabio.open(self.maskfile).data self.mask = 1 - fabio.open(self.maskfile).data.astype(np.uint8) assert self.mask.min() < 2 assert self.mask.max() >= 0 - avg = self.mask.mean() print( "# Opened mask", self.maskfile, @@ -87,7 +100,8 @@ def load(self, h5name, h5group): if name in grp.attrs: setattr(self, name, grp.attrs.get(name)) for name in self.datasetnames: - # datasetnames = ( 'limapath', 'analysispath', 'datapath', 'imagefiles', 'sparsefiles' ) + # datasetnames = ( 'limapath', 'analysispath', 'datapath', + # 'imagefiles', 'sparsefiles' ) if name in pgrp.attrs: data = pgrp.attrs.get(name) setattr(self, name, data) @@ -118,24 +132,6 @@ def save(self, h5name, h5group): grp.attrs[name] = value -########################## should not need to change much below here - - -import numpy as np -import hdf5plugin -import h5py -import fabio -import numba - -# pip install ImageD11 --no-deps # if you do not have it yet: -from ImageD11 import sparseframe - -try: - from bslz4_to_sparse import chunk2sparse -except ImportError: - chunk2sparse = None - - @numba.njit def select(img, mask, row, col, val, cut): # TODO: This is in now cImageD11.tosparse_{u16|f32} @@ -329,7 +325,7 @@ def segment_lima(args): npx += spf.nnz g.attrs["npx"] = npx end = time.time() - print("\n# Done", nframes, "frames", npx, "pixels", "fps", nframes / (end - start)) + print("\n# Done", nframes, "frames", npx, "pixels fps", nframes / (end - start)) return destname # the output file should be flushed and closed when this returns @@ -338,9 +334,19 @@ def segment_lima(args): OPTIONS = None -def main(options): +def initOptions(h5name, jobid): + global OPTIONS + OPTIONS = SegmenterOptions() + OPTIONS.load(h5name, "lima_segmenter") + OPTIONS.jobid = jobid + + +def main(h5name, jobid): global OPTIONS - OPTIONS = options + initOptions(h5name, jobid) + options = OPTIONS + assert options is not None + assert OPTIONS is not None args = [] files_per_job = options.cores_per_job * options.files_per_core # 64 files per job start = options.jobid * files_per_job @@ -354,10 +360,12 @@ def main(options): ) ) if 1: - import concurrent.futures + import multiprocessing - with concurrent.futures.ProcessPoolExecutor( - max_workers=options.cores_per_job + with multiprocessing.Pool( + max_workers=options.cores_per_job, + initializer=initOptions, + initargs=(h5name, jobid), ) as mypool: donefile = sys.stdout for fname in mypool.map(segment_lima, args, chunksize=1): @@ -442,12 +450,8 @@ def segment(): # everything is passing via this file. # h5name = sys.argv[2] - - # This assumes forking. To be investigated otherwise. - options = SegmenterOptions() - options.load(h5name, "lima_segmenter") - options.jobid = int(sys.argv[3]) - main(options) + jobid = int(sys.argv[3]) + main(h5name, jobid) if __name__ == "__main__": diff --git a/ImageD11/sinograms/point_by_point.py b/ImageD11/sinograms/point_by_point.py index 61922e68..3c1478c7 100755 --- a/ImageD11/sinograms/point_by_point.py +++ b/ImageD11/sinograms/point_by_point.py @@ -4,11 +4,10 @@ # Try to build the point-by-point mapping code ... from __future__ import print_function, division -from ImageD11 import cImageD11 # patches to forkserver import multiprocessing +method = multiprocessing.set_start_method("forkserver") +from ImageD11 import cImageD11 # NO!!! patches to forkserver -method = multiprocessing.get_start_method() -assert method in ("spawn", "forkserver"), "mp:" + method import sys, os from multiprocessing.managers import SharedMemoryManager import time, random diff --git a/ImageD11/sparseframe.py b/ImageD11/sparseframe.py index bb4e4882..fb0e80e5 100644 --- a/ImageD11/sparseframe.py +++ b/ImageD11/sparseframe.py @@ -252,7 +252,7 @@ def __init__( self, hname, scan, start = 0, n=None, break # read the pixels - all pointers nnz = grp['nnz'][:] - ipt = np.concatenate( ( (0,) , np.cumsum(nnz, dtype=int) ) ) + ipt = nnz_to_pointer( nnz ) s = ipt[start] e = ipt[end] for name in self.names: @@ -260,7 +260,7 @@ def __init__( self, hname, scan, start = 0, n=None, setattr( self, name, grp[name][s:e] ) # pointers into this scan self.nnz = nnz[start:end] - self.ipt = np.concatenate( ( (0,) , np.cumsum(self.nnz, dtype=int) ) ) + self.ipt = nnz_to_pointer( self.nnz ) def getframe(self, i, SAFE=SAFE): # (self, row, col, shape, itype=np.uint16, pixels=None): @@ -617,7 +617,18 @@ def sparse_smooth( frame, data_name='intensity' ): return smoothed - +def nnz_to_pointer( nnz, out = None ): + """ + nnz = number of pixels per frame + pointer = position in a single flat array + """ + if out is None: + out = np.empty( len(nnz)+1, int ) + else: + assert len(out) == len(nnz)+1 + out[0] = 0 + np.cumsum( nnz, out=out[1:] ) + return out diff --git a/ImageD11/sym_u.py b/ImageD11/sym_u.py index cdb22444..bfc0371d 100644 --- a/ImageD11/sym_u.py +++ b/ImageD11/sym_u.py @@ -364,18 +364,19 @@ def getgroup(s): convert a user supplied string to a group ... a little vague still """ - if s in ['cubic', 'hexagonal','trigonal','tetragonal', + groups =['cubic', 'hexagonal','trigonal','tetragonal', 'orthorhombic','monoclinic_c','monoclinic_a', - 'monoclinic_b','triclinic','rhombohedralP']: - import ImageD11.sym_u - return getattr(ImageD11.sym_u, s) + 'monoclinic_b','triclinic','rhombohedralP'] + if s in groups: + return globals()[s] + else: + raise Exception( s,"not in", groups ) if __name__=="__main__": test() - u = np.array([[ 0.71850787 , 0.69517833, 0.02176059], [-0.62925889 , 0.66306714, -0.40543213], [-0.29627636 , 0.27761313 , 0.91386611]]) diff --git a/test/test_forking.py b/test/test_forking.py new file mode 100644 index 00000000..010988ae --- /dev/null +++ b/test/test_forking.py @@ -0,0 +1,127 @@ +# There is a problem of forking / multiprocessing. +# There are no imports here! + +# Each function is intended to run in a subprocess + + +def run_test_fun(function, env=None): + """Runs a function in a subprocess, returns the output""" + import sys + from subprocess import PIPE, Popen + + args = [sys.executable, __file__, function] + proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) + stdout, stderr = proc.communicate() + return stdout.decode(), stderr.decode() + + +def omp_call(N=1024): + """returns the number of threads in use for an example call using openmp""" + import ImageD11.cImageD11 + import numpy as np + + dat = np.full((N, N), 101, dtype=np.uint16) + drk = np.full((N, N), 100, dtype=np.float32) + cor = np.empty((N, N), dtype=np.float32) + ImageD11.cImageD11.uint16_to_float_darksub(cor.ravel(), drk.ravel(), dat.ravel()) + threads_avail = ImageD11.cImageD11.cores_available() + threads_used = ImageD11.cImageD11.cimaged11_omp_get_max_threads() + return threads_avail, threads_used + + +def should_warn_on_fork_in_parent(): + """This is the problem case, we want a warning""" + import multiprocessing + + try: + if "fork" in multiprocessing.get_start_methods(): + multiprocessing.set_start_method("fork") + print("Set to fork") + except AttributeError: + pass + threads_avail, threads_used = omp_call() + assert threads_avail == threads_used, "not threaded?" + + +def test_should_warn_on_fork_in_parent(): + """this is called by py.test to run the previous function""" + stdout, stderr = run_test_fun("should_warn_on_fork_in_parent") + import sys + + if stdout.find("Set to fork") != -1: # we set to fork + if sys.version_info[0] > 2: + assert stderr.find("please use forkserver or spawn") > 0 + else: + assert stderr.find("cImageD11: for multiprocessing use spawn") != -1 + + +def should_warn_on_fork_in_child(): + """This is a problem case, we want a warning""" + import multiprocessing + + try: + if "fork" in multiprocessing.get_start_methods(): + multiprocessing.set_start_method("fork") + print("Set to fork") + except AttributeError: + pass + p = multiprocessing.Pool(2) + for threads_avail, threads_used in p.map(omp_call, (1024, 1024)): + assert threads_used == 1, "failed to set omp threads == 1 for child" + + +def test_should_warn_on_fork_in_child(): + stdout, stderr = run_test_fun("should_warn_on_fork_in_child") + import sys + + if stdout.find("Set to fork") != -1: # we set to fork + if sys.version_info[0] > 2: + assert stderr.find("please use forkserver or spawn ") > 0 + else: + assert stderr.find("cImageD11: for multiprocessing use spawn") != -1 + + +def threads_in_child_no_warn(): + """This is a working case, we do not want a warning""" + import multiprocessing + + try: + if "forkserver" in multiprocessing.get_start_methods(): + multiprocessing.set_start_method("forkserver") + print("Set to forkserver") + else: + multiprocessing.set_start_method("spawn") + print("Set to spawn") + except AttributeError: + pass + p = multiprocessing.Pool(2) + for threads_avail, threads_used in p.map(omp_call, (1024, 1024)): + print("threads available, threads used", threads_avail, threads_used) + if threads_used == 1 and threads_avail >= 2: + raise ("Bad thread setting, should have 2") + if threads_used == 2: # from OMP_NUM_THREADS + pass + + +def test_threads_in_child_no_warn(): + import sys, os + + env = os.environ.copy() + env["OMP_NUM_THREADS"] = "2" + stdout, stderr = run_test_fun("threads_in_child_no_warn", env=env) + if stdout.find("Set to ") != -1: # we set to forkserver or spawn + if sys.version_info[0] > 2: + assert len(stderr) == 0 + else: + # python2.... + assert stderr.find("cImageD11: for multiprocessing use spawn") != -1 + + +if __name__ == "__main__": + import sys + + try: + result = globals()[sys.argv[1]]() + except Exception as e: + print(e) + raise