From 3b2f3ce6ccf980d4fed0d69347c7b2abee8a3f13 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 10:05:12 +0100 Subject: [PATCH 01/16] Unify user interface --- executorlib/__init__.py | 35 +++++++++++++++++++++++-- executorlib/cache/executor.py | 26 +++++++++++++++--- executorlib/cache/shared.py | 6 ++--- tests/test_cache_executor_pysqa_flux.py | 6 ++--- tests/test_cache_executor_serial.py | 27 ++++++++++++++++--- 5 files changed, 86 insertions(+), 14 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 3e95579f..7d3ee0a2 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -8,6 +8,12 @@ from executorlib.standalone.inputcheck import ( check_refresh_rate as _check_refresh_rate, ) +from executorlib.standalone.inputcheck import ( + check_executor as _check_executor, +) +from executorlib.standalone.inputcheck import ( + check_nested_flux_executor as _check_nested_flux_executor, +) __version__ = _get_versions()["version"] __all__ = [] @@ -47,6 +53,7 @@ class Executor: flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -95,8 +102,9 @@ def __init__( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, - block_allocation: bool = True, + block_allocation: bool = False, init_function: Optional[callable] = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, @@ -115,8 +123,9 @@ def __new__( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, - block_allocation: bool = True, + block_allocation: bool = False, init_function: Optional[callable] = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, @@ -162,6 +171,7 @@ def __new__( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later disable_dependencies (boolean): Disable resolving future objects during the submission. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. @@ -196,6 +206,27 @@ def __new__( refresh_rate=refresh_rate, plot_dependency_graph=plot_dependency_graph, ) + elif "pysqa_" in backend and not plot_dependency_graph: + if cache_directory is None: + cache_directory = "executorlib_cache" + if max_workers != 1: + raise ValueError("The number of workers cannot be controlled with the pysqa based backend.") + if max_cores != 1: + raise ValueError("The number of cores cannot be controlled with the pysqa based backend.") + if hostname_localhost is not None: + raise ValueError("The option to connect to hosts based on their hostname is not available with the pysqa based backend.") + if block_allocation: + raise ValueError("The option block_allocation is not available with the pysqa based backend.") + if init_function is not None: + raise ValueError("The option to specify an init_function is not available with the pysqa based backend.") + _check_executor(executor=flux_executor) + _check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) + return FileExecutor( + cache_directory=cache_directory, + resource_dict=resource_dict, + pysqa_config_directory=pysqa_config_directory, + backend=backend.split("pysqa_")[-1], + ) else: _check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) _check_refresh_rate(refresh_rate=refresh_rate) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 0195a14e..81f543e7 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -3,6 +3,12 @@ from executorlib.base.executor import ExecutorBase from executorlib.cache.shared import execute_tasks_h5 +from executorlib.standalone.inputcheck import ( + check_command_line_argument_lst, + check_oversubscribe, + check_threads_per_core, + check_gpus_per_worker, +) from executorlib.standalone.cache.spawner import ( execute_in_subprocess, terminate_subprocess, @@ -23,7 +29,7 @@ def __init__( resource_dict: Optional[dict] = None, execute_function: callable = execute_with_pysqa, terminate_function: Optional[callable] = None, - config_directory: Optional[str] = None, + pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, ): """ @@ -36,19 +42,33 @@ def __init__( - cwd (str/None): current working directory where the parallel python task is executed execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. terminate_function (callable, optional): The function to terminate the tasks. - config_directory (str, optional): path to the config directory. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. """ super().__init__() default_resource_dict = { "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], } if resource_dict is None: resource_dict = {} resource_dict.update( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) + check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) + check_command_line_argument_lst( + command_line_argument_lst=resource_dict["slurm_cmd_args"] + ) + check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) + check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) + del resource_dict["threads_per_core"] + del resource_dict["gpus_per_core"] + del resource_dict["openmpi_oversubscribe"] + del resource_dict["slurm_cmd_args"] if execute_function == execute_in_subprocess and terminate_function is None: terminate_function = terminate_subprocess cache_directory_path = os.path.abspath(cache_directory) @@ -62,7 +82,7 @@ def __init__( "cache_directory": cache_directory_path, "resource_dict": resource_dict, "terminate_function": terminate_function, - "config_directory": config_directory, + "config_directory": pysqa_config_directory, "backend": backend, }, ) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index dd09542b..22177b32 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -52,7 +52,7 @@ def execute_tasks_h5( execute_function: callable, resource_dict: dict, terminate_function: Optional[callable] = None, - config_directory: Optional[str] = None, + pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, ) -> None: """ @@ -66,7 +66,7 @@ def execute_tasks_h5( - cwd (str/None): current working directory where the parallel python task is executed execute_function (callable): The function to execute the tasks. terminate_function (callable): The function to terminate the tasks. - config_directory (str, optional): path to the config directory. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. Returns: @@ -120,7 +120,7 @@ def execute_tasks_h5( process_dict[k] for k in future_wait_key_lst ], resource_dict=task_resource_dict, - config_directory=config_directory, + config_directory=pysqa_config_directory, backend=backend, ) file_name_dict[task_key] = os.path.join( diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index 545f9f25..36426114 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -5,7 +5,7 @@ try: import flux.job - from executorlib import FileExecutor + from executorlib import Executor skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) @@ -30,9 +30,9 @@ def mpi_funct(i): ) class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): - with FileExecutor( + with Executor( resource_dict={"cores": 2}, - backend="flux", + backend="pysqa_flux", ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 96aa2df0..2edadcf8 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -74,7 +74,14 @@ def test_executor_function(self): "future_queue": q, "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "resource_dict": {"cores": 1, "cwd": None}, + "resource_dict": { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, "terminate_function": terminate_subprocess, }, ) @@ -115,7 +122,14 @@ def test_executor_function_dependence_kwargs(self): "future_queue": q, "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "resource_dict": {"cores": 1, "cwd": None}, + "resource_dict": { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, "terminate_function": terminate_subprocess, }, ) @@ -156,7 +170,14 @@ def test_executor_function_dependence_args(self): "future_queue": q, "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "resource_dict": {"cores": 1, "cwd": None}, + "resource_dict": { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, "terminate_function": terminate_subprocess, }, ) From 8fd4d47a932e1b7c493b8e80d5690f7f24af6aa2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 28 Oct 2024 09:07:13 +0000 Subject: [PATCH 02/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/__init__.py | 28 +++++++++++++++++++--------- executorlib/cache/executor.py | 10 +++++----- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 7d3ee0a2..5383fc6a 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -3,16 +3,16 @@ from executorlib._version import get_versions as _get_versions from executorlib.interactive.executor import ExecutorWithDependencies, create_executor from executorlib.standalone.inputcheck import ( - check_plot_dependency_graph as _check_plot_dependency_graph, + check_executor as _check_executor, ) from executorlib.standalone.inputcheck import ( - check_refresh_rate as _check_refresh_rate, + check_nested_flux_executor as _check_nested_flux_executor, ) from executorlib.standalone.inputcheck import ( - check_executor as _check_executor, + check_plot_dependency_graph as _check_plot_dependency_graph, ) from executorlib.standalone.inputcheck import ( - check_nested_flux_executor as _check_nested_flux_executor, + check_refresh_rate as _check_refresh_rate, ) __version__ = _get_versions()["version"] @@ -210,15 +210,25 @@ def __new__( if cache_directory is None: cache_directory = "executorlib_cache" if max_workers != 1: - raise ValueError("The number of workers cannot be controlled with the pysqa based backend.") + raise ValueError( + "The number of workers cannot be controlled with the pysqa based backend." + ) if max_cores != 1: - raise ValueError("The number of cores cannot be controlled with the pysqa based backend.") + raise ValueError( + "The number of cores cannot be controlled with the pysqa based backend." + ) if hostname_localhost is not None: - raise ValueError("The option to connect to hosts based on their hostname is not available with the pysqa based backend.") + raise ValueError( + "The option to connect to hosts based on their hostname is not available with the pysqa based backend." + ) if block_allocation: - raise ValueError("The option block_allocation is not available with the pysqa based backend.") + raise ValueError( + "The option block_allocation is not available with the pysqa based backend." + ) if init_function is not None: - raise ValueError("The option to specify an init_function is not available with the pysqa based backend.") + raise ValueError( + "The option to specify an init_function is not available with the pysqa based backend." + ) _check_executor(executor=flux_executor) _check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) return FileExecutor( diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 81f543e7..ce5e2f64 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -3,15 +3,15 @@ from executorlib.base.executor import ExecutorBase from executorlib.cache.shared import execute_tasks_h5 +from executorlib.standalone.cache.spawner import ( + execute_in_subprocess, + terminate_subprocess, +) from executorlib.standalone.inputcheck import ( check_command_line_argument_lst, + check_gpus_per_worker, check_oversubscribe, check_threads_per_core, - check_gpus_per_worker, -) -from executorlib.standalone.cache.spawner import ( - execute_in_subprocess, - terminate_subprocess, ) from executorlib.standalone.thread import RaisingThread From 55dd86ab91eb31a3625ce0990f883ad252bf0043 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 10:25:13 +0100 Subject: [PATCH 03/16] Update __init__.py --- executorlib/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 5383fc6a..8ffc1ff6 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -104,7 +104,7 @@ def __init__( flux_executor_nesting: bool = False, pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, + block_allocation: bool = True, init_function: Optional[callable] = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, @@ -125,7 +125,7 @@ def __new__( flux_executor_nesting: bool = False, pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, - block_allocation: bool = False, + block_allocation: bool = True, init_function: Optional[callable] = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, From 72303a4ede91e5261152afd0e746ccbbaeda776b Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 10:59:21 +0100 Subject: [PATCH 04/16] bug fixes --- executorlib/cache/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index ce5e2f64..e4c89482 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -82,7 +82,7 @@ def __init__( "cache_directory": cache_directory_path, "resource_dict": resource_dict, "terminate_function": terminate_function, - "config_directory": pysqa_config_directory, + "pysqa_config_directory": pysqa_config_directory, "backend": backend, }, ) From fa26b93f35e79c7bbe5a899d54d95452dacd3b0e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 11:44:10 +0100 Subject: [PATCH 05/16] More fixes --- executorlib/__init__.py | 52 +++++--------------- executorlib/cache/executor.py | 65 ++++++++++++++++++++----- tests/test_cache_executor_mpi.py | 10 +++- tests/test_cache_executor_pysqa_flux.py | 5 +- tests/test_cache_executor_serial.py | 36 ++++++++++++-- 5 files changed, 108 insertions(+), 60 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 8ffc1ff6..d13684f0 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -2,12 +2,6 @@ from executorlib._version import get_versions as _get_versions from executorlib.interactive.executor import ExecutorWithDependencies, create_executor -from executorlib.standalone.inputcheck import ( - check_executor as _check_executor, -) -from executorlib.standalone.inputcheck import ( - check_nested_flux_executor as _check_nested_flux_executor, -) from executorlib.standalone.inputcheck import ( check_plot_dependency_graph as _check_plot_dependency_graph, ) @@ -19,14 +13,6 @@ __all__ = [] -try: - from executorlib.cache.executor import FileExecutor - - __all__ += [FileExecutor] -except ImportError: - pass - - class Executor: """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or @@ -207,35 +193,21 @@ def __new__( plot_dependency_graph=plot_dependency_graph, ) elif "pysqa_" in backend and not plot_dependency_graph: - if cache_directory is None: - cache_directory = "executorlib_cache" - if max_workers != 1: - raise ValueError( - "The number of workers cannot be controlled with the pysqa based backend." - ) - if max_cores != 1: - raise ValueError( - "The number of cores cannot be controlled with the pysqa based backend." - ) - if hostname_localhost is not None: - raise ValueError( - "The option to connect to hosts based on their hostname is not available with the pysqa based backend." - ) - if block_allocation: - raise ValueError( - "The option block_allocation is not available with the pysqa based backend." - ) - if init_function is not None: - raise ValueError( - "The option to specify an init_function is not available with the pysqa based backend." - ) - _check_executor(executor=flux_executor) - _check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) - return FileExecutor( + from executorlib.cache.executor import create_file_executor + + return create_file_executor( + max_workers=max_workers, + backend=backend, + max_cores=max_cores, cache_directory=cache_directory, resource_dict=resource_dict, + flux_executor=None, + flux_executor_pmi_mode=flux_executor_pmi_mode, + flux_executor_nesting=flux_executor_nesting, pysqa_config_directory=pysqa_config_directory, - backend=backend.split("pysqa_")[-1], + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, ) else: _check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index e4c89482..3cc66246 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -12,6 +12,8 @@ check_gpus_per_worker, check_oversubscribe, check_threads_per_core, + check_executor, + check_nested_flux_executor, ) from executorlib.standalone.thread import RaisingThread @@ -46,19 +48,6 @@ def __init__( backend (str, optional): name of the backend used to spawn tasks. """ super().__init__() - default_resource_dict = { - "cores": 1, - "threads_per_core": 1, - "gpus_per_core": 0, - "cwd": None, - "openmpi_oversubscribe": False, - "slurm_cmd_args": [], - } - if resource_dict is None: - resource_dict = {} - resource_dict.update( - {k: v for k, v in default_resource_dict.items() if k not in resource_dict} - ) check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) check_command_line_argument_lst( command_line_argument_lst=resource_dict["slurm_cmd_args"] @@ -87,3 +76,53 @@ def __init__( }, ) ) + + +def create_file_executor( + max_workers: int = 1, + backend: str = "local", + max_cores: int = 1, + cache_directory: Optional[str] = None, + resource_dict: Optional[dict] = None, + flux_executor=None, + flux_executor_pmi_mode: Optional[str] = None, + flux_executor_nesting: bool = False, + pysqa_config_directory: Optional[str] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[callable] = None, +): + if cache_directory is None: + cache_directory = "executorlib_cache" + if max_workers != 1: + raise ValueError( + "The number of workers cannot be controlled with the pysqa based backend." + ) + if max_cores != 1: + raise ValueError( + "The number of cores cannot be controlled with the pysqa based backend." + ) + if hostname_localhost is not None: + raise ValueError( + "The option to connect to hosts based on their hostname is not available with the pysqa based backend." + ) + if block_allocation: + raise ValueError( + "The option block_allocation is not available with the pysqa based backend." + ) + if init_function is not None: + raise ValueError( + "The option to specify an init_function is not available with the pysqa based backend." + ) + if flux_executor_pmi_mode is not None: + raise ValueError( + "The option to specify the flux pmi mode is not available with the pysqa based backend." + ) + check_executor(executor=flux_executor) + check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) + return FileExecutor( + cache_directory=cache_directory, + resource_dict=resource_dict, + pysqa_config_directory=pysqa_config_directory, + backend=backend.split("pysqa_")[-1], + ) diff --git a/tests/test_cache_executor_mpi.py b/tests/test_cache_executor_mpi.py index bc5e1226..d0923f95 100644 --- a/tests/test_cache_executor_mpi.py +++ b/tests/test_cache_executor_mpi.py @@ -32,7 +32,15 @@ def mpi_funct(i): class TestCacheExecutorMPI(unittest.TestCase): def test_executor(self): with FileExecutor( - resource_dict={"cores": 2}, execute_function=execute_in_subprocess + resource_dict={ + "cores": 2, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, + execute_function=execute_in_subprocess ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index 36426114..cab1293c 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -33,6 +33,7 @@ def test_executor(self): with Executor( resource_dict={"cores": 2}, backend="pysqa_flux", + block_allocation=False, ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) @@ -40,5 +41,5 @@ def test_executor(self): self.assertTrue(fs1.done()) def tearDown(self): - if os.path.exists("cache"): - shutil.rmtree("cache") + if os.path.exists("executorlib_cache"): + shutil.rmtree("executorlib_cache") diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 2edadcf8..9d2dc82f 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -11,7 +11,7 @@ from executorlib.standalone.thread import RaisingThread try: - from executorlib import FileExecutor + from executorlib.cache.executor import FileExecutor from executorlib.cache.shared import execute_tasks_h5 skip_h5py_test = False @@ -32,14 +32,34 @@ def list_files_in_working_directory(): ) class TestCacheExecutorSerial(unittest.TestCase): def test_executor_mixed(self): - with FileExecutor(execute_function=execute_in_subprocess) as exe: + with FileExecutor( + execute_function=execute_in_subprocess, + resource_dict={ + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, + ) as exe: fs1 = exe.submit(my_funct, 1, b=2) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), 3) self.assertTrue(fs1.done()) def test_executor_dependence_mixed(self): - with FileExecutor(execute_function=execute_in_subprocess) as exe: + with FileExecutor( + execute_function=execute_in_subprocess, + resource_dict={ + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, + ) as exe: fs1 = exe.submit(my_funct, 1, b=2) fs2 = exe.submit(my_funct, 1, b=fs1) self.assertFalse(fs2.done()) @@ -49,7 +69,15 @@ def test_executor_dependence_mixed(self): def test_executor_working_directory(self): cwd = os.path.join(os.path.dirname(__file__), "executables") with FileExecutor( - resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess + resource_dict={ + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": cwd, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, + execute_function=execute_in_subprocess ) as exe: fs1 = exe.submit(list_files_in_working_directory) self.assertEqual(fs1.result(), os.listdir(cwd)) From 4624c0f9e446a1e6cfff57f668790d1b7ae73765 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 28 Oct 2024 10:44:18 +0000 Subject: [PATCH 06/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/cache/executor.py | 4 ++-- tests/test_cache_executor_mpi.py | 2 +- tests/test_cache_executor_serial.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 3cc66246..14de880c 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -9,11 +9,11 @@ ) from executorlib.standalone.inputcheck import ( check_command_line_argument_lst, + check_executor, check_gpus_per_worker, + check_nested_flux_executor, check_oversubscribe, check_threads_per_core, - check_executor, - check_nested_flux_executor, ) from executorlib.standalone.thread import RaisingThread diff --git a/tests/test_cache_executor_mpi.py b/tests/test_cache_executor_mpi.py index d0923f95..becba0f2 100644 --- a/tests/test_cache_executor_mpi.py +++ b/tests/test_cache_executor_mpi.py @@ -40,7 +40,7 @@ def test_executor(self): "openmpi_oversubscribe": False, "slurm_cmd_args": [], }, - execute_function=execute_in_subprocess + execute_function=execute_in_subprocess, ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 9d2dc82f..7c170328 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -77,7 +77,7 @@ def test_executor_working_directory(self): "openmpi_oversubscribe": False, "slurm_cmd_args": [], }, - execute_function=execute_in_subprocess + execute_function=execute_in_subprocess, ) as exe: fs1 = exe.submit(list_files_in_working_directory) self.assertEqual(fs1.result(), os.listdir(cwd)) From a458dbe5872cd7ffc5ba41fbb7d0b378ee632717 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 11:51:30 +0100 Subject: [PATCH 07/16] revert to file executor --- tests/test_cache_executor_pysqa_flux.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index cab1293c..c61d9815 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -5,7 +5,7 @@ try: import flux.job - from executorlib import Executor + from executorlib.cache.executor import FileExecutor skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) @@ -30,10 +30,16 @@ def mpi_funct(i): ) class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): - with Executor( - resource_dict={"cores": 2}, - backend="pysqa_flux", - block_allocation=False, + with FileExecutor( + resource_dict={ + "cores": 2, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, + backend="flux", ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) From 8bf1bacc150ff926f3088bd066b0969a4603d122 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 11:55:26 +0100 Subject: [PATCH 08/16] delete keys only if they exist --- executorlib/cache/executor.py | 26 +++++++++++++++----------- tests/test_cache_executor_serial.py | 24 ------------------------ 2 files changed, 15 insertions(+), 35 deletions(-) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 14de880c..fa48ca56 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -28,7 +28,7 @@ class FileExecutor(ExecutorBase): def __init__( self, cache_directory: str = "cache", - resource_dict: Optional[dict] = None, + resource_dict: dict = {}, execute_function: callable = execute_with_pysqa, terminate_function: Optional[callable] = None, pysqa_config_directory: Optional[str] = None, @@ -48,16 +48,20 @@ def __init__( backend (str, optional): name of the backend used to spawn tasks. """ super().__init__() - check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) - check_command_line_argument_lst( - command_line_argument_lst=resource_dict["slurm_cmd_args"] - ) - check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) - check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) - del resource_dict["threads_per_core"] - del resource_dict["gpus_per_core"] - del resource_dict["openmpi_oversubscribe"] - del resource_dict["slurm_cmd_args"] + if "openmpi_oversubscribe" in resource_dict: + check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) + del resource_dict["openmpi_oversubscribe"] + if "slurm_cmd_args" in resource_dict: + check_command_line_argument_lst( + command_line_argument_lst=resource_dict["slurm_cmd_args"] + ) + del resource_dict["slurm_cmd_args"] + if "threads_per_core" in resource_dict: + check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) + del resource_dict["threads_per_core"] + if "gpus_per_core" in resource_dict: + check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) + del resource_dict["gpus_per_core"] if execute_function == execute_in_subprocess and terminate_function is None: terminate_function = terminate_subprocess cache_directory_path = os.path.abspath(cache_directory) diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 7c170328..24081bf7 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -36,11 +36,7 @@ def test_executor_mixed(self): execute_function=execute_in_subprocess, resource_dict={ "cores": 1, - "threads_per_core": 1, - "gpus_per_core": 0, "cwd": None, - "openmpi_oversubscribe": False, - "slurm_cmd_args": [], }, ) as exe: fs1 = exe.submit(my_funct, 1, b=2) @@ -53,11 +49,7 @@ def test_executor_dependence_mixed(self): execute_function=execute_in_subprocess, resource_dict={ "cores": 1, - "threads_per_core": 1, - "gpus_per_core": 0, "cwd": None, - "openmpi_oversubscribe": False, - "slurm_cmd_args": [], }, ) as exe: fs1 = exe.submit(my_funct, 1, b=2) @@ -71,11 +63,7 @@ def test_executor_working_directory(self): with FileExecutor( resource_dict={ "cores": 1, - "threads_per_core": 1, - "gpus_per_core": 0, "cwd": cwd, - "openmpi_oversubscribe": False, - "slurm_cmd_args": [], }, execute_function=execute_in_subprocess, ) as exe: @@ -104,11 +92,7 @@ def test_executor_function(self): "execute_function": execute_in_subprocess, "resource_dict": { "cores": 1, - "threads_per_core": 1, - "gpus_per_core": 0, "cwd": None, - "openmpi_oversubscribe": False, - "slurm_cmd_args": [], }, "terminate_function": terminate_subprocess, }, @@ -152,11 +136,7 @@ def test_executor_function_dependence_kwargs(self): "execute_function": execute_in_subprocess, "resource_dict": { "cores": 1, - "threads_per_core": 1, - "gpus_per_core": 0, "cwd": None, - "openmpi_oversubscribe": False, - "slurm_cmd_args": [], }, "terminate_function": terminate_subprocess, }, @@ -200,11 +180,7 @@ def test_executor_function_dependence_args(self): "execute_function": execute_in_subprocess, "resource_dict": { "cores": 1, - "threads_per_core": 1, - "gpus_per_core": 0, "cwd": None, - "openmpi_oversubscribe": False, - "slurm_cmd_args": [], }, "terminate_function": terminate_subprocess, }, From 930de31c2557291f060d1437fdd0657156cc6c10 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 11:58:03 +0100 Subject: [PATCH 09/16] Fix initialization --- executorlib/__init__.py | 26 ++++++++++++------------- tests/test_cache_executor_pysqa_flux.py | 6 +++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index d13684f0..c637fa8d 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -176,38 +176,38 @@ def __new__( resource_dict.update( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) - if not disable_dependencies: - return ExecutorWithDependencies( + if "pysqa_" in backend and not plot_dependency_graph: + from executorlib.cache.executor import create_file_executor + + return create_file_executor( max_workers=max_workers, backend=backend, - cache_directory=cache_directory, max_cores=max_cores, + cache_directory=cache_directory, resource_dict=resource_dict, - flux_executor=flux_executor, + flux_executor=None, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, + pysqa_config_directory=pysqa_config_directory, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, - refresh_rate=refresh_rate, - plot_dependency_graph=plot_dependency_graph, ) - elif "pysqa_" in backend and not plot_dependency_graph: - from executorlib.cache.executor import create_file_executor - - return create_file_executor( + elif not disable_dependencies: + return ExecutorWithDependencies( max_workers=max_workers, backend=backend, - max_cores=max_cores, cache_directory=cache_directory, + max_cores=max_cores, resource_dict=resource_dict, - flux_executor=None, + flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, - pysqa_config_directory=pysqa_config_directory, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, ) else: _check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index c61d9815..051e1bc5 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -5,7 +5,7 @@ try: import flux.job - from executorlib.cache.executor import FileExecutor + from executorlib import Executor skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) @@ -30,7 +30,7 @@ def mpi_funct(i): ) class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): - with FileExecutor( + with Executor( resource_dict={ "cores": 2, "threads_per_core": 1, @@ -39,7 +39,7 @@ def test_executor(self): "openmpi_oversubscribe": False, "slurm_cmd_args": [], }, - backend="flux", + backend="pysqa_flux", ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) From 4a0402ce7b1a39bc8d1c9eeb35d155951553770e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 11:58:34 +0100 Subject: [PATCH 10/16] reduce resource dict --- tests/test_cache_executor_pysqa_flux.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index 051e1bc5..b280b2b8 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -31,14 +31,7 @@ def mpi_funct(i): class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): with Executor( - resource_dict={ - "cores": 2, - "threads_per_core": 1, - "gpus_per_core": 0, - "cwd": None, - "openmpi_oversubscribe": False, - "slurm_cmd_args": [], - }, + resource_dict={"cores": 2}, backend="pysqa_flux", ) as exe: fs1 = exe.submit(mpi_funct, 1) From e6764f2d2ea07fad9e1742231fd62eb8440b8f3c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 12:02:50 +0100 Subject: [PATCH 11/16] Update test_cache_executor_pysqa_flux.py --- tests/test_cache_executor_pysqa_flux.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index b280b2b8..cab1293c 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -33,6 +33,7 @@ def test_executor(self): with Executor( resource_dict={"cores": 2}, backend="pysqa_flux", + block_allocation=False, ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) From 5eabfba3d4731a6bce3fa6db0ef7e37248c4634a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 12:14:55 +0100 Subject: [PATCH 12/16] Update executor.py --- executorlib/cache/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index fa48ca56..c389c5be 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -84,7 +84,7 @@ def __init__( def create_file_executor( max_workers: int = 1, - backend: str = "local", + backend: str = "pysqa_flux", max_cores: int = 1, cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, From c34fc0b2ae1ffc26b3df3962a3dbffc3c133695c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 12:21:18 +0100 Subject: [PATCH 13/16] Update __init__.py --- executorlib/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index c637fa8d..7e25c886 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -194,6 +194,8 @@ def __new__( init_function=init_function, ) elif not disable_dependencies: + if pysqa_config_directory is not None: + raise ValueError("The pysqa_config_directory is only required for the pysqa backend.") return ExecutorWithDependencies( max_workers=max_workers, backend=backend, @@ -212,6 +214,8 @@ def __new__( else: _check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) _check_refresh_rate(refresh_rate=refresh_rate) + if pysqa_config_directory is not None: + raise ValueError("The pysqa_config_directory is only required for the pysqa backend.") return create_executor( max_workers=max_workers, backend=backend, From 6c3c54f571606410edca15fd7c5db899c216213f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 28 Oct 2024 11:21:33 +0000 Subject: [PATCH 14/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/__init__.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 7e25c886..349e1074 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -195,7 +195,9 @@ def __new__( ) elif not disable_dependencies: if pysqa_config_directory is not None: - raise ValueError("The pysqa_config_directory is only required for the pysqa backend.") + raise ValueError( + "The pysqa_config_directory is only required for the pysqa backend." + ) return ExecutorWithDependencies( max_workers=max_workers, backend=backend, @@ -215,7 +217,9 @@ def __new__( _check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) _check_refresh_rate(refresh_rate=refresh_rate) if pysqa_config_directory is not None: - raise ValueError("The pysqa_config_directory is only required for the pysqa backend.") + raise ValueError( + "The pysqa_config_directory is only required for the pysqa backend." + ) return create_executor( max_workers=max_workers, backend=backend, From 74c1b7f092067b15432f921c34588a110aa1b89b Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 12:29:42 +0100 Subject: [PATCH 15/16] Update __init__.py --- executorlib/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 349e1074..54b2f6c3 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -185,7 +185,7 @@ def __new__( max_cores=max_cores, cache_directory=cache_directory, resource_dict=resource_dict, - flux_executor=None, + flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, pysqa_config_directory=pysqa_config_directory, From 434fe06cac0d9453bc9df6cb71f0d7bfe4b345fd Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 28 Oct 2024 12:50:25 +0100 Subject: [PATCH 16/16] Update test_cache_executor_pysqa_flux.py --- tests/test_cache_executor_pysqa_flux.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index cab1293c..cd4d6b70 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -31,8 +31,9 @@ def mpi_funct(i): class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): with Executor( - resource_dict={"cores": 2}, + resource_dict={"cores": 2, "cwd": "executorlib_cache"}, backend="pysqa_flux", + cache_directory="executorlib_cache", block_allocation=False, ) as exe: fs1 = exe.submit(mpi_funct, 1)