Skip to content

Commit

Permalink
Merge pull request #220 from jonwright/master
Browse files Browse the repository at this point in the history
repair the multiprocessing thing?
  • Loading branch information
jonwright authored Feb 13, 2024
2 parents 2a1be85 + b093d2d commit e1956e4
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 80 deletions.
93 changes: 62 additions & 31 deletions ImageD11/cImageD11.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,88 @@

# Check for the use of openmp interactions with os.fork and multiprocessing

def check_multiprocessing():

def check_multiprocessing(patch=False):
"""
I tried to suppress the "fork" method for multiprocessing, but I could not.
You cannot safely use os.fork together with threads.
But the cImageD11 codes uses threads via openmp, and you are importing them.
So please use forkserver or spawn for multiprocessing.
https://discuss.python.org/t/concerns-regarding-deprecation-of-fork-with-alive-threads/33555
https://github.com/FABLE-3DXRD/ImageD11/issues/177
> Developers should respond by adjusting their use of multiprocessing or concurrent.futures
> to explicitly specify either the "forkserver" or "spawn" start methods via a context.
"""
# We only do this if os.fork exists
if not hasattr(os, "fork"):
return
import multiprocessing
# Problem cases are:
# child processes -> we will set num threads to 1
parent = None
if hasattr(multiprocessing,"parent_process"):
parent = multiprocessing.parent_process()
# only for python 3.8 and up
if (parent is not None) and ('OMP_NUM_THREADS' not in os.environ):
cimaged11_omp_set_num_threads( 1 )
# people wanting Nprocs * Mthreads need to reset after import
# or use OMP_NUM_THREADS as usual

#
# now check for the fork issue
if not hasattr(multiprocessing, 'get_start_method'):
# You are on python2.7. Give up.
if not hasattr(multiprocessing, "get_start_method"):
# You are on python2.7. Give up?
warnings.warn(
"python2.7 with ImageD11.cImageD11: for multiprocessing use spawn\n"
)
return
# This has a side effect of fixing the start method if allow_none is not True
#
# This has a side effect of fixing the start method if allow_none was not True
method = multiprocessing.get_start_method(allow_none=True)
if method is None:
# we can change it away from fork
if method == "fork":
warnings.warn(check_multiprocessing.__doc__)
# Problem cases are:
# child processes -> oversubscribe -> we will set num threads to 1
# your numpy (etc) are also going to be a problem if this was needed
parent = None
if hasattr(multiprocessing, "parent_process"):
parent = multiprocessing.parent_process()
# only for python 3.8 and up
if parent is not None:
if "OMP_NUM_THREADS" not in os.environ:
# people wanting Nprocs * Mthreads may need to reset after import
# or use OMP_NUM_THREADS as usual
cimaged11_omp_set_num_threads(1)
if method in ("fork", None):
# warn if we are a child process
warnings.warn(check_multiprocessing.__doc__)
if method is None and patch:
# we should change it away from fork
# this is going to break code which relied on fork
# (... but that was already broken)
# needs one of those "warn people for a year or two and then break it"
# messages. Perhaps wait on python3.14 when multiprocessing will get
# fixed and break everyones code at that time?
poss = multiprocessing.get_all_start_methods()
if 'forkserver' in poss:
multiprocessing.set_start_method('forkserver')
elif 'spawn' in poss:
multiprocessing.set_start_method('spawn')
if "forkserver" in poss:
multiprocessing.set_start_method("forkserver")
elif "spawn" in poss:
multiprocessing.set_start_method("spawn")
else:
raise Exception('Could not set to forkserver or spawn')
else:
# Tell them about the problem
if method == 'fork':
warnings.warn(check_multiprocessing.__doc__)
raise Exception("Could not set to forkserver or spawn")
# It is doubtful that this function can ever please everyone.


def cores_available():
""" Return the number of CPU cores you can use """
return len( os.sched_getaffinity( os.getpid() ) )
"""
Return the number of CPU cores you can use
First choice = SLURM_CPUS_PER_TASK (ESRF specific)
Second choice = os.sched_getaffinity
Third choice = os.cpu_count (may be high)
Guess at 1 when os.cpu_count fails.
"""
if "SLURM_CPUS_PER_TASK" in os.environ:
return int(os.environ["SLURM_CPUS_PER_TASK"])
if hasattr(os, "sched_getaffinity"):
return len(os.sched_getaffinity(os.getpid()))
ncpu = os.cpu_count()
if ncpu is None:
return 1
else:
return ncpu


if cimaged11_omp_get_max_threads() == 0:
# The code was compiled without openmp
Expand All @@ -81,7 +112,7 @@ def cores_available():
OPENMP = True
check_multiprocessing()


# For 32 or 64 bits
nbyte = struct.calcsize("P") # 4 or 8

Expand Down
80 changes: 42 additions & 38 deletions ImageD11/sinograms/lima_segmenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,36 @@
""" Do segmentation of lima/eiger files with no notion of metadata
Blocking will come via lima saving, so about 1000 frames per file
Make the code parallel over lima files ... no interprocess communication intended
Make the code parallel over lima files ...
minimal interprocess communication intended
- read an input file which has the source/destination file names for this job
"""


import sys, time, os, math, logging

import sys
import time
import os
import math
import logging
import numpy as np
import h5py
import hdf5plugin
import fabio
import numba

from ImageD11 import sparseframe
from ImageD11.sinograms import dataset

try:
from bslz4_to_sparse import chunk2sparse
except ImportError:
chunk2sparse = None

# Code to clean the 2D image and reduce it to a sparse array:
# things we might edit
class SegmenterOptions:

# These are the stuff that belong to us in the hdf5 file (in our group: lima_segmenter)
# These are the stuff that belong to us in the hdf5 file
# (in our group: lima_segmenter)
jobnames = (
"cut",
"howmany",
Expand Down Expand Up @@ -65,11 +80,9 @@ def setup(self):
# The mask must have:
# 0 == active pixel
# 1 == masked pixel
m = fabio.open(self.maskfile).data
self.mask = 1 - fabio.open(self.maskfile).data.astype(np.uint8)
assert self.mask.min() < 2
assert self.mask.max() >= 0
avg = self.mask.mean()
print(
"# Opened mask",
self.maskfile,
Expand All @@ -87,7 +100,8 @@ def load(self, h5name, h5group):
if name in grp.attrs:
setattr(self, name, grp.attrs.get(name))
for name in self.datasetnames:
# datasetnames = ( 'limapath', 'analysispath', 'datapath', 'imagefiles', 'sparsefiles' )
# datasetnames = ( 'limapath', 'analysispath', 'datapath',
# 'imagefiles', 'sparsefiles' )
if name in pgrp.attrs:
data = pgrp.attrs.get(name)
setattr(self, name, data)
Expand Down Expand Up @@ -118,24 +132,6 @@ def save(self, h5name, h5group):
grp.attrs[name] = value


########################## should not need to change much below here


import numpy as np
import hdf5plugin
import h5py
import fabio
import numba

# pip install ImageD11 --no-deps # if you do not have it yet:
from ImageD11 import sparseframe

try:
from bslz4_to_sparse import chunk2sparse
except ImportError:
chunk2sparse = None


@numba.njit
def select(img, mask, row, col, val, cut):
# TODO: This is in now cImageD11.tosparse_{u16|f32}
Expand Down Expand Up @@ -329,7 +325,7 @@ def segment_lima(args):
npx += spf.nnz
g.attrs["npx"] = npx
end = time.time()
print("\n# Done", nframes, "frames", npx, "pixels", "fps", nframes / (end - start))
print("\n# Done", nframes, "frames", npx, "pixels fps", nframes / (end - start))
return destname

# the output file should be flushed and closed when this returns
Expand All @@ -338,9 +334,19 @@ def segment_lima(args):
OPTIONS = None


def main(options):
def initOptions(h5name, jobid):
global OPTIONS
OPTIONS = SegmenterOptions()
OPTIONS.load(h5name, "lima_segmenter")
OPTIONS.jobid = jobid


def main(h5name, jobid):
global OPTIONS
OPTIONS = options
initOptions(h5name, jobid)
options = OPTIONS
assert options is not None
assert OPTIONS is not None
args = []
files_per_job = options.cores_per_job * options.files_per_core # 64 files per job
start = options.jobid * files_per_job
Expand All @@ -354,10 +360,12 @@ def main(options):
)
)
if 1:
import concurrent.futures
import multiprocessing

with concurrent.futures.ProcessPoolExecutor(
max_workers=options.cores_per_job
with multiprocessing.Pool(
max_workers=options.cores_per_job,
initializer=initOptions,
initargs=(h5name, jobid),
) as mypool:
donefile = sys.stdout
for fname in mypool.map(segment_lima, args, chunksize=1):
Expand Down Expand Up @@ -442,12 +450,8 @@ def segment():
# everything is passing via this file.
#
h5name = sys.argv[2]

# This assumes forking. To be investigated otherwise.
options = SegmenterOptions()
options.load(h5name, "lima_segmenter")
options.jobid = int(sys.argv[3])
main(options)
jobid = int(sys.argv[3])
main(h5name, jobid)


if __name__ == "__main__":
Expand Down
5 changes: 2 additions & 3 deletions ImageD11/sinograms/point_by_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
# Try to build the point-by-point mapping code ...
from __future__ import print_function, division

from ImageD11 import cImageD11 # patches to forkserver
import multiprocessing
method = multiprocessing.set_start_method("forkserver")
from ImageD11 import cImageD11 # NO!!! patches to forkserver

method = multiprocessing.get_start_method()
assert method in ("spawn", "forkserver"), "mp:" + method
import sys, os
from multiprocessing.managers import SharedMemoryManager
import time, random
Expand Down
17 changes: 14 additions & 3 deletions ImageD11/sparseframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,15 @@ def __init__( self, hname, scan, start = 0, n=None,
break
# read the pixels - all pointers
nnz = grp['nnz'][:]
ipt = np.concatenate( ( (0,) , np.cumsum(nnz, dtype=int) ) )
ipt = nnz_to_pointer( nnz )
s = ipt[start]
e = ipt[end]
for name in self.names:
if name in grp:
setattr( self, name, grp[name][s:e] )
# pointers into this scan
self.nnz = nnz[start:end]
self.ipt = np.concatenate( ( (0,) , np.cumsum(self.nnz, dtype=int) ) )
self.ipt = nnz_to_pointer( self.nnz )

def getframe(self, i, SAFE=SAFE):
# (self, row, col, shape, itype=np.uint16, pixels=None):
Expand Down Expand Up @@ -617,7 +617,18 @@ def sparse_smooth( frame, data_name='intensity' ):
return smoothed



def nnz_to_pointer( nnz, out = None ):
"""
nnz = number of pixels per frame
pointer = position in a single flat array
"""
if out is None:
out = np.empty( len(nnz)+1, int )
else:
assert len(out) == len(nnz)+1
out[0] = 0
np.cumsum( nnz, out=out[1:] )
return out



Expand Down
11 changes: 6 additions & 5 deletions ImageD11/sym_u.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,18 +364,19 @@ def getgroup(s):
convert a user supplied string to a group
... a little vague still
"""
if s in ['cubic', 'hexagonal','trigonal','tetragonal',
groups =['cubic', 'hexagonal','trigonal','tetragonal',
'orthorhombic','monoclinic_c','monoclinic_a',
'monoclinic_b','triclinic','rhombohedralP']:
import ImageD11.sym_u
return getattr(ImageD11.sym_u, s)
'monoclinic_b','triclinic','rhombohedralP']
if s in groups:
return globals()[s]
else:
raise Exception( s,"not in", groups )



if __name__=="__main__":
test()


u = np.array([[ 0.71850787 , 0.69517833, 0.02176059],
[-0.62925889 , 0.66306714, -0.40543213],
[-0.29627636 , 0.27761313 , 0.91386611]])
Expand Down
Loading

0 comments on commit e1956e4

Please sign in to comment.