diff --git a/adaptive_scheduler/__init__.py b/adaptive_scheduler/__init__.py index 50b7b0ab..4ab0cac0 100644 --- a/adaptive_scheduler/__init__.py +++ b/adaptive_scheduler/__init__.py @@ -1,7 +1,7 @@ from adaptive_scheduler import client_support, scheduler, server_support, utils from adaptive_scheduler._version import __version__ from adaptive_scheduler.scheduler import PBS, SLURM -from adaptive_scheduler.server_support import RunManager +from adaptive_scheduler.server_support import RunManager, slurm_run __all__ = [ "client_support", @@ -11,5 +11,6 @@ "server_support", "SLURM", "utils", + "slurm_run", "__version__", ] diff --git a/adaptive_scheduler/scheduler.py b/adaptive_scheduler/scheduler.py index d153e533..495d3280 100644 --- a/adaptive_scheduler/scheduler.py +++ b/adaptive_scheduler/scheduler.py @@ -13,7 +13,8 @@ import time import warnings from distutils.spawn import find_executable -from functools import cached_property +from functools import cached_property, lru_cache +from typing import Literal import adaptive_scheduler._mock_scheduler from adaptive_scheduler.utils import _progress, _RequireAttrsABCMeta @@ -81,7 +82,7 @@ def __init__( python_executable, log_folder, mpiexec_executable, - executor_type, + executor_type: Literal["ipyparallel", "dask-mpi", "mpi4py", "process-pool"], num_threads, extra_scheduler, extra_env_vars, @@ -304,7 +305,9 @@ def __init__( python_executable=None, log_folder="", mpiexec_executable=None, - executor_type="mpi4py", + executor_type: Literal[ + "ipyparallel", "dask-mpi", "mpi4py", "process-pool" + ] = "mpi4py", num_threads=1, extra_scheduler=None, extra_env_vars=None, @@ -580,7 +583,9 @@ def __init__( python_executable=None, log_folder="", mpiexec_executable=None, - executor_type="mpi4py", + executor_type: Literal[ + "ipyparallel", "dask-mpi", "mpi4py", "process-pool" + ] = "mpi4py", num_threads=1, extra_scheduler=None, extra_env_vars=None, @@ -940,6 +945,7 @@ def _get_ncores(partition): return int(numbers[0]) +@lru_cache(maxsize=1) def slurm_partitions( timeout: int = 5, with_ncores: bool = True ) -> list[str] | dict[str, int]: diff --git a/adaptive_scheduler/server_support.py b/adaptive_scheduler/server_support.py index c36c123b..3027d907 100644 --- a/adaptive_scheduler/server_support.py +++ b/adaptive_scheduler/server_support.py @@ -28,10 +28,11 @@ import zmq.ssh from tinydb import Query, TinyDB -from adaptive_scheduler.scheduler import BaseScheduler +from adaptive_scheduler.scheduler import SLURM, BaseScheduler, slurm_partitions from adaptive_scheduler.utils import ( _DATAFRAME_FORMATS, _deserialize, + _get_default_args, _progress, _remove_or_move_files, _require_adaptive, @@ -873,7 +874,7 @@ def __init__( "loky", "loky_int_main", "spawn", "fork", "forkserver" ] = "loky", cleanup_first: bool = False, - save_dataframe: bool = True, + save_dataframe: bool = False, # TODO: use _DATAFRAME_FORMATS instead of literal in ≥Python 3.10 dataframe_format: Literal[ "parquet", "csv", "hdf", "pickle", "feather", "excel", "json" @@ -1163,3 +1164,72 @@ async def clean(interval): ioloop = asyncio.get_event_loop() coro = clean(interval) return ioloop.create_task(coro) + + +def slurm_run( + learners: list[adaptive.BaseLearner], + fnames: list[str], + partition: str, + nodes: int = 1, + cores_per_node: int | None = None, + goal: Callable[[adaptive.BaseLearner], bool] + | int + | float + | datetime.timedelta + | datetime.datetime + | None = None, + folder: str | Path = "", + name: str = "adaptive", + num_threads: int = 1, + save_interval: int = 300, + log_interval: int = 300, + cleanup_first: bool = True, + save_dataframe: bool = True, + dataframe_format: Literal[ + "parquet", "csv", "hdf", "pickle", "feather", "excel", "json" + ] = "parquet", + max_fails_per_job: int = 50, + max_simultaneous_jobs: int = 500, + executor_type: Literal[ + "ipyparallel", "dask-mpi", "mpi4py", "process-pool" + ] = "process-pool", + extra_run_manager_kwargs: dict[str, Any] | None = None, + extra_scheduler_kwargs: dict[str, Any] | None = None, +): + folder = Path(folder) + folder.mkdir(parents=True, exist_ok=True) + if cores_per_node is None: + cores_per_node = slurm_partitions()[partition] + kw = dict( + _get_default_args(SLURM), + nodes=nodes, + cores_per_node=cores_per_node, + partition=partition, + run_script=folder / f"{name}_adaptive_scheduler.py", + log_folder=folder / "logs", + executor_type=executor_type, + num_threads=num_threads, + ) + if extra_scheduler_kwargs is None: + extra_scheduler_kwargs = {} + scheduler = SLURM(**dict(kw, **extra_scheduler_kwargs)) + kw = dict( + _get_default_args(RunManager), + scheduler=scheduler, + learners=learners, + fnames=fnames, + goal=goal, + save_interval=save_interval, + log_interval=log_interval, + move_old_logs_to=folder / "old_logs", + db_fname=f"{name}.db.json", + job_name=name, + cleanup_first=cleanup_first, + save_dataframe=save_dataframe, + dataframe_format=dataframe_format, + max_fails_per_job=max_fails_per_job, + max_simultaneous_jobs=max_simultaneous_jobs, + ) + if extra_run_manager_kwargs is None: + extra_run_manager_kwargs = {} + return RunManager(**dict(kw, **extra_run_manager_kwargs)) diff --git a/adaptive_scheduler/utils.py b/adaptive_scheduler/utils.py index df716ae3..00b6dbf7 100644 --- a/adaptive_scheduler/utils.py +++ b/adaptive_scheduler/utils.py @@ -888,6 +888,7 @@ def smart_goal( raise TypeError("Multiple learner types found.") if isinstance(learners[0], adaptive.SequenceLearner): return adaptive.SequenceLearner.done + warnings.warn("Goal is None which means the learners continue forever!") return lambda _: False else: raise ValueError("goal must be `callable | int | float | None`") diff --git a/example.ipynb b/example.ipynb index 01bbe9b4..d29a7f43 100644 --- a/example.ipynb +++ b/example.ipynb @@ -84,18 +84,36 @@ "source": [ "import adaptive_scheduler\n", "\n", - "\n", - "def goal(learner):\n", - " return learner.npoints > 200\n", - "\n", - "\n", + "run_manager = adaptive_scheduler.slurm_run(learners, fnames, partition=\"hb120rsv2-low\", goal=0.01)\n", + "run_manager.start()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Explicit use" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Or be explicit and use:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ "name = \"example\"\n", "scheduler = adaptive_scheduler.scheduler.SLURM(\n", - " num_threads=1,\n", - " cores_per_node=20,\n", + " cores_per_node=2,\n", " nodes=1,\n", " partition=\"hb120rsv2-low\",\n", - " executor_type=\"process-pool\",\n", + " executor_type=\"ipyparallel\",\n", " run_script=f\"{name}-run.py\",\n", " log_folder=\"logs\",\n", ")\n", @@ -103,7 +121,7 @@ " learners=learners,\n", " fnames=fnames,\n", " scheduler=scheduler,\n", - " goal=goal,\n", + " goal=0.01,\n", " job_name=f\"{name}\",\n", " max_fails_per_job=5,\n", " max_simultaneous_jobs=50,\n", @@ -111,9 +129,15 @@ " log_interval=30,\n", " save_interval=30,\n", " save_dataframe=True,\n", - ")\n", - "\n", - "run_manager.start()" + " cleanup_first=False,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Queue" ] }, {