Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor user interface #458

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

from executorlib._version import get_versions as _get_versions
from executorlib.interactive.executor import ExecutorWithDependencies, create_executor
from executorlib.standalone.inputcheck import (
check_executor as _check_executor,
)
from executorlib.standalone.inputcheck import (
check_nested_flux_executor as _check_nested_flux_executor,
)
from executorlib.standalone.inputcheck import (
check_plot_dependency_graph as _check_plot_dependency_graph,
)
Expand Down Expand Up @@ -47,6 +53,7 @@ class Executor:
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -95,6 +102,7 @@ def __init__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = True,
init_function: Optional[callable] = None,
Expand All @@ -115,6 +123,7 @@ def __new__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = True,
init_function: Optional[callable] = None,
Expand Down Expand Up @@ -162,6 +171,7 @@ def __new__(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
disable_dependencies (boolean): Disable resolving future objects during the submission.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
Expand Down Expand Up @@ -196,6 +206,37 @@ def __new__(
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
)
elif "pysqa_" in backend and not plot_dependency_graph:
if cache_directory is None:
cache_directory = "executorlib_cache"
if max_workers != 1:
raise ValueError(
"The number of workers cannot be controlled with the pysqa based backend."
)
if max_cores != 1:
raise ValueError(
"The number of cores cannot be controlled with the pysqa based backend."
)
if hostname_localhost is not None:
raise ValueError(
"The option to connect to hosts based on their hostname is not available with the pysqa based backend."
)
if block_allocation:
raise ValueError(
"The option block_allocation is not available with the pysqa based backend."
)
if init_function is not None:
raise ValueError(
"The option to specify an init_function is not available with the pysqa based backend."
)
_check_executor(executor=flux_executor)
_check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Suggestion to consolidate repetitive error checks

In the conditional block for the 'pysqa_' backend, multiple if statements individually raise ValueError exceptions for unsupported options. Consider consolidating these checks to enhance code readability and maintainability.

Apply the following refactoring:

            elif "pysqa_" in backend and not plot_dependency_graph:
                if cache_directory is None:
                    cache_directory = "executorlib_cache"
-               if max_workers != 1:
-                   raise ValueError(
-                       "The number of workers cannot be controlled with the pysqa based backend."
-                   )
-               if max_cores != 1:
-                   raise ValueError(
-                       "The number of cores cannot be controlled with the pysqa based backend."
-                   )
-               if hostname_localhost is not None:
-                   raise ValueError(
-                       "The option to connect to hosts based on their hostname is not available with the pysqa based backend."
-                   )
-               if block_allocation:
-                   raise ValueError(
-                       "The option block_allocation is not available with the pysqa based backend."
-                   )
-               if init_function is not None:
-                   raise ValueError(
-                       "The option to specify an init_function is not available with the pysqa based backend."
-                   )
+               unsupported_options = []
+               if max_workers != 1:
+                   unsupported_options.append("max_workers")
+               if max_cores != 1:
+                   unsupported_options.append("max_cores")
+               if hostname_localhost is not None:
+                   unsupported_options.append("hostname_localhost")
+               if block_allocation:
+                   unsupported_options.append("block_allocation")
+               if init_function is not None:
+                   unsupported_options.append("init_function")
+               if unsupported_options:
+                   raise ValueError(
+                       f"The following options are not available with the pysqa-based backend: {', '.join(unsupported_options)}."
+                   )
                _check_executor(executor=flux_executor)
                _check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
                return FileExecutor(
                    cache_directory=cache_directory,
                    resource_dict=resource_dict,
                    pysqa_config_directory=pysqa_config_directory,
                    backend=backend.split("pysqa_")[-1],
                )

This refactoring gathers unsupported options into a list and raises a single ValueError, improving clarity and reducing repetitive code.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
elif "pysqa_" in backend and not plot_dependency_graph:
if cache_directory is None:
cache_directory = "executorlib_cache"
if max_workers != 1:
raise ValueError(
"The number of workers cannot be controlled with the pysqa based backend."
)
if max_cores != 1:
raise ValueError(
"The number of cores cannot be controlled with the pysqa based backend."
)
if hostname_localhost is not None:
raise ValueError(
"The option to connect to hosts based on their hostname is not available with the pysqa based backend."
)
if block_allocation:
raise ValueError(
"The option block_allocation is not available with the pysqa based backend."
)
if init_function is not None:
raise ValueError(
"The option to specify an init_function is not available with the pysqa based backend."
)
_check_executor(executor=flux_executor)
_check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
)
elif "pysqa_" in backend and not plot_dependency_graph:
if cache_directory is None:
cache_directory = "executorlib_cache"
unsupported_options = []
if max_workers != 1:
unsupported_options.append("max_workers")
if max_cores != 1:
unsupported_options.append("max_cores")
if hostname_localhost is not None:
unsupported_options.append("hostname_localhost")
if block_allocation:
unsupported_options.append("block_allocation")
if init_function is not None:
unsupported_options.append("init_function")
if unsupported_options:
raise ValueError(
f"The following options are not available with the pysqa-based backend: {', '.join(unsupported_options)}."
)
_check_executor(executor=flux_executor)
_check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
)

else:
_check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
_check_refresh_rate(refresh_rate=refresh_rate)
Expand Down
26 changes: 23 additions & 3 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
execute_in_subprocess,
terminate_subprocess,
)
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_gpus_per_worker,
check_oversubscribe,
check_threads_per_core,
)
from executorlib.standalone.thread import RaisingThread

try:
Expand All @@ -23,7 +29,7 @@ def __init__(
resource_dict: Optional[dict] = None,
execute_function: callable = execute_with_pysqa,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Expand All @@ -36,19 +42,33 @@ def __init__(
- cwd (str/None): current working directory where the parallel python task is executed
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
terminate_function (callable, optional): The function to terminate the tasks.
config_directory (str, optional): path to the config directory.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str, optional): name of the backend used to spawn tasks.
"""
super().__init__()
default_resource_dict = {
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
}
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
)
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
del resource_dict["threads_per_core"]
del resource_dict["gpus_per_core"]
del resource_dict["openmpi_oversubscribe"]
del resource_dict["slurm_cmd_args"]
if execute_function == execute_in_subprocess and terminate_function is None:
terminate_function = terminate_subprocess
cache_directory_path = os.path.abspath(cache_directory)
Expand All @@ -62,7 +82,7 @@ def __init__(
"cache_directory": cache_directory_path,
"resource_dict": resource_dict,
"terminate_function": terminate_function,
"config_directory": config_directory,
"pysqa_config_directory": pysqa_config_directory,
"backend": backend,
},
)
Expand Down
6 changes: 3 additions & 3 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def execute_tasks_h5(
execute_function: callable,
resource_dict: dict,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
) -> None:
"""
Expand All @@ -66,7 +66,7 @@ def execute_tasks_h5(
- cwd (str/None): current working directory where the parallel python task is executed
execute_function (callable): The function to execute the tasks.
terminate_function (callable): The function to terminate the tasks.
config_directory (str, optional): path to the config directory.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str, optional): name of the backend used to spawn tasks.

Returns:
Expand Down Expand Up @@ -120,7 +120,7 @@ def execute_tasks_h5(
process_dict[k] for k in future_wait_key_lst
],
resource_dict=task_resource_dict,
config_directory=config_directory,
config_directory=pysqa_config_directory,
backend=backend,
)
file_name_dict[task_key] = os.path.join(
Expand Down
6 changes: 3 additions & 3 deletions tests/test_cache_executor_pysqa_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

try:
import flux.job
from executorlib import FileExecutor
from executorlib import Executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

FileExecutor still exists in other test files and needs to be updated

The import change in tests/test_cache_executor_pysqa_flux.py is correct, but there are other test files that still use FileExecutor:

  • tests/test_cache_executor_serial.py
  • tests/test_cache_executor_mpi.py

These files should also be updated to use the new Executor class for consistency with the module's evolution.

🔗 Analysis chain

LGTM: Import statement updated correctly.

The change from FileExecutor to Executor aligns with the module's evolution.

Let's verify this change is consistent across the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any remaining FileExecutor imports that might need updating
rg "from executorlib.*import.*FileExecutor"

Length of output: 107


Script:

#!/bin/bash
# Let's try a broader search to find any FileExecutor references
rg "FileExecutor"

# Also search for Executor class usage patterns
ast-grep --pattern 'Executor($$$)'

# And check all import statements related to executorlib
rg "from executorlib.*import"

Length of output: 20989


skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
Expand All @@ -30,9 +30,9 @@ def mpi_funct(i):
)
class TestCacheExecutorPysqa(unittest.TestCase):
def test_executor(self):
with FileExecutor(
with Executor(
resource_dict={"cores": 2},
backend="flux",
backend="pysqa_flux",
) as exe:
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
Expand Down
27 changes: 24 additions & 3 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ def test_executor_function(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1, "cwd": None},
"resource_dict": {
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
},
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -115,7 +122,14 @@ def test_executor_function_dependence_kwargs(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1, "cwd": None},
"resource_dict": {
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
},
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -156,7 +170,14 @@ def test_executor_function_dependence_args(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1, "cwd": None},
"resource_dict": {
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
},
"terminate_function": terminate_subprocess,
},
)
Expand Down
Loading