Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pathos for multiprocessing #7002

Closed
wants to merge 10 commits into from
Closed
16 changes: 4 additions & 12 deletions qiskit/tools/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
"""

import os
from concurrent.futures import ProcessPoolExecutor
import sys
from pathos.pools import ProcessPool

from qiskit.exceptions import QiskitError
from qiskit.utils.multiprocessing import local_hardware_info
Expand All @@ -68,12 +68,6 @@
# On python 3.9 default false to avoid deadlock issues
elif sys.version_info[0] == 3 and sys.version_info[1] == 9:
PARALLEL_DEFAULT = False
# On macOS default false on Python >=3.8
elif sys.platform == "darwin":
if sys.version_info[0] == 3 and sys.version_info[1] >= 8:
PARALLEL_DEFAULT = False
else:
PARALLEL_DEFAULT = True
# On linux (and other OSes) default to True
else:
PARALLEL_DEFAULT = True
Expand Down Expand Up @@ -146,12 +140,10 @@ def _callback(_):
):
os.environ["QISKIT_IN_PARALLEL"] = "TRUE"
try:
results = []
with ProcessPoolExecutor(max_workers=num_processes) as executor:
param = map(lambda value: (task, value, task_args, task_kwargs), values)
future = executor.map(_task_wrapper, param)
pool = ProcessPool(nodes=num_processes)
param = map(lambda value: (task, value, task_args, task_kwargs), values)
results = pool.map(_task_wrapper, param)

results = list(future)
Publisher().publish("terra.parallel.done", len(results))

except (KeyboardInterrupt, Exception) as error:
Expand Down
20 changes: 4 additions & 16 deletions qiskit/utils/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
# that they have been altered from the originals.

"""Multiprocessing utilities"""

import os
import multiprocessing as mp
import platform
import sys

import psutil

MAIN_PROCESS = os.getpid()


def local_hardware_info():
"""Basic hardware information about the local machine.
Expand All @@ -38,17 +39,4 @@ def local_hardware_info():

def is_main_process():
"""Checks whether the current process is the main one"""
if platform.system() == "Windows":
return not isinstance(mp.current_process(), mp.context.SpawnProcess)
else:
return not (
isinstance(mp.current_process(), (mp.context.ForkProcess, mp.context.SpawnProcess))
# In python 3.5 and 3.6, processes created by "ProcessPoolExecutor" are not
# mp.context.ForkProcess or mp.context.SpawnProcess. As a workaround,
# "name" of the process is checked instead.
or (
sys.version_info[0] == 3
and (sys.version_info[1] == 5 or sys.version_info[1] == 6)
and mp.current_process().name != "MainProcess"
)
)
return mp.current_process().pid == MAIN_PROCESS
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ psutil>=5
scipy>=1.4
sympy>=1.3
dill>=0.3
pathos
python-constraint>=1.4
python-dateutil>=2.8.0
symengine>0.7 ; platform_machine == 'x86_64' or platform_machine == 'aarch64' or platform_machine == 'ppc64le' or platform_machine == 'amd64' or platform_machine == 'arm64'
Expand Down