Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor user interface #458

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@
__all__ = []


try:
from executorlib.cache.executor import FileExecutor

__all__ += [FileExecutor]
except ImportError:
pass


class Executor:
"""
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
Expand All @@ -47,6 +39,7 @@ class Executor:
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -95,6 +88,7 @@ def __init__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = True,
init_function: Optional[callable] = None,
Expand All @@ -115,6 +109,7 @@ def __new__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = True,
init_function: Optional[callable] = None,
Expand Down Expand Up @@ -162,6 +157,7 @@ def __new__(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
disable_dependencies (boolean): Disable resolving future objects during the submission.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
Expand All @@ -180,7 +176,24 @@ def __new__(
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
if not disable_dependencies:
if "pysqa_" in backend and not plot_dependency_graph:
from executorlib.cache.executor import create_file_executor

return create_file_executor(
max_workers=max_workers,
backend=backend,
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=None,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
)
elif not disable_dependencies:
return ExecutorWithDependencies(
max_workers=max_workers,
backend=backend,
Expand Down
89 changes: 76 additions & 13 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
execute_in_subprocess,
terminate_subprocess,
)
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_executor,
check_gpus_per_worker,
check_nested_flux_executor,
check_oversubscribe,
check_threads_per_core,
)
from executorlib.standalone.thread import RaisingThread

try:
Expand All @@ -20,10 +28,10 @@ class FileExecutor(ExecutorBase):
def __init__(
self,
cache_directory: str = "cache",
resource_dict: Optional[dict] = None,
resource_dict: dict = {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix mutable default argument.

Using a mutable default argument (empty dict) can lead to unexpected behavior when the same default instance is shared across function calls.

-        resource_dict: dict = {},
+        resource_dict: Optional[dict] = None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
resource_dict: dict = {},
resource_dict: Optional[dict] = None,
🧰 Tools
🪛 Ruff

31-31: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

execute_function: callable = execute_with_pysqa,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
):
"""
Expand All @@ -36,19 +44,24 @@ def __init__(
- cwd (str/None): current working directory where the parallel python task is executed
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
terminate_function (callable, optional): The function to terminate the tasks.
config_directory (str, optional): path to the config directory.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str, optional): name of the backend used to spawn tasks.
"""
super().__init__()
default_resource_dict = {
"cores": 1,
"cwd": None,
}
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
if "openmpi_oversubscribe" in resource_dict:
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
del resource_dict["openmpi_oversubscribe"]
if "slurm_cmd_args" in resource_dict:
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
)
del resource_dict["slurm_cmd_args"]
if "threads_per_core" in resource_dict:
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
del resource_dict["threads_per_core"]
if "gpus_per_core" in resource_dict:
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
del resource_dict["gpus_per_core"]
Comment on lines +51 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor resource validation for better maintainability.

The current validation logic has repetitive patterns. Consider using a dictionary to map keys to their validation functions.

-        if "openmpi_oversubscribe" in resource_dict:
-            check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
-            del resource_dict["openmpi_oversubscribe"]
-        if "slurm_cmd_args" in resource_dict:
-            check_command_line_argument_lst(
-                command_line_argument_lst=resource_dict["slurm_cmd_args"]
-            )
-            del resource_dict["slurm_cmd_args"]
-        if "threads_per_core" in resource_dict:
-            check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
-            del resource_dict["threads_per_core"]
-        if "gpus_per_core" in resource_dict:
-            check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
-            del resource_dict["gpus_per_core"]
+        if resource_dict is None:
+            resource_dict = {}
+        
+        validation_map = {
+            "openmpi_oversubscribe": (check_oversubscribe, "oversubscribe"),
+            "slurm_cmd_args": (check_command_line_argument_lst, "command_line_argument_lst"),
+            "threads_per_core": (check_threads_per_core, "threads_per_core"),
+            "gpus_per_core": (check_gpus_per_worker, "gpus_per_worker"),
+        }
+        
+        for key, (validator, param_name) in validation_map.items():
+            if key in resource_dict:
+                validator(**{param_name: resource_dict[key]})
+                del resource_dict[key]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if "openmpi_oversubscribe" in resource_dict:
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
del resource_dict["openmpi_oversubscribe"]
if "slurm_cmd_args" in resource_dict:
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
)
del resource_dict["slurm_cmd_args"]
if "threads_per_core" in resource_dict:
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
del resource_dict["threads_per_core"]
if "gpus_per_core" in resource_dict:
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
del resource_dict["gpus_per_core"]
if resource_dict is None:
resource_dict = {}
validation_map = {
"openmpi_oversubscribe": (check_oversubscribe, "oversubscribe"),
"slurm_cmd_args": (check_command_line_argument_lst, "command_line_argument_lst"),
"threads_per_core": (check_threads_per_core, "threads_per_core"),
"gpus_per_core": (check_gpus_per_worker, "gpus_per_worker"),
}
for key, (validator, param_name) in validation_map.items():
if key in resource_dict:
validator(**{param_name: resource_dict[key]})
del resource_dict[key]

if execute_function == execute_in_subprocess and terminate_function is None:
terminate_function = terminate_subprocess
cache_directory_path = os.path.abspath(cache_directory)
Expand All @@ -62,8 +75,58 @@ def __init__(
"cache_directory": cache_directory_path,
"resource_dict": resource_dict,
"terminate_function": terminate_function,
"config_directory": config_directory,
"pysqa_config_directory": pysqa_config_directory,
"backend": backend,
},
)
)


def create_file_executor(
max_workers: int = 1,
backend: str = "local",
max_cores: int = 1,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
):
if cache_directory is None:
cache_directory = "executorlib_cache"
if max_workers != 1:
raise ValueError(
"The number of workers cannot be controlled with the pysqa based backend."
)
if max_cores != 1:
raise ValueError(
"The number of cores cannot be controlled with the pysqa based backend."
)
if hostname_localhost is not None:
raise ValueError(
"The option to connect to hosts based on their hostname is not available with the pysqa based backend."
)
if block_allocation:
raise ValueError(
"The option block_allocation is not available with the pysqa based backend."
)
if init_function is not None:
raise ValueError(
"The option to specify an init_function is not available with the pysqa based backend."
)
if flux_executor_pmi_mode is not None:
raise ValueError(
"The option to specify the flux pmi mode is not available with the pysqa based backend."
)
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve backend string handling.

The current backend string splitting is fragile and could fail with malformed input.

-        backend=backend.split("pysqa_")[-1],
+        backend=backend[6:] if backend.startswith("pysqa_") else backend,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
backend=backend.split("pysqa_")[-1],
backend=backend[6:] if backend.startswith("pysqa_") else backend,

)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve backend handling and error messages.

  1. The backend string splitting is fragile and could fail with malformed input
  2. Error messages could be more informative by suggesting valid alternatives
  3. The flux_executor parameter lacks type hints
 def create_file_executor(
     max_workers: int = 1,
     backend: str = "local",
     max_cores: int = 1,
     cache_directory: Optional[str] = None,
     resource_dict: Optional[dict] = None,
-    flux_executor=None,
+    flux_executor: Optional[Any] = None,
     flux_executor_pmi_mode: Optional[str] = None,
     flux_executor_nesting: bool = False,
     pysqa_config_directory: Optional[str] = None,
     hostname_localhost: Optional[bool] = None,
     block_allocation: bool = False,
     init_function: Optional[callable] = None,
 ):
+    """
+    Create a FileExecutor instance with pysqa backend.
+    
+    Args:
+        max_workers (int): Must be 1 for pysqa backend
+        backend (str): Must start with 'pysqa_'
+        max_cores (int): Must be 1 for pysqa backend
+        cache_directory (Optional[str]): Cache directory path
+        resource_dict (Optional[dict]): Resource configuration
+        flux_executor (Optional[Any]): Flux executor instance
+        flux_executor_pmi_mode (Optional[str]): Not supported with pysqa
+        flux_executor_nesting (bool): Whether to nest flux executors
+        pysqa_config_directory (Optional[str]): Pysqa config directory path
+        hostname_localhost (Optional[bool]): Not supported with pysqa
+        block_allocation (bool): Not supported with pysqa
+        init_function (Optional[callable]): Not supported with pysqa
+    
+    Returns:
+        FileExecutor: Configured executor instance
+    
+    Raises:
+        ValueError: If unsupported options are used with pysqa backend
+    """
+    if not backend.startswith("pysqa_"):
+        raise ValueError(
+            f"Invalid backend: {backend}. Must start with 'pysqa_'. "
+            "Available backends: pysqa_slurm, pysqa_lsf, etc."
+        )

     if cache_directory is None:
         cache_directory = "executorlib_cache"
     
+    unsupported_options = {
+        "max_workers": (max_workers != 1, "must be 1"),
+        "max_cores": (max_cores != 1, "must be 1"),
+        "hostname_localhost": (hostname_localhost is not None, "not supported"),
+        "block_allocation": (block_allocation, "not supported"),
+        "init_function": (init_function is not None, "not supported"),
+        "flux_executor_pmi_mode": (flux_executor_pmi_mode is not None, "not supported"),
+    }
+    
+    for option, (is_invalid, message) in unsupported_options.items():
+        if is_invalid:
+            raise ValueError(
+                f"The option '{option}' {message} with the pysqa based backend."
+            )

     check_executor(executor=flux_executor)
     check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
     
     return FileExecutor(
         cache_directory=cache_directory,
         resource_dict=resource_dict,
         pysqa_config_directory=pysqa_config_directory,
-        backend=backend.split("pysqa_")[-1],
+        backend=backend[6:] if backend.startswith("pysqa_") else backend,
     )

Don't forget to add the following import at the top of the file:

from typing import Any

6 changes: 3 additions & 3 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def execute_tasks_h5(
execute_function: callable,
resource_dict: dict,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
pysqa_config_directory: Optional[str] = None,
backend: Optional[str] = None,
) -> None:
"""
Expand All @@ -66,7 +66,7 @@ def execute_tasks_h5(
- cwd (str/None): current working directory where the parallel python task is executed
execute_function (callable): The function to execute the tasks.
terminate_function (callable): The function to terminate the tasks.
config_directory (str, optional): path to the config directory.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
backend (str, optional): name of the backend used to spawn tasks.

Returns:
Expand Down Expand Up @@ -120,7 +120,7 @@ def execute_tasks_h5(
process_dict[k] for k in future_wait_key_lst
],
resource_dict=task_resource_dict,
config_directory=config_directory,
config_directory=pysqa_config_directory,
backend=backend,
)
file_name_dict[task_key] = os.path.join(
Expand Down
10 changes: 9 additions & 1 deletion tests/test_cache_executor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ def mpi_funct(i):
class TestCacheExecutorMPI(unittest.TestCase):
def test_executor(self):
with FileExecutor(
resource_dict={"cores": 2}, execute_function=execute_in_subprocess
resource_dict={
"cores": 2,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
},
Comment on lines +35 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Additional test coverage needed for resource parameters

The search results confirm that while these parameters are used in various test files, there's insufficient testing of their actual behavior:

  1. threads_per_core is only tested for invalid input in test_executor_backend_mpi.py
  2. openmpi_oversubscribe is only used with default False value in test setups
  3. cwd is only set to None in tests, no validation of custom working directories
  4. slurm_cmd_args appears in test configurations but has no tests verifying its effect

Suggested test additions:

  • Test MPI behavior with different threads_per_core values
  • Verify process behavior when openmpi_oversubscribe=True
  • Validate execution in custom working directories
  • Test the impact of custom SLURM command arguments
🔗 Analysis chain

Verify test coverage for resource parameters.

The test only verifies basic MPI functionality but doesn't validate the behavior of the new resource parameters. Consider adding test cases that verify:

  1. Impact of different threads_per_core values
  2. Behavior with openmpi_oversubscribe=True
  3. Custom working directory via cwd
  4. Custom SLURM arguments

Let's check if there are other tests covering these scenarios:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for tests that might cover these parameters
rg -l 'threads_per_core|openmpi_oversubscribe|slurm_cmd_args' tests/

Length of output: 211


Script:

#!/bin/bash
# Search for test files containing resource parameters with proper regex
rg -l "resource_dict.*threads_per_core|openmpi_oversubscribe|slurm_cmd_args" tests/

# Search for test files containing resource dictionary usage
rg "resource_dict\s*=\s*{" tests/

# Look for test files that might test MPI functionality
rg -l "mpi|MPI" tests/

# Check test file content for resource parameter testing
rg -A 5 "def test.*mpi" tests/

Length of output: 5467


Script:

#!/bin/bash
# Let's examine the content of test files that might contain resource parameter testing
rg -C 10 "threads_per_core|openmpi_oversubscribe|cwd.*=|slurm_cmd_args" tests/test_shared_communication.py tests/test_cache_executor_mpi.py tests/test_executor_backend_mpi.py

# Also check for any assertions related to these parameters
rg -B 5 "assert.*threads_per_core|assert.*openmpi_oversubscribe|assert.*cwd|assert.*slurm" tests/

Length of output: 6681

execute_function=execute_in_subprocess,
) as exe:
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
Expand Down
11 changes: 6 additions & 5 deletions tests/test_cache_executor_pysqa_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

try:
import flux.job
from executorlib import FileExecutor
from executorlib import Executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

FileExecutor still exists in other test files and needs to be updated

The import change in tests/test_cache_executor_pysqa_flux.py is correct, but there are other test files that still use FileExecutor:

  • tests/test_cache_executor_serial.py
  • tests/test_cache_executor_mpi.py

These files should also be updated to use the new Executor class for consistency with the module's evolution.

🔗 Analysis chain

LGTM: Import statement updated correctly.

The change from FileExecutor to Executor aligns with the module's evolution.

Let's verify this change is consistent across the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any remaining FileExecutor imports that might need updating
rg "from executorlib.*import.*FileExecutor"

Length of output: 107


Script:

#!/bin/bash
# Let's try a broader search to find any FileExecutor references
rg "FileExecutor"

# Also search for Executor class usage patterns
ast-grep --pattern 'Executor($$$)'

# And check all import statements related to executorlib
rg "from executorlib.*import"

Length of output: 20989


skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
Expand All @@ -30,15 +30,16 @@ def mpi_funct(i):
)
class TestCacheExecutorPysqa(unittest.TestCase):
def test_executor(self):
with FileExecutor(
with Executor(
resource_dict={"cores": 2},
backend="flux",
backend="pysqa_flux",
block_allocation=False,
) as exe:
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertTrue(fs1.done())

def tearDown(self):
if os.path.exists("cache"):
shutil.rmtree("cache")
if os.path.exists("executorlib_cache"):
shutil.rmtree("executorlib_cache")
39 changes: 32 additions & 7 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from executorlib.standalone.thread import RaisingThread

try:
from executorlib import FileExecutor
from executorlib.cache.executor import FileExecutor
from executorlib.cache.shared import execute_tasks_h5

skip_h5py_test = False
Expand All @@ -32,14 +32,26 @@ def list_files_in_working_directory():
)
class TestCacheExecutorSerial(unittest.TestCase):
def test_executor_mixed(self):
with FileExecutor(execute_function=execute_in_subprocess) as exe:
with FileExecutor(
execute_function=execute_in_subprocess,
resource_dict={
"cores": 1,
"cwd": None,
},
) as exe:
fs1 = exe.submit(my_funct, 1, b=2)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), 3)
self.assertTrue(fs1.done())

def test_executor_dependence_mixed(self):
with FileExecutor(execute_function=execute_in_subprocess) as exe:
with FileExecutor(
execute_function=execute_in_subprocess,
resource_dict={
"cores": 1,
"cwd": None,
},
) as exe:
fs1 = exe.submit(my_funct, 1, b=2)
fs2 = exe.submit(my_funct, 1, b=fs1)
self.assertFalse(fs2.done())
Expand All @@ -49,7 +61,11 @@ def test_executor_dependence_mixed(self):
def test_executor_working_directory(self):
cwd = os.path.join(os.path.dirname(__file__), "executables")
with FileExecutor(
resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess
resource_dict={
"cores": 1,
"cwd": cwd,
},
execute_function=execute_in_subprocess,
) as exe:
fs1 = exe.submit(list_files_in_working_directory)
self.assertEqual(fs1.result(), os.listdir(cwd))
Expand All @@ -74,7 +90,10 @@ def test_executor_function(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1, "cwd": None},
"resource_dict": {
"cores": 1,
"cwd": None,
},
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -115,7 +134,10 @@ def test_executor_function_dependence_kwargs(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1, "cwd": None},
"resource_dict": {
"cores": 1,
"cwd": None,
},
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -156,7 +178,10 @@ def test_executor_function_dependence_args(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1, "cwd": None},
"resource_dict": {
"cores": 1,
"cwd": None,
},
"terminate_function": terminate_subprocess,
},
)
Expand Down
Loading