From e0a74bd1df40499f2e719b286edb3a61d0106cce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 20 Dec 2022 11:55:29 +0100 Subject: [PATCH] Add switches --mpi and --no-mpi to openpmd-pipe (#1336) * Add switches --mpi and --no-mpi to openpmd-pipe * Fix order of MPI checking --- .../python/openpmd_api/pipe/__main__.py | 86 ++++++++++++------- 1 file changed, 54 insertions(+), 32 deletions(-) diff --git a/src/binding/python/openpmd_api/pipe/__main__.py b/src/binding/python/openpmd_api/pipe/__main__.py index f517ae6991..d7f0590567 100755 --- a/src/binding/python/openpmd_api/pipe/__main__.py +++ b/src/binding/python/openpmd_api/pipe/__main__.py @@ -14,29 +14,6 @@ from .. import openpmd_api_cxx as io -# MPI is an optional dependency -if io.variants['mpi']: - try: - from mpi4py import MPI - HAVE_MPI = True - except (ImportError, ModuleNotFoundError): - print(""" -openPMD-api was built with support for MPI, -but mpi4py Python package was not found. -Will continue in serial mode.""", - file=sys.stderr) - HAVE_MPI = False -else: - HAVE_MPI = False - -debug = False - - -class FallbackMPICommunicator: - def __init__(self): - self.size = 1 - self.rank = 0 - def parse_args(program_name): parser = argparse.ArgumentParser( @@ -51,8 +28,19 @@ def parse_args(program_name): or multiplexing the data path in streaming setups. Parallelization with MPI is optionally possible and is done automatically as soon as the mpi4py package is found and this tool is called in an MPI -context. In that case, each dataset will be equally sliced along the dimension -with the largest extent. +context. +Parallelization with MPI is optionally possible and can be switched on with +the --mpi switch, resp. switched off with the --no-mpi switch. +By default, openpmd-pipe will use MPI if all of the following conditions +are fulfilled: +1) The mpi4py package can be imported. +2) The openPMD-api has been built with support for MPI. +3) The MPI size is greater than 1. + By default, the openPMD-api will be initialized without an MPI communicator + if the MPI size is 1. This is to simplify the use of the JSON backend + which is only available in serial openPMD. +With parallelization enabled, each dataset will be equally sliced along +the dimension with the largest extent. Examples: {0} --infile simData.h5 --outfile simData_%T.bp @@ -72,10 +60,45 @@ def parse_args(program_name): type=str, default='{}', help='JSON config for the out file') + # MPI, default: Import mpi4py if available and openPMD is parallel, + # but don't use if MPI size is 1 (this makes it easier to interact with + # JSON, since that backend is unavailable in parallel) + if io.variants['mpi']: + parser.add_argument('--mpi', action='store_true') + parser.add_argument('--no-mpi', dest='mpi', action='store_false') + parser.set_defaults(mpi=None) return parser.parse_args() +args = parse_args(sys.argv[0]) +# MPI is an optional dependency +if io.variants['mpi'] and (args.mpi is None or args.mpi): + try: + from mpi4py import MPI + HAVE_MPI = True + except (ImportError, ModuleNotFoundError): + if args.mpi: + raise + else: + print(""" + openPMD-api was built with support for MPI, + but mpi4py Python package was not found. + Will continue in serial mode.""", + file=sys.stderr) + HAVE_MPI = False +else: + HAVE_MPI = False + +debug = False + + +class FallbackMPICommunicator: + def __init__(self): + self.size = 1 + self.rank = 0 + + class Chunk: """ A Chunk is an n-dimensional hypercube, defined by an offset and an extent. @@ -178,7 +201,7 @@ def __init__(self, infile, outfile, inconfig, outconfig, comm): self.comm = comm def run(self): - if self.comm.size == 1: + if not HAVE_MPI or (args.mpi is None and self.comm.size == 1): print("Opening data source") sys.stdout.flush() inseries = io.Series(self.infile, io.Access.read_only, @@ -320,16 +343,15 @@ def __copy(self, src, dest, current_path="/data/"): def main(): - args = parse_args(sys.argv[0]) if not args.infile or not args.outfile: print("Please specify parameters --infile and --outfile.") sys.exit(1) - if (HAVE_MPI): - run_pipe = pipe(args.infile, args.outfile, args.inconfig, - args.outconfig, MPI.COMM_WORLD) + if HAVE_MPI: + communicator = MPI.COMM_WORLD else: - run_pipe = pipe(args.infile, args.outfile, args.inconfig, - args.outconfig, FallbackMPICommunicator()) + communicator = FallbackMPICommunicator() + run_pipe = pipe(args.infile, args.outfile, args.inconfig, args.outconfig, + communicator) run_pipe.run()