Skip to content

Commit

Permalink
Merge pull request #300 from python-adaptive/loky-non-default
Browse files Browse the repository at this point in the history
make loky a default on Windows and MacOS but not on Linux
  • Loading branch information
basnijholt authored Mar 5, 2021
2 parents 75a3274 + 9e0dad9 commit ae2ddf4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 37 deletions.
33 changes: 20 additions & 13 deletions adaptive/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
import inspect
import itertools
import pickle
import platform
import time
import traceback
import warnings
from contextlib import suppress

import loky

from adaptive.notebook_integration import in_ipynb, live_info, live_plot

try:
Expand All @@ -33,22 +36,23 @@
except ModuleNotFoundError:
with_mpi4py = False

try:
import loky

with_loky = True
except ModuleNotFoundError:
with_loky = False

with suppress(ModuleNotFoundError):
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


_default_executor = (
loky.get_reusable_executor if with_loky else concurrent.ProcessPoolExecutor
)
if platform.system() == "Linux":
_default_executor = concurrent.ProcessPoolExecutor
else:
# On Windows and MacOS functions, the __main__ module must be
# importable by worker subprocesses. This means that
# ProcessPoolExecutor will not work in the interactive interpreter.
# On Linux the whole process is forked, so the issue does not appear.
# See https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor
# and https://github.com/python-adaptive/adaptive/issues/301
_default_executor = loky.get_reusable_executor


class BaseRunner(metaclass=abc.ABCMeta):
Expand All @@ -65,7 +69,8 @@ class BaseRunner(metaclass=abc.ABCMeta):
`mpi4py.futures.MPIPoolExecutor`, `ipyparallel.Client` or\
`loky.get_reusable_executor`, optional
The executor in which to evaluate the function to be learned.
If not provided, a new `~concurrent.futures.ProcessPoolExecutor`.
If not provided, a new `~concurrent.futures.ProcessPoolExecutor` on
Linux, and a `loky.get_reusable_executor` on MacOS and Windows.
ntasks : int, optional
The number of concurrent function evaluations. Defaults to the number
of cores available in `executor`.
Expand Down Expand Up @@ -317,7 +322,8 @@ class BlockingRunner(BaseRunner):
`mpi4py.futures.MPIPoolExecutor`, `ipyparallel.Client` or\
`loky.get_reusable_executor`, optional
The executor in which to evaluate the function to be learned.
If not provided, a new `~concurrent.futures.ProcessPoolExecutor`.
If not provided, a new `~concurrent.futures.ProcessPoolExecutor` on
Linux, and a `loky.get_reusable_executor` on MacOS and Windows.
ntasks : int, optional
The number of concurrent function evaluations. Defaults to the number
of cores available in `executor`.
Expand Down Expand Up @@ -433,7 +439,8 @@ class AsyncRunner(BaseRunner):
`mpi4py.futures.MPIPoolExecutor`, `ipyparallel.Client` or\
`loky.get_reusable_executor`, optional
The executor in which to evaluate the function to be learned.
If not provided, a new `~concurrent.futures.ProcessPoolExecutor`.
If not provided, a new `~concurrent.futures.ProcessPoolExecutor` on
Linux, and a `loky.get_reusable_executor` on MacOS and Windows.
ntasks : int, optional
The number of concurrent function evaluations. Defaults to the number
of cores available in `executor`.
Expand Down Expand Up @@ -814,7 +821,7 @@ def _get_ncores(ex):
ex, (concurrent.ProcessPoolExecutor, concurrent.ThreadPoolExecutor)
):
return ex._max_workers # not public API!
elif with_loky and isinstance(ex, loky.reusable_executor._ReusablePoolExecutor):
elif isinstance(ex, loky.reusable_executor._ReusablePoolExecutor):
return ex._max_workers # not public API!
elif isinstance(ex, SequentialExecutor):
return 1
Expand Down
46 changes: 24 additions & 22 deletions adaptive/tests/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
import os
import platform
import sys
import time

Expand All @@ -15,9 +15,10 @@
stop_after,
with_distributed,
with_ipyparallel,
with_loky,
)

OPERATING_SYSTEM = platform.system()


def blocking_runner(learner, goal):
BlockingRunner(learner, goal, executor=SequentialExecutor())
Expand Down Expand Up @@ -72,22 +73,6 @@ async def f(x):
# --- Test with different executors


@pytest.fixture(scope="session")
def ipyparallel_executor():
from ipyparallel import Client

if os.name == "nt":
import wexpect as expect
else:
import pexpect as expect

child = expect.spawn("ipcluster start -n 1")
child.expect("Engines appear to have started successfully", timeout=35)
yield Client()
if not child.terminate(force=True):
raise RuntimeError("Could not stop ipcluster")


@pytest.fixture(scope="session")
def loky_executor():
import loky
Expand Down Expand Up @@ -118,17 +103,35 @@ def test_stop_after_goal():


@pytest.mark.skipif(not with_ipyparallel, reason="IPyparallel is not installed")
def test_ipyparallel_executor(ipyparallel_executor):
@pytest.mark.skipif(
OPERATING_SYSTEM == "Windows" and sys.version_info >= (3, 7),
reason="Gets stuck in CI",
)
def test_ipyparallel_executor():
from ipyparallel import Client

if OPERATING_SYSTEM == "Windows":
import wexpect as expect
else:
import pexpect as expect

child = expect.spawn("ipcluster start -n 1")
child.expect("Engines appear to have started successfully", timeout=35)
ipyparallel_executor = Client()
learner = Learner1D(linear, (-1, 1))
BlockingRunner(learner, trivial_goal, executor=ipyparallel_executor)

assert learner.npoints > 0

if not child.terminate(force=True):
raise RuntimeError("Could not stop ipcluster")


@flaky.flaky(max_runs=5)
@pytest.mark.timeout(60)
@pytest.mark.skipif(not with_distributed, reason="dask.distributed is not installed")
@pytest.mark.skipif(os.name == "nt", reason="XXX: seems to always fail")
@pytest.mark.skipif(sys.platform == "darwin", reason="XXX: intermittently fails")
@pytest.mark.skipif(OPERATING_SYSTEM == "Windows", reason="XXX: seems to always fail")
@pytest.mark.skipif(OPERATING_SYSTEM == "Darwin", reason="XXX: intermittently fails")
def test_distributed_executor():
from distributed import Client

Expand All @@ -139,7 +142,6 @@ def test_distributed_executor():
assert learner.npoints > 0


@pytest.mark.skipif(not with_loky, reason="loky not installed")
def test_loky_executor(loky_executor):
learner = Learner1D(lambda x: x, (-1, 1))
BlockingRunner(
Expand Down
2 changes: 1 addition & 1 deletion docs/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ dependencies:
- atomicwrites=1.4.0
- sphinx_fontawesome=0.0.6
- sphinx=3.2.1
- m2r2=0.2.5
- m2r2=0.2.7
- pip:
- sphinx_rtd_theme
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def get_version_and_cmdclass(package_name):
"sortedcontainers >= 2.0",
"atomicwrites",
"cloudpickle",
"loky >= 2.9",
]

extras_require = {
Expand All @@ -55,7 +56,6 @@ def get_version_and_cmdclass(package_name):
"dill",
"distributed",
"ipyparallel>=6.2.5", # because of https://github.com/ipython/ipyparallel/issues/404
"loky",
"scikit-optimize>=0.8.1", # because of https://github.com/scikit-optimize/scikit-optimize/issues/931
"wexpect" if os.name == "nt" else "pexpect",
],
Expand Down

0 comments on commit ae2ddf4

Please sign in to comment.