Skip to content

Commit

Permalink
Add switches --mpi and --no-mpi to openpmd-pipe (#1336)
Browse files Browse the repository at this point in the history
* Add switches --mpi and --no-mpi to openpmd-pipe

* Fix order of MPI checking
  • Loading branch information
franzpoeschel authored Dec 20, 2022
1 parent a167195 commit e0a74bd
Showing 1 changed file with 54 additions and 32 deletions.
86 changes: 54 additions & 32 deletions src/binding/python/openpmd_api/pipe/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,6 @@

from .. import openpmd_api_cxx as io

# MPI is an optional dependency
if io.variants['mpi']:
try:
from mpi4py import MPI
HAVE_MPI = True
except (ImportError, ModuleNotFoundError):
print("""
openPMD-api was built with support for MPI,
but mpi4py Python package was not found.
Will continue in serial mode.""",
file=sys.stderr)
HAVE_MPI = False
else:
HAVE_MPI = False

debug = False


class FallbackMPICommunicator:
def __init__(self):
self.size = 1
self.rank = 0


def parse_args(program_name):
parser = argparse.ArgumentParser(
Expand All @@ -51,8 +28,19 @@ def parse_args(program_name):
or multiplexing the data path in streaming setups.
Parallelization with MPI is optionally possible and is done automatically
as soon as the mpi4py package is found and this tool is called in an MPI
context. In that case, each dataset will be equally sliced along the dimension
with the largest extent.
context.
Parallelization with MPI is optionally possible and can be switched on with
the --mpi switch, resp. switched off with the --no-mpi switch.
By default, openpmd-pipe will use MPI if all of the following conditions
are fulfilled:
1) The mpi4py package can be imported.
2) The openPMD-api has been built with support for MPI.
3) The MPI size is greater than 1.
By default, the openPMD-api will be initialized without an MPI communicator
if the MPI size is 1. This is to simplify the use of the JSON backend
which is only available in serial openPMD.
With parallelization enabled, each dataset will be equally sliced along
the dimension with the largest extent.
Examples:
{0} --infile simData.h5 --outfile simData_%T.bp
Expand All @@ -72,10 +60,45 @@ def parse_args(program_name):
type=str,
default='{}',
help='JSON config for the out file')
# MPI, default: Import mpi4py if available and openPMD is parallel,
# but don't use if MPI size is 1 (this makes it easier to interact with
# JSON, since that backend is unavailable in parallel)
if io.variants['mpi']:
parser.add_argument('--mpi', action='store_true')
parser.add_argument('--no-mpi', dest='mpi', action='store_false')
parser.set_defaults(mpi=None)

return parser.parse_args()


args = parse_args(sys.argv[0])
# MPI is an optional dependency
if io.variants['mpi'] and (args.mpi is None or args.mpi):
try:
from mpi4py import MPI
HAVE_MPI = True
except (ImportError, ModuleNotFoundError):
if args.mpi:
raise
else:
print("""
openPMD-api was built with support for MPI,
but mpi4py Python package was not found.
Will continue in serial mode.""",
file=sys.stderr)
HAVE_MPI = False
else:
HAVE_MPI = False

debug = False


class FallbackMPICommunicator:
def __init__(self):
self.size = 1
self.rank = 0


class Chunk:
"""
A Chunk is an n-dimensional hypercube, defined by an offset and an extent.
Expand Down Expand Up @@ -178,7 +201,7 @@ def __init__(self, infile, outfile, inconfig, outconfig, comm):
self.comm = comm

def run(self):
if self.comm.size == 1:
if not HAVE_MPI or (args.mpi is None and self.comm.size == 1):
print("Opening data source")
sys.stdout.flush()
inseries = io.Series(self.infile, io.Access.read_only,
Expand Down Expand Up @@ -320,16 +343,15 @@ def __copy(self, src, dest, current_path="/data/"):


def main():
args = parse_args(sys.argv[0])
if not args.infile or not args.outfile:
print("Please specify parameters --infile and --outfile.")
sys.exit(1)
if (HAVE_MPI):
run_pipe = pipe(args.infile, args.outfile, args.inconfig,
args.outconfig, MPI.COMM_WORLD)
if HAVE_MPI:
communicator = MPI.COMM_WORLD
else:
run_pipe = pipe(args.infile, args.outfile, args.inconfig,
args.outconfig, FallbackMPICommunicator())
communicator = FallbackMPICommunicator()
run_pipe = pipe(args.infile, args.outfile, args.inconfig, args.outconfig,
communicator)

run_pipe.run()

Expand Down

0 comments on commit e0a74bd

Please sign in to comment.