Skip to content

Commit

Permalink
[feature] Add option to specify number of nodes (#565)
Browse files Browse the repository at this point in the history
* [feature] Add option to specify number of nodes

* extend test

* Add DocStrings
  • Loading branch information
jan-janssen authored Feb 11, 2025
1 parent ca707f9 commit 0ffd312
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 17 deletions.
14 changes: 10 additions & 4 deletions executorlib/interactive/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class FluxPythonSpawner(BaseSpawner):
cores (int, optional): The number of cores. Defaults to 1.
threads_per_core (int, optional): The number of threads per base. Defaults to 1.
gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False.
openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
Expand All @@ -42,6 +44,8 @@ def __init__(
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
exclusive: bool = False,
openmpi_oversubscribe: bool = False,
flux_executor: Optional[flux.job.FluxExecutor] = None,
flux_executor_pmi_mode: Optional[str] = None,
Expand All @@ -55,6 +59,8 @@ def __init__(
)
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._num_nodes = num_nodes
self._exclusive = exclusive
self._flux_executor = flux_executor
self._flux_executor_pmi_mode = flux_executor_pmi_mode
self._flux_executor_nesting = flux_executor_nesting
Expand Down Expand Up @@ -85,17 +91,17 @@ def bootup(
num_tasks=self._cores,
cores_per_task=self._threads_per_core,
gpus_per_task=self._gpus_per_core,
num_nodes=None,
exclusive=False,
num_nodes=self._num_nodes,
exclusive=self._exclusive,
)
else:
jobspec = flux.job.JobspecV1.from_nest_command(
command=command_lst,
num_slots=self._cores,
cores_per_slot=self._threads_per_core,
gpus_per_slot=self._gpus_per_core,
num_nodes=None,
exclusive=False,
num_nodes=self._num_nodes,
exclusive=self._exclusive,
)
jobspec.environment = dict(os.environ)
if self._flux_executor_pmi_mode is not None:
Expand Down
16 changes: 16 additions & 0 deletions executorlib/interactive/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def __init__(
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
exclusive: bool = False,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
):
Expand All @@ -38,6 +40,8 @@ def __init__(
cores (int, optional): The number of cores to use. Defaults to 1.
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False.
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
"""
Expand All @@ -49,6 +53,8 @@ def __init__(
)
self._gpus_per_core = gpus_per_core
self._slurm_cmd_args = slurm_cmd_args
self._num_nodes = num_nodes
self._exclusive = exclusive

def generate_command(self, command_lst: list[str]) -> list[str]:
"""
Expand All @@ -65,6 +71,8 @@ def generate_command(self, command_lst: list[str]) -> list[str]:
cwd=self._cwd,
threads_per_core=self._threads_per_core,
gpus_per_core=self._gpus_per_core,
num_nodes=self._num_nodes,
exclusive=self._exclusive,
openmpi_oversubscribe=self._openmpi_oversubscribe,
slurm_cmd_args=self._slurm_cmd_args,
)
Expand All @@ -78,6 +86,8 @@ def generate_slurm_command(
cwd: Optional[str],
threads_per_core: int = 1,
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
exclusive: bool = False,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
) -> list[str]:
Expand All @@ -89,6 +99,8 @@ def generate_slurm_command(
cwd (str): The current working directory.
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False.
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
Expand All @@ -98,10 +110,14 @@ def generate_slurm_command(
command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)]
if cwd is not None:
command_prepend_lst += ["-D", cwd]
if num_nodes is not None:
command_prepend_lst += ["-N", str(num_nodes)]
if threads_per_core > 1:
command_prepend_lst += ["--cpus-per-task=" + str(threads_per_core)]
if gpus_per_core > 0:
command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)]
if exclusive:
command_prepend_lst += ["--exact"]
if openmpi_oversubscribe:
command_prepend_lst += ["--oversubscribe"]
if slurm_cmd_args is not None and len(slurm_cmd_args) > 0:
Expand Down
23 changes: 12 additions & 11 deletions executorlib/interfaces/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ class FluxJobExecutor:
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down Expand Up @@ -147,10 +148,10 @@ def __new__(
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
and SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
only)
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down Expand Up @@ -444,10 +445,10 @@ def create_flux_executor(
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
and SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
only)
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down
12 changes: 12 additions & 0 deletions executorlib/interfaces/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ class SlurmJobExecutor:
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -320,6 +324,10 @@ def __new__(
and SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
only)
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -409,6 +417,10 @@ def create_slurm_executor(
and SLURM only) - default False
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
only)
- num_nodes (int, optional): The number of compute nodes to use for executing the task.
Defaults to None.
- exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing
compute notes. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down
6 changes: 4 additions & 2 deletions tests/test_pysqa_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ def test_generate_slurm_command(self):
cwd="/tmp/test",
threads_per_core=2,
gpus_per_core=1,
num_nodes=1,
exclusive=True,
openmpi_oversubscribe=True,
slurm_cmd_args=["--help"],
)
self.assertEqual(len(command_lst), 9)
reply_lst = ['srun', '-n', '1', '-D', '/tmp/test', '--cpus-per-task=2', '--gpus-per-task=1', '--oversubscribe', '--help']
self.assertEqual(len(command_lst), 12)
reply_lst = ['srun', '-n', '1', '-D', '/tmp/test', '-N', '1', '--cpus-per-task=2', '--gpus-per-task=1', '--exact', '--oversubscribe', '--help']
self.assertEqual(command_lst, reply_lst)

0 comments on commit 0ffd312

Please sign in to comment.