diff --git a/executorlib/__init__.py b/executorlib/__init__.py index b4ae786e..3337653a 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -83,10 +83,10 @@ class Executor: def __init__( self, - max_workers: int = 1, + max_workers: Optional[int] = None, backend: str = "local", cache_directory: Optional[str] = None, - max_cores: int = 1, + max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, @@ -104,10 +104,10 @@ def __init__( def __new__( cls, - max_workers: int = 1, + max_workers: Optional[int] = None, backend: str = "local", cache_directory: Optional[str] = None, - max_cores: int = 1, + max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 52a9fb80..cd5ce42c 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -147,9 +147,9 @@ def __exit__( def create_executor( - max_workers: int = 1, + max_workers: Optional[int] = None, backend: str = "local", - max_cores: int = 1, + max_cores: Optional[int] = None, cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, flux_executor=None, diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 9a466265..76f6c823 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -1,4 +1,5 @@ import inspect +import multiprocessing from concurrent.futures import Executor from typing import Callable, List, Optional @@ -131,12 +132,14 @@ def check_init_function(block_allocation: bool, init_function: Callable) -> None raise ValueError("") -def check_max_workers_and_cores(max_workers: int, max_cores: int) -> None: - if max_workers != 1: +def check_max_workers_and_cores( + max_workers: Optional[int], max_cores: Optional[int] +) -> None: + if max_workers is not None: raise ValueError( "The number of workers cannot be controlled with the pysqa based backend." ) - if max_cores != 1: + if max_cores is not None: raise ValueError( "The number of cores cannot be controlled with the pysqa based backend." ) @@ -166,10 +169,15 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: ) -def validate_number_of_cores(max_cores: int, max_workers: int) -> int: +def validate_number_of_cores( + max_cores: Optional[int], max_workers: Optional[int] +) -> int: """ Validate the number of cores and return the appropriate value. """ - if max_workers != 1 and max_cores == 1: + if max_workers is None and max_cores is None: + return multiprocessing.cpu_count() + elif max_workers is not None and max_cores is None: return max_workers - return max_cores + else: + return max_cores diff --git a/tests/test_integration_pyiron_workflow.py b/tests/test_integration_pyiron_workflow.py index 37272703..64f0eed6 100644 --- a/tests/test_integration_pyiron_workflow.py +++ b/tests/test_integration_pyiron_workflow.py @@ -74,7 +74,7 @@ def slowly_returns_dynamic(dynamic_arg): return dynamic_arg dynamic_dynamic = slowly_returns_dynamic() - executor = Executor(block_allocation=True) + executor = Executor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) dynamic_object = does_nothing() fs = executor.submit(dynamic_dynamic.run, dynamic_object) @@ -104,7 +104,7 @@ def slowly_returns_42(): self.assertIsNone( dynamic_42.result, msg="Just a sanity check that the test is set up right" ) - executor = Executor(block_allocation=True) + executor = Executor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) fs = executor.submit(dynamic_42.run) fs.add_done_callback(dynamic_42.process_result) @@ -135,7 +135,7 @@ def returns_42(): dynamic_42.running, msg="Sanity check that the test starts in the expected condition", ) - executor = Executor(block_allocation=True) + executor = Executor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) fs = executor.submit(dynamic_42.run) fs.add_done_callback(dynamic_42.process_result) @@ -159,7 +159,7 @@ def raise_error(): raise RuntimeError re = raise_error() - executor = Executor(block_allocation=True) + executor = Executor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) fs = executor.submit(re.run) with self.assertRaises( @@ -189,7 +189,7 @@ def slowly_returns_dynamic(): return inside_variable dynamic_dynamic = slowly_returns_dynamic() - executor = Executor(block_allocation=True) + executor = Executor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) fs = executor.submit(dynamic_dynamic.run) self.assertIsInstance( @@ -218,7 +218,7 @@ def slow(): return fortytwo f = slow() - executor = Executor(block_allocation=True) + executor = Executor(block_allocation=True, max_workers=1) cloudpickle_register(ind=1) fs = executor.submit(f.run) self.assertEqual( diff --git a/tests/test_local_executor_future.py b/tests/test_local_executor_future.py index 13691579..7ab3f1cd 100644 --- a/tests/test_local_executor_future.py +++ b/tests/test_local_executor_future.py @@ -68,6 +68,7 @@ def submit(): # Executor only exists in this scope and can get garbage collected after # this function is exits future = InteractiveExecutor( + max_workers=1, executor_kwargs={}, spawner=MpiExecSpawner, ).submit(slow_callable) @@ -108,6 +109,7 @@ def run(self): self.running = True future = InteractiveExecutor( + max_workers=1, executor_kwargs={}, spawner=MpiExecSpawner, ).submit(self.return_42) diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 8899a75e..44f5e599 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -17,6 +17,7 @@ check_max_workers_and_cores, check_hostname_localhost, check_pysqa_config_directory, + validate_number_of_cores, ) @@ -80,9 +81,9 @@ def test_check_flux_executor_pmi_mode(self): def test_check_max_workers_and_cores(self): with self.assertRaises(ValueError): - check_max_workers_and_cores(max_workers=2, max_cores=1) + check_max_workers_and_cores(max_workers=2, max_cores=None) with self.assertRaises(ValueError): - check_max_workers_and_cores(max_workers=1, max_cores=2) + check_max_workers_and_cores(max_workers=None, max_cores=2) with self.assertRaises(ValueError): check_max_workers_and_cores(max_workers=2, max_cores=2) @@ -95,3 +96,14 @@ def test_check_hostname_localhost(self): def test_check_pysqa_config_directory(self): with self.assertRaises(ValueError): check_pysqa_config_directory(pysqa_config_directory="path/to/config") + + def test_validate_number_of_cores(self): + self.assertIsInstance( + validate_number_of_cores(max_cores=None, max_workers=None), int + ) + self.assertIsInstance( + validate_number_of_cores(max_cores=1, max_workers=None), int + ) + self.assertIsInstance( + validate_number_of_cores(max_cores=None, max_workers=1), int + ) diff --git a/tests/test_shell_interactive.py b/tests/test_shell_interactive.py index c304e65c..e211b559 100644 --- a/tests/test_shell_interactive.py +++ b/tests/test_shell_interactive.py @@ -105,6 +105,7 @@ def test_execute_single_task(self): def test_shell_interactive_executor(self): cloudpickle_register(ind=1) with Executor( + max_workers=1, init_function=init_process, block_allocation=True, ) as exe: