Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache: Use explicit arguments for serialize_funct_h5() #448

Merged
merged 13 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
)
from typing import Optional

from executorlib.standalone.inputcheck import (
check_resource_dict,
check_resource_dict_is_empty,
)
from executorlib.standalone.inputcheck import check_resource_dict
from executorlib.standalone.queue import cancel_items_in_queue
from executorlib.standalone.serialize import cloudpickle_register
from executorlib.standalone.thread import RaisingThread
Expand Down Expand Up @@ -89,10 +86,17 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut
Returns:
Future: A Future representing the given call.
"""
check_resource_dict_is_empty(resource_dict=resource_dict)
check_resource_dict(function=fn)
f = Future()
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
self._future_queue.put(
{
"fn": fn,
"args": args,
"kwargs": kwargs,
"future": f,
"resource_dict": resource_dict,
}
)
return f

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
Expand Down
5 changes: 4 additions & 1 deletion executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ def execute_tasks_h5(
file_name_dict=file_name_dict,
)
task_key, data_dict = serialize_funct_h5(
task_dict["fn"], *task_args, **task_kwargs
fn=task_dict["fn"],
fn_args=task_args,
fn_kwargs=task_kwargs,
resource_dict=task_dict["resource_dict"],
)
if task_key not in memory_dict.keys():
if task_key + ".h5out" not in os.listdir(cache_directory):
Expand Down
8 changes: 4 additions & 4 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from concurrent.futures import Future
from typing import Any, Callable, Dict, Optional

from executorlib.base.executor import ExecutorBase
from executorlib.interactive.shared import (
ExecutorSteps,
InteractiveExecutor,
InteractiveStepExecutor,
execute_tasks_with_dependencies,
Expand Down Expand Up @@ -35,10 +35,10 @@
pass


class ExecutorWithDependencies(ExecutorSteps):
class ExecutorWithDependencies(ExecutorBase):
"""
ExecutorWithDependencies is a class that extends ExecutorSteps and provides
functionality for executing tasks with dependencies.
ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with
dependencies.

Args:
refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01.
Expand Down
83 changes: 40 additions & 43 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

from executorlib.base.executor import ExecutorBase, cancel_items_in_queue
from executorlib.standalone.command import get_command_path
from executorlib.standalone.inputcheck import check_resource_dict
from executorlib.standalone.inputcheck import (
check_resource_dict,
check_resource_dict_is_empty,
)
from executorlib.standalone.interactive.communication import (
SocketInterface,
interface_bootup,
Expand All @@ -19,6 +22,37 @@


class ExecutorBroker(ExecutorBase):
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace mutable default argument

Using a mutable default argument (empty dict) can lead to unexpected behavior when the default value is shared between function calls.

Apply this fix:

-    def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future:
+    def submit(self, fn: callable, *args, resource_dict: dict | None = None, **kwargs) -> Future:

And add this check at the beginning of the method:

if resource_dict is None:
    resource_dict = {}
🧰 Tools
🪛 Ruff

25-25: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

"""
Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Args:
fn (callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}

Returns:
Future: A Future representing the given call.
"""
check_resource_dict_is_empty(resource_dict=resource_dict)
check_resource_dict(function=fn)
f = Future()
self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f})
return f

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
"""Clean-up the resources associated with the Executor.

Expand Down Expand Up @@ -57,46 +91,6 @@ def _set_process(self, process: List[RaisingThread]):
process.start()


class ExecutorSteps(ExecutorBase):
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
"""
Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.

Args:
fn (callable): function to submit for execution
args: arguments for the submitted function
kwargs: keyword arguments for the submitted function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the
function. Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}

Returns:
A Future representing the given call.
"""
check_resource_dict(function=fn)
f = Future()
self._future_queue.put(
{
"fn": fn,
"args": args,
"kwargs": kwargs,
"future": f,
"resource_dict": resource_dict,
}
)
return f


class InteractiveExecutor(ExecutorBroker):
"""
The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python
Expand Down Expand Up @@ -151,7 +145,7 @@ def __init__(
)


class InteractiveStepExecutor(ExecutorSteps):
class InteractiveStepExecutor(ExecutorBase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect class name in docstring example

The example in the docstring uses PyFluxStepExecutor, but the actual class name is InteractiveStepExecutor.

Update the example in the docstring:

->>> with PyFluxStepExecutor(max_cores=2) as p:
+>>> with InteractiveStepExecutor(max_cores=2) as p:

Committable suggestion was skipped due to low confidence.

"""
The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python
tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor
Expand Down Expand Up @@ -596,7 +590,10 @@ def _execute_task_with_cache(

future = task_dict["future"]
task_key, data_dict = serialize_funct_h5(
task_dict["fn"], *task_dict["args"], **task_dict["kwargs"]
fn=task_dict["fn"],
fn_args=task_dict["args"],
fn_kwargs=task_dict["kwargs"],
resource_dict=task_dict["resource_dict"],
)
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, task_key + ".h5out")
Expand Down
29 changes: 24 additions & 5 deletions executorlib/standalone/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,41 @@ def cloudpickle_register(ind: int = 2):
pass


def serialize_funct_h5(fn: callable, *args: Any, **kwargs: Any) -> Tuple[str, dict]:
def serialize_funct_h5(
fn: callable, fn_args: list = [], fn_kwargs: dict = {}, resource_dict: dict = {}
) -> Tuple[str, dict]:
"""
Serialize a function and its arguments and keyword arguments into an HDF5 file.

Args:
fn (callable): The function to be serialized.
*args (Any): The arguments of the function.
**kwargs (Any): The keyword arguments of the function.
fn_args (list): The arguments of the function.
fn_kwargs (dict): The keyword arguments of the function.
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Example resource dictionary: {
cores: 1,
threads_per_core: 1,
gpus_per_worker: 0,
oversubscribe: False,
cwd: None,
executor: None,
hostname_localhost: False,
}

Returns:
Tuple[str, dict]: A tuple containing the task key and the serialized data.

"""
binary_all = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs})
binary_all = cloudpickle.dumps(
{"fn": fn, "args": fn_args, "kwargs": fn_kwargs, "resource_dict": resource_dict}
)
task_key = fn.__name__ + _get_hash(binary=binary_all)
data = {"fn": fn, "args": args, "kwargs": kwargs}
data = {
"fn": fn,
"args": fn_args,
"kwargs": fn_kwargs,
"resource_dict": resource_dict,
}
return task_key, data


Expand Down
50 changes: 45 additions & 5 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,15 @@ def test_executor_working_directory(self):
def test_executor_function(self):
fs1 = Future()
q = Queue()
q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": 2}, "future": fs1})
q.put(
{
"fn": my_funct,
"args": (),
"kwargs": {"a": 1, "b": 2},
"future": fs1,
"resource_dict": {},
}
)
cache_dir = os.path.abspath("cache")
os.makedirs(cache_dir, exist_ok=True)
process = RaisingThread(
Expand All @@ -80,8 +88,24 @@ def test_executor_function_dependence_kwargs(self):
fs1 = Future()
fs2 = Future()
q = Queue()
q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": 2}, "future": fs1})
q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": fs1}, "future": fs2})
q.put(
{
"fn": my_funct,
"args": (),
"kwargs": {"a": 1, "b": 2},
"future": fs1,
"resource_dict": {},
}
)
q.put(
{
"fn": my_funct,
"args": (),
"kwargs": {"a": 1, "b": fs1},
"future": fs2,
"resource_dict": {},
}
)
cache_dir = os.path.abspath("cache")
os.makedirs(cache_dir, exist_ok=True)
process = RaisingThread(
Expand All @@ -106,8 +130,24 @@ def test_executor_function_dependence_args(self):
fs1 = Future()
fs2 = Future()
q = Queue()
q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": 2}, "future": fs1})
q.put({"fn": my_funct, "args": [fs1], "kwargs": {"b": 2}, "future": fs2})
q.put(
{
"fn": my_funct,
"args": (),
"kwargs": {"a": 1, "b": 2},
"future": fs1,
"resource_dict": {},
}
)
q.put(
{
"fn": my_funct,
"args": [fs1],
"kwargs": {"b": 2},
"future": fs2,
"resource_dict": {},
}
)
cache_dir = os.path.abspath("cache")
os.makedirs(cache_dir, exist_ok=True)
process = RaisingThread(
Expand Down
18 changes: 9 additions & 9 deletions tests/test_cache_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ def test_execute_function_mixed(self):
cache_directory = os.path.abspath("cache")
os.makedirs(cache_directory, exist_ok=True)
task_key, data_dict = serialize_funct_h5(
my_funct,
1,
b=2,
fn=my_funct,
fn_args=[1],
fn_kwargs={"b": 2},
)
file_name = os.path.join(cache_directory, task_key + ".h5in")
dump(file_name=file_name, data_dict=data_dict)
Expand All @@ -50,9 +50,9 @@ def test_execute_function_args(self):
cache_directory = os.path.abspath("cache")
os.makedirs(cache_directory, exist_ok=True)
task_key, data_dict = serialize_funct_h5(
my_funct,
1,
2,
fn=my_funct,
fn_args=[1, 2],
fn_kwargs={},
)
file_name = os.path.join(cache_directory, task_key + ".h5in")
dump(file_name=file_name, data_dict=data_dict)
Expand All @@ -73,9 +73,9 @@ def test_execute_function_kwargs(self):
cache_directory = os.path.abspath("cache")
os.makedirs(cache_directory, exist_ok=True)
task_key, data_dict = serialize_funct_h5(
my_funct,
a=1,
b=2,
fn=my_funct,
fn_args=[],
fn_kwargs={"a": 1, "b": 2},
)
file_name = os.path.join(cache_directory, task_key + ".h5in")
dump(file_name=file_name, data_dict=data_dict)
Expand Down
Loading