-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Executor user interface #458
Conversation
Warning Rate limit exceeded@jan-janssen has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 8 minutes and 41 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThe pull request introduces modifications across several files in the Changes
Possibly related PRs
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (3)
executorlib/cache/executor.py (1)
45-45
: Consider enhancing the pysqa_config_directory documentation.While the documentation is clear, it would be helpful to specify:
- The expected structure or format of the config directory
- Any required files within this directory
- Example path or reference to pysqa documentation
tests/test_cache_executor_serial.py (1)
77-84
: LGTM! Consider extracting the default resource configuration.The resource dictionary structure is correct and includes all necessary parameters with appropriate default values.
To reduce code duplication, consider extracting this default configuration to a class-level constant or helper method:
class TestCacheExecutorSerial(unittest.TestCase): DEFAULT_RESOURCE_DICT = { "cores": 1, "threads_per_core": 1, "gpus_per_core": 0, "cwd": None, "openmpi_oversubscribe": False, "slurm_cmd_args": [], }executorlib/cache/shared.py (1)
55-56
: Add docstring for thebackend
parameter.The docstring for
pysqa_config_directory
is clear, but thebackend
parameter lacks documentation. Please add a description to clarify its purpose and possible values.pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). - backend (str, optional): name of the backend used to spawn tasks. + backend (str, optional): name of the backend used to spawn tasks (e.g., "pysqa_flux", "serial").Also applies to: 69-69
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (5)
- executorlib/init.py (6 hunks)
- executorlib/cache/executor.py (4 hunks)
- executorlib/cache/shared.py (3 hunks)
- tests/test_cache_executor_pysqa_flux.py (2 hunks)
- tests/test_cache_executor_serial.py (3 hunks)
🔇 Additional comments (15)
tests/test_cache_executor_pysqa_flux.py (1)
33-35
: Verify if pysqa_config_directory parameter is needed.While the backend name has been correctly updated to "pysqa_flux", the AI summary mentions a new optional parameter
pysqa_config_directory
. Please verify if this parameter should be provided here.Let's check how this parameter is used in other test files:
#!/bin/bash # Description: Check how pysqa_config_directory is used in other test files rg -A 5 "pysqa_config_directory" "tests/" # Description: Check other Executor instantiations with pysqa backends ast-grep --pattern 'Executor( $$$ backend="pysqa_$_" $$$ )'executorlib/cache/executor.py (5)
10-15
: LGTM: Well-organized imports for input validation.The new imports are properly organized and specifically target the required validation functions.
32-32
: LGTM: Clean addition of pysqa configuration parameter.The new optional parameter is well-placed and properly typed.
62-71
: Verify that removing validated parameters doesn't break downstream usage.While the validation checks are good, removing these parameters from the resource dictionary might affect any code that expects them later in the execution pipeline.
#!/bin/bash # Search for any code that might expect these parameters rg -A 5 'resource_dict\[["'\''](?:threads_per_core|gpus_per_core|openmpi_oversubscribe|slurm_cmd_args)["\'']\]' --type py
85-85
: Verify consistent parameter naming across the codebase.The parameter rename from
config_directory
topysqa_config_directory
needs to be consistently applied.#!/bin/bash # Search for any remaining uses of the old parameter name rg 'config_directory(?!.*pysqa)' --type py # Search for the new parameter name to ensure consistent usage rg 'pysqa_config_directory' --type py
51-55
: Verify the default resource values align with system requirements.The new default values need verification:
threads_per_core: 1
- Confirm this is suitable for your target systemsgpus_per_core: 0
- Verify this default won't cause issues on GPU-enabled systemsopenmpi_oversubscribe: False
- Ensure this aligns with your MPI configuration✅ Verification successful
Let me try another search strategy to find relevant resource configurations.
Based on the comprehensive search results, I can now generate the final response:
Default resource values are correctly aligned with system requirements
The default values are verified to be appropriate based on the codebase evidence:
threads_per_core: 1
is the correct default as the codebase explicitly validates this value throughcheck_threads_per_core()
which enforces single-threaded operation for MPI compatibilitygpus_per_core: 0
is appropriate as GPU allocation is optional and handled specifically by SLURM/Flux when neededopenmpi_oversubscribe: False
is the safe default, with explicit support in both MPI and SLURM spawnersslurm_cmd_args: []
provides flexibility for SLURM-specific customization when neededThe values are consistently used across the test suite and are properly validated by the input checking system.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for any existing resource configurations that might conflict rg -l 'threads_per_core|gpus_per_core|openmpi_oversubscribe' --type pyLength of output: 1495
Script:
#!/bin/bash # Search for resource-related configurations with context rg -A 5 -B 5 'threads_per_core|gpus_per_core|openmpi_oversubscribe|slurm_cmd_args' --type py # Also check for any test files that might validate these defaults fd test_.*\.py --exec grep -l 'threads_per_core\|gpus_per_core\|openmpi_oversubscribe\|slurm_cmd_args' {} \;Length of output: 60217
tests/test_cache_executor_serial.py (2)
125-132
: Duplicate resource configuration detected.This resource dictionary is identical to the one in
test_executor_function
. Using the suggestedDEFAULT_RESOURCE_DICT
would eliminate this duplication.
173-180
: Duplicate resource configuration detected.This resource dictionary is identical to the previous instances. Using the suggested
DEFAULT_RESOURCE_DICT
would eliminate this duplication.Let's verify that all test methods using execute_tasks_h5 have been updated:
#!/bin/bash # Search for all test methods using execute_tasks_h5 to ensure consistent resource_dict usage rg -l 'execute_tasks_h5' tests/ | xargs rg 'def test_.*'executorlib/cache/shared.py (1)
123-124
:⚠️ Potential issueFix parameter name mismatch in execute_function call.
There's an inconsistency between the parameter names. The function signature uses
pysqa_config_directory
, but it's passed asconfig_directory
toexecute_function
. This should be updated to maintain consistency.resource_dict=task_resource_dict, - config_directory=pysqa_config_directory, + pysqa_config_directory=pysqa_config_directory, backend=backend,Let's verify if this parameter name is consistent across the codebase:
executorlib/__init__.py (6)
5-7
: Addition of_check_executor
import is appropriateThe import of
_check_executor
fromexecutorlib.standalone.inputcheck
is necessary for the validation logic added later in the code.
8-10
: Addition of_check_nested_flux_executor
import is appropriateThe import of
_check_nested_flux_executor
fromexecutorlib.standalone.inputcheck
is necessary for validating nested Flux executors in the code.
56-56
: Documentation ofpysqa_config_directory
parameterThe addition of the
pysqa_config_directory
parameter in the docstring of theExecutor
class is clear and correctly describes its purpose.
105-105
: Addition ofpysqa_config_directory
to__init__
methodThe parameter
pysqa_config_directory
is appropriately added to the__init__
method signature, allowing users to specify the path to the pysqa configuration directory when initializing theExecutor
.
126-126
: Addition ofpysqa_config_directory
to__new__
methodIncluding
pysqa_config_directory
in the__new__
method ensures that it is available during the creation of a newExecutor
instance. This is appropriate and consistent with its usage.
174-174
: Documentation ofpysqa_config_directory
in__new__
docstringThe docstring for the
__new__
method correctly includes thepysqa_config_directory
parameter, providing clarity on its purpose and usage.
@@ -5,7 +5,7 @@ | |||
|
|||
try: | |||
import flux.job | |||
from executorlib import FileExecutor | |||
from executorlib import Executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
FileExecutor still exists in other test files and needs to be updated
The import change in tests/test_cache_executor_pysqa_flux.py
is correct, but there are other test files that still use FileExecutor
:
tests/test_cache_executor_serial.py
tests/test_cache_executor_mpi.py
These files should also be updated to use the new Executor
class for consistency with the module's evolution.
🔗 Analysis chain
LGTM: Import statement updated correctly.
The change from FileExecutor
to Executor
aligns with the module's evolution.
Let's verify this change is consistent across the codebase:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any remaining FileExecutor imports that might need updating
rg "from executorlib.*import.*FileExecutor"
Length of output: 107
Script:
#!/bin/bash
# Let's try a broader search to find any FileExecutor references
rg "FileExecutor"
# Also search for Executor class usage patterns
ast-grep --pattern 'Executor($$$)'
# And check all import statements related to executorlib
rg "from executorlib.*import"
Length of output: 20989
executorlib/__init__.py
Outdated
elif "pysqa_" in backend and not plot_dependency_graph: | ||
if cache_directory is None: | ||
cache_directory = "executorlib_cache" | ||
if max_workers != 1: | ||
raise ValueError( | ||
"The number of workers cannot be controlled with the pysqa based backend." | ||
) | ||
if max_cores != 1: | ||
raise ValueError( | ||
"The number of cores cannot be controlled with the pysqa based backend." | ||
) | ||
if hostname_localhost is not None: | ||
raise ValueError( | ||
"The option to connect to hosts based on their hostname is not available with the pysqa based backend." | ||
) | ||
if block_allocation: | ||
raise ValueError( | ||
"The option block_allocation is not available with the pysqa based backend." | ||
) | ||
if init_function is not None: | ||
raise ValueError( | ||
"The option to specify an init_function is not available with the pysqa based backend." | ||
) | ||
_check_executor(executor=flux_executor) | ||
_check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) | ||
return FileExecutor( | ||
cache_directory=cache_directory, | ||
resource_dict=resource_dict, | ||
pysqa_config_directory=pysqa_config_directory, | ||
backend=backend.split("pysqa_")[-1], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Suggestion to consolidate repetitive error checks
In the conditional block for the 'pysqa_'
backend, multiple if
statements individually raise ValueError
exceptions for unsupported options. Consider consolidating these checks to enhance code readability and maintainability.
Apply the following refactoring:
elif "pysqa_" in backend and not plot_dependency_graph:
if cache_directory is None:
cache_directory = "executorlib_cache"
- if max_workers != 1:
- raise ValueError(
- "The number of workers cannot be controlled with the pysqa based backend."
- )
- if max_cores != 1:
- raise ValueError(
- "The number of cores cannot be controlled with the pysqa based backend."
- )
- if hostname_localhost is not None:
- raise ValueError(
- "The option to connect to hosts based on their hostname is not available with the pysqa based backend."
- )
- if block_allocation:
- raise ValueError(
- "The option block_allocation is not available with the pysqa based backend."
- )
- if init_function is not None:
- raise ValueError(
- "The option to specify an init_function is not available with the pysqa based backend."
- )
+ unsupported_options = []
+ if max_workers != 1:
+ unsupported_options.append("max_workers")
+ if max_cores != 1:
+ unsupported_options.append("max_cores")
+ if hostname_localhost is not None:
+ unsupported_options.append("hostname_localhost")
+ if block_allocation:
+ unsupported_options.append("block_allocation")
+ if init_function is not None:
+ unsupported_options.append("init_function")
+ if unsupported_options:
+ raise ValueError(
+ f"The following options are not available with the pysqa-based backend: {', '.join(unsupported_options)}."
+ )
_check_executor(executor=flux_executor)
_check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
)
This refactoring gathers unsupported options into a list and raises a single ValueError
, improving clarity and reducing repetitive code.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
elif "pysqa_" in backend and not plot_dependency_graph: | |
if cache_directory is None: | |
cache_directory = "executorlib_cache" | |
if max_workers != 1: | |
raise ValueError( | |
"The number of workers cannot be controlled with the pysqa based backend." | |
) | |
if max_cores != 1: | |
raise ValueError( | |
"The number of cores cannot be controlled with the pysqa based backend." | |
) | |
if hostname_localhost is not None: | |
raise ValueError( | |
"The option to connect to hosts based on their hostname is not available with the pysqa based backend." | |
) | |
if block_allocation: | |
raise ValueError( | |
"The option block_allocation is not available with the pysqa based backend." | |
) | |
if init_function is not None: | |
raise ValueError( | |
"The option to specify an init_function is not available with the pysqa based backend." | |
) | |
_check_executor(executor=flux_executor) | |
_check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) | |
return FileExecutor( | |
cache_directory=cache_directory, | |
resource_dict=resource_dict, | |
pysqa_config_directory=pysqa_config_directory, | |
backend=backend.split("pysqa_")[-1], | |
) | |
elif "pysqa_" in backend and not plot_dependency_graph: | |
if cache_directory is None: | |
cache_directory = "executorlib_cache" | |
unsupported_options = [] | |
if max_workers != 1: | |
unsupported_options.append("max_workers") | |
if max_cores != 1: | |
unsupported_options.append("max_cores") | |
if hostname_localhost is not None: | |
unsupported_options.append("hostname_localhost") | |
if block_allocation: | |
unsupported_options.append("block_allocation") | |
if init_function is not None: | |
unsupported_options.append("init_function") | |
if unsupported_options: | |
raise ValueError( | |
f"The following options are not available with the pysqa-based backend: {', '.join(unsupported_options)}." | |
) | |
_check_executor(executor=flux_executor) | |
_check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) | |
return FileExecutor( | |
cache_directory=cache_directory, | |
resource_dict=resource_dict, | |
pysqa_config_directory=pysqa_config_directory, | |
backend=backend.split("pysqa_")[-1], | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
executorlib/cache/executor.py (1)
Line range hint
36-46
: Update resource_dict documentationThe docstring for
resource_dict
should be updated to include the new parameters:
threads_per_core
gpus_per_core
openmpi_oversubscribe
slurm_cmd_args
resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of threads per core + - gpus_per_core (int): number of GPUs per core + - openmpi_oversubscribe (bool): whether to allow OpenMPI oversubscription + - slurm_cmd_args (list): additional SLURM command line arguments - cwd (str/None): current working directory where the parallel python task is executed
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- executorlib/cache/executor.py (4 hunks)
🔇 Additional comments (5)
executorlib/cache/executor.py (5)
10-15
: LGTM: Well-organized importsThe new validation function imports are properly organized and follow Python's import style guidelines.
31-32
: LGTM: Improved parameter namingThe rename from
config_directory
topysqa_config_directory
improves clarity by explicitly indicating its purpose.
50-55
: LGTM: Sensible default resource valuesThe new default values are conservative and safe:
threads_per_core: 1
- Standard single-threadinggpus_per_core: 0
- No GPU by defaultopenmpi_oversubscribe: False
- Safe default for MPIslurm_cmd_args: []
- No additional args by default
84-87
: LGTM: Consistent parameter handlingThe
pysqa_config_directory
is correctly passed through to the execution function.
62-71
: 🛠️ Refactor suggestionConsider preserving validated values
The validation functions might normalize or modify the input values, but these potentially modified values are lost since we delete the keys immediately after validation.
Consider storing the validated values if they're needed later:
- check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) - del resource_dict["threads_per_core"] + validated_threads = check_threads_per_core(threads_per_core=resource_dict.pop("threads_per_core")) + # Store in class if needed: self._threads_per_core = validated_threads
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (2)
tests/test_cache_executor_mpi.py (1)
35-42
: Consider documenting the resource configuration parameters.While the expanded resource dictionary provides more control over execution parameters, it would be helpful to add a comment explaining the purpose and impact of each parameter, especially for parameters like
openmpi_oversubscribe
andslurm_cmd_args
which might not be immediately obvious to users.with FileExecutor( resource_dict={ + # Number of MPI processes to spawn "cores": 2, + # Number of OpenMP threads per MPI process "threads_per_core": 1, + # Number of GPUs allocated per core (0 for CPU-only execution) "gpus_per_core": 0, + # Working directory for execution (None uses default) "cwd": None, + # Controls OpenMPI process placement behavior "openmpi_oversubscribe": False, + # Additional arguments passed to SLURM scheduler "slurm_cmd_args": [], },executorlib/cache/executor.py (1)
Line range hint
37-48
: Enhance docstring with resource dictionary key descriptions.The docstring should document all expected keys in the resource dictionary and their purposes.
Consider updating the docstring:
resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - cwd (str/None): current working directory where the parallel python task is executed + - threads_per_core (int, optional): number of threads per core + - gpus_per_core (int, optional): number of GPUs per core + - openmpi_oversubscribe (bool, optional): whether to allow oversubscription + - slurm_cmd_args (list, optional): additional SLURM command arguments
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (5)
- executorlib/init.py (5 hunks)
- executorlib/cache/executor.py (4 hunks)
- tests/test_cache_executor_mpi.py (1 hunks)
- tests/test_cache_executor_pysqa_flux.py (2 hunks)
- tests/test_cache_executor_serial.py (6 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- executorlib/init.py
- tests/test_cache_executor_pysqa_flux.py
- tests/test_cache_executor_serial.py
🔇 Additional comments (2)
tests/test_cache_executor_mpi.py (1)
35-42
: Validate resource dictionary structure.The resource dictionary structure should be verified against the executor's requirements. Let's ensure this configuration is consistent with the executor's implementation.
#!/bin/bash # Search for resource dictionary validation in the executor implementation ast-grep --pattern 'def validate_resource_dict($params) { $$$ }' # Look for other resource dictionary usage patterns rg 'resource_dict.*=.*{' --type pythonexecutorlib/cache/executor.py (1)
10-17
: LGTM: Well-organized imports for input validation.The new imports are properly organized and specifically target the required validation functions.
resource_dict={ | ||
"cores": 2, | ||
"threads_per_core": 1, | ||
"gpus_per_core": 0, | ||
"cwd": None, | ||
"openmpi_oversubscribe": False, | ||
"slurm_cmd_args": [], | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Additional test coverage needed for resource parameters
The search results confirm that while these parameters are used in various test files, there's insufficient testing of their actual behavior:
threads_per_core
is only tested for invalid input intest_executor_backend_mpi.py
openmpi_oversubscribe
is only used with defaultFalse
value in test setupscwd
is only set toNone
in tests, no validation of custom working directoriesslurm_cmd_args
appears in test configurations but has no tests verifying its effect
Suggested test additions:
- Test MPI behavior with different
threads_per_core
values - Verify process behavior when
openmpi_oversubscribe=True
- Validate execution in custom working directories
- Test the impact of custom SLURM command arguments
🔗 Analysis chain
Verify test coverage for resource parameters.
The test only verifies basic MPI functionality but doesn't validate the behavior of the new resource parameters. Consider adding test cases that verify:
- Impact of different
threads_per_core
values - Behavior with
openmpi_oversubscribe=True
- Custom working directory via
cwd
- Custom SLURM arguments
Let's check if there are other tests covering these scenarios:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for tests that might cover these parameters
rg -l 'threads_per_core|openmpi_oversubscribe|slurm_cmd_args' tests/
Length of output: 211
Script:
#!/bin/bash
# Search for test files containing resource parameters with proper regex
rg -l "resource_dict.*threads_per_core|openmpi_oversubscribe|slurm_cmd_args" tests/
# Search for test files containing resource dictionary usage
rg "resource_dict\s*=\s*{" tests/
# Look for test files that might test MPI functionality
rg -l "mpi|MPI" tests/
# Check test file content for resource parameter testing
rg -A 5 "def test.*mpi" tests/
Length of output: 5467
Script:
#!/bin/bash
# Let's examine the content of test files that might contain resource parameter testing
rg -C 10 "threads_per_core|openmpi_oversubscribe|cwd.*=|slurm_cmd_args" tests/test_shared_communication.py tests/test_cache_executor_mpi.py tests/test_executor_backend_mpi.py
# Also check for any assertions related to these parameters
rg -B 5 "assert.*threads_per_core|assert.*openmpi_oversubscribe|assert.*cwd|assert.*slurm" tests/
Length of output: 6681
executorlib/cache/executor.py
Outdated
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) | ||
check_command_line_argument_lst( | ||
command_line_argument_lst=resource_dict["slurm_cmd_args"] | ||
) | ||
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) | ||
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) | ||
del resource_dict["threads_per_core"] | ||
del resource_dict["gpus_per_core"] | ||
del resource_dict["openmpi_oversubscribe"] | ||
del resource_dict["slurm_cmd_args"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for missing resource dictionary keys.
The code assumes all keys exist in the resource dictionary and removes them after validation. This could lead to KeyError exceptions if any keys are missing.
Consider adding defensive checks:
- check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
- check_command_line_argument_lst(
- command_line_argument_lst=resource_dict["slurm_cmd_args"]
- )
- check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
- check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
- del resource_dict["threads_per_core"]
- del resource_dict["gpus_per_core"]
- del resource_dict["openmpi_oversubscribe"]
- del resource_dict["slurm_cmd_args"]
+ if resource_dict is not None:
+ oversubscribe = resource_dict.get("openmpi_oversubscribe")
+ if oversubscribe is not None:
+ check_oversubscribe(oversubscribe=oversubscribe)
+ del resource_dict["openmpi_oversubscribe"]
+
+ cmd_args = resource_dict.get("slurm_cmd_args")
+ if cmd_args is not None:
+ check_command_line_argument_lst(command_line_argument_lst=cmd_args)
+ del resource_dict["slurm_cmd_args"]
+
+ threads = resource_dict.get("threads_per_core")
+ if threads is not None:
+ check_threads_per_core(threads_per_core=threads)
+ del resource_dict["threads_per_core"]
+
+ gpus = resource_dict.get("gpus_per_core")
+ if gpus is not None:
+ check_gpus_per_worker(gpus_per_worker=gpus)
+ del resource_dict["gpus_per_core"]
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) | |
check_command_line_argument_lst( | |
command_line_argument_lst=resource_dict["slurm_cmd_args"] | |
) | |
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) | |
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) | |
del resource_dict["threads_per_core"] | |
del resource_dict["gpus_per_core"] | |
del resource_dict["openmpi_oversubscribe"] | |
del resource_dict["slurm_cmd_args"] | |
if resource_dict is not None: | |
oversubscribe = resource_dict.get("openmpi_oversubscribe") | |
if oversubscribe is not None: | |
check_oversubscribe(oversubscribe=oversubscribe) | |
del resource_dict["openmpi_oversubscribe"] | |
cmd_args = resource_dict.get("slurm_cmd_args") | |
if cmd_args is not None: | |
check_command_line_argument_lst(command_line_argument_lst=cmd_args) | |
del resource_dict["slurm_cmd_args"] | |
threads = resource_dict.get("threads_per_core") | |
if threads is not None: | |
check_threads_per_core(threads_per_core=threads) | |
del resource_dict["threads_per_core"] | |
gpus = resource_dict.get("gpus_per_core") | |
if gpus is not None: | |
check_gpus_per_worker(gpus_per_worker=gpus) | |
del resource_dict["gpus_per_core"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (1)
executorlib/cache/executor.py (1)
Line range hint
38-49
: Update docstring to reflect parameter changes.The docstring needs updating to:
- Add type hints in the parameter descriptions
- Update the resource_dict documentation with all possible keys
- Fix the indentation of the parameter descriptions
""" Initialize the FileExecutor. Args: - cache_directory (str, optional): The directory to store cache files. Defaults to "cache". - resource_dict (dict): A dictionary of resources required by the task. With the following keys: - - cores (int): number of MPI cores to be used for each function call - - cwd (str/None): current working directory where the parallel python task is executed - execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. - terminate_function (callable, optional): The function to terminate the tasks. - pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). - backend (str, optional): name of the backend used to spawn tasks. + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". + resource_dict (Optional[dict]): A dictionary of resources required by the task. With the following keys: + - cores (int): Number of MPI cores to be used for each function call + - cwd (Optional[str]): Current working directory where the parallel python task is executed + - openmpi_oversubscribe (Optional[bool]): Whether to allow oversubscription in OpenMPI + - slurm_cmd_args (Optional[List[str]]): Additional command line arguments for SLURM + - threads_per_core (Optional[int]): Number of threads per core + - gpus_per_core (Optional[int]): Number of GPUs per core + execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. + terminate_function (Optional[callable]): The function to terminate the tasks. + pysqa_config_directory (Optional[str]): Path to the pysqa config directory (only for pysqa based backend). + backend (Optional[str]): Name of the backend used to spawn tasks. """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
- executorlib/init.py (5 hunks)
- executorlib/cache/executor.py (4 hunks)
- tests/test_cache_executor_serial.py (6 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- executorlib/init.py
- tests/test_cache_executor_serial.py
🧰 Additional context used
🪛 Ruff
executorlib/cache/executor.py
31-31: Do not use mutable data structures for argument defaults
Replace with
None
; initialize within function(B006)
executorlib/cache/executor.py
Outdated
def create_file_executor( | ||
max_workers: int = 1, | ||
backend: str = "local", | ||
max_cores: int = 1, | ||
cache_directory: Optional[str] = None, | ||
resource_dict: Optional[dict] = None, | ||
flux_executor=None, | ||
flux_executor_pmi_mode: Optional[str] = None, | ||
flux_executor_nesting: bool = False, | ||
pysqa_config_directory: Optional[str] = None, | ||
hostname_localhost: Optional[bool] = None, | ||
block_allocation: bool = False, | ||
init_function: Optional[callable] = None, | ||
): | ||
if cache_directory is None: | ||
cache_directory = "executorlib_cache" | ||
if max_workers != 1: | ||
raise ValueError( | ||
"The number of workers cannot be controlled with the pysqa based backend." | ||
) | ||
if max_cores != 1: | ||
raise ValueError( | ||
"The number of cores cannot be controlled with the pysqa based backend." | ||
) | ||
if hostname_localhost is not None: | ||
raise ValueError( | ||
"The option to connect to hosts based on their hostname is not available with the pysqa based backend." | ||
) | ||
if block_allocation: | ||
raise ValueError( | ||
"The option block_allocation is not available with the pysqa based backend." | ||
) | ||
if init_function is not None: | ||
raise ValueError( | ||
"The option to specify an init_function is not available with the pysqa based backend." | ||
) | ||
if flux_executor_pmi_mode is not None: | ||
raise ValueError( | ||
"The option to specify the flux pmi mode is not available with the pysqa based backend." | ||
) | ||
check_executor(executor=flux_executor) | ||
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) | ||
return FileExecutor( | ||
cache_directory=cache_directory, | ||
resource_dict=resource_dict, | ||
pysqa_config_directory=pysqa_config_directory, | ||
backend=backend.split("pysqa_")[-1], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve backend handling and error messages.
- The backend string splitting is fragile and could fail with malformed input
- Error messages could be more informative by suggesting valid alternatives
- The
flux_executor
parameter lacks type hints
def create_file_executor(
max_workers: int = 1,
backend: str = "local",
max_cores: int = 1,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
- flux_executor=None,
+ flux_executor: Optional[Any] = None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
):
+ """
+ Create a FileExecutor instance with pysqa backend.
+
+ Args:
+ max_workers (int): Must be 1 for pysqa backend
+ backend (str): Must start with 'pysqa_'
+ max_cores (int): Must be 1 for pysqa backend
+ cache_directory (Optional[str]): Cache directory path
+ resource_dict (Optional[dict]): Resource configuration
+ flux_executor (Optional[Any]): Flux executor instance
+ flux_executor_pmi_mode (Optional[str]): Not supported with pysqa
+ flux_executor_nesting (bool): Whether to nest flux executors
+ pysqa_config_directory (Optional[str]): Pysqa config directory path
+ hostname_localhost (Optional[bool]): Not supported with pysqa
+ block_allocation (bool): Not supported with pysqa
+ init_function (Optional[callable]): Not supported with pysqa
+
+ Returns:
+ FileExecutor: Configured executor instance
+
+ Raises:
+ ValueError: If unsupported options are used with pysqa backend
+ """
+ if not backend.startswith("pysqa_"):
+ raise ValueError(
+ f"Invalid backend: {backend}. Must start with 'pysqa_'. "
+ "Available backends: pysqa_slurm, pysqa_lsf, etc."
+ )
if cache_directory is None:
cache_directory = "executorlib_cache"
+ unsupported_options = {
+ "max_workers": (max_workers != 1, "must be 1"),
+ "max_cores": (max_cores != 1, "must be 1"),
+ "hostname_localhost": (hostname_localhost is not None, "not supported"),
+ "block_allocation": (block_allocation, "not supported"),
+ "init_function": (init_function is not None, "not supported"),
+ "flux_executor_pmi_mode": (flux_executor_pmi_mode is not None, "not supported"),
+ }
+
+ for option, (is_invalid, message) in unsupported_options.items():
+ if is_invalid:
+ raise ValueError(
+ f"The option '{option}' {message} with the pysqa based backend."
+ )
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
- backend=backend.split("pysqa_")[-1],
+ backend=backend[6:] if backend.startswith("pysqa_") else backend,
)
Don't forget to add the following import at the top of the file:
from typing import Any
if "openmpi_oversubscribe" in resource_dict: | ||
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) | ||
del resource_dict["openmpi_oversubscribe"] | ||
if "slurm_cmd_args" in resource_dict: | ||
check_command_line_argument_lst( | ||
command_line_argument_lst=resource_dict["slurm_cmd_args"] | ||
) | ||
del resource_dict["slurm_cmd_args"] | ||
if "threads_per_core" in resource_dict: | ||
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) | ||
del resource_dict["threads_per_core"] | ||
if "gpus_per_core" in resource_dict: | ||
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) | ||
del resource_dict["gpus_per_core"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor resource validation for better maintainability.
The current validation logic has repetitive patterns. Consider using a dictionary to map keys to their validation functions.
- if "openmpi_oversubscribe" in resource_dict:
- check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
- del resource_dict["openmpi_oversubscribe"]
- if "slurm_cmd_args" in resource_dict:
- check_command_line_argument_lst(
- command_line_argument_lst=resource_dict["slurm_cmd_args"]
- )
- del resource_dict["slurm_cmd_args"]
- if "threads_per_core" in resource_dict:
- check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
- del resource_dict["threads_per_core"]
- if "gpus_per_core" in resource_dict:
- check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
- del resource_dict["gpus_per_core"]
+ if resource_dict is None:
+ resource_dict = {}
+
+ validation_map = {
+ "openmpi_oversubscribe": (check_oversubscribe, "oversubscribe"),
+ "slurm_cmd_args": (check_command_line_argument_lst, "command_line_argument_lst"),
+ "threads_per_core": (check_threads_per_core, "threads_per_core"),
+ "gpus_per_core": (check_gpus_per_worker, "gpus_per_worker"),
+ }
+
+ for key, (validator, param_name) in validation_map.items():
+ if key in resource_dict:
+ validator(**{param_name: resource_dict[key]})
+ del resource_dict[key]
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if "openmpi_oversubscribe" in resource_dict: | |
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) | |
del resource_dict["openmpi_oversubscribe"] | |
if "slurm_cmd_args" in resource_dict: | |
check_command_line_argument_lst( | |
command_line_argument_lst=resource_dict["slurm_cmd_args"] | |
) | |
del resource_dict["slurm_cmd_args"] | |
if "threads_per_core" in resource_dict: | |
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) | |
del resource_dict["threads_per_core"] | |
if "gpus_per_core" in resource_dict: | |
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) | |
del resource_dict["gpus_per_core"] | |
if resource_dict is None: | |
resource_dict = {} | |
validation_map = { | |
"openmpi_oversubscribe": (check_oversubscribe, "oversubscribe"), | |
"slurm_cmd_args": (check_command_line_argument_lst, "command_line_argument_lst"), | |
"threads_per_core": (check_threads_per_core, "threads_per_core"), | |
"gpus_per_core": (check_gpus_per_worker, "gpus_per_worker"), | |
} | |
for key, (validator, param_name) in validation_map.items(): | |
if key in resource_dict: | |
validator(**{param_name: resource_dict[key]}) | |
del resource_dict[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (1)
executorlib/cache/executor.py (1)
85-98
: Add docstring to document parameters and return type.The function lacks a docstring to explain its purpose, parameters, return type, and possible exceptions.
Add a comprehensive docstring:
def create_file_executor( max_workers: int = 1, backend: str = "pysqa_flux", max_cores: int = 1, cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, flux_executor: Optional[Any] = None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[callable] = None, ) -> FileExecutor: """ Create a FileExecutor instance configured for pysqa-based backends. Args: max_workers (int): Number of workers (must be 1 for pysqa backend) backend (str): Backend name (must start with 'pysqa_') max_cores (int): Number of cores (must be 1 for pysqa backend) cache_directory (Optional[str]): Directory for cache files resource_dict (Optional[dict]): Resource configuration dictionary flux_executor (Optional[Any]): Flux executor instance flux_executor_pmi_mode (Optional[str]): PMI mode for flux executor flux_executor_nesting (bool): Enable flux executor nesting pysqa_config_directory (Optional[str]): Path to pysqa config directory hostname_localhost (Optional[bool]): Hostname-based connection option block_allocation (bool): Block allocation option init_function (Optional[callable]): Initialization function Returns: FileExecutor: Configured executor instance Raises: ValueError: If unsupported options are used with pysqa backend """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- executorlib/init.py (6 hunks)
- executorlib/cache/executor.py (4 hunks)
🧰 Additional context used
🪛 Ruff
executorlib/cache/executor.py
31-31: Do not use mutable data structures for argument defaults
Replace with
None
; initialize within function(B006)
🔇 Additional comments (4)
executorlib/__init__.py (4)
42-42
: LGTM: Well-documented parameter additionThe new
pysqa_config_directory
parameter is clearly documented with its purpose and scope.
91-91
: LGTM: Properly typed parameter additionThe
pysqa_config_directory
parameter is correctly typed asOptional[str]
.
112-112
: LGTM: Consistent parameter implementationThe
pysqa_config_directory
parameter is consistently implemented in__new__
with proper typing and documentation.Also applies to: 160-160
179-195
: LGTM: Clean pysqa backend implementationThe pysqa backend handling is implemented correctly with proper parameter forwarding to
create_file_executor
. Note that there's an existing suggestion to consolidate error checks in the pysqa backend section.
cache_directory=cache_directory, | ||
resource_dict=resource_dict, | ||
pysqa_config_directory=pysqa_config_directory, | ||
backend=backend.split("pysqa_")[-1], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve backend string handling.
The current backend string splitting is fragile and could fail with malformed input.
- backend=backend.split("pysqa_")[-1],
+ backend=backend[6:] if backend.startswith("pysqa_") else backend,
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
backend=backend.split("pysqa_")[-1], | |
backend=backend[6:] if backend.startswith("pysqa_") else backend, |
@@ -20,10 +28,10 @@ | |||
def __init__( | |||
self, | |||
cache_directory: str = "cache", | |||
resource_dict: Optional[dict] = None, | |||
resource_dict: dict = {}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix mutable default argument.
Using a mutable default argument (empty dict) can lead to unexpected behavior when the same default instance is shared across function calls.
- resource_dict: dict = {},
+ resource_dict: Optional[dict] = None
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
resource_dict: dict = {}, | |
resource_dict: Optional[dict] = None, |
🧰 Tools
🪛 Ruff
31-31: Do not use mutable data structures for argument defaults
Replace with
None
; initialize within function(B006)
elif not disable_dependencies: | ||
if pysqa_config_directory is not None: | ||
raise ValueError( | ||
"The pysqa_config_directory is only required for the pysqa backend." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consolidate duplicate validation logic
The validation for pysqa_config_directory
is duplicated in both non-pysqa branches. Consider consolidating this check to improve maintainability.
def __new__(
cls,
max_workers: int = 1,
backend: str = "local",
...
):
+ # Validate pysqa_config_directory early
+ if pysqa_config_directory is not None and not backend.startswith("pysqa_"):
+ raise ValueError(
+ "The pysqa_config_directory is only required for the pysqa backend."
+ )
+
if "pysqa_" in backend and not plot_dependency_graph:
from executorlib.cache.executor import create_file_executor
return create_file_executor(...)
elif not disable_dependencies:
- if pysqa_config_directory is not None:
- raise ValueError(
- "The pysqa_config_directory is only required for the pysqa backend."
- )
return ExecutorWithDependencies(...)
else:
_check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
_check_refresh_rate(refresh_rate=refresh_rate)
- if pysqa_config_directory is not None:
- raise ValueError(
- "The pysqa_config_directory is only required for the pysqa backend."
- )
return create_executor(...)
Also applies to: 219-222
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
executorlib/__init__.py (1)
Based on the gathered information, I can now provide the final response:
Additional test coverage needed for pysqa backend configuration
The current implementation lacks sufficient test coverage for pysqa configuration handling:
- Only basic functionality test exists in
test_cache_executor_pysqa_flux.py
- No tests for
pysqa_config_directory
validation- Missing error cases for invalid configuration paths
- No tests for interaction between pysqa and other backend settings
Required test cases:
- Validation of None/empty/invalid config directory paths
- Error handling when using pysqa config with non-pysqa backends
- Configuration directory path validation
- Integration with other backend settings
🔗 Analysis chain
Line range hint
179-222
: Verify pysqa backend integration.Please ensure that the pysqa backend integration is properly tested, particularly:
- The handling of None/empty/invalid config directory paths
- The interaction between pysqa configuration and other backend settings
Let me analyze the output and generate additional verification scripts to check the pysqa integration more thoroughly.
Let me check the content of the pysqa test file to understand the test coverage.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify pysqa integration testing coverage # Test 1: Check for pysqa-related test files echo "Looking for pysqa test files..." fd -t f "test.*pysqa.*\.py$" # Test 2: Check for pysqa configuration handling tests echo "Looking for config directory handling tests..." rg -A 5 "pysqa_config_directory.*None|pysqa_config_directory.*=.*['\"]"Length of output: 15620
Script:
#!/bin/bash # Check for pysqa-specific test files and test coverage echo "Looking for pysqa test files..." fd "test.*pysqa.*\.py$" # Check for pysqa-specific test cases and assertions echo "Looking for pysqa test assertions..." rg -A 5 "def test.*pysqa|assert.*pysqa" # Check for pysqa backend implementation echo "Looking for pysqa backend implementation..." rg -A 10 "pysqa_.*backend" tests/Length of output: 396
Script:
#!/bin/bash # Check content of pysqa test file echo "Content of pysqa test file:" cat tests/test_cache_executor_pysqa_flux.py # Look for pysqa-related error handling echo -e "\nError handling for pysqa:" rg -A 3 "raise.*pysqa|ValueError.*pysqa"Length of output: 1396
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- executorlib/init.py (6 hunks)
🔇 Additional comments (3)
executorlib/__init__.py (3)
42-42
: LGTM: Clear and accurate parameter documentation.The documentation for
pysqa_config_directory
is well-written, clearly indicating its optionality and specific use case.
91-91
: LGTM: Consistent signature updates.The
pysqa_config_directory
parameter is properly added to both methods with correct type hints.Also applies to: 112-112
179-195
: Verify the backend string handling.The backend string manipulation could be made more robust. Consider validating the backend string format and handling edge cases.
#!/bin/bash # Description: Check for potential edge cases in backend string handling # Test: Search for all backend string usages to ensure consistent handling rg -A 2 'backend.*=|backend.*split'
Summary by CodeRabbit
Release Notes
New Features
Executor
andFileExecutor
classes with improved configuration options for thepysqa
backend.Bug Fixes
pysqa_config_directory
for clarity in configuration handling.Tests
Documentation