Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add slurm_run function #79

Merged
merged 5 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion adaptive_scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from adaptive_scheduler import client_support, scheduler, server_support, utils
from adaptive_scheduler._version import __version__
from adaptive_scheduler.scheduler import PBS, SLURM
from adaptive_scheduler.server_support import RunManager
from adaptive_scheduler.server_support import RunManager, slurm_run

__all__ = [
"client_support",
Expand All @@ -11,5 +11,6 @@
"server_support",
"SLURM",
"utils",
"slurm_run",
"__version__",
]
14 changes: 10 additions & 4 deletions adaptive_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import time
import warnings
from distutils.spawn import find_executable
from functools import cached_property
from functools import cached_property, lru_cache
from typing import Literal

import adaptive_scheduler._mock_scheduler
from adaptive_scheduler.utils import _progress, _RequireAttrsABCMeta
Expand Down Expand Up @@ -81,7 +82,7 @@ def __init__(
python_executable,
log_folder,
mpiexec_executable,
executor_type,
executor_type: Literal["ipyparallel", "dask-mpi", "mpi4py", "process-pool"],
num_threads,
extra_scheduler,
extra_env_vars,
Expand Down Expand Up @@ -304,7 +305,9 @@ def __init__(
python_executable=None,
log_folder="",
mpiexec_executable=None,
executor_type="mpi4py",
executor_type: Literal[
"ipyparallel", "dask-mpi", "mpi4py", "process-pool"
] = "mpi4py",
num_threads=1,
extra_scheduler=None,
extra_env_vars=None,
Expand Down Expand Up @@ -580,7 +583,9 @@ def __init__(
python_executable=None,
log_folder="",
mpiexec_executable=None,
executor_type="mpi4py",
executor_type: Literal[
"ipyparallel", "dask-mpi", "mpi4py", "process-pool"
] = "mpi4py",
num_threads=1,
extra_scheduler=None,
extra_env_vars=None,
Expand Down Expand Up @@ -940,6 +945,7 @@ def _get_ncores(partition):
return int(numbers[0])


@lru_cache(maxsize=1)
def slurm_partitions(
timeout: int = 5, with_ncores: bool = True
) -> list[str] | dict[str, int]:
Expand Down
74 changes: 72 additions & 2 deletions adaptive_scheduler/server_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
import zmq.ssh
from tinydb import Query, TinyDB

from adaptive_scheduler.scheduler import BaseScheduler
from adaptive_scheduler.scheduler import SLURM, BaseScheduler, slurm_partitions
from adaptive_scheduler.utils import (
_DATAFRAME_FORMATS,
_deserialize,
_get_default_args,
_progress,
_remove_or_move_files,
_require_adaptive,
Expand Down Expand Up @@ -873,7 +874,7 @@ def __init__(
"loky", "loky_int_main", "spawn", "fork", "forkserver"
] = "loky",
cleanup_first: bool = False,
save_dataframe: bool = True,
save_dataframe: bool = False,
# TODO: use _DATAFRAME_FORMATS instead of literal in ≥Python 3.10
dataframe_format: Literal[
"parquet", "csv", "hdf", "pickle", "feather", "excel", "json"
Expand Down Expand Up @@ -1163,3 +1164,72 @@ async def clean(interval):
ioloop = asyncio.get_event_loop()
coro = clean(interval)
return ioloop.create_task(coro)


def slurm_run(
learners: list[adaptive.BaseLearner],
fnames: list[str],
partition: str,
nodes: int = 1,
cores_per_node: int | None = None,
goal: Callable[[adaptive.BaseLearner], bool]
| int
| float
| datetime.timedelta
| datetime.datetime
| None = None,
folder: str | Path = "",
name: str = "adaptive",
num_threads: int = 1,
save_interval: int = 300,
log_interval: int = 300,
cleanup_first: bool = True,
save_dataframe: bool = True,
dataframe_format: Literal[
"parquet", "csv", "hdf", "pickle", "feather", "excel", "json"
] = "parquet",
max_fails_per_job: int = 50,
max_simultaneous_jobs: int = 500,
executor_type: Literal[
"ipyparallel", "dask-mpi", "mpi4py", "process-pool"
] = "process-pool",
extra_run_manager_kwargs: dict[str, Any] | None = None,
extra_scheduler_kwargs: dict[str, Any] | None = None,
):
folder = Path(folder)
folder.mkdir(parents=True, exist_ok=True)
if cores_per_node is None:
cores_per_node = slurm_partitions()[partition]
kw = dict(
_get_default_args(SLURM),
nodes=nodes,
cores_per_node=cores_per_node,
partition=partition,
run_script=folder / f"{name}_adaptive_scheduler.py",
log_folder=folder / "logs",
executor_type=executor_type,
num_threads=num_threads,
)
if extra_scheduler_kwargs is None:
extra_scheduler_kwargs = {}
scheduler = SLURM(**dict(kw, **extra_scheduler_kwargs))
kw = dict(
_get_default_args(RunManager),
scheduler=scheduler,
learners=learners,
fnames=fnames,
goal=goal,
save_interval=save_interval,
log_interval=log_interval,
move_old_logs_to=folder / "old_logs",
db_fname=f"{name}.db.json",
job_name=name,
cleanup_first=cleanup_first,
save_dataframe=save_dataframe,
dataframe_format=dataframe_format,
max_fails_per_job=max_fails_per_job,
max_simultaneous_jobs=max_simultaneous_jobs,
)
if extra_run_manager_kwargs is None:
extra_run_manager_kwargs = {}
return RunManager(**dict(kw, **extra_run_manager_kwargs))
1 change: 1 addition & 0 deletions adaptive_scheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,7 @@ def smart_goal(
raise TypeError("Multiple learner types found.")
if isinstance(learners[0], adaptive.SequenceLearner):
return adaptive.SequenceLearner.done
warnings.warn("Goal is None which means the learners continue forever!")
return lambda _: False
else:
raise ValueError("goal must be `callable | int | float | None`")
48 changes: 36 additions & 12 deletions example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -84,36 +84,60 @@
"source": [
"import adaptive_scheduler\n",
"\n",
"\n",
"def goal(learner):\n",
" return learner.npoints > 200\n",
"\n",
"\n",
"run_manager = adaptive_scheduler.slurm_run(learners, fnames, partition=\"hb120rsv2-low\", goal=0.01)\n",
"run_manager.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Explicit use"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Or be explicit and use:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"name = \"example\"\n",
"scheduler = adaptive_scheduler.scheduler.SLURM(\n",
" num_threads=1,\n",
" cores_per_node=20,\n",
" cores_per_node=2,\n",
" nodes=1,\n",
" partition=\"hb120rsv2-low\",\n",
" executor_type=\"process-pool\",\n",
" executor_type=\"ipyparallel\",\n",
" run_script=f\"{name}-run.py\",\n",
" log_folder=\"logs\",\n",
")\n",
"run_manager = adaptive_scheduler.RunManager(\n",
" learners=learners,\n",
" fnames=fnames,\n",
" scheduler=scheduler,\n",
" goal=goal,\n",
" goal=0.01,\n",
" job_name=f\"{name}\",\n",
" max_fails_per_job=5,\n",
" max_simultaneous_jobs=50,\n",
" db_fname=f\"{name}-database.json\",\n",
" log_interval=30,\n",
" save_interval=30,\n",
" save_dataframe=True,\n",
")\n",
"\n",
"run_manager.start()"
" cleanup_first=False,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Queue"
]
},
{
Expand Down