-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Executor user interface #458
Changes from 6 commits
3b2f3ce
8fd4d47
55dd86a
72303a4
fa26b93
4624c0f
a458dbe
107f598
8bf1bac
930de31
4a0402c
e6764f2
5eabfba
c34fc0b
6c3c54f
74c1b7f
434fe06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -7,6 +7,14 @@ | |||||
execute_in_subprocess, | ||||||
terminate_subprocess, | ||||||
) | ||||||
from executorlib.standalone.inputcheck import ( | ||||||
check_command_line_argument_lst, | ||||||
check_executor, | ||||||
check_gpus_per_worker, | ||||||
check_nested_flux_executor, | ||||||
check_oversubscribe, | ||||||
check_threads_per_core, | ||||||
) | ||||||
from executorlib.standalone.thread import RaisingThread | ||||||
|
||||||
try: | ||||||
|
@@ -23,7 +31,7 @@ def __init__( | |||||
resource_dict: Optional[dict] = None, | ||||||
execute_function: callable = execute_with_pysqa, | ||||||
terminate_function: Optional[callable] = None, | ||||||
config_directory: Optional[str] = None, | ||||||
pysqa_config_directory: Optional[str] = None, | ||||||
backend: Optional[str] = None, | ||||||
): | ||||||
""" | ||||||
|
@@ -36,19 +44,20 @@ def __init__( | |||||
- cwd (str/None): current working directory where the parallel python task is executed | ||||||
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. | ||||||
terminate_function (callable, optional): The function to terminate the tasks. | ||||||
config_directory (str, optional): path to the config directory. | ||||||
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). | ||||||
backend (str, optional): name of the backend used to spawn tasks. | ||||||
""" | ||||||
super().__init__() | ||||||
default_resource_dict = { | ||||||
"cores": 1, | ||||||
"cwd": None, | ||||||
} | ||||||
if resource_dict is None: | ||||||
resource_dict = {} | ||||||
resource_dict.update( | ||||||
{k: v for k, v in default_resource_dict.items() if k not in resource_dict} | ||||||
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) | ||||||
check_command_line_argument_lst( | ||||||
command_line_argument_lst=resource_dict["slurm_cmd_args"] | ||||||
) | ||||||
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) | ||||||
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) | ||||||
del resource_dict["threads_per_core"] | ||||||
del resource_dict["gpus_per_core"] | ||||||
del resource_dict["openmpi_oversubscribe"] | ||||||
del resource_dict["slurm_cmd_args"] | ||||||
if execute_function == execute_in_subprocess and terminate_function is None: | ||||||
terminate_function = terminate_subprocess | ||||||
cache_directory_path = os.path.abspath(cache_directory) | ||||||
|
@@ -62,8 +71,58 @@ def __init__( | |||||
"cache_directory": cache_directory_path, | ||||||
"resource_dict": resource_dict, | ||||||
"terminate_function": terminate_function, | ||||||
"config_directory": config_directory, | ||||||
"pysqa_config_directory": pysqa_config_directory, | ||||||
"backend": backend, | ||||||
}, | ||||||
) | ||||||
) | ||||||
|
||||||
|
||||||
def create_file_executor( | ||||||
max_workers: int = 1, | ||||||
backend: str = "local", | ||||||
max_cores: int = 1, | ||||||
cache_directory: Optional[str] = None, | ||||||
resource_dict: Optional[dict] = None, | ||||||
flux_executor=None, | ||||||
flux_executor_pmi_mode: Optional[str] = None, | ||||||
flux_executor_nesting: bool = False, | ||||||
pysqa_config_directory: Optional[str] = None, | ||||||
hostname_localhost: Optional[bool] = None, | ||||||
block_allocation: bool = False, | ||||||
init_function: Optional[callable] = None, | ||||||
): | ||||||
if cache_directory is None: | ||||||
cache_directory = "executorlib_cache" | ||||||
if max_workers != 1: | ||||||
raise ValueError( | ||||||
"The number of workers cannot be controlled with the pysqa based backend." | ||||||
) | ||||||
if max_cores != 1: | ||||||
raise ValueError( | ||||||
"The number of cores cannot be controlled with the pysqa based backend." | ||||||
) | ||||||
if hostname_localhost is not None: | ||||||
raise ValueError( | ||||||
"The option to connect to hosts based on their hostname is not available with the pysqa based backend." | ||||||
) | ||||||
if block_allocation: | ||||||
raise ValueError( | ||||||
"The option block_allocation is not available with the pysqa based backend." | ||||||
) | ||||||
if init_function is not None: | ||||||
raise ValueError( | ||||||
"The option to specify an init_function is not available with the pysqa based backend." | ||||||
) | ||||||
if flux_executor_pmi_mode is not None: | ||||||
raise ValueError( | ||||||
"The option to specify the flux pmi mode is not available with the pysqa based backend." | ||||||
) | ||||||
check_executor(executor=flux_executor) | ||||||
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) | ||||||
return FileExecutor( | ||||||
cache_directory=cache_directory, | ||||||
resource_dict=resource_dict, | ||||||
pysqa_config_directory=pysqa_config_directory, | ||||||
backend=backend.split("pysqa_")[-1], | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve backend string handling. The current backend string splitting is fragile and could fail with malformed input. - backend=backend.split("pysqa_")[-1],
+ backend=backend[6:] if backend.startswith("pysqa_") else backend, 📝 Committable suggestion
Suggested change
|
||||||
) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve backend handling and error messages.
def create_file_executor(
max_workers: int = 1,
backend: str = "local",
max_cores: int = 1,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
- flux_executor=None,
+ flux_executor: Optional[Any] = None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
):
+ """
+ Create a FileExecutor instance with pysqa backend.
+
+ Args:
+ max_workers (int): Must be 1 for pysqa backend
+ backend (str): Must start with 'pysqa_'
+ max_cores (int): Must be 1 for pysqa backend
+ cache_directory (Optional[str]): Cache directory path
+ resource_dict (Optional[dict]): Resource configuration
+ flux_executor (Optional[Any]): Flux executor instance
+ flux_executor_pmi_mode (Optional[str]): Not supported with pysqa
+ flux_executor_nesting (bool): Whether to nest flux executors
+ pysqa_config_directory (Optional[str]): Pysqa config directory path
+ hostname_localhost (Optional[bool]): Not supported with pysqa
+ block_allocation (bool): Not supported with pysqa
+ init_function (Optional[callable]): Not supported with pysqa
+
+ Returns:
+ FileExecutor: Configured executor instance
+
+ Raises:
+ ValueError: If unsupported options are used with pysqa backend
+ """
+ if not backend.startswith("pysqa_"):
+ raise ValueError(
+ f"Invalid backend: {backend}. Must start with 'pysqa_'. "
+ "Available backends: pysqa_slurm, pysqa_lsf, etc."
+ )
if cache_directory is None:
cache_directory = "executorlib_cache"
+ unsupported_options = {
+ "max_workers": (max_workers != 1, "must be 1"),
+ "max_cores": (max_cores != 1, "must be 1"),
+ "hostname_localhost": (hostname_localhost is not None, "not supported"),
+ "block_allocation": (block_allocation, "not supported"),
+ "init_function": (init_function is not None, "not supported"),
+ "flux_executor_pmi_mode": (flux_executor_pmi_mode is not None, "not supported"),
+ }
+
+ for option, (is_invalid, message) in unsupported_options.items():
+ if is_invalid:
+ raise ValueError(
+ f"The option '{option}' {message} with the pysqa based backend."
+ )
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
- backend=backend.split("pysqa_")[-1],
+ backend=backend[6:] if backend.startswith("pysqa_") else backend,
) Don't forget to add the following import at the top of the file: from typing import Any |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,15 @@ def mpi_funct(i): | |
class TestCacheExecutorMPI(unittest.TestCase): | ||
def test_executor(self): | ||
with FileExecutor( | ||
resource_dict={"cores": 2}, execute_function=execute_in_subprocess | ||
resource_dict={ | ||
"cores": 2, | ||
"threads_per_core": 1, | ||
"gpus_per_core": 0, | ||
"cwd": None, | ||
"openmpi_oversubscribe": False, | ||
"slurm_cmd_args": [], | ||
}, | ||
Comment on lines
+35
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Additional test coverage needed for resource parameters The search results confirm that while these parameters are used in various test files, there's insufficient testing of their actual behavior:
Suggested test additions:
🔗 Analysis chainVerify test coverage for resource parameters. The test only verifies basic MPI functionality but doesn't validate the behavior of the new resource parameters. Consider adding test cases that verify:
Let's check if there are other tests covering these scenarios: 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Search for tests that might cover these parameters
rg -l 'threads_per_core|openmpi_oversubscribe|slurm_cmd_args' tests/
Length of output: 211 Script: #!/bin/bash
# Search for test files containing resource parameters with proper regex
rg -l "resource_dict.*threads_per_core|openmpi_oversubscribe|slurm_cmd_args" tests/
# Search for test files containing resource dictionary usage
rg "resource_dict\s*=\s*{" tests/
# Look for test files that might test MPI functionality
rg -l "mpi|MPI" tests/
# Check test file content for resource parameter testing
rg -A 5 "def test.*mpi" tests/
Length of output: 5467 Script: #!/bin/bash
# Let's examine the content of test files that might contain resource parameter testing
rg -C 10 "threads_per_core|openmpi_oversubscribe|cwd.*=|slurm_cmd_args" tests/test_shared_communication.py tests/test_cache_executor_mpi.py tests/test_executor_backend_mpi.py
# Also check for any assertions related to these parameters
rg -B 5 "assert.*threads_per_core|assert.*openmpi_oversubscribe|assert.*cwd|assert.*slurm" tests/
Length of output: 6681 |
||
execute_function=execute_in_subprocess, | ||
) as exe: | ||
fs1 = exe.submit(mpi_funct, 1) | ||
self.assertFalse(fs1.done()) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
|
||
try: | ||
import flux.job | ||
from executorlib import FileExecutor | ||
from executorlib import Executor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification FileExecutor still exists in other test files and needs to be updated The import change in
These files should also be updated to use the new 🔗 Analysis chainLGTM: Import statement updated correctly. The change from Let's verify this change is consistent across the codebase: 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Check for any remaining FileExecutor imports that might need updating
rg "from executorlib.*import.*FileExecutor"
Length of output: 107 Script: #!/bin/bash
# Let's try a broader search to find any FileExecutor references
rg "FileExecutor"
# Also search for Executor class usage patterns
ast-grep --pattern 'Executor($$$)'
# And check all import statements related to executorlib
rg "from executorlib.*import"
Length of output: 20989 |
||
|
||
skip_flux_test = "FLUX_URI" not in os.environ | ||
pmi = os.environ.get("PYMPIPOOL_PMIX", None) | ||
|
@@ -30,15 +30,16 @@ def mpi_funct(i): | |
) | ||
class TestCacheExecutorPysqa(unittest.TestCase): | ||
def test_executor(self): | ||
with FileExecutor( | ||
with Executor( | ||
resource_dict={"cores": 2}, | ||
backend="flux", | ||
backend="pysqa_flux", | ||
block_allocation=False, | ||
) as exe: | ||
fs1 = exe.submit(mpi_funct, 1) | ||
self.assertFalse(fs1.done()) | ||
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) | ||
self.assertTrue(fs1.done()) | ||
|
||
def tearDown(self): | ||
if os.path.exists("cache"): | ||
shutil.rmtree("cache") | ||
if os.path.exists("executorlib_cache"): | ||
shutil.rmtree("executorlib_cache") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for missing resource dictionary keys.
The code assumes all keys exist in the resource dictionary and removes them after validation. This could lead to KeyError exceptions if any keys are missing.
Consider adding defensive checks:
📝 Committable suggestion