From 37f84d230996299f9c962b5e6d72435c1c960b5d Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Fri, 9 Dec 2022 12:17:51 +0100 Subject: [PATCH 01/42] Add SequentialParallelBackend to run tasks without parallelism --- src/pydvl/utils/config.py | 4 +- src/pydvl/utils/parallel/backend.py | 112 +++++++++++++++++++++---- src/pydvl/utils/parallel/map_reduce.py | 5 +- tests/utils/test_parallel.py | 22 +++-- 4 files changed, 118 insertions(+), 25 deletions(-) diff --git a/src/pydvl/utils/config.py b/src/pydvl/utils/config.py index 0f488aed3..885b7fe18 100644 --- a/src/pydvl/utils/config.py +++ b/src/pydvl/utils/config.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from typing import Iterable, Optional, Tuple, Union +from typing import Iterable, Literal, Optional, Tuple, Union from pymemcache.serde import PickleSerde @@ -19,7 +19,7 @@ class ParallelConfig: :param num_workers: Number of workers (CPUs) to use. """ - backend: str = "ray" + backend: Literal["sequential", "ray"] = "ray" address: Optional[Union[str, Tuple[str, int]]] = None num_workers: Optional[int] = None diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 7ae04a390..a19532498 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -1,6 +1,8 @@ +import functools import os +from abc import ABC, abstractmethod from dataclasses import asdict -from typing import Any, Iterable, List, Optional, Tuple, TypeVar, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, TypeVar, Union import ray from ray import ObjectRef @@ -15,10 +17,89 @@ T = TypeVar("T") -_PARALLEL_BACKEND: Optional["RayParallelBackend"] = None +_PARALLEL_BACKENDS: Dict[str, "BaseParallelBackend"] = {} -class RayParallelBackend: +class BaseParallelBackend(ABC): + """Abstract base class for all parallel backends""" + + config: Dict[str, Any] = {} + + @abstractmethod + def get(self, v: Any, *args, **kwargs): + ... + + @abstractmethod + def put(self, v: Any, *args, **kwargs) -> Any: + ... + + @abstractmethod + def wrap(self, *args, **kwargs) -> Any: + ... + + @abstractmethod + def wait(self, v: Any, *args, **kwargs) -> Any: + ... + + @abstractmethod + def effective_n_jobs(self, n_jobs: Optional[int]) -> int: + ... + + def __repr__(self) -> str: + return f"<{self.__class__.__name__}: {self.config}>" + + +class SequentialParallelBackend(BaseParallelBackend): + """Class used to run jobs sequentiall and locally. It shouldn't + be initialized directly. You should instead call `init_parallel_backend`. + + :param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with number of cpus + + :Example: + + >>> from pydvl.utils.parallel.backend import SequentialParallelBackend + >>> from pydvl.utils.config import ParallelConfig + >>> config = ParallelConfig(backend="sequential") + >>> parallel_backend = SequentialParallelBackend(config) + >>> parallel_backend + + + """ + + def __init__(self, config: ParallelConfig): + config_dict = asdict(config) + config_dict.pop("backend") + config_dict.pop("address") + config_dict["num_cpus"] = config_dict.pop("num_workers") + self.config = config_dict + + def get(self, v: Any, *args, **kwargs): + return v + + def put(self, v: Any, *args, **kwargs) -> Any: + pass + + def wrap(self, *args, **kwargs) -> Any: + assert len(args) == 1 + return functools.partial(args[0], **kwargs) + + def wait(self, v: Any, *args, **kwargs) -> Tuple[list, list]: + return v, [] + + def effective_n_jobs(self, n_jobs: Optional[int]) -> int: + if n_jobs == 0: + raise ValueError("n_jobs == 0 in Parallel has no meaning") + elif n_jobs is None or n_jobs < 0: + if self.config["num_cpus"]: + eff_n_jobs = self.config["num_cpus"] + else: + eff_n_jobs = available_cpus() + else: + eff_n_jobs = n_jobs + return eff_n_jobs + + +class RayParallelBackend(BaseParallelBackend): """Class used to wrap ray to make it transparent to algorithms. It shouldn't be initialized directly. You should instead call `init_parallel_backend`. @@ -56,21 +137,21 @@ def get( else: return v - def put(self, x: Any, **kwargs) -> ObjectRef: - return ray.put(x, **kwargs) # type: ignore + def put(self, v: Any, **kwargs) -> ObjectRef: + return ray.put(v, **kwargs) # type: ignore def wrap(self, *args, **kwargs) -> RemoteFunction: return ray.remote(*args, **kwargs) # type: ignore def wait( self, - object_refs: List["ray.ObjectRef"], + v: List["ray.ObjectRef"], *, num_returns: int = 1, timeout: Optional[float] = None, ) -> Tuple[List[ObjectRef], List[ObjectRef]]: return ray.wait( # type: ignore - object_refs, + v, num_returns=num_returns, timeout=timeout, ) @@ -85,11 +166,8 @@ def effective_n_jobs(self, n_jobs: Optional[int]) -> int: eff_n_jobs = n_jobs return eff_n_jobs - def __repr__(self) -> str: - return f"" - -def init_parallel_backend(config: ParallelConfig) -> "RayParallelBackend": +def init_parallel_backend(config: ParallelConfig) -> "BaseParallelBackend": """Initializes the parallel backend and returns an instance of it. :param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc. @@ -104,13 +182,15 @@ def init_parallel_backend(config: ParallelConfig) -> "RayParallelBackend": """ - global _PARALLEL_BACKEND - if _PARALLEL_BACKEND is None: + global _PARALLEL_BACKENDS + if config.backend not in ["sequential", "ray"]: + raise NotImplementedError(f"Unexpected parallel type {config.backend}") + if config.backend not in _PARALLEL_BACKENDS: if config.backend == "ray": - _PARALLEL_BACKEND = RayParallelBackend(config) + _PARALLEL_BACKENDS["ray"] = RayParallelBackend(config) else: - raise NotImplementedError(f"Unexpected parallel type {config.backend}") - return _PARALLEL_BACKEND + _PARALLEL_BACKENDS["sequential"] = SequentialParallelBackend(config) + return _PARALLEL_BACKENDS[config.backend] def available_cpus() -> int: diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index 373d4059f..080c487c6 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -244,7 +244,10 @@ def _wrap_function(self, func): remote_func = self.parallel_backend.wrap( wrap_func_with_remote_args(func, timeout=self.timeout) ) - return remote_func.remote + if hasattr(remote_func, "remote"): + return remote_func.remote + else: + return remote_func def _backpressure( self, jobs: List[ObjectRef], n_dispatched: int, n_finished: int diff --git a/tests/utils/test_parallel.py b/tests/utils/test_parallel.py index 92f3a9c47..c6e566148 100644 --- a/tests/utils/test_parallel.py +++ b/tests/utils/test_parallel.py @@ -5,34 +5,44 @@ import numpy as np import pytest +from pydvl.utils.config import ParallelConfig from pydvl.utils.parallel import MapReduceJob +@pytest.fixture(scope="session", params=["sequential", "ray"]) +def parallel_config(request): + return ParallelConfig(backend=request.param) + + @pytest.fixture() -def map_reduce_job(request): +def map_reduce_job(parallel_config, request): try: kind, map_func, reduce_func = request.param assert kind == "custom" except ValueError: kind = request.param if kind == "numpy": - return MapReduceJob(map_func=np.sum, reduce_func=np.sum) + return MapReduceJob(map_func=np.sum, reduce_func=np.sum, config=parallel_config) elif kind == "list": return MapReduceJob( - map_func=lambda x: x, reduce_func=lambda r: reduce(operator.add, r, []) + map_func=lambda x: x, + reduce_func=lambda r: reduce(operator.add, r, []), + config=parallel_config, ) elif kind == "range": return MapReduceJob( map_func=lambda x: list(x), reduce_func=lambda r: reduce(operator.add, list(r), []), + config=parallel_config, ) elif kind == "custom": return MapReduceJob( - map_func=map_func, - reduce_func=reduce_func, + map_func=map_func, reduce_func=reduce_func, config=parallel_config ) else: - return MapReduceJob(map_func=lambda x: x * x, reduce_func=lambda r: r) + return MapReduceJob( + map_func=lambda x: x * x, reduce_func=lambda r: r, config=parallel_config + ) @pytest.mark.parametrize( From 949a3bfd3ce3710b52939b0aa178dee7a3768126 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Sun, 11 Dec 2022 14:24:12 +0100 Subject: [PATCH 02/42] Fix map_reduce test for non chunkified inputs --- src/pydvl/utils/parallel/map_reduce.py | 1 + tests/utils/test_parallel.py | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index 080c487c6..a386a9549 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -83,6 +83,7 @@ class MapReduceJob(Generic[T, R]): in each job. Alternatively, one can use `itertools.partial`. :param config: Instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc. + :param chunkify_inputs: If True, the input is split across jobs, otherwise it is repeated. :param n_jobs: Number of parallel jobs to run. Does not accept 0 :param n_runs: Number of times to run `map_func` and `reduce_func` on the whole data. diff --git a/tests/utils/test_parallel.py b/tests/utils/test_parallel.py index c6e566148..72fb5f2c2 100644 --- a/tests/utils/test_parallel.py +++ b/tests/utils/test_parallel.py @@ -56,15 +56,18 @@ def map_reduce_job(parallel_config, request): ], indirect=["map_reduce_job"], ) -@pytest.mark.parametrize("n_jobs", [1]) +@pytest.mark.parametrize("n_jobs", [1, 2]) @pytest.mark.parametrize("n_runs", [1, 2]) def test_map_reduce_job(map_reduce_job, indices, n_jobs, n_runs, expected): result = map_reduce_job( indices, n_jobs=n_jobs, n_runs=n_runs, + chunkify_inputs=False, ) + assert len(result) == n_runs for exp, ret in zip_longest(expected * n_runs, result, fillvalue=None): + exp = exp * n_jobs if not isinstance(ret, np.ndarray): assert ret == exp else: @@ -87,7 +90,7 @@ def test_map_reduce_job(map_reduce_job, indices, n_jobs, n_runs, expected): def test_map_reduce_job_chunkified_inputs( map_reduce_job, indices, n_jobs, n_runs, expected ): - result = map_reduce_job(indices, n_jobs=n_jobs, n_runs=n_runs, chunkify_inputs=True) + result = map_reduce_job(indices, n_jobs=n_jobs, n_runs=n_runs) assert len(result) == n_runs for exp, ret in zip_longest(expected * n_runs, result, fillvalue=None): if not isinstance(ret, np.ndarray): From 74c4cdd565567f45d3eecddbc134b0a533b8223a Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Sun, 11 Dec 2022 15:25:40 +0100 Subject: [PATCH 03/42] Use expit from scipy to avoid overflow warnings --- src/pydvl/utils/numeric.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pydvl/utils/numeric.py b/src/pydvl/utils/numeric.py index 0a60763c6..120e149a3 100644 --- a/src/pydvl/utils/numeric.py +++ b/src/pydvl/utils/numeric.py @@ -17,6 +17,7 @@ ) import numpy as np +from scipy.special import expit from pydvl.utils.types import compose_score @@ -252,7 +253,7 @@ def top_k_value_accuracy(y_true: "NDArray", y_pred: "NDArray", k: int = 3) -> fl def sigmoid(x: float) -> float: - return float(1 / (1 + np.exp(-x))) + return expit(x).item() squashed_r2 = compose_score("r2", sigmoid, "squashed r2") From 58d2149e15d24854b6e41313e4332739505e085f Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Sun, 11 Dec 2022 15:26:39 +0100 Subject: [PATCH 04/42] Remove the chunkify_inputs argument of MapReduceJob It was only used in one place and did not make sense in the context of parallelization --- src/pydvl/utils/parallel/map_reduce.py | 62 +++++++++----------------- src/pydvl/value/shapley/montecarlo.py | 10 +++-- src/pydvl/value/shapley/naive.py | 1 - tests/utils/test_parallel.py | 33 +------------- 4 files changed, 28 insertions(+), 78 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index a386a9549..e3bef6799 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -83,7 +83,6 @@ class MapReduceJob(Generic[T, R]): in each job. Alternatively, one can use `itertools.partial`. :param config: Instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc. - :param chunkify_inputs: If True, the input is split across jobs, otherwise it is repeated. :param n_jobs: Number of parallel jobs to run. Does not accept 0 :param n_runs: Number of times to run `map_func` and `reduce_func` on the whole data. @@ -111,21 +110,6 @@ class MapReduceJob(Generic[T, R]): >>> map_reduce_job(np.arange(5)) [10, 10, 10] - If we set `chunkify_inputs` to `False` the input is not split across jobs - but instead repeated: - - >>> from pydvl.utils.parallel import MapReduceJob - >>> import numpy as np - >>> map_reduce_job: MapReduceJob[np.ndarray, np.ndarray] = MapReduceJob( - ... map_func=np.sum, - ... reduce_func=np.sum, - ... chunkify_inputs=False, - ... n_jobs=2, - ... n_runs=3, - ... ) - >>> map_reduce_job(np.arange(5)) - [20, 20, 20] - """ def __init__( @@ -136,7 +120,6 @@ def __init__( reduce_kwargs: Optional[Dict] = None, config: ParallelConfig = ParallelConfig(), *, - chunkify_inputs: bool = True, n_jobs: int = 1, n_runs: int = 1, timeout: Optional[float] = None, @@ -147,7 +130,6 @@ def __init__( self._parallel_backend_ref = weakref.ref(parallel_backend) self.timeout = timeout - self.chunkify_inputs = chunkify_inputs self.n_runs = n_runs self._n_jobs = 1 @@ -180,14 +162,11 @@ def __call__( *, n_jobs: Optional[int] = None, n_runs: Optional[int] = None, - chunkify_inputs: Optional[bool] = None, ) -> List[R]: if n_jobs is not None: self.n_jobs = n_jobs if n_runs is not None: self.n_runs = n_runs - if chunkify_inputs is not None: - self.chunkify_inputs = chunkify_inputs map_results = self.map(inputs) reduce_results = self.reduce(map_results) @@ -202,10 +181,7 @@ def map(self, inputs: Union[Sequence[T], Any]) -> List[List["ObjectRef[R]"]]: total_n_finished = 0 for _ in range(self.n_runs): - if self.chunkify_inputs and self.n_jobs > 1: - chunks = self._chunkify(inputs, num_chunks=self.n_jobs) - else: - chunks = repeat(inputs, times=self.n_jobs) + chunks = self._chunkify(inputs, num_chunks=self.n_jobs) map_result = [] for j, next_chunk in enumerate(chunks): @@ -274,23 +250,27 @@ def _chunkify(data: Sequence[T], num_chunks: int) -> Iterator[Sequence[T]]: if num_chunks == 0: raise ValueError("Number of chunks should be greater than 0") - n = len(data) - - # This is very much inspired by numpy's array_split function - # The difference is that it only uses built-in functions - # and does not convert the input data to an array - chunk_size, remainder = divmod(n, num_chunks) - chunk_indices = tuple( - accumulate( - [0] - + remainder * [chunk_size + 1] - + (num_chunks - remainder) * [chunk_size] + elif num_chunks == 1: + yield data + + else: + n = len(data) + + # This is very much inspired by numpy's array_split function + # The difference is that it only uses built-in functions + # and does not convert the input data to an array + chunk_size, remainder = divmod(n, num_chunks) + chunk_indices = tuple( + accumulate( + [0] + + remainder * [chunk_size + 1] + + (num_chunks - remainder) * [chunk_size] + ) ) - ) - for start_index, end_index in zip(chunk_indices[:-1], chunk_indices[1:]): - if start_index >= end_index: - return - yield data[start_index:end_index] + for start_index, end_index in zip(chunk_indices[:-1], chunk_indices[1:]): + if start_index >= end_index: + return + yield data[start_index:end_index] @property def parallel_backend(self): diff --git a/src/pydvl/value/shapley/montecarlo.py b/src/pydvl/value/shapley/montecarlo.py index f55225b33..fd3507779 100644 --- a/src/pydvl/value/shapley/montecarlo.py +++ b/src/pydvl/value/shapley/montecarlo.py @@ -34,6 +34,7 @@ import logging import math from enum import Enum +from itertools import repeat from time import sleep from typing import TYPE_CHECKING, Iterable, NamedTuple, Optional, Sequence from warnings import warn @@ -234,10 +235,13 @@ def permutation_montecarlo_shapley( map_kwargs=dict(max_permutations=iterations_per_job, progress=progress), reduce_kwargs=dict(axis=0), config=config, - chunkify_inputs=False, n_jobs=n_jobs, ) - full_results = map_reduce_job(u_id)[0] + if n_jobs == 1: + input_ = u_id + else: + input_ = repeat(u_id, times=n_jobs) + full_results = map_reduce_job(input_)[0] values = np.mean(full_results, axis=0) stderr = np.std(full_results, axis=0) / np.sqrt(full_results.shape[0]) @@ -373,7 +377,6 @@ def combinatorial_montecarlo_shapley( map_func=_combinatorial_montecarlo_shapley, reduce_func=disjoint_reducer, map_kwargs=dict(u=u_id, max_iterations=max_iterations, progress=progress), - chunkify_inputs=True, n_jobs=n_jobs, config=config, ) @@ -529,7 +532,6 @@ def owen_sampling_shapley( progress=progress, ), reduce_func=disjoint_reducer, - chunkify_inputs=True, n_jobs=n_jobs, config=config, ) diff --git a/src/pydvl/value/shapley/naive.py b/src/pydvl/value/shapley/naive.py index 6973a4966..40d18fc0e 100644 --- a/src/pydvl/value/shapley/naive.py +++ b/src/pydvl/value/shapley/naive.py @@ -128,7 +128,6 @@ def reduce_fun(results): map_func=_combinatorial_exact_shapley, map_kwargs=dict(u=u_id, progress=progress), reduce_func=reduce_fun, - chunkify_inputs=True, n_jobs=n_jobs, ) values = map_reduce_job(u.data.indices)[0] diff --git a/tests/utils/test_parallel.py b/tests/utils/test_parallel.py index 72fb5f2c2..61b7f26e6 100644 --- a/tests/utils/test_parallel.py +++ b/tests/utils/test_parallel.py @@ -45,35 +45,6 @@ def map_reduce_job(parallel_config, request): ) -@pytest.mark.parametrize( - "map_reduce_job, indices, expected", - [ - ("list", [], [[]]), - ("list", [1, 2], [[1, 2]]), - ("list", [1, 2, 3], [[1, 2, 3]]), - ("range", range(10), [list(range(10))]), - ("numpy", list(range(10)), [45]), - ], - indirect=["map_reduce_job"], -) -@pytest.mark.parametrize("n_jobs", [1, 2]) -@pytest.mark.parametrize("n_runs", [1, 2]) -def test_map_reduce_job(map_reduce_job, indices, n_jobs, n_runs, expected): - result = map_reduce_job( - indices, - n_jobs=n_jobs, - n_runs=n_runs, - chunkify_inputs=False, - ) - assert len(result) == n_runs - for exp, ret in zip_longest(expected * n_runs, result, fillvalue=None): - exp = exp * n_jobs - if not isinstance(ret, np.ndarray): - assert ret == exp - else: - assert (ret == exp).all() - - @pytest.mark.parametrize( "map_reduce_job, indices, expected", [ @@ -87,9 +58,7 @@ def test_map_reduce_job(map_reduce_job, indices, n_jobs, n_runs, expected): ) @pytest.mark.parametrize("n_jobs", [2, 4]) @pytest.mark.parametrize("n_runs", [1, 2]) -def test_map_reduce_job_chunkified_inputs( - map_reduce_job, indices, n_jobs, n_runs, expected -): +def test_map_reduce_job(map_reduce_job, indices, n_jobs, n_runs, expected): result = map_reduce_job(indices, n_jobs=n_jobs, n_runs=n_runs) assert len(result) == n_runs for exp, ret in zip_longest(expected * n_runs, result, fillvalue=None): From 9a1ad1aa7f31e58eac23ba2e477dec83249f428e Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Sun, 11 Dec 2022 15:30:08 +0100 Subject: [PATCH 05/42] Rename num_chunks to n_chunks for consistency --- src/pydvl/utils/parallel/map_reduce.py | 12 ++++++------ tests/utils/test_parallel.py | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index e3bef6799..b59ddb1bc 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -181,7 +181,7 @@ def map(self, inputs: Union[Sequence[T], Any]) -> List[List["ObjectRef[R]"]]: total_n_finished = 0 for _ in range(self.n_runs): - chunks = self._chunkify(inputs, num_chunks=self.n_jobs) + chunks = self._chunkify(inputs, n_chunks=self.n_jobs) map_result = [] for j, next_chunk in enumerate(chunks): @@ -245,12 +245,12 @@ def _backpressure( return n_finished @staticmethod - def _chunkify(data: Sequence[T], num_chunks: int) -> Iterator[Sequence[T]]: + def _chunkify(data: Sequence[T], n_chunks: int) -> Iterator[Sequence[T]]: # Splits a list of values into chunks for each job - if num_chunks == 0: + if n_chunks == 0: raise ValueError("Number of chunks should be greater than 0") - elif num_chunks == 1: + elif n_chunks == 1: yield data else: @@ -259,12 +259,12 @@ def _chunkify(data: Sequence[T], num_chunks: int) -> Iterator[Sequence[T]]: # This is very much inspired by numpy's array_split function # The difference is that it only uses built-in functions # and does not convert the input data to an array - chunk_size, remainder = divmod(n, num_chunks) + chunk_size, remainder = divmod(n, n_chunks) chunk_indices = tuple( accumulate( [0] + remainder * [chunk_size + 1] - + (num_chunks - remainder) * [chunk_size] + + (n_chunks - remainder) * [chunk_size] ) ) for start_index, end_index in zip(chunk_indices[:-1], chunk_indices[1:]): diff --git a/tests/utils/test_parallel.py b/tests/utils/test_parallel.py index 61b7f26e6..78831886c 100644 --- a/tests/utils/test_parallel.py +++ b/tests/utils/test_parallel.py @@ -56,8 +56,8 @@ def map_reduce_job(parallel_config, request): ], indirect=["map_reduce_job"], ) -@pytest.mark.parametrize("n_jobs", [2, 4]) -@pytest.mark.parametrize("n_runs", [1, 2]) +@pytest.mark.parametrize("n_jobs", [1, 2, 4]) +@pytest.mark.parametrize("n_runs", [1, 2, 4]) def test_map_reduce_job(map_reduce_job, indices, n_jobs, n_runs, expected): result = map_reduce_job(indices, n_jobs=n_jobs, n_runs=n_runs) assert len(result) == n_runs From 80dec5317056aa4a38d76cb022626b6c528474ea Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Sun, 11 Dec 2022 15:50:46 +0100 Subject: [PATCH 06/42] Remove all optional arguments from MapReduceJob's __call__ method --- src/pydvl/utils/parallel/map_reduce.py | 11 +----- src/pydvl/value/shapley/montecarlo.py | 2 +- tests/utils/test_parallel.py | 53 ++++++++++++++++++-------- 3 files changed, 39 insertions(+), 27 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index b59ddb1bc..e5f43197c 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -1,6 +1,5 @@ -import warnings import weakref -from itertools import accumulate, chain, repeat +from itertools import accumulate, chain from typing import ( Any, Callable, @@ -159,15 +158,7 @@ def __init__( def __call__( self, inputs: Union[Collection[T], Any], - *, - n_jobs: Optional[int] = None, - n_runs: Optional[int] = None, ) -> List[R]: - if n_jobs is not None: - self.n_jobs = n_jobs - if n_runs is not None: - self.n_runs = n_runs - map_results = self.map(inputs) reduce_results = self.reduce(map_results) return reduce_results diff --git a/src/pydvl/value/shapley/montecarlo.py b/src/pydvl/value/shapley/montecarlo.py index fd3507779..957b2d805 100644 --- a/src/pydvl/value/shapley/montecarlo.py +++ b/src/pydvl/value/shapley/montecarlo.py @@ -240,7 +240,7 @@ def permutation_montecarlo_shapley( if n_jobs == 1: input_ = u_id else: - input_ = repeat(u_id, times=n_jobs) + input_ = tuple(repeat(u_id, times=n_jobs)) full_results = map_reduce_job(input_)[0] values = np.mean(full_results, axis=0) diff --git a/tests/utils/test_parallel.py b/tests/utils/test_parallel.py index 78831886c..cbc4b7d76 100644 --- a/tests/utils/test_parallel.py +++ b/tests/utils/test_parallel.py @@ -15,38 +15,57 @@ def parallel_config(request): @pytest.fixture() -def map_reduce_job(parallel_config, request): +def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): try: kind, map_func, reduce_func = request.param assert kind == "custom" except ValueError: kind = request.param if kind == "numpy": - return MapReduceJob(map_func=np.sum, reduce_func=np.sum, config=parallel_config) + map_reduce_job = MapReduceJob( + map_func=np.sum, + reduce_func=np.sum, + config=parallel_config, + n_jobs=n_jobs, + n_runs=n_runs, + ) elif kind == "list": - return MapReduceJob( + map_reduce_job = MapReduceJob( map_func=lambda x: x, reduce_func=lambda r: reduce(operator.add, r, []), config=parallel_config, + n_jobs=n_jobs, + n_runs=n_runs, ) elif kind == "range": - return MapReduceJob( + map_reduce_job = MapReduceJob( map_func=lambda x: list(x), reduce_func=lambda r: reduce(operator.add, list(r), []), config=parallel_config, + n_jobs=n_jobs, + n_runs=n_runs, ) elif kind == "custom": - return MapReduceJob( - map_func=map_func, reduce_func=reduce_func, config=parallel_config + map_reduce_job = MapReduceJob( + map_func=map_func, + reduce_func=reduce_func, + config=parallel_config, + n_jobs=n_jobs, + n_runs=n_runs, ) else: - return MapReduceJob( - map_func=lambda x: x * x, reduce_func=lambda r: r, config=parallel_config + map_reduce_job = MapReduceJob( + map_func=lambda x: x * x, + reduce_func=lambda r: r, + config=parallel_config, + n_jobs=n_jobs, + n_runs=n_runs, ) + return map_reduce_job, n_jobs, n_runs @pytest.mark.parametrize( - "map_reduce_job, indices, expected", + "map_reduce_job_and_parameters, indices, expected", [ ("list", [], [[]]), ("list", [1, 2], [[1, 2]]), @@ -54,12 +73,13 @@ def map_reduce_job(parallel_config, request): ("range", range(10), [list(range(10))]), ("numpy", list(range(10)), [45]), ], - indirect=["map_reduce_job"], + indirect=["map_reduce_job_and_parameters"], ) @pytest.mark.parametrize("n_jobs", [1, 2, 4]) @pytest.mark.parametrize("n_runs", [1, 2, 4]) -def test_map_reduce_job(map_reduce_job, indices, n_jobs, n_runs, expected): - result = map_reduce_job(indices, n_jobs=n_jobs, n_runs=n_runs) +def test_map_reduce_job(map_reduce_job_and_parameters, indices, expected): + map_reduce_job, n_jobs, n_runs = map_reduce_job_and_parameters + result = map_reduce_job(indices) assert len(result) == n_runs for exp, ret in zip_longest(expected * n_runs, result, fillvalue=None): if not isinstance(ret, np.ndarray): @@ -119,14 +139,15 @@ def map_func(x): # TODO: figure out test cases for this test @pytest.mark.skip @pytest.mark.parametrize( - "map_reduce_job, indices, n_jobs, n_runs, expected", + "map_reduce_job_and_parameters, indices, n_jobs, n_runs, expected", [ ("other", [], 1, 1, [[]]), ], - indirect=["map_reduce_job"], + indirect=["map_reduce_job_and_parameters"], ) def test_map_reduce_job_expected_failures( - map_reduce_job, indices, n_jobs, n_runs, expected + map_reduce_job_and_parameters, indices, expected ): + map_reduce_job, *_ = map_reduce_job_and_parameters with pytest.raises(expected): - map_reduce_job(indices, n_jobs=n_jobs, n_runs=n_runs) + map_reduce_job(indices) From 31be0b61ce990adc3e20a569901516882ae01b41 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Sun, 11 Dec 2022 15:56:36 +0100 Subject: [PATCH 07/42] Call parallel backend inside MapReduceJob when a Utility or sequence thereof is passed --- src/pydvl/utils/parallel/map_reduce.py | 16 ++++++++++++---- src/pydvl/value/shapley/montecarlo.py | 8 ++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index e5f43197c..d4770d7e2 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -3,7 +3,6 @@ from typing import ( Any, Callable, - Collection, Dict, Generic, Iterable, @@ -19,6 +18,7 @@ from ray import ObjectRef from ..config import ParallelConfig +from ..utility import Utility from .backend import init_parallel_backend __all__ = ["MapReduceJob"] @@ -157,13 +157,21 @@ def __init__( def __call__( self, - inputs: Union[Collection[T], Any], + inputs: Union[Sequence[T], Sequence[Utility], Utility], ) -> List[R]: - map_results = self.map(inputs) + if isinstance(inputs, Utility): + inputs_ = self.parallel_backend.put(inputs) + elif isinstance(inputs[0], Utility): + inputs_ = [self.parallel_backend.put(x) for x in inputs] + else: + inputs_ = inputs + map_results = self.map(inputs_) reduce_results = self.reduce(map_results) return reduce_results - def map(self, inputs: Union[Sequence[T], Any]) -> List[List["ObjectRef[R]"]]: + def map( + self, inputs: Union[Sequence[T], Sequence["ObjectRef"], "ObjectRef"] + ) -> List[List["ObjectRef[R]"]]: map_results: List[List["ObjectRef[R]"]] = [] map_func = self._wrap_function(self._map_func) diff --git a/src/pydvl/value/shapley/montecarlo.py b/src/pydvl/value/shapley/montecarlo.py index 957b2d805..00d878bc9 100644 --- a/src/pydvl/value/shapley/montecarlo.py +++ b/src/pydvl/value/shapley/montecarlo.py @@ -223,10 +223,6 @@ def permutation_montecarlo_shapley( :param progress: Whether to display progress bars for each job. :return: Object with the data values. """ - parallel_backend = init_parallel_backend(config) - - u_id = parallel_backend.put(u) - iterations_per_job = max(1, max_iterations // n_jobs) map_reduce_job: MapReduceJob["NDArray", "NDArray"] = MapReduceJob( @@ -238,9 +234,9 @@ def permutation_montecarlo_shapley( n_jobs=n_jobs, ) if n_jobs == 1: - input_ = u_id + input_ = u else: - input_ = tuple(repeat(u_id, times=n_jobs)) + input_ = tuple(repeat(u, times=n_jobs)) full_results = map_reduce_job(input_)[0] values = np.mean(full_results, axis=0) From 7c72d2a7b8be6cf2a00ec4a8bf6a48899434069c Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Sun, 11 Dec 2022 15:57:42 +0100 Subject: [PATCH 08/42] Rename num_workers in ParallelConfig to n_local_workers --- src/pydvl/utils/config.py | 4 ++-- src/pydvl/utils/parallel/backend.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pydvl/utils/config.py b/src/pydvl/utils/config.py index 885b7fe18..a6900d650 100644 --- a/src/pydvl/utils/config.py +++ b/src/pydvl/utils/config.py @@ -16,12 +16,12 @@ class ParallelConfig: :param backend: Type of backend to use. For now only 'ray' is supported. :param address: Address of existing remote or local cluster to use. - :param num_workers: Number of workers (CPUs) to use. + :param n_local_workers: Number of workers (CPUs) to use when using a local ray cluster """ backend: Literal["sequential", "ray"] = "ray" address: Optional[Union[str, Tuple[str, int]]] = None - num_workers: Optional[int] = None + n_local_workers: Optional[int] = None @unpackable diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index a19532498..5d807d10f 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -70,7 +70,7 @@ def __init__(self, config: ParallelConfig): config_dict = asdict(config) config_dict.pop("backend") config_dict.pop("address") - config_dict["num_cpus"] = config_dict.pop("num_workers") + config_dict["num_cpus"] = config_dict.pop("n_local_workers") self.config = config_dict def get(self, v: Any, *args, **kwargs): @@ -120,7 +120,7 @@ class RayParallelBackend(BaseParallelBackend): def __init__(self, config: ParallelConfig): config_dict = asdict(config) config_dict.pop("backend") - config_dict["num_cpus"] = config_dict.pop("num_workers") + config_dict["num_cpus"] = config_dict.pop("n_local_workers") self.config = config_dict ray.init(**self.config) From 219403f446274157fc001d6d9c0e0bead3794d9b Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Sun, 11 Dec 2022 19:51:06 +0100 Subject: [PATCH 09/42] Add a __repr__ to ValuationResults --- src/pydvl/value/results.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/pydvl/value/results.py b/src/pydvl/value/results.py index 5015392c9..dea9a31c7 100644 --- a/src/pydvl/value/results.py +++ b/src/pydvl/value/results.py @@ -242,6 +242,9 @@ def __eq__(self, other: object) -> bool: # and np.all(self.indices == other.indices) # Redundant ) + def __repr__(self) -> str: + return f"{self.__class__.__name__}(algorithm='{self._algorithm}', status='{self._status.value}', values={self.values})" + def to_dataframe( self, column: Optional[str] = None, use_names: bool = False ) -> "pandas.DataFrame": From 9bc96d18ec2443a7edf79dfbe564014323c40f4a Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Sun, 11 Dec 2022 19:51:26 +0100 Subject: [PATCH 10/42] If n_runs >= n_jobs, do not chunkify inputs --- src/pydvl/utils/parallel/map_reduce.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index d4770d7e2..e90e1bb7c 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -57,13 +57,13 @@ def get_value( class MapReduceJob(Generic[T, R]): - """Takes an embarrassingly parallel fun and runs it in n_jobs parallel + """Takes an embarrassingly parallel fun and runs it in `n_jobs` parallel jobs, splitting the data into the same number of chunks, one for each job. - It repeats the process num_runs times, allocating jobs across runs. E.g. - if n_jobs = 90 and n_runs=10, each whole execution of fun uses 9 jobs, - with the data split evenly among them. If n_jobs=2 and n_runs=10, two - cores are used, five times in succession, and each job receives all data. + It repeats the process `n_runs` times, allocating jobs across runs. E.g. + if `n_jobs=90` and `n_runs=10`, each whole execution of fun uses 9 jobs, + with the data split evenly among them. If `n_jobs=2` and `n_runs=10`, two + jobs are used, five times in succession, and each job receives all data. Results are aggregated per run using `reduce_func`, but **not across runs**. A list of length `n_runs` is always returned. @@ -132,6 +132,7 @@ def __init__( self.n_runs = n_runs self._n_jobs = 1 + # This uses the setter defined below self.n_jobs = n_jobs if max_parallel_tasks is None: @@ -161,7 +162,7 @@ def __call__( ) -> List[R]: if isinstance(inputs, Utility): inputs_ = self.parallel_backend.put(inputs) - elif isinstance(inputs[0], Utility): + elif len(inputs) > 0 and isinstance(inputs[0], Utility): inputs_ = [self.parallel_backend.put(x) for x in inputs] else: inputs_ = inputs @@ -180,7 +181,11 @@ def map( total_n_finished = 0 for _ in range(self.n_runs): - chunks = self._chunkify(inputs, n_chunks=self.n_jobs) + # In this first case we don't use chunking at all + if self.n_runs >= self.n_jobs: + chunks = [inputs] + else: + chunks = self._chunkify(inputs, n_chunks=self.n_jobs) map_result = [] for j, next_chunk in enumerate(chunks): From 6fcdc5f2b7cc11a7390d2f8a891b4a1e5eb87e7c Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Mon, 19 Dec 2022 09:04:23 +0100 Subject: [PATCH 11/42] Use older version of tox --- requirements-dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 5b9d0448d..5d6004423 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -11,7 +11,7 @@ pytest-cov pytest-mock pytest-timeout ray[default] >= 0.8 -tox +tox<4.0.0 tox-wheel types-tqdm twine From 0c2454d10aa0d2062cc602f92bef214cc5d65456 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Mon, 19 Dec 2022 09:08:04 +0100 Subject: [PATCH 12/42] Update changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d99cd6df2..5bba85e43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,11 @@ - **Breaking change:** Introduces a class ValuationResult to gather and inspect results from all valuation algorithms [PR #214](https://github.com/appliedAI-Initiative/pyDVL/pull/214) +- **Breaking change**: Removes `chunkify_inputs` argument from `MapReduceJob`, + renames ParallelConfig's `num_workers` attribute to `n_local_workers`, + fixes a bug in `MapReduceJob`'s chunkification when `n_runs` >= `n_jobs`, + and defines a sequential parallel backend to run all jobs in the current thread + [PR #232](https://github.com/appliedAI-Initiative/pyDVL/pull/232) ## 0.3.0 - 💥 Breaking changes From c77a4744615a531545eab3f954577616093f411f Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Mon, 19 Dec 2022 16:44:35 +0100 Subject: [PATCH 13/42] Apply suggestions from code review Co-authored-by: Miguel de Benito Delgado --- src/pydvl/utils/parallel/backend.py | 2 +- src/pydvl/utils/parallel/map_reduce.py | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 5d807d10f..a07c7f827 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -50,7 +50,7 @@ def __repr__(self) -> str: class SequentialParallelBackend(BaseParallelBackend): - """Class used to run jobs sequentiall and locally. It shouldn't + """Class used to run jobs sequentially and locally. It shouldn't be initialized directly. You should instead call `init_parallel_backend`. :param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with number of cpus diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index e90e1bb7c..462a25f8e 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -225,10 +225,7 @@ def _wrap_function(self, func): remote_func = self.parallel_backend.wrap( wrap_func_with_remote_args(func, timeout=self.timeout) ) - if hasattr(remote_func, "remote"): - return remote_func.remote - else: - return remote_func + return gettatr(remote_func, "remote", remote_func) def _backpressure( self, jobs: List[ObjectRef], n_dispatched: int, n_finished: int From 4aa64509723dca03e8b5a01736dd3798f1957055 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Mon, 19 Dec 2022 17:02:43 +0100 Subject: [PATCH 14/42] Set explicit type for result of expit --- src/pydvl/utils/numeric.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pydvl/utils/numeric.py b/src/pydvl/utils/numeric.py index 120e149a3..7f8087cf0 100644 --- a/src/pydvl/utils/numeric.py +++ b/src/pydvl/utils/numeric.py @@ -253,7 +253,8 @@ def top_k_value_accuracy(y_true: "NDArray", y_pred: "NDArray", k: int = 3) -> fl def sigmoid(x: float) -> float: - return expit(x).item() + result: float = expit(x).item() + return result squashed_r2 = compose_score("r2", sigmoid, "squashed r2") From 8a15206784347268f6db52d514e598862165b574 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Mon, 19 Dec 2022 17:03:32 +0100 Subject: [PATCH 15/42] Explicitly use RayParallelBackend in RayActorWrapper's docstring example --- src/pydvl/utils/parallel/actor.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/pydvl/utils/parallel/actor.py b/src/pydvl/utils/parallel/actor.py index b5905efbf..3a8703d81 100644 --- a/src/pydvl/utils/parallel/actor.py +++ b/src/pydvl/utils/parallel/actor.py @@ -1,16 +1,12 @@ import abc import inspect import logging -from typing import TYPE_CHECKING, Any, Dict, Optional, Union +from typing import Any, Dict, Optional, Union from ray import ObjectRef from .backend import RayParallelBackend -if TYPE_CHECKING: - from numpy.typing import NDArray - - __all__ = ["RayActorWrapper", "Coordinator", "Worker"] @@ -25,7 +21,7 @@ class RayActorWrapper: :Example: - >>> from pydvl.utils.parallel import init_parallel_backend + >>> from pydvl.utils.parallel.backend import RayParallelBackend >>> from pydvl.utils.config import ParallelConfig >>> from pydvl.utils.parallel.actor import RayActorWrapper >>> class Actor: @@ -35,8 +31,8 @@ class RayActorWrapper: ... def get(self): ... return self.x ... - >>> config = ParallelConfig() - >>> parallel_backend = init_parallel_backend(config) + >>> config = ParallelConfig(backend="ray") + >>> parallel_backend = RayParallelBackend(config) >>> actor_handle = parallel_backend.wrap(Actor).remote(5) >>> parallel_backend.get(actor_handle.get.remote()) 5 From 2f4be674c0e08d4daced9bb5a974c0d8f9beee56 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Mon, 19 Dec 2022 17:04:44 +0100 Subject: [PATCH 16/42] Add support for sequential parallel backend in get_shapley_coordinator and get_shapley_coordinator --- src/pydvl/value/shapley/actor.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/pydvl/value/shapley/actor.py b/src/pydvl/value/shapley/actor.py index da524ebcf..180e6b096 100644 --- a/src/pydvl/value/shapley/actor.py +++ b/src/pydvl/value/shapley/actor.py @@ -8,7 +8,7 @@ import logging import warnings from time import time -from typing import TYPE_CHECKING, Optional, Tuple, Union +from typing import TYPE_CHECKING, Optional, Tuple, Union, cast import numpy as np @@ -21,6 +21,8 @@ if TYPE_CHECKING: from numpy.typing import NDArray + from ...utils.parallel.backend import RayParallelBackend + __all__ = ["get_shapley_coordinator", "get_shapley_worker"] @@ -31,27 +33,33 @@ def get_shapley_coordinator( *args, config: ParallelConfig = ParallelConfig(), **kwargs ) -> "ShapleyCoordinator": - parallel_backend = init_parallel_backend(config) if config.backend == "ray": + parallel_backend = cast(RayParallelBackend, init_parallel_backend(config)) remote_cls = parallel_backend.wrap(ShapleyCoordinator) handle = remote_cls.remote(*args, **kwargs) - coordinator = RayActorWrapper(handle, parallel_backend) + coordinator = cast( + ShapleyCoordinator, RayActorWrapper(handle, parallel_backend) + ) + elif config.backend == "sequential": + coordinator = ShapleyCoordinator(*args, **kwargs) else: raise NotImplementedError(f"Unexpected parallel type {config.backend}") - return coordinator # type: ignore + return coordinator def get_shapley_worker( *args, config: ParallelConfig = ParallelConfig(), **kwargs ) -> "ShapleyWorker": - parallel_backend = init_parallel_backend(config) if config.backend == "ray": + parallel_backend = cast(RayParallelBackend, init_parallel_backend(config)) remote_cls = parallel_backend.wrap(ShapleyWorker) handle = remote_cls.remote(*args, **kwargs) - worker = RayActorWrapper(handle, parallel_backend) + worker = cast(ShapleyWorker, RayActorWrapper(handle, parallel_backend)) + elif config.backend == "sequential": + worker = ShapleyWorker(*args, **kwargs) else: raise NotImplementedError(f"Unexpected parallel type {config.backend}") - return worker # type: ignore + return worker class ShapleyCoordinator(Coordinator): From a75a4c9dc78c3936c8c7426287739f2ccacd95c6 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Mon, 19 Dec 2022 17:05:19 +0100 Subject: [PATCH 17/42] Use singledispatchmethod and singledispatch decorators in map_reduce --- src/pydvl/utils/parallel/backend.py | 28 +++++--- src/pydvl/utils/parallel/map_reduce.py | 99 ++++++++++++++++++-------- src/pydvl/value/shapley/montecarlo.py | 10 +-- 3 files changed, 89 insertions(+), 48 deletions(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index a07c7f827..08f97ee86 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -17,7 +17,9 @@ T = TypeVar("T") -_PARALLEL_BACKENDS: Dict[str, "BaseParallelBackend"] = {} +_PARALLEL_BACKENDS: Dict[ + str, Union["RayParallelBackend", "SequentialParallelBackend"] +] = {} class BaseParallelBackend(ABC): @@ -77,7 +79,7 @@ def get(self, v: Any, *args, **kwargs): return v def put(self, v: Any, *args, **kwargs) -> Any: - pass + return v def wrap(self, *args, **kwargs) -> Any: assert len(args) == 1 @@ -91,7 +93,7 @@ def effective_n_jobs(self, n_jobs: Optional[int]) -> int: raise ValueError("n_jobs == 0 in Parallel has no meaning") elif n_jobs is None or n_jobs < 0: if self.config["num_cpus"]: - eff_n_jobs = self.config["num_cpus"] + eff_n_jobs: int = self.config["num_cpus"] else: eff_n_jobs = available_cpus() else: @@ -127,9 +129,10 @@ def __init__(self, config: ParallelConfig): def get( self, v: Union[ObjectRef, Iterable[ObjectRef], T], - *, - timeout: Optional[float] = None, + *args, + **kwargs, ) -> Union[T, Any]: + timeout: Optional[float] = kwargs.get("timeout", None) if isinstance(v, ObjectRef): return ray.get(v, timeout=timeout) elif isinstance(v, Iterable): @@ -137,7 +140,7 @@ def get( else: return v - def put(self, v: Any, **kwargs) -> ObjectRef: + def put(self, v: T, *args, **kwargs) -> "ObjectRef[T]": return ray.put(v, **kwargs) # type: ignore def wrap(self, *args, **kwargs) -> RemoteFunction: @@ -145,11 +148,12 @@ def wrap(self, *args, **kwargs) -> RemoteFunction: def wait( self, - v: List["ray.ObjectRef"], - *, - num_returns: int = 1, - timeout: Optional[float] = None, + v: List["ObjectRef"], + *args, + **kwargs, ) -> Tuple[List[ObjectRef], List[ObjectRef]]: + num_returns: int = kwargs.get("num_returns", 1) + timeout: Optional[float] = kwargs.get("timeout", None) return ray.wait( # type: ignore v, num_returns=num_returns, @@ -167,7 +171,9 @@ def effective_n_jobs(self, n_jobs: Optional[int]) -> int: return eff_n_jobs -def init_parallel_backend(config: ParallelConfig) -> "BaseParallelBackend": +def init_parallel_backend( + config: ParallelConfig, +) -> Union[RayParallelBackend, SequentialParallelBackend]: """Initializes the parallel backend and returns an instance of it. :param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc. diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index 462a25f8e..fe636deeb 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -1,15 +1,16 @@ import weakref +from collections.abc import Iterable, Sequence +from functools import singledispatch, singledispatchmethod from itertools import accumulate, chain from typing import ( + TYPE_CHECKING, Any, Callable, Dict, Generic, - Iterable, Iterator, List, Optional, - Sequence, TypeVar, Union, ) @@ -18,13 +19,11 @@ from ray import ObjectRef from ..config import ParallelConfig -from ..utility import Utility +from ..types import maybe_add_argument from .backend import init_parallel_backend __all__ = ["MapReduceJob"] -from ..types import maybe_add_argument - T = TypeVar("T") R = TypeVar("R") Identity = lambda x, *args, **kwargs: x @@ -32,28 +31,48 @@ MapFunction = Callable[..., R] ReduceFunction = Callable[[Iterable[R]], R] +if not TYPE_CHECKING: + # HACK to make singledispatchmethod work with staticmethod + def _register(self, cls, method=None): + if hasattr(cls, "__func__"): + setattr(cls, "__annotations__", cls.__func__.__annotations__) + return self.dispatcher.register(cls, func=method) + + singledispatchmethod.register = _register -def wrap_func_with_remote_args(func, *, timeout: Optional[float] = None): + +def _wrap_func_with_remote_args(func, *, timeout: Optional[float] = None): def wrapper(*args, **kwargs): args = list(args) for i, v in enumerate(args[:]): - args[i] = get_value(v, timeout=timeout) + args[i] = _get_value(v, timeout=timeout) for k, v in kwargs.items(): - kwargs[k] = get_value(v, timeout=timeout) + kwargs[k] = _get_value(v, timeout=timeout) return func(*args, **kwargs) + # Doing it manually here because using wraps or update_wrapper + # from functools doesn't work with ray for some unknown reason + wrapper.__module__ = func.__module__ + wrapper.__name__ = func.__name__ + wrapper.__annotations__ = func.__annotations__ + wrapper.__qualname__ = func.__qualname__ + wrapper.__doc__ = func.__doc__ return wrapper -def get_value( - v: Union[ObjectRef, Iterable[ObjectRef], Any], *, timeout: Optional[float] = None -): - if isinstance(v, ObjectRef): - return ray.get(v, timeout=timeout) - elif isinstance(v, Iterable): - return [get_value(x, timeout=timeout) for x in v] - else: - return v +@singledispatch +def _get_value(v: T, *, timeout: Optional[float] = None) -> T: + return v + + +@_get_value.register +def _(v: ObjectRef, *, timeout: Optional[float] = None) -> Any: + return ray.get(v, timeout=timeout) + + +@_get_value.register +def _(v: Iterable, *, timeout: Optional[float] = None) -> List[Any]: + return [_get_value(x, timeout=timeout) for x in v] class MapReduceJob(Generic[T, R]): @@ -158,20 +177,19 @@ def __init__( def __call__( self, - inputs: Union[Sequence[T], Sequence[Utility], Utility], + inputs: Union[Sequence[T], T], ) -> List[R]: - if isinstance(inputs, Utility): - inputs_ = self.parallel_backend.put(inputs) - elif len(inputs) > 0 and isinstance(inputs[0], Utility): - inputs_ = [self.parallel_backend.put(x) for x in inputs] - else: + inputs_: Union[Sequence[T], "ObjectRef[T]"] + if isinstance(inputs, Sequence): inputs_ = inputs + else: + inputs_ = self.parallel_backend.put(inputs) map_results = self.map(inputs_) reduce_results = self.reduce(map_results) return reduce_results def map( - self, inputs: Union[Sequence[T], Sequence["ObjectRef"], "ObjectRef"] + self, inputs: Union[Sequence[T], "ObjectRef[T]"] ) -> List[List["ObjectRef[R]"]]: map_results: List[List["ObjectRef[R]"]] = [] @@ -183,7 +201,7 @@ def map( for _ in range(self.n_runs): # In this first case we don't use chunking at all if self.n_runs >= self.n_jobs: - chunks = [inputs] + chunks = iter([inputs]) else: chunks = self._chunkify(inputs, n_chunks=self.n_jobs) @@ -217,15 +235,14 @@ def reduce(self, chunks: List[List["ObjectRef[R]"]]) -> List[R]: total_n_finished = self._backpressure( reduce_results, n_dispatched=total_n_jobs, n_finished=total_n_finished ) - results = self.parallel_backend.get(reduce_results, timeout=self.timeout) return results # type: ignore def _wrap_function(self, func): remote_func = self.parallel_backend.wrap( - wrap_func_with_remote_args(func, timeout=self.timeout) + _wrap_func_with_remote_args(func, timeout=self.timeout) ) - return gettatr(remote_func, "remote", remote_func) + return getattr(remote_func, "remote", remote_func) def _backpressure( self, jobs: List[ObjectRef], n_dispatched: int, n_finished: int @@ -245,9 +262,17 @@ def _backpressure( n_finished += len(finished_jobs) return n_finished + @singledispatchmethod + @staticmethod + def _chunkify(data: Any, n_chunks: int): + raise NotImplementedError( + f"_chunkify does not support data of type {type(data)}" + ) + + @_chunkify.register @staticmethod - def _chunkify(data: Sequence[T], n_chunks: int) -> Iterator[Sequence[T]]: - # Splits a list of values into chunks for each job + def _(data: Sequence, n_chunks: int) -> Iterator[Sequence[T]]: + """Splits a sequence of values into `n_chunks` chunks for each job""" if n_chunks == 0: raise ValueError("Number of chunks should be greater than 0") @@ -273,6 +298,20 @@ def _chunkify(data: Sequence[T], n_chunks: int) -> Iterator[Sequence[T]]: return yield data[start_index:end_index] + @_chunkify.register + @staticmethod + def _(data: ObjectRef, n_chunks: int) -> Iterator[ObjectRef]: + """Repeatedly yields the passed data object `n_chunks` number of times""" + if n_chunks == 0: + raise ValueError("Number of chunks should be greater than 0") + + elif n_chunks == 1: + yield data + + else: + for _ in range(n_chunks): + yield data + @property def parallel_backend(self): parallel_backend = self._parallel_backend_ref() diff --git a/src/pydvl/value/shapley/montecarlo.py b/src/pydvl/value/shapley/montecarlo.py index 00d878bc9..a67ed3820 100644 --- a/src/pydvl/value/shapley/montecarlo.py +++ b/src/pydvl/value/shapley/montecarlo.py @@ -36,7 +36,7 @@ from enum import Enum from itertools import repeat from time import sleep -from typing import TYPE_CHECKING, Iterable, NamedTuple, Optional, Sequence +from typing import TYPE_CHECKING, Iterable, NamedTuple, Optional, Sequence, Tuple, Union from warnings import warn import numpy as np @@ -225,7 +225,7 @@ def permutation_montecarlo_shapley( """ iterations_per_job = max(1, max_iterations // n_jobs) - map_reduce_job: MapReduceJob["NDArray", "NDArray"] = MapReduceJob( + map_reduce_job: MapReduceJob[Utility, "NDArray"] = MapReduceJob( map_func=_permutation_montecarlo_marginals, reduce_func=np.concatenate, # type: ignore map_kwargs=dict(max_permutations=iterations_per_job, progress=progress), @@ -233,11 +233,7 @@ def permutation_montecarlo_shapley( config=config, n_jobs=n_jobs, ) - if n_jobs == 1: - input_ = u - else: - input_ = tuple(repeat(u, times=n_jobs)) - full_results = map_reduce_job(input_)[0] + full_results = map_reduce_job(u)[0] values = np.mean(full_results, axis=0) stderr = np.std(full_results, axis=0) / np.sqrt(full_results.shape[0]) From 9e68790dbda6b481e4db26d3f12bd8e8ad77361c Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Tue, 20 Dec 2022 09:51:07 +0100 Subject: [PATCH 18/42] Use Any as type of default _get_value function --- src/pydvl/utils/parallel/map_reduce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index fe636deeb..a68623994 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -61,7 +61,7 @@ def wrapper(*args, **kwargs): @singledispatch -def _get_value(v: T, *, timeout: Optional[float] = None) -> T: +def _get_value(v: Any, *, timeout: Optional[float] = None) -> Any: return v From 89b67b3a80195c935f6adebf8ac29ba2a6cda597 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Tue, 20 Dec 2022 10:41:42 +0100 Subject: [PATCH 19/42] Use Iterable from typing for ReduceFunction type --- src/pydvl/utils/parallel/map_reduce.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index a68623994..7b6606c66 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -2,18 +2,9 @@ from collections.abc import Iterable, Sequence from functools import singledispatch, singledispatchmethod from itertools import accumulate, chain -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Generic, - Iterator, - List, - Optional, - TypeVar, - Union, -) +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic +from typing import Iterable as IterableType +from typing import Iterator, List, Optional, TypeVar, Union import ray from ray import ObjectRef @@ -29,7 +20,7 @@ Identity = lambda x, *args, **kwargs: x MapFunction = Callable[..., R] -ReduceFunction = Callable[[Iterable[R]], R] +ReduceFunction = Callable[[IterableType[R]], R] if not TYPE_CHECKING: # HACK to make singledispatchmethod work with staticmethod From cd1ced5be1bc3431d6c440bcfa634eae1f7b6450 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Tue, 20 Dec 2022 11:06:58 +0100 Subject: [PATCH 20/42] Fixes --- src/pydvl/utils/parallel/map_reduce.py | 10 ++++++---- src/pydvl/value/shapley/actor.py | 4 +--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index 7b6606c66..27e8acef9 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -4,7 +4,9 @@ from itertools import accumulate, chain from typing import TYPE_CHECKING, Any, Callable, Dict, Generic from typing import Iterable as IterableType -from typing import Iterator, List, Optional, TypeVar, Union +from typing import Iterator, List, Optional +from typing import Sequence as SequenceType +from typing import TypeVar, Union import ray from ray import ObjectRef @@ -168,9 +170,9 @@ def __init__( def __call__( self, - inputs: Union[Sequence[T], T], + inputs: Union[SequenceType[T], T], ) -> List[R]: - inputs_: Union[Sequence[T], "ObjectRef[T]"] + inputs_: Union[SequenceType[T], "ObjectRef[T]"] if isinstance(inputs, Sequence): inputs_ = inputs else: @@ -180,7 +182,7 @@ def __call__( return reduce_results def map( - self, inputs: Union[Sequence[T], "ObjectRef[T]"] + self, inputs: Union[SequenceType[T], "ObjectRef[T]"] ) -> List[List["ObjectRef[R]"]]: map_results: List[List["ObjectRef[R]"]] = [] diff --git a/src/pydvl/value/shapley/actor.py b/src/pydvl/value/shapley/actor.py index 180e6b096..6febcbde1 100644 --- a/src/pydvl/value/shapley/actor.py +++ b/src/pydvl/value/shapley/actor.py @@ -15,14 +15,12 @@ from pydvl.utils import Utility, get_running_avg_variance, maybe_progress from pydvl.utils.config import ParallelConfig from pydvl.utils.parallel.actor import Coordinator, RayActorWrapper, Worker -from pydvl.utils.parallel.backend import init_parallel_backend +from pydvl.utils.parallel.backend import RayParallelBackend, init_parallel_backend from pydvl.value.results import ValuationStatus if TYPE_CHECKING: from numpy.typing import NDArray - from ...utils.parallel.backend import RayParallelBackend - __all__ = ["get_shapley_coordinator", "get_shapley_worker"] From dc491ec8dbbbada909ef32144feba4c00abc5c96 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Tue, 20 Dec 2022 11:36:59 +0100 Subject: [PATCH 21/42] Another fix --- src/pydvl/utils/parallel/map_reduce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index 27e8acef9..096b17082 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -264,7 +264,7 @@ def _chunkify(data: Any, n_chunks: int): @_chunkify.register @staticmethod - def _(data: Sequence, n_chunks: int) -> Iterator[Sequence[T]]: + def _(data: Sequence, n_chunks: int) -> Iterator[SequenceType[T]]: """Splits a sequence of values into `n_chunks` chunks for each job""" if n_chunks == 0: raise ValueError("Number of chunks should be greater than 0") From d06389fee0b8b6b56c28444c0dbb700407748b03 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Wed, 21 Dec 2022 09:43:39 +0100 Subject: [PATCH 22/42] Set ignore_reinit_error to True when using local ray cluster --- src/pydvl/utils/parallel/backend.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 08f97ee86..9325a4adb 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -124,6 +124,8 @@ def __init__(self, config: ParallelConfig): config_dict.pop("backend") config_dict["num_cpus"] = config_dict.pop("n_local_workers") self.config = config_dict + if not self.config["address"] is None: + self.config["ignore_reinit_error"] = True ray.init(**self.config) def get( From f35de7a6fcdc0236ae80b261f4f500a3379fc4b1 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Wed, 21 Dec 2022 11:09:35 +0100 Subject: [PATCH 23/42] Fix check in RayParallelBackend's init --- src/pydvl/utils/parallel/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 9325a4adb..031cd900b 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -124,7 +124,7 @@ def __init__(self, config: ParallelConfig): config_dict.pop("backend") config_dict["num_cpus"] = config_dict.pop("n_local_workers") self.config = config_dict - if not self.config["address"] is None: + if self.config["address"] is None: self.config["ignore_reinit_error"] = True ray.init(**self.config) From 7d25f14be612578c5391a53cfa9e6378de305b13 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Wed, 28 Dec 2022 10:11:08 +0100 Subject: [PATCH 24/42] Do not expose available_cpus at the package level --- src/pydvl/utils/parallel/backend.py | 1 - tests/conftest.py | 3 +-- tests/value/shapley/test_knn.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 031cd900b..2fdd928cb 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -12,7 +12,6 @@ __all__ = [ "init_parallel_backend", - "available_cpus", ] T = TypeVar("T") diff --git a/tests/conftest.py b/tests/conftest.py index 7c8ae945e..74a18e668 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,8 +8,7 @@ from pymemcache.client import Client from pydvl.utils import Dataset, MemcachedClientConfig -from pydvl.utils.parallel import available_cpus -from pydvl.value import ValuationResult, ValuationStatus +from pydvl.utils.parallel.backend import available_cpus if TYPE_CHECKING: from _pytest.config import Config diff --git a/tests/value/shapley/test_knn.py b/tests/value/shapley/test_knn.py index 8ade09c37..f149fb3e3 100644 --- a/tests/value/shapley/test_knn.py +++ b/tests/value/shapley/test_knn.py @@ -5,8 +5,8 @@ from sklearn.metrics import make_scorer from sklearn.neighbors import KNeighborsClassifier -from pydvl.utils import available_cpus from pydvl.utils.dataset import Dataset +from pydvl.utils.parallel.backend import available_cpus from pydvl.utils.utility import Utility from pydvl.value.shapley.knn import knn_shapley from pydvl.value.shapley.naive import combinatorial_exact_shapley From 8ed28063396d3a4feea3b7688d6977fedf61ff51 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Wed, 28 Dec 2022 10:11:56 +0100 Subject: [PATCH 25/42] Disallow instantiating parallel backend classes directly --- src/pydvl/utils/parallel/backend.py | 101 +++++++++++++++---------- src/pydvl/utils/parallel/map_reduce.py | 10 +-- 2 files changed, 60 insertions(+), 51 deletions(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 2fdd928cb..1dd252a24 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -1,8 +1,19 @@ import functools import os -from abc import ABC, abstractmethod +from abc import ABCMeta, abstractmethod from dataclasses import asdict -from typing import Any, Dict, Iterable, List, Optional, Tuple, TypeVar, Union +from typing import ( + Any, + Dict, + Iterable, + List, + Literal, + Optional, + Tuple, + Type, + TypeVar, + Union, +) import ray from ray import ObjectRef @@ -17,15 +28,46 @@ T = TypeVar("T") _PARALLEL_BACKENDS: Dict[ - str, Union["RayParallelBackend", "SequentialParallelBackend"] + str, + Union["Type[RayParallelBackend]", "Type[SequentialParallelBackend]"], ] = {} -class BaseParallelBackend(ABC): +class AbstractNoPublicConstructor(ABCMeta): + """Metaclass that ensures a private constructor + + If a class uses this metaclass like this: + + class SomeClass(metaclass=NoPublicConstructor): + pass + + If you try to instantiate your class (`SomeClass()`), + a `TypeError` will be thrown. + + Taken almost verbatim from: + https://stackoverflow.com/a/64682734 + """ + + def __call__(cls, *args, **kwargs): + raise TypeError( + f"{cls.__module__}.{cls.__qualname__} cannot be initialized directly. " + "Use init_parallel_backend() instead." + ) + + def _create(cls, *args: Any, **kwargs: Any): + return super().__call__(*args, **kwargs) + + +class BaseParallelBackend(metaclass=AbstractNoPublicConstructor): """Abstract base class for all parallel backends""" config: Dict[str, Any] = {} + def __init_subclass__(cls, *, backend_name: str, **kwargs): + global _PARALLEL_BACKENDS + _PARALLEL_BACKENDS[backend_name] = cls + super().__init_subclass__(**kwargs) + @abstractmethod def get(self, v: Any, *args, **kwargs): ... @@ -42,29 +84,20 @@ def wrap(self, *args, **kwargs) -> Any: def wait(self, v: Any, *args, **kwargs) -> Any: ... - @abstractmethod def effective_n_jobs(self, n_jobs: Optional[int]) -> int: - ... + if n_jobs == 0: + raise ValueError("n_jobs == 0 in Parallel has no meaning") + return n_jobs def __repr__(self) -> str: return f"<{self.__class__.__name__}: {self.config}>" -class SequentialParallelBackend(BaseParallelBackend): +class SequentialParallelBackend(BaseParallelBackend, backend_name="sequential"): """Class used to run jobs sequentially and locally. It shouldn't be initialized directly. You should instead call `init_parallel_backend`. :param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with number of cpus - - :Example: - - >>> from pydvl.utils.parallel.backend import SequentialParallelBackend - >>> from pydvl.utils.config import ParallelConfig - >>> config = ParallelConfig(backend="sequential") - >>> parallel_backend = SequentialParallelBackend(config) - >>> parallel_backend - - """ def __init__(self, config: ParallelConfig): @@ -88,9 +121,8 @@ def wait(self, v: Any, *args, **kwargs) -> Tuple[list, list]: return v, [] def effective_n_jobs(self, n_jobs: Optional[int]) -> int: - if n_jobs == 0: - raise ValueError("n_jobs == 0 in Parallel has no meaning") - elif n_jobs is None or n_jobs < 0: + n_jobs = super().effective_n_jobs(n_jobs) + if n_jobs is None or n_jobs < 0: if self.config["num_cpus"]: eff_n_jobs: int = self.config["num_cpus"] else: @@ -100,22 +132,12 @@ def effective_n_jobs(self, n_jobs: Optional[int]) -> int: return eff_n_jobs -class RayParallelBackend(BaseParallelBackend): +class RayParallelBackend(BaseParallelBackend, backend_name="ray"): """Class used to wrap ray to make it transparent to algorithms. It shouldn't be initialized directly. You should instead call `init_parallel_backend`. :param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc. - - :Example: - - >>> from pydvl.utils.parallel.backend import RayParallelBackend - >>> from pydvl.utils.config import ParallelConfig - >>> config = ParallelConfig(backend="ray") - >>> parallel_backend = RayParallelBackend(config) - >>> parallel_backend - - """ def __init__(self, config: ParallelConfig): @@ -162,9 +184,8 @@ def wait( ) def effective_n_jobs(self, n_jobs: Optional[int]) -> int: - if n_jobs == 0: - raise ValueError("n_jobs == 0 in Parallel has no meaning") - elif n_jobs is None or n_jobs < 0: + n_jobs = super().effective_n_jobs(n_jobs) + if n_jobs is None or n_jobs < 0: ray_cpus = int(ray._private.state.cluster_resources()["CPU"]) # type: ignore eff_n_jobs = ray_cpus else: @@ -186,18 +207,14 @@ def init_parallel_backend( >>> config = ParallelConfig(backend="ray") >>> parallel_backend = init_parallel_backend(config) >>> parallel_backend - + """ - global _PARALLEL_BACKENDS if config.backend not in ["sequential", "ray"]: raise NotImplementedError(f"Unexpected parallel type {config.backend}") - if config.backend not in _PARALLEL_BACKENDS: - if config.backend == "ray": - _PARALLEL_BACKENDS["ray"] = RayParallelBackend(config) - else: - _PARALLEL_BACKENDS["sequential"] = SequentialParallelBackend(config) - return _PARALLEL_BACKENDS[config.backend] + parallel_backend_cls = _PARALLEL_BACKENDS[config.backend] + parallel_backend = parallel_backend_cls._create(config) + return parallel_backend def available_cpus() -> int: diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index 096b17082..afcccdbce 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -1,4 +1,3 @@ -import weakref from collections.abc import Iterable, Sequence from functools import singledispatch, singledispatchmethod from itertools import accumulate, chain @@ -138,7 +137,7 @@ def __init__( ): self.config = config parallel_backend = init_parallel_backend(self.config) - self._parallel_backend_ref = weakref.ref(parallel_backend) + self.parallel_backend = parallel_backend self.timeout = timeout self.n_runs = n_runs @@ -305,13 +304,6 @@ def _(data: ObjectRef, n_chunks: int) -> Iterator[ObjectRef]: for _ in range(n_chunks): yield data - @property - def parallel_backend(self): - parallel_backend = self._parallel_backend_ref() - if parallel_backend is None: - raise RuntimeError(f"Could not get reference to parallel backend instance") - return parallel_backend - @property def n_jobs(self) -> int: return self._n_jobs From cbe1ca927b0ac716b58e8902bb7ac0de0233c572 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Wed, 28 Dec 2022 10:15:41 +0100 Subject: [PATCH 26/42] Remove unused import --- src/pydvl/utils/parallel/backend.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 1dd252a24..81b0426ff 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -2,18 +2,7 @@ import os from abc import ABCMeta, abstractmethod from dataclasses import asdict -from typing import ( - Any, - Dict, - Iterable, - List, - Literal, - Optional, - Tuple, - Type, - TypeVar, - Union, -) +from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union import ray from ray import ObjectRef From 9cdcde8a5058b6360a6fda565e29b0be5c90f7ed Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Wed, 28 Dec 2022 10:18:01 +0100 Subject: [PATCH 27/42] Fix RayActorWrapper's docstring --- src/pydvl/utils/parallel/actor.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/pydvl/utils/parallel/actor.py b/src/pydvl/utils/parallel/actor.py index 3a8703d81..33002c3f9 100644 --- a/src/pydvl/utils/parallel/actor.py +++ b/src/pydvl/utils/parallel/actor.py @@ -21,7 +21,7 @@ class RayActorWrapper: :Example: - >>> from pydvl.utils.parallel.backend import RayParallelBackend + >>> from pydvl.utils.parallel.backend import RayParallelBackend, init_parallel_backend >>> from pydvl.utils.config import ParallelConfig >>> from pydvl.utils.parallel.actor import RayActorWrapper >>> class Actor: @@ -32,7 +32,8 @@ class RayActorWrapper: ... return self.x ... >>> config = ParallelConfig(backend="ray") - >>> parallel_backend = RayParallelBackend(config) + >>> parallel_backend = init_parallel_backend(config) + >>> assert isinstance(parallel_backend, RayParallelBackend) >>> actor_handle = parallel_backend.wrap(Actor).remote(5) >>> parallel_backend.get(actor_handle.get.remote()) 5 From fd8f6bd69bc95fbe4f0bd439ae00449fd5810f44 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Wed, 28 Dec 2022 10:48:34 +0100 Subject: [PATCH 28/42] Pass inputs to MapReduceJob at initialization --- CHANGELOG.md | 3 ++- src/pydvl/utils/parallel/backend.py | 9 +++----- src/pydvl/utils/parallel/map_reduce.py | 31 +++++++++++++++++++------- src/pydvl/value/shapley/montecarlo.py | 10 +++++---- src/pydvl/value/shapley/naive.py | 3 ++- tests/utils/test_caching.py | 10 +++++---- tests/utils/test_parallel.py | 25 ++++++++++++++------- 7 files changed, 59 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bba85e43..d5f373821 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,8 @@ - **Breaking change:** Introduces a class ValuationResult to gather and inspect results from all valuation algorithms [PR #214](https://github.com/appliedAI-Initiative/pyDVL/pull/214) -- **Breaking change**: Removes `chunkify_inputs` argument from `MapReduceJob`, +- **Breaking change**: Passes the input to `MapReduceJob` at initialization, + removes `chunkify_inputs` argument from `MapReduceJob`, renames ParallelConfig's `num_workers` attribute to `n_local_workers`, fixes a bug in `MapReduceJob`'s chunkification when `n_runs` >= `n_jobs`, and defines a sequential parallel backend to run all jobs in the current thread diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 81b0426ff..2145533f2 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -16,10 +16,7 @@ T = TypeVar("T") -_PARALLEL_BACKENDS: Dict[ - str, - Union["Type[RayParallelBackend]", "Type[SequentialParallelBackend]"], -] = {} +_PARALLEL_BACKENDS: Dict[str, "Type[BaseParallelBackend]"] = {} class AbstractNoPublicConstructor(ABCMeta): @@ -73,7 +70,7 @@ def wrap(self, *args, **kwargs) -> Any: def wait(self, v: Any, *args, **kwargs) -> Any: ... - def effective_n_jobs(self, n_jobs: Optional[int]) -> int: + def effective_n_jobs(self, n_jobs: Optional[int]) -> Optional[int]: if n_jobs == 0: raise ValueError("n_jobs == 0 in Parallel has no meaning") return n_jobs @@ -203,7 +200,7 @@ def init_parallel_backend( raise NotImplementedError(f"Unexpected parallel type {config.backend}") parallel_backend_cls = _PARALLEL_BACKENDS[config.backend] parallel_backend = parallel_backend_cls._create(config) - return parallel_backend + return parallel_backend # type: ignore def available_cpus() -> int: diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index afcccdbce..1fb188ccb 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -82,6 +82,7 @@ class MapReduceJob(Generic[T, R]): Typing information for objects of this class requires the type of the inputs that are split for `map_func` and the type of its output. + :param inputs: :param map_func: Function that will be applied to the input chunks in each job. :param reduce_func: Function that will be applied to the results of @@ -112,18 +113,33 @@ class MapReduceJob(Generic[T, R]): >>> from pydvl.utils.parallel import MapReduceJob >>> import numpy as np >>> map_reduce_job: MapReduceJob[np.ndarray, np.ndarray] = MapReduceJob( + ... np.arange(5), ... map_func=np.sum, ... reduce_func=np.sum, ... n_jobs=2, ... n_runs=3, ... ) - >>> map_reduce_job(np.arange(5)) + >>> map_reduce_job() [10, 10, 10] + When passed a single object as input, it will be repeated for each job, if n_jobs > n_runs: + + >>> from pydvl.utils.parallel import MapReduceJob + >>> import numpy as np + >>> map_reduce_job: MapReduceJob[int, np.ndarray] = MapReduceJob( + ... 5, + ... map_func=np.sum, + ... reduce_func=np.sum, + ... n_jobs=4, + ... n_runs=2, + ... ) + >>> map_reduce_job() + [20, 20] """ def __init__( self, + inputs: Union[SequenceType[T], T], map_func: MapFunction[R], reduce_func: Optional[ReduceFunction[R]] = None, map_kwargs: Optional[Dict] = None, @@ -152,6 +168,11 @@ def __init__( else: self.max_parallel_tasks = max_parallel_tasks + if isinstance(inputs, Sequence): + self.inputs_: Union[SequenceType[T], "ObjectRef[T]"] = inputs + else: + self.inputs_ = self.parallel_backend.put(inputs) + if reduce_func is None: reduce_func = Identity @@ -169,14 +190,8 @@ def __init__( def __call__( self, - inputs: Union[SequenceType[T], T], ) -> List[R]: - inputs_: Union[SequenceType[T], "ObjectRef[T]"] - if isinstance(inputs, Sequence): - inputs_ = inputs - else: - inputs_ = self.parallel_backend.put(inputs) - map_results = self.map(inputs_) + map_results = self.map(self.inputs_) reduce_results = self.reduce(map_results) return reduce_results diff --git a/src/pydvl/value/shapley/montecarlo.py b/src/pydvl/value/shapley/montecarlo.py index a67ed3820..08fdc9510 100644 --- a/src/pydvl/value/shapley/montecarlo.py +++ b/src/pydvl/value/shapley/montecarlo.py @@ -34,7 +34,6 @@ import logging import math from enum import Enum -from itertools import repeat from time import sleep from typing import TYPE_CHECKING, Iterable, NamedTuple, Optional, Sequence, Tuple, Union from warnings import warn @@ -226,6 +225,7 @@ def permutation_montecarlo_shapley( iterations_per_job = max(1, max_iterations // n_jobs) map_reduce_job: MapReduceJob[Utility, "NDArray"] = MapReduceJob( + u, map_func=_permutation_montecarlo_marginals, reduce_func=np.concatenate, # type: ignore map_kwargs=dict(max_permutations=iterations_per_job, progress=progress), @@ -233,7 +233,7 @@ def permutation_montecarlo_shapley( config=config, n_jobs=n_jobs, ) - full_results = map_reduce_job(u)[0] + full_results = map_reduce_job()[0] values = np.mean(full_results, axis=0) stderr = np.std(full_results, axis=0) / np.sqrt(full_results.shape[0]) @@ -366,13 +366,14 @@ def combinatorial_montecarlo_shapley( # FIXME? max_iterations has different semantics in permutation-based methods map_reduce_job: MapReduceJob["NDArray", MonteCarloResults] = MapReduceJob( + u.data.indices, map_func=_combinatorial_montecarlo_shapley, reduce_func=disjoint_reducer, map_kwargs=dict(u=u_id, max_iterations=max_iterations, progress=progress), n_jobs=n_jobs, config=config, ) - results = map_reduce_job(u.data.indices)[0] + results = map_reduce_job()[0] return ValuationResult( algorithm="combinatorial_montecarlo_shapley", @@ -515,6 +516,7 @@ def owen_sampling_shapley( u_id = parallel_backend.put(u) map_reduce_job: MapReduceJob["NDArray", MonteCarloResults] = MapReduceJob( + u.data.indices, map_func=_owen_sampling_shapley, map_kwargs=dict( u=u_id, @@ -528,7 +530,7 @@ def owen_sampling_shapley( config=config, ) - results = map_reduce_job(u.data.indices)[0] + results = map_reduce_job()[0] return ValuationResult( algorithm="owen_sampling_shapley", diff --git a/src/pydvl/value/shapley/naive.py b/src/pydvl/value/shapley/naive.py index 40d18fc0e..71495d0c7 100644 --- a/src/pydvl/value/shapley/naive.py +++ b/src/pydvl/value/shapley/naive.py @@ -125,12 +125,13 @@ def reduce_fun(results): return np.array(results).sum(axis=0) map_reduce_job: MapReduceJob[np.ndarray, np.ndarray] = MapReduceJob( + u.data.indices, map_func=_combinatorial_exact_shapley, map_kwargs=dict(u=u_id, progress=progress), reduce_func=reduce_fun, n_jobs=n_jobs, ) - values = map_reduce_job(u.data.indices)[0] + values = map_reduce_job()[0] return ValuationResult( algorithm="combinatorial_exact_shapley", status=ValuationStatus.Converged, diff --git a/tests/utils/test_caching.py b/tests/utils/test_caching.py index 1be31f6f4..ea1b43d20 100644 --- a/tests/utils/test_caching.py +++ b/tests/utils/test_caching.py @@ -68,8 +68,8 @@ def foo(indices: "NDArray[int]", *args, **kwargs) -> float: n = 1234 n_runs = 10 hits_before = client.stats()[b"get_hits"] - map_reduce_job = MapReduceJob(foo, np.sum, n_jobs=4, n_runs=n_runs) - result = map_reduce_job(np.arange(n)) + map_reduce_job = MapReduceJob(np.arange(n), foo, np.sum, n_jobs=4, n_runs=n_runs) + result = map_reduce_job() hits_after = client.stats()[b"get_hits"] assert result[0] == n * (n - 1) / 2 # Sanity check @@ -174,8 +174,10 @@ def map_func(indices: "NDArray[int]") -> float: def reduce_func(chunks: "NDArray[float]") -> float: return np.sum(chunks).item() - map_reduce_job = MapReduceJob(map_func, reduce_func, n_jobs=n_jobs, n_runs=n_runs) - result = map_reduce_job(np.arange(n)) + map_reduce_job = MapReduceJob( + np.arange(n), map_func, reduce_func, n_jobs=n_jobs, n_runs=n_runs + ) + result = map_reduce_job() exact_value = np.sum(np.arange(n)).item() diff --git a/tests/utils/test_parallel.py b/tests/utils/test_parallel.py index cbc4b7d76..c3e3f7721 100644 --- a/tests/utils/test_parallel.py +++ b/tests/utils/test_parallel.py @@ -1,5 +1,5 @@ import operator -from functools import reduce +from functools import partial, reduce from itertools import zip_longest import numpy as np @@ -22,7 +22,8 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): except ValueError: kind = request.param if kind == "numpy": - map_reduce_job = MapReduceJob( + map_reduce_job = partial( + MapReduceJob, map_func=np.sum, reduce_func=np.sum, config=parallel_config, @@ -30,7 +31,8 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): n_runs=n_runs, ) elif kind == "list": - map_reduce_job = MapReduceJob( + map_reduce_job = partial( + MapReduceJob, map_func=lambda x: x, reduce_func=lambda r: reduce(operator.add, r, []), config=parallel_config, @@ -38,7 +40,8 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): n_runs=n_runs, ) elif kind == "range": - map_reduce_job = MapReduceJob( + map_reduce_job = partial( + MapReduceJob, map_func=lambda x: list(x), reduce_func=lambda r: reduce(operator.add, list(r), []), config=parallel_config, @@ -46,7 +49,8 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): n_runs=n_runs, ) elif kind == "custom": - map_reduce_job = MapReduceJob( + map_reduce_job = partial( + MapReduceJob, map_func=map_func, reduce_func=reduce_func, config=parallel_config, @@ -54,7 +58,8 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): n_runs=n_runs, ) else: - map_reduce_job = MapReduceJob( + map_reduce_job = partial( + MapReduceJob, map_func=lambda x: x * x, reduce_func=lambda r: r, config=parallel_config, @@ -79,7 +84,7 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): @pytest.mark.parametrize("n_runs", [1, 2, 4]) def test_map_reduce_job(map_reduce_job_and_parameters, indices, expected): map_reduce_job, n_jobs, n_runs = map_reduce_job_and_parameters - result = map_reduce_job(indices) + result = map_reduce_job(indices)() assert len(result) == n_runs for exp, ret in zip_longest(expected * n_runs, result, fillvalue=None): if not isinstance(ret, np.ndarray): @@ -123,13 +128,17 @@ def map_func(x): time.sleep(1) return x + inputs_ = list(range(n_dispatched)) + map_reduce_job = MapReduceJob( + inputs_, map_func=map_func, max_parallel_tasks=max_parallel_tasks, timeout=10, ) + map_func = map_reduce_job._wrap_function(map_func) - jobs = [map_func(x) for x in range(n_dispatched)] + jobs = [map_func(x) for x in inputs_] n_finished = map_reduce_job._backpressure( jobs, n_finished=n_finished, n_dispatched=n_dispatched ) From 0ce2d59b46b3b02e44459ad9631ab40f318e406a Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Thu, 29 Dec 2022 09:15:34 +0100 Subject: [PATCH 29/42] Apply suggestions from code review Co-authored-by: Miguel de Benito Delgado --- src/pydvl/utils/parallel/backend.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 2145533f2..307350ae1 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -19,7 +19,7 @@ _PARALLEL_BACKENDS: Dict[str, "Type[BaseParallelBackend]"] = {} -class AbstractNoPublicConstructor(ABCMeta): +class NoPublicConstructor(ABCMeta): """Metaclass that ensures a private constructor If a class uses this metaclass like this: @@ -44,7 +44,7 @@ def _create(cls, *args: Any, **kwargs: Any): return super().__call__(*args, **kwargs) -class BaseParallelBackend(metaclass=AbstractNoPublicConstructor): +class BaseParallelBackend(metaclass=NoPublicConstructor): """Abstract base class for all parallel backends""" config: Dict[str, Any] = {} @@ -196,9 +196,10 @@ def init_parallel_backend( """ - if config.backend not in ["sequential", "ray"]: - raise NotImplementedError(f"Unexpected parallel type {config.backend}") - parallel_backend_cls = _PARALLEL_BACKENDS[config.backend] + try: + parallel_backend_cls = _PARALLEL_BACKENDS[config.backend] + except KeyError: + raise NotImplementedError(f"Unexpected parallel backend {config.backend}") parallel_backend = parallel_backend_cls._create(config) return parallel_backend # type: ignore From 59534d6d587bf4d58a5395c7015cee916237719a Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Thu, 29 Dec 2022 09:24:17 +0100 Subject: [PATCH 30/42] Fix indentation error --- src/pydvl/utils/parallel/backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 307350ae1..8fa0e97c3 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -196,9 +196,9 @@ def init_parallel_backend( """ - try: + try: parallel_backend_cls = _PARALLEL_BACKENDS[config.backend] - except KeyError: + except KeyError: raise NotImplementedError(f"Unexpected parallel backend {config.backend}") parallel_backend = parallel_backend_cls._create(config) return parallel_backend # type: ignore From ff247dfbb454daa747f4c488d9df3434bfd77bba Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Thu, 29 Dec 2022 09:24:45 +0100 Subject: [PATCH 31/42] Improve effective_n_jobs interface --- src/pydvl/utils/parallel/backend.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index 8fa0e97c3..d5d5b5b90 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -70,9 +70,14 @@ def wrap(self, *args, **kwargs) -> Any: def wait(self, v: Any, *args, **kwargs) -> Any: ... - def effective_n_jobs(self, n_jobs: Optional[int]) -> Optional[int]: + @abstractmethod + def _effective_n_jobs(self, n_jobs: int) -> int: + ... + + def effective_n_jobs(self, n_jobs: int = -1) -> int: if n_jobs == 0: raise ValueError("n_jobs == 0 in Parallel has no meaning") + n_jobs = self._effective_n_jobs(n_jobs) return n_jobs def __repr__(self) -> str: @@ -106,9 +111,8 @@ def wrap(self, *args, **kwargs) -> Any: def wait(self, v: Any, *args, **kwargs) -> Tuple[list, list]: return v, [] - def effective_n_jobs(self, n_jobs: Optional[int]) -> int: - n_jobs = super().effective_n_jobs(n_jobs) - if n_jobs is None or n_jobs < 0: + def _effective_n_jobs(self, n_jobs: int) -> int: + if n_jobs < 0: if self.config["num_cpus"]: eff_n_jobs: int = self.config["num_cpus"] else: @@ -169,9 +173,8 @@ def wait( timeout=timeout, ) - def effective_n_jobs(self, n_jobs: Optional[int]) -> int: - n_jobs = super().effective_n_jobs(n_jobs) - if n_jobs is None or n_jobs < 0: + def _effective_n_jobs(self, n_jobs: int) -> int: + if n_jobs < 0: ray_cpus = int(ray._private.state.cluster_resources()["CPU"]) # type: ignore eff_n_jobs = ray_cpus else: From bbaa07bcb76d9cfc57931218c36732cd0169203a Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Thu, 29 Dec 2022 09:41:26 +0100 Subject: [PATCH 32/42] Set default n_jobs in shapley methods to 1 --- src/pydvl/value/shapley/montecarlo.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pydvl/value/shapley/montecarlo.py b/src/pydvl/value/shapley/montecarlo.py index 08fdc9510..1df29f575 100644 --- a/src/pydvl/value/shapley/montecarlo.py +++ b/src/pydvl/value/shapley/montecarlo.py @@ -77,7 +77,7 @@ def truncated_montecarlo_shapley( value_tolerance: Optional[float] = None, max_iterations: Optional[int] = None, *, - n_jobs: Optional[int] = None, + n_jobs: int = 1, config: ParallelConfig = ParallelConfig(), progress: bool = False, coordinator_update_period: int = 10, @@ -201,7 +201,7 @@ def permutation_montecarlo_shapley( u: Utility, max_iterations: int, *, - n_jobs: int, + n_jobs: int = 1, config: ParallelConfig = ParallelConfig(), progress: bool = False, ) -> ValuationResult: From 4dffe037b10d3af39128b941c7eb28bf91bc3d7e Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Thu, 29 Dec 2022 09:42:28 +0100 Subject: [PATCH 33/42] Call put on map_kwargs' and reduce_kwargs' items inside MapReduceJob's __init__ --- src/pydvl/utils/parallel/map_reduce.py | 25 +++++++++++++++---------- src/pydvl/value/shapley/montecarlo.py | 9 ++------- src/pydvl/value/shapley/naive.py | 15 +++------------ 3 files changed, 20 insertions(+), 29 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index 1fb188ccb..b561939f5 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -162,11 +162,9 @@ def __init__( # This uses the setter defined below self.n_jobs = n_jobs - if max_parallel_tasks is None: - # TODO: Find a better default value? - self.max_parallel_tasks = 2 * (self.n_jobs + self.n_runs) - else: - self.max_parallel_tasks = max_parallel_tasks + # TODO: Find a better default value? + default_max_parallel_tasks = 2 * (self.n_jobs + self.n_runs) + self.max_parallel_tasks = max_parallel_tasks or default_max_parallel_tasks if isinstance(inputs, Sequence): self.inputs_: Union[SequenceType[T], "ObjectRef[T]"] = inputs @@ -176,14 +174,21 @@ def __init__( if reduce_func is None: reduce_func = Identity - self.map_kwargs = map_kwargs - self.reduce_kwargs = reduce_kwargs - - if self.map_kwargs is None: + if map_kwargs is None: self.map_kwargs = dict() + else: + self.map_kwargs = { + k: self.parallel_backend.put(v) if not isinstance(v, ObjectRef) else v + for k, v in map_kwargs.items() + } - if self.reduce_kwargs is None: + if reduce_kwargs is None: self.reduce_kwargs = dict() + else: + self.reduce_kwargs = { + k: self.parallel_backend.put(v) if not isinstance(v, ObjectRef) else v + for k, v in reduce_kwargs.items() + } self._map_func = maybe_add_argument(map_func, "job_id") self._reduce_func = reduce_func diff --git a/src/pydvl/value/shapley/montecarlo.py b/src/pydvl/value/shapley/montecarlo.py index 1df29f575..d30bf764a 100644 --- a/src/pydvl/value/shapley/montecarlo.py +++ b/src/pydvl/value/shapley/montecarlo.py @@ -361,15 +361,13 @@ def combinatorial_montecarlo_shapley( :param progress: Whether to display progress bars for each job. :return: Object with the data values. """ - parallel_backend = init_parallel_backend(config) - u_id = parallel_backend.put(u) # FIXME? max_iterations has different semantics in permutation-based methods map_reduce_job: MapReduceJob["NDArray", MonteCarloResults] = MapReduceJob( u.data.indices, map_func=_combinatorial_montecarlo_shapley, reduce_func=disjoint_reducer, - map_kwargs=dict(u=u_id, max_iterations=max_iterations, progress=progress), + map_kwargs=dict(u=u, max_iterations=max_iterations, progress=progress), n_jobs=n_jobs, config=config, ) @@ -512,14 +510,11 @@ def owen_sampling_shapley( if OwenAlgorithm(method) == OwenAlgorithm.Antithetic: warn("Owen antithetic sampling not tested and probably bogus") - parallel_backend = init_parallel_backend(config) - u_id = parallel_backend.put(u) - map_reduce_job: MapReduceJob["NDArray", MonteCarloResults] = MapReduceJob( u.data.indices, map_func=_owen_sampling_shapley, map_kwargs=dict( - u=u_id, + u=u, method=OwenAlgorithm(method), max_iterations=max_iterations, max_q=max_q, diff --git a/src/pydvl/value/shapley/naive.py b/src/pydvl/value/shapley/naive.py index 71495d0c7..08145c9aa 100644 --- a/src/pydvl/value/shapley/naive.py +++ b/src/pydvl/value/shapley/naive.py @@ -4,14 +4,7 @@ import numpy as np -from pydvl.utils import ( - MapReduceJob, - ParallelConfig, - Utility, - init_parallel_backend, - maybe_progress, - powerset, -) +from pydvl.utils import MapReduceJob, ParallelConfig, Utility, maybe_progress, powerset from pydvl.value.results import ValuationResult, ValuationStatus __all__ = ["permutation_exact_shapley", "combinatorial_exact_shapley"] @@ -118,18 +111,16 @@ def combinatorial_exact_shapley( f"Large dataset! Computation requires 2^{len(u.data)} calls to model.fit()" ) - parallel_backend = init_parallel_backend(config) - u_id = parallel_backend.put(u) - def reduce_fun(results): return np.array(results).sum(axis=0) map_reduce_job: MapReduceJob[np.ndarray, np.ndarray] = MapReduceJob( u.data.indices, map_func=_combinatorial_exact_shapley, - map_kwargs=dict(u=u_id, progress=progress), + map_kwargs=dict(u=u, progress=progress), reduce_func=reduce_fun, n_jobs=n_jobs, + config=config, ) values = map_reduce_job()[0] return ValuationResult( From e9ccbc2fad162d6cdf62ca95e356fd89ec54aad3 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Thu, 29 Dec 2022 09:42:58 +0100 Subject: [PATCH 34/42] Update src/pydvl/utils/parallel/backend.py Co-authored-by: Miguel de Benito Delgado --- src/pydvl/utils/parallel/backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index d5d5b5b90..c16405b1a 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -184,7 +184,7 @@ def _effective_n_jobs(self, n_jobs: int) -> int: def init_parallel_backend( config: ParallelConfig, -) -> Union[RayParallelBackend, SequentialParallelBackend]: +) -> BaseParallelBackend: """Initializes the parallel backend and returns an instance of it. :param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc. From fe2bab641d4fbe344985ae108ace44da3d78b263 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Thu, 29 Dec 2022 10:27:25 +0100 Subject: [PATCH 35/42] Remove usage of singledispatchmethod in MapReduceJob --- src/pydvl/utils/parallel/map_reduce.py | 59 +++++++------------------- 1 file changed, 16 insertions(+), 43 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index b561939f5..4a08a5b6f 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -1,11 +1,9 @@ -from collections.abc import Iterable, Sequence -from functools import singledispatch, singledispatchmethod +from collections.abc import Iterable +from functools import singledispatch from itertools import accumulate, chain -from typing import TYPE_CHECKING, Any, Callable, Dict, Generic +from typing import Any, Callable, Dict, Generic from typing import Iterable as IterableType -from typing import Iterator, List, Optional -from typing import Sequence as SequenceType -from typing import TypeVar, Union +from typing import Iterator, List, Optional, Sequence, TypeVar, Union import ray from ray import ObjectRef @@ -23,15 +21,6 @@ MapFunction = Callable[..., R] ReduceFunction = Callable[[IterableType[R]], R] -if not TYPE_CHECKING: - # HACK to make singledispatchmethod work with staticmethod - def _register(self, cls, method=None): - if hasattr(cls, "__func__"): - setattr(cls, "__annotations__", cls.__func__.__annotations__) - return self.dispatcher.register(cls, func=method) - - singledispatchmethod.register = _register - def _wrap_func_with_remote_args(func, *, timeout: Optional[float] = None): def wrapper(*args, **kwargs): @@ -139,7 +128,7 @@ class MapReduceJob(Generic[T, R]): def __init__( self, - inputs: Union[SequenceType[T], T], + inputs: Union[Sequence[T], T], map_func: MapFunction[R], reduce_func: Optional[ReduceFunction[R]] = None, map_kwargs: Optional[Dict] = None, @@ -167,7 +156,7 @@ def __init__( self.max_parallel_tasks = max_parallel_tasks or default_max_parallel_tasks if isinstance(inputs, Sequence): - self.inputs_: Union[SequenceType[T], "ObjectRef[T]"] = inputs + self.inputs_: Union[Sequence[T], "ObjectRef[T]"] = inputs else: self.inputs_ = self.parallel_backend.put(inputs) @@ -201,7 +190,7 @@ def __call__( return reduce_results def map( - self, inputs: Union[SequenceType[T], "ObjectRef[T]"] + self, inputs: Union[Sequence[T], "ObjectRef[T]"] ) -> List[List["ObjectRef[R]"]]: map_results: List[List["ObjectRef[R]"]] = [] @@ -274,26 +263,24 @@ def _backpressure( n_finished += len(finished_jobs) return n_finished - @singledispatchmethod - @staticmethod - def _chunkify(data: Any, n_chunks: int): - raise NotImplementedError( - f"_chunkify does not support data of type {type(data)}" - ) - - @_chunkify.register @staticmethod - def _(data: Sequence, n_chunks: int) -> Iterator[SequenceType[T]]: - """Splits a sequence of values into `n_chunks` chunks for each job""" + def _chunkify( + data: Union[Sequence[T], "ObjectRef[T]"], n_chunks: int + ) -> Union[Sequence[T], "ObjectRef[T]"]: + """If data is a sequence, it splits it into sequences of size `n_chunks` for each job that we call chunks. + If instead data is an `ObjectRef` instance, then it yields it repeatedly `n_chunks` number of times. + """ if n_chunks == 0: raise ValueError("Number of chunks should be greater than 0") elif n_chunks == 1: yield data + if isinstance(data, ObjectRef): + for _ in range(n_chunks): + yield data else: n = len(data) - # This is very much inspired by numpy's array_split function # The difference is that it only uses built-in functions # and does not convert the input data to an array @@ -310,20 +297,6 @@ def _(data: Sequence, n_chunks: int) -> Iterator[SequenceType[T]]: return yield data[start_index:end_index] - @_chunkify.register - @staticmethod - def _(data: ObjectRef, n_chunks: int) -> Iterator[ObjectRef]: - """Repeatedly yields the passed data object `n_chunks` number of times""" - if n_chunks == 0: - raise ValueError("Number of chunks should be greater than 0") - - elif n_chunks == 1: - yield data - - else: - for _ in range(n_chunks): - yield data - @property def n_jobs(self) -> int: return self._n_jobs From f8e3abae035ffa4a26538d67e3960a27cdf999e8 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Thu, 29 Dec 2022 10:34:18 +0100 Subject: [PATCH 36/42] Fix type hint --- src/pydvl/utils/parallel/map_reduce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index 4a08a5b6f..d201001e6 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -266,7 +266,7 @@ def _backpressure( @staticmethod def _chunkify( data: Union[Sequence[T], "ObjectRef[T]"], n_chunks: int - ) -> Union[Sequence[T], "ObjectRef[T]"]: + ) -> Iterator[Union[Sequence[T], "ObjectRef[T]"]]: """If data is a sequence, it splits it into sequences of size `n_chunks` for each job that we call chunks. If instead data is an `ObjectRef` instance, then it yields it repeatedly `n_chunks` number of times. """ From 0cd99455b2ca0be9df2f71dabf7b6c903f5c33a3 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Fri, 30 Dec 2022 16:01:59 +0100 Subject: [PATCH 37/42] Call put() on chunks inside _chunkify --- src/pydvl/utils/parallel/backend.py | 7 ++- src/pydvl/utils/parallel/map_reduce.py | 71 +++++++++++++------------- tests/utils/test_parallel.py | 4 +- tests/value/shapley/test_montecarlo.py | 2 +- 4 files changed, 45 insertions(+), 39 deletions(-) diff --git a/src/pydvl/utils/parallel/backend.py b/src/pydvl/utils/parallel/backend.py index c16405b1a..6b22cc3bf 100644 --- a/src/pydvl/utils/parallel/backend.py +++ b/src/pydvl/utils/parallel/backend.py @@ -153,8 +153,11 @@ def get( else: return v - def put(self, v: T, *args, **kwargs) -> "ObjectRef[T]": - return ray.put(v, **kwargs) # type: ignore + def put(self, v: T, *args, **kwargs) -> Union["ObjectRef[T]", T]: + try: + return ray.put(v, **kwargs) # type: ignore + except TypeError: + return v # type: ignore def wrap(self, *args, **kwargs) -> RemoteFunction: return ray.remote(*args, **kwargs) # type: ignore diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index d201001e6..767c5c571 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -1,9 +1,11 @@ -from collections.abc import Iterable +from collections.abc import Iterable, Sequence from functools import singledispatch -from itertools import accumulate, chain +from itertools import accumulate, chain, repeat from typing import Any, Callable, Dict, Generic from typing import Iterable as IterableType -from typing import Iterator, List, Optional, Sequence, TypeVar, Union +from typing import List, Optional +from typing import Sequence as SequenceType +from typing import TypeVar, Union import ray from ray import ObjectRef @@ -22,7 +24,7 @@ ReduceFunction = Callable[[IterableType[R]], R] -def _wrap_func_with_remote_args(func, *, timeout: Optional[float] = None): +def _wrap_func_with_remote_args(func: Callable, *, timeout: Optional[float] = None): def wrapper(*args, **kwargs): args = list(args) for i, v in enumerate(args[:]): @@ -33,9 +35,7 @@ def wrapper(*args, **kwargs): # Doing it manually here because using wraps or update_wrapper # from functools doesn't work with ray for some unknown reason - wrapper.__module__ = func.__module__ wrapper.__name__ = func.__name__ - wrapper.__annotations__ = func.__annotations__ wrapper.__qualname__ = func.__qualname__ wrapper.__doc__ = func.__doc__ return wrapper @@ -128,7 +128,7 @@ class MapReduceJob(Generic[T, R]): def __init__( self, - inputs: Union[Sequence[T], T], + inputs: Union[SequenceType[T], T], map_func: MapFunction[R], reduce_func: Optional[ReduceFunction[R]] = None, map_kwargs: Optional[Dict] = None, @@ -155,10 +155,7 @@ def __init__( default_max_parallel_tasks = 2 * (self.n_jobs + self.n_runs) self.max_parallel_tasks = max_parallel_tasks or default_max_parallel_tasks - if isinstance(inputs, Sequence): - self.inputs_: Union[Sequence[T], "ObjectRef[T]"] = inputs - else: - self.inputs_ = self.parallel_backend.put(inputs) + self.inputs_ = inputs if reduce_func is None: reduce_func = Identity @@ -167,16 +164,14 @@ def __init__( self.map_kwargs = dict() else: self.map_kwargs = { - k: self.parallel_backend.put(v) if not isinstance(v, ObjectRef) else v - for k, v in map_kwargs.items() + k: self.parallel_backend.put(v) for k, v in map_kwargs.items() } if reduce_kwargs is None: self.reduce_kwargs = dict() else: self.reduce_kwargs = { - k: self.parallel_backend.put(v) if not isinstance(v, ObjectRef) else v - for k, v in reduce_kwargs.items() + k: self.parallel_backend.put(v) for k, v in reduce_kwargs.items() } self._map_func = maybe_add_argument(map_func, "job_id") @@ -190,7 +185,7 @@ def __call__( return reduce_results def map( - self, inputs: Union[Sequence[T], "ObjectRef[T]"] + self, inputs: Union[SequenceType[T], "ObjectRef[T]"] ) -> List[List["ObjectRef[R]"]]: map_results: List[List["ObjectRef[R]"]] = [] @@ -199,13 +194,13 @@ def map( total_n_jobs = 0 total_n_finished = 0 - for _ in range(self.n_runs): - # In this first case we don't use chunking at all - if self.n_runs >= self.n_jobs: - chunks = iter([inputs]) - else: - chunks = self._chunkify(inputs, n_chunks=self.n_jobs) + # In the first case we don't use chunking at all + if self.n_runs >= self.n_jobs: + chunks = self._chunkify(inputs, n_chunks=1) + else: + chunks = self._chunkify(inputs, n_chunks=self.n_jobs) + for _ in range(self.n_runs): map_result = [] for j, next_chunk in enumerate(chunks): result = map_func(next_chunk, job_id=j, **self.map_kwargs) @@ -239,11 +234,11 @@ def reduce(self, chunks: List[List["ObjectRef[R]"]]) -> List[R]: results = self.parallel_backend.get(reduce_results, timeout=self.timeout) return results # type: ignore - def _wrap_function(self, func): + def _wrap_function(self, func: Callable, **kwargs) -> Callable: remote_func = self.parallel_backend.wrap( - _wrap_func_with_remote_args(func, timeout=self.timeout) + _wrap_func_with_remote_args(func, timeout=self.timeout), **kwargs ) - return getattr(remote_func, "remote", remote_func) + return getattr(remote_func, "remote", remote_func) # type: ignore def _backpressure( self, jobs: List[ObjectRef], n_dispatched: int, n_finished: int @@ -263,22 +258,22 @@ def _backpressure( n_finished += len(finished_jobs) return n_finished - @staticmethod def _chunkify( - data: Union[Sequence[T], "ObjectRef[T]"], n_chunks: int - ) -> Iterator[Union[Sequence[T], "ObjectRef[T]"]]: - """If data is a sequence, it splits it into sequences of size `n_chunks` for each job that we call chunks. + self, data: Union[SequenceType[T], T], n_chunks: int + ) -> List["ObjectRef[T]"]: + """If data is a SequenceType, it splits it into SequenceTypes of size `n_chunks` for each job that we call chunks. If instead data is an `ObjectRef` instance, then it yields it repeatedly `n_chunks` number of times. """ if n_chunks == 0: raise ValueError("Number of chunks should be greater than 0") elif n_chunks == 1: - yield data + data_id = self.parallel_backend.put(data) + return [data_id] - if isinstance(data, ObjectRef): - for _ in range(n_chunks): - yield data + elif not isinstance(data, Sequence): + data_id = self.parallel_backend.put(data) + return list(repeat(data_id, times=n_chunks)) else: n = len(data) # This is very much inspired by numpy's array_split function @@ -292,10 +287,16 @@ def _chunkify( + (n_chunks - remainder) * [chunk_size] ) ) + + chunks = [] + for start_index, end_index in zip(chunk_indices[:-1], chunk_indices[1:]): if start_index >= end_index: - return - yield data[start_index:end_index] + break + chunk_id = self.parallel_backend.put(data[start_index:end_index]) + chunks.append(chunk_id) + + return chunks @property def n_jobs(self) -> int: diff --git a/tests/utils/test_parallel.py b/tests/utils/test_parallel.py index c3e3f7721..a0c364cb2 100644 --- a/tests/utils/test_parallel.py +++ b/tests/utils/test_parallel.py @@ -106,7 +106,9 @@ def test_map_reduce_job(map_reduce_job_and_parameters, indices, expected): ], ) def test_chunkification(data, n_chunks, expected_chunks): - chunks = list(MapReduceJob._chunkify(data, n_chunks)) + map_reduce_job = MapReduceJob([], map_func=lambda x: x) + chunks = list(map_reduce_job._chunkify(data, n_chunks)) + chunks = map_reduce_job.parallel_backend.get(chunks) assert chunks == expected_chunks diff --git a/tests/value/shapley/test_montecarlo.py b/tests/value/shapley/test_montecarlo.py index 8ae0ffeb0..d88274a50 100644 --- a/tests/value/shapley/test_montecarlo.py +++ b/tests/value/shapley/test_montecarlo.py @@ -186,7 +186,7 @@ def test_linear_montecarlo_with_outlier( @pytest.mark.parametrize( "fun, max_iterations, kwargs", [ - (permutation_montecarlo_shapley, 600, {}), + (permutation_montecarlo_shapley, 700, {}), (truncated_montecarlo_shapley, 500, {"coordinator_update_period": 1}), (owen_sampling_shapley, 4, {"max_q": 300, "method": "standard"}), # FIXME: antithetic breaks for non-deterministic u From 12cfb0c46fc3a3d660aea1a356addd58c2e5edf8 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Fri, 30 Dec 2022 16:52:14 +0100 Subject: [PATCH 38/42] Remove n_runs from MapReduceJobs, fix bug in _chunkify when passing numpy arrays --- src/pydvl/utils/parallel/map_reduce.py | 202 ++++++++++++------------- src/pydvl/value/shapley/montecarlo.py | 6 +- src/pydvl/value/shapley/naive.py | 12 +- tests/utils/conftest.py | 8 + tests/utils/test_caching.py | 33 ++-- tests/utils/test_parallel.py | 52 +++---- 6 files changed, 154 insertions(+), 159 deletions(-) create mode 100644 tests/utils/conftest.py diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index 767c5c571..3a36ca6c5 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -1,13 +1,20 @@ -from collections.abc import Iterable, Sequence +from collections.abc import Iterable from functools import singledispatch -from itertools import accumulate, chain, repeat -from typing import Any, Callable, Dict, Generic -from typing import Iterable as IterableType -from typing import List, Optional -from typing import Sequence as SequenceType -from typing import TypeVar, Union +from itertools import accumulate, repeat +from typing import ( + Any, + Callable, + Dict, + Generic, + List, + Optional, + Sequence, + TypeVar, + Union, +) import ray +from numpy.typing import NDArray from ray import ObjectRef from ..config import ParallelConfig @@ -21,7 +28,8 @@ Identity = lambda x, *args, **kwargs: x MapFunction = Callable[..., R] -ReduceFunction = Callable[[IterableType[R]], R] +ReduceFunction = Callable[[List[R]], R] +ChunkifyInputType = Union[NDArray[T], Sequence[T], T] def _wrap_func_with_remote_args(func: Callable, *, timeout: Optional[float] = None): @@ -58,25 +66,16 @@ def _(v: Iterable, *, timeout: Optional[float] = None) -> List[Any]: class MapReduceJob(Generic[T, R]): """Takes an embarrassingly parallel fun and runs it in `n_jobs` parallel - jobs, splitting the data into the same number of chunks, one for each job. - - It repeats the process `n_runs` times, allocating jobs across runs. E.g. - if `n_jobs=90` and `n_runs=10`, each whole execution of fun uses 9 jobs, - with the data split evenly among them. If `n_jobs=2` and `n_runs=10`, two - jobs are used, five times in succession, and each job receives all data. - - Results are aggregated per run using `reduce_func`, but **not across runs**. - A list of length `n_runs` is always returned. + jobs, splitting the data evenly into a number of chunks equal to the number of jobs. Typing information for objects of this class requires the type of the inputs that are split for `map_func` and the type of its output. - :param inputs: - :param map_func: Function that will be applied to the input chunks in each - job. + :param inputs: The input that will be split and passed to `map_func`. + if it's not a sequence object. It will be repeat `n_jobs` number of times. + :param map_func: Function that will be applied to the input chunks in each job. :param reduce_func: Function that will be applied to the results of - `map_func` to reduce them. This will be done independently for each run, - i.e. the reducer need and must not account for data of multiple runs. + `map_func` to reduce them. :param map_kwargs: Keyword arguments that will be passed to `map_func` in each job. Alternatively, one can use `itertools.partial`. :param reduce_kwargs: Keyword arguments that will be passed to `reduce_func` @@ -84,20 +83,18 @@ class MapReduceJob(Generic[T, R]): :param config: Instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc. :param n_jobs: Number of parallel jobs to run. Does not accept 0 - :param n_runs: Number of times to run `map_func` and `reduce_func` on the - whole data. :param timeout: Amount of time in seconds to wait for remote results before ... TODO :param max_parallel_tasks: Maximum number of jobs to start in parallel. Any tasks above this number won't be submitted to the backend before some are done. This is to avoid swamping the work queue. Note that tasks have - a low memory footprint, so this is probably not a big concernt, except + a low memory footprint, so this is probably not a big concern, except in the case of an infinite stream (not the case for MapReduceJob). See https://docs.ray.io/en/latest/ray-core/patterns/limit-pending-tasks.html :Examples: - A simple usage example with 2 jobs and 3 runs: + A simple usage example with 2 jobs: >>> from pydvl.utils.parallel import MapReduceJob >>> import numpy as np @@ -106,37 +103,34 @@ class MapReduceJob(Generic[T, R]): ... map_func=np.sum, ... reduce_func=np.sum, ... n_jobs=2, - ... n_runs=3, ... ) >>> map_reduce_job() - [10, 10, 10] + [10] - When passed a single object as input, it will be repeated for each job, if n_jobs > n_runs: + When passed a single object as input, it will be repeated for each job: >>> from pydvl.utils.parallel import MapReduceJob >>> import numpy as np >>> map_reduce_job: MapReduceJob[int, np.ndarray] = MapReduceJob( ... 5, - ... map_func=np.sum, + ... map_func=lambda x: np.array([x]), ... reduce_func=np.sum, ... n_jobs=4, - ... n_runs=2, ... ) >>> map_reduce_job() - [20, 20] + [20] """ def __init__( self, - inputs: Union[SequenceType[T], T], + inputs: Union[Sequence[T], T], map_func: MapFunction[R], reduce_func: Optional[ReduceFunction[R]] = None, map_kwargs: Optional[Dict] = None, reduce_kwargs: Optional[Dict] = None, config: ParallelConfig = ParallelConfig(), *, - n_jobs: int = 1, - n_runs: int = 1, + n_jobs: int = -1, timeout: Optional[float] = None, max_parallel_tasks: Optional[int] = None, ): @@ -145,14 +139,13 @@ def __init__( self.parallel_backend = parallel_backend self.timeout = timeout - self.n_runs = n_runs self._n_jobs = 1 # This uses the setter defined below self.n_jobs = n_jobs # TODO: Find a better default value? - default_max_parallel_tasks = 2 * (self.n_jobs + self.n_runs) + default_max_parallel_tasks = 2 * self.n_jobs self.max_parallel_tasks = max_parallel_tasks or default_max_parallel_tasks self.inputs_ = inputs @@ -179,60 +172,40 @@ def __init__( def __call__( self, - ) -> List[R]: + ) -> R: map_results = self.map(self.inputs_) reduce_results = self.reduce(map_results) return reduce_results - def map( - self, inputs: Union[SequenceType[T], "ObjectRef[T]"] - ) -> List[List["ObjectRef[R]"]]: - map_results: List[List["ObjectRef[R]"]] = [] + def map(self, inputs: Union[Sequence[T], T]) -> List["ObjectRef[R]"]: + map_results: List["ObjectRef[R]"] = [] map_func = self._wrap_function(self._map_func) total_n_jobs = 0 total_n_finished = 0 - # In the first case we don't use chunking at all - if self.n_runs >= self.n_jobs: - chunks = self._chunkify(inputs, n_chunks=1) - else: - chunks = self._chunkify(inputs, n_chunks=self.n_jobs) - - for _ in range(self.n_runs): - map_result = [] - for j, next_chunk in enumerate(chunks): - result = map_func(next_chunk, job_id=j, **self.map_kwargs) - map_result.append(result) - total_n_jobs += 1 - - total_n_finished = self._backpressure( - list(chain.from_iterable([*map_results, map_result])), - n_dispatched=total_n_jobs, - n_finished=total_n_finished, - ) + chunks = self._chunkify(inputs, n_chunks=self.n_jobs) + + for j, next_chunk in enumerate(chunks): + result = map_func(next_chunk, job_id=j, **self.map_kwargs) + map_results.append(result) + total_n_jobs += 1 - map_results.append(map_result) + total_n_finished = self._backpressure( + map_results, + n_dispatched=total_n_jobs, + n_finished=total_n_finished, + ) return map_results - def reduce(self, chunks: List[List["ObjectRef[R]"]]) -> List[R]: + def reduce(self, chunks: List["ObjectRef[R]"]) -> R: reduce_func = self._wrap_function(self._reduce_func) - total_n_jobs = 0 - total_n_finished = 0 - reduce_results = [] - - for i in range(self.n_runs): - result = reduce_func(chunks[i], **self.reduce_kwargs) - reduce_results.append(result) - total_n_jobs += 1 - total_n_finished = self._backpressure( - reduce_results, n_dispatched=total_n_jobs, n_finished=total_n_finished - ) - results = self.parallel_backend.get(reduce_results, timeout=self.timeout) - return results # type: ignore + reduce_result = reduce_func(chunks, **self.reduce_kwargs) + result = self.parallel_backend.get(reduce_result, timeout=self.timeout) + return result # type: ignore def _wrap_function(self, func: Callable, **kwargs) -> Callable: remote_func = self.parallel_backend.wrap( @@ -243,60 +216,71 @@ def _wrap_function(self, func: Callable, **kwargs) -> Callable: def _backpressure( self, jobs: List[ObjectRef], n_dispatched: int, n_finished: int ) -> int: - """ + """This is used to limit the number of concurrent tasks. + If :attr:`~pydvl.utils.parallel.map_reduce.MapReduceJob.max_parallel_tasks` is None then this function + is a no-op that simply returns 0. + See https://docs.ray.io/en/latest/ray-core/patterns/limit-pending-tasks.html :param jobs: :param n_dispatched: :param n_finished: :return: """ - while (n_in_flight := n_dispatched - n_finished) > self.max_parallel_tasks: - wait_for_num_jobs = n_in_flight - self.max_parallel_tasks - finished_jobs, _ = self.parallel_backend.wait( - jobs, num_returns=wait_for_num_jobs, timeout=10 # FIXME make parameter? - ) - n_finished += len(finished_jobs) + if self.max_parallel_tasks is None: + return 0 + else: + while (n_in_flight := n_dispatched - n_finished) > self.max_parallel_tasks: + wait_for_num_jobs = n_in_flight - self.max_parallel_tasks + finished_jobs, _ = self.parallel_backend.wait( + jobs, + num_returns=wait_for_num_jobs, + timeout=10, # FIXME make parameter? + ) + n_finished += len(finished_jobs) return n_finished - def _chunkify( - self, data: Union[SequenceType[T], T], n_chunks: int - ) -> List["ObjectRef[T]"]: - """If data is a SequenceType, it splits it into SequenceTypes of size `n_chunks` for each job that we call chunks. + def _chunkify(self, data: ChunkifyInputType, n_chunks: int) -> List["ObjectRef[T]"]: + """If data is a Sequence, it splits it into Sequences of size `n_chunks` for each job that we call chunks. If instead data is an `ObjectRef` instance, then it yields it repeatedly `n_chunks` number of times. """ - if n_chunks == 0: + if n_chunks <= 0: raise ValueError("Number of chunks should be greater than 0") elif n_chunks == 1: data_id = self.parallel_backend.put(data) return [data_id] - - elif not isinstance(data, Sequence): - data_id = self.parallel_backend.put(data) - return list(repeat(data_id, times=n_chunks)) else: - n = len(data) - # This is very much inspired by numpy's array_split function - # The difference is that it only uses built-in functions - # and does not convert the input data to an array - chunk_size, remainder = divmod(n, n_chunks) - chunk_indices = tuple( - accumulate( - [0] - + remainder * [chunk_size + 1] - + (n_chunks - remainder) * [chunk_size] + try: + # This is used as a check to determine whether data is iterable or not + # if it's the former, then the value will be used to determine the chunk indices. + n = len(data) + except TypeError: + data_id = self.parallel_backend.put(data) + return list(repeat(data_id, times=n_chunks)) + else: + # This is very much inspired by numpy's array_split function + # The difference is that it only uses built-in functions + # and does not convert the input data to an array + chunk_size, remainder = divmod(n, n_chunks) + chunk_indices = tuple( + accumulate( + [0] + + remainder * [chunk_size + 1] + + (n_chunks - remainder) * [chunk_size] + ) ) - ) - chunks = [] + chunks = [] - for start_index, end_index in zip(chunk_indices[:-1], chunk_indices[1:]): - if start_index >= end_index: - break - chunk_id = self.parallel_backend.put(data[start_index:end_index]) - chunks.append(chunk_id) + for start_index, end_index in zip( + chunk_indices[:-1], chunk_indices[1:] + ): + if start_index >= end_index: + break + chunk_id = self.parallel_backend.put(data[start_index:end_index]) + chunks.append(chunk_id) - return chunks + return chunks @property def n_jobs(self) -> int: diff --git a/src/pydvl/value/shapley/montecarlo.py b/src/pydvl/value/shapley/montecarlo.py index d30bf764a..757929f1e 100644 --- a/src/pydvl/value/shapley/montecarlo.py +++ b/src/pydvl/value/shapley/montecarlo.py @@ -233,7 +233,7 @@ def permutation_montecarlo_shapley( config=config, n_jobs=n_jobs, ) - full_results = map_reduce_job()[0] + full_results = map_reduce_job() values = np.mean(full_results, axis=0) stderr = np.std(full_results, axis=0) / np.sqrt(full_results.shape[0]) @@ -371,7 +371,7 @@ def combinatorial_montecarlo_shapley( n_jobs=n_jobs, config=config, ) - results = map_reduce_job()[0] + results = map_reduce_job() return ValuationResult( algorithm="combinatorial_montecarlo_shapley", @@ -525,7 +525,7 @@ def owen_sampling_shapley( config=config, ) - results = map_reduce_job()[0] + results = map_reduce_job() return ValuationResult( algorithm="owen_sampling_shapley", diff --git a/src/pydvl/value/shapley/naive.py b/src/pydvl/value/shapley/naive.py index 08145c9aa..2c0fdc95d 100644 --- a/src/pydvl/value/shapley/naive.py +++ b/src/pydvl/value/shapley/naive.py @@ -1,8 +1,10 @@ import math import warnings from itertools import permutations +from typing import List import numpy as np +from numpy.typing import NDArray from pydvl.utils import MapReduceJob, ParallelConfig, Utility, maybe_progress, powerset from pydvl.value.results import ValuationResult, ValuationStatus @@ -58,7 +60,7 @@ def permutation_exact_shapley(u: Utility, *, progress: bool = True) -> Valuation def _combinatorial_exact_shapley( indices: np.ndarray, u: Utility, progress: bool -) -> np.ndarray: +) -> NDArray: """Helper function for :func:`combinatorial_exact_shapley`. Computes the marginal utilities for the set of indices passed and returns @@ -111,10 +113,10 @@ def combinatorial_exact_shapley( f"Large dataset! Computation requires 2^{len(u.data)} calls to model.fit()" ) - def reduce_fun(results): - return np.array(results).sum(axis=0) + def reduce_fun(results: List[NDArray]) -> NDArray: + return np.array(results).sum(axis=0) # type: ignore - map_reduce_job: MapReduceJob[np.ndarray, np.ndarray] = MapReduceJob( + map_reduce_job: MapReduceJob[NDArray, NDArray] = MapReduceJob( u.data.indices, map_func=_combinatorial_exact_shapley, map_kwargs=dict(u=u, progress=progress), @@ -122,7 +124,7 @@ def reduce_fun(results): n_jobs=n_jobs, config=config, ) - values = map_reduce_job()[0] + values = map_reduce_job() return ValuationResult( algorithm="combinatorial_exact_shapley", status=ValuationStatus.Converged, diff --git a/tests/utils/conftest.py b/tests/utils/conftest.py new file mode 100644 index 000000000..8d4a822fe --- /dev/null +++ b/tests/utils/conftest.py @@ -0,0 +1,8 @@ +import pytest + +from pydvl.utils.config import ParallelConfig + + +@pytest.fixture(scope="session", params=["sequential", "ray"]) +def parallel_config(request): + return ParallelConfig(backend=request.param) diff --git a/tests/utils/test_caching.py b/tests/utils/test_caching.py index ea1b43d20..6437feb5d 100644 --- a/tests/utils/test_caching.py +++ b/tests/utils/test_caching.py @@ -1,6 +1,5 @@ import logging from time import sleep, time -from typing import Iterable import numpy as np import pytest @@ -52,7 +51,7 @@ def foo(indices: "NDArray[int]") -> float: assert hits_after > hits_before -def test_memcached_parallel_jobs(memcached_client): +def test_memcached_parallel_jobs(memcached_client, parallel_config): client, config = memcached_client @memcached( @@ -68,11 +67,18 @@ def foo(indices: "NDArray[int]", *args, **kwargs) -> float: n = 1234 n_runs = 10 hits_before = client.stats()[b"get_hits"] - map_reduce_job = MapReduceJob(np.arange(n), foo, np.sum, n_jobs=4, n_runs=n_runs) - result = map_reduce_job() + + map_reduce_job = MapReduceJob( + np.arange(n), foo, np.sum, n_jobs=4, config=parallel_config + ) + results = [] + + for _ in range(n_runs): + result = map_reduce_job() + results.append(result) hits_after = client.stats()[b"get_hits"] - assert result[0] == n * (n - 1) / 2 # Sanity check + assert results[0] == n * (n - 1) / 2 # Sanity check # FIXME! This is non-deterministic: if packets are delayed for longer than # the timeout configured then we won't have num_runs hits. So we add this # good old hard-coded magic number here. @@ -153,7 +159,7 @@ def foo_no_cache(indices: "NDArray[int]") -> float: @pytest.mark.parametrize("n_jobs", [1, 2]) @pytest.mark.parametrize("n_runs", [100]) def test_memcached_parallel_repeated_training( - memcached_client, n, atol, n_jobs, n_runs, seed=42 + memcached_client, n, atol, n_jobs, n_runs, parallel_config, seed=42 ): _, config = memcached_client np.random.seed(seed) @@ -166,20 +172,23 @@ def test_memcached_parallel_repeated_training( # Note that we typically do NOT want to ignore run_id ignore_args=["job_id", "run_id"], ) - def map_func(indices: "NDArray[int]") -> float: + def map_func(indices: "NDArray[np.int_]") -> float: # from pydvl.utils.logging import logger # logger.info(f"run_id: {run_id}, running...") return np.sum(indices).item() + np.random.normal(scale=5) - def reduce_func(chunks: "NDArray[float]") -> float: + def reduce_func(chunks: "NDArray[np.float_]") -> float: return np.sum(chunks).item() map_reduce_job = MapReduceJob( - np.arange(n), map_func, reduce_func, n_jobs=n_jobs, n_runs=n_runs + np.arange(n), map_func, reduce_func, n_jobs=n_jobs, config=parallel_config ) - result = map_reduce_job() + results = [] + for _ in range(n_runs): + result = map_reduce_job() + results.append(result) exact_value = np.sum(np.arange(n)).item() - assert np.isclose(result[-1], result[-2], atol=atol) - assert np.isclose(result[-1], exact_value, atol=atol) + assert np.isclose(results[-1], results[-2], atol=atol) + assert np.isclose(results[-1], exact_value, atol=atol) diff --git a/tests/utils/test_parallel.py b/tests/utils/test_parallel.py index a0c364cb2..b979fb681 100644 --- a/tests/utils/test_parallel.py +++ b/tests/utils/test_parallel.py @@ -1,6 +1,5 @@ import operator from functools import partial, reduce -from itertools import zip_longest import numpy as np import pytest @@ -9,13 +8,8 @@ from pydvl.utils.parallel import MapReduceJob -@pytest.fixture(scope="session", params=["sequential", "ray"]) -def parallel_config(request): - return ParallelConfig(backend=request.param) - - @pytest.fixture() -def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): +def map_reduce_job_and_parameters(parallel_config, n_jobs, request): try: kind, map_func, reduce_func = request.param assert kind == "custom" @@ -28,7 +22,6 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): reduce_func=np.sum, config=parallel_config, n_jobs=n_jobs, - n_runs=n_runs, ) elif kind == "list": map_reduce_job = partial( @@ -37,7 +30,6 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): reduce_func=lambda r: reduce(operator.add, r, []), config=parallel_config, n_jobs=n_jobs, - n_runs=n_runs, ) elif kind == "range": map_reduce_job = partial( @@ -46,7 +38,6 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): reduce_func=lambda r: reduce(operator.add, list(r), []), config=parallel_config, n_jobs=n_jobs, - n_runs=n_runs, ) elif kind == "custom": map_reduce_job = partial( @@ -55,7 +46,6 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): reduce_func=reduce_func, config=parallel_config, n_jobs=n_jobs, - n_runs=n_runs, ) else: map_reduce_job = partial( @@ -64,33 +54,29 @@ def map_reduce_job_and_parameters(parallel_config, n_jobs, n_runs, request): reduce_func=lambda r: r, config=parallel_config, n_jobs=n_jobs, - n_runs=n_runs, ) - return map_reduce_job, n_jobs, n_runs + return map_reduce_job, n_jobs @pytest.mark.parametrize( "map_reduce_job_and_parameters, indices, expected", [ - ("list", [], [[]]), - ("list", [1, 2], [[1, 2]]), - ("list", [1, 2, 3, 4], [[1, 2, 3, 4]]), - ("range", range(10), [list(range(10))]), - ("numpy", list(range(10)), [45]), + ("list", [], []), + ("list", [1, 2], [1, 2]), + ("list", [1, 2, 3, 4], [1, 2, 3, 4]), + ("range", range(10), list(range(10))), + ("numpy", np.arange(10), 45), ], indirect=["map_reduce_job_and_parameters"], ) @pytest.mark.parametrize("n_jobs", [1, 2, 4]) -@pytest.mark.parametrize("n_runs", [1, 2, 4]) def test_map_reduce_job(map_reduce_job_and_parameters, indices, expected): - map_reduce_job, n_jobs, n_runs = map_reduce_job_and_parameters + map_reduce_job, n_jobs = map_reduce_job_and_parameters result = map_reduce_job(indices)() - assert len(result) == n_runs - for exp, ret in zip_longest(expected * n_runs, result, fillvalue=None): - if not isinstance(ret, np.ndarray): - assert ret == exp - else: - assert (ret == exp).all() + if not isinstance(result, np.ndarray): + assert result == expected + else: + assert (result == expected).all() @pytest.mark.parametrize( @@ -101,15 +87,21 @@ def test_map_reduce_job(map_reduce_job_and_parameters, indices, expected): ([1, 2, 3, 4], 2, [[1, 2], [3, 4]]), ([1, 2, 3, 4], 3, [[1, 2], [3], [4]]), ([1, 2, 3, 4], 5, [[1], [2], [3], [4]]), - (range(10), 4, [range(0, 3), range(3, 6), range(6, 8), range(8, 10)]), (list(range(5)), 42, [[i] for i in range(5)]), + (np.arange(5), 42, [[i] for i in range(5)]), + (range(10), 4, [range(0, 3), range(3, 6), range(6, 8), range(8, 10)]), + (np.arange(10), 4, np.array_split(np.arange(10), 4)), ], ) def test_chunkification(data, n_chunks, expected_chunks): map_reduce_job = MapReduceJob([], map_func=lambda x: x) chunks = list(map_reduce_job._chunkify(data, n_chunks)) chunks = map_reduce_job.parallel_backend.get(chunks) - assert chunks == expected_chunks + for x, y in zip(chunks, expected_chunks): + if not isinstance(x, np.ndarray): + assert x == y + else: + assert (x == y).all() @pytest.mark.parametrize( @@ -150,9 +142,9 @@ def map_func(x): # TODO: figure out test cases for this test @pytest.mark.skip @pytest.mark.parametrize( - "map_reduce_job_and_parameters, indices, n_jobs, n_runs, expected", + "map_reduce_job_and_parameters, indices, n_jobs, expected", [ - ("other", [], 1, 1, [[]]), + ("other", [], 1, [[]]), ], indirect=["map_reduce_job_and_parameters"], ) From 03c690f4ad0dcf48e076373af82f1784934d30d0 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Fri, 30 Dec 2022 18:26:42 +0100 Subject: [PATCH 39/42] Set default value of max_parallel_tasks to None, add tests for _get_value and using partial instead of map_kwargs and reduce_kwargs --- src/pydvl/utils/parallel/map_reduce.py | 46 ++++++++++++++++---------- tests/utils/test_parallel.py | 44 ++++++++++++++++-------- 2 files changed, 60 insertions(+), 30 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index 3a36ca6c5..edc58ad1d 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -1,5 +1,6 @@ +import inspect from collections.abc import Iterable -from functools import singledispatch +from functools import singledispatch, update_wrapper from itertools import accumulate, repeat from typing import ( Any, @@ -13,6 +14,7 @@ Union, ) +import numpy as np import ray from numpy.typing import NDArray from ray import ObjectRef @@ -41,11 +43,15 @@ def wrapper(*args, **kwargs): kwargs[k] = _get_value(v, timeout=timeout) return func(*args, **kwargs) - # Doing it manually here because using wraps or update_wrapper - # from functools doesn't work with ray for some unknown reason - wrapper.__name__ = func.__name__ - wrapper.__qualname__ = func.__qualname__ - wrapper.__doc__ = func.__doc__ + try: + inspect.signature(func) + wrapper = update_wrapper(wrapper, func) + except ValueError: + # Doing it manually here because using update_wrapper from functools + # on numpy functions doesn't work with ray for some unknown reason. + wrapper.__name__ = func.__name__ + wrapper.__qualname__ = func.__qualname__ + wrapper.__doc__ = func.__doc__ return wrapper @@ -59,6 +65,11 @@ def _(v: ObjectRef, *, timeout: Optional[float] = None) -> Any: return ray.get(v, timeout=timeout) +@_get_value.register +def _(v: np.ndarray, *, timeout: Optional[float] = None) -> NDArray: + return v + + @_get_value.register def _(v: Iterable, *, timeout: Optional[float] = None) -> List[Any]: return [_get_value(x, timeout=timeout) for x in v] @@ -72,14 +83,14 @@ class MapReduceJob(Generic[T, R]): that are split for `map_func` and the type of its output. :param inputs: The input that will be split and passed to `map_func`. - if it's not a sequence object. It will be repeat `n_jobs` number of times. + if it's not a sequence object. It will be repeat ``n_jobs`` number of times. :param map_func: Function that will be applied to the input chunks in each job. :param reduce_func: Function that will be applied to the results of - `map_func` to reduce them. - :param map_kwargs: Keyword arguments that will be passed to `map_func` in + ``map_func`` to reduce them. + :param map_kwargs: Keyword arguments that will be passed to ``map_func`` in each job. Alternatively, one can use `itertools.partial`. - :param reduce_kwargs: Keyword arguments that will be passed to `reduce_func` - in each job. Alternatively, one can use `itertools.partial`. + :param reduce_kwargs: Keyword arguments that will be passed to ``reduce_func`` + in each job. Alternatively, one can use :func:`itertools.partial`. :param config: Instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc. :param n_jobs: Number of parallel jobs to run. Does not accept 0 @@ -105,7 +116,7 @@ class MapReduceJob(Generic[T, R]): ... n_jobs=2, ... ) >>> map_reduce_job() - [10] + 10 When passed a single object as input, it will be repeated for each job: @@ -118,7 +129,7 @@ class MapReduceJob(Generic[T, R]): ... n_jobs=4, ... ) >>> map_reduce_job() - [20] + 20 """ def __init__( @@ -144,9 +155,7 @@ def __init__( # This uses the setter defined below self.n_jobs = n_jobs - # TODO: Find a better default value? - default_max_parallel_tasks = 2 * self.n_jobs - self.max_parallel_tasks = max_parallel_tasks or default_max_parallel_tasks + self.max_parallel_tasks = max_parallel_tasks self.inputs_ = inputs @@ -178,6 +187,7 @@ def __call__( return reduce_results def map(self, inputs: Union[Sequence[T], T]) -> List["ObjectRef[R]"]: + """Splits the input data into chunks and calls a wrapped :func:`map_func` on them.""" map_results: List["ObjectRef[R]"] = [] map_func = self._wrap_function(self._map_func) @@ -197,10 +207,11 @@ def map(self, inputs: Union[Sequence[T], T]) -> List["ObjectRef[R]"]: n_dispatched=total_n_jobs, n_finished=total_n_finished, ) - return map_results def reduce(self, chunks: List["ObjectRef[R]"]) -> R: + """Reduces the resulting chunks from a call to :meth:`~pydvl.utils.parallel.map_reduce.MapReduceJob.map` + by passing them to a wrapped :func:`reduce_func`.""" reduce_func = self._wrap_function(self._reduce_func) reduce_result = reduce_func(chunks, **self.reduce_kwargs) @@ -284,6 +295,7 @@ def _chunkify(self, data: ChunkifyInputType, n_chunks: int) -> List["ObjectRef[T @property def n_jobs(self) -> int: + """Effective number of jobs according to the used ParallelBackend instance.""" return self._n_jobs @n_jobs.setter diff --git a/tests/utils/test_parallel.py b/tests/utils/test_parallel.py index b979fb681..fdac67753 100644 --- a/tests/utils/test_parallel.py +++ b/tests/utils/test_parallel.py @@ -4,8 +4,8 @@ import numpy as np import pytest -from pydvl.utils.config import ParallelConfig -from pydvl.utils.parallel import MapReduceJob +from pydvl.utils.parallel import MapReduceJob, init_parallel_backend +from pydvl.utils.parallel.map_reduce import _get_value @pytest.fixture() @@ -139,18 +139,36 @@ def map_func(x): assert n_finished == expected_n_finished -# TODO: figure out test cases for this test -@pytest.mark.skip +def test_map_reduce_job_partial_map_and_reduce_func(parallel_config): + def map_func(x, y): + return x + y + + def reduce_func(x, y): + return np.sum(np.concatenate(x)) + y + + map_func = partial(map_func, y=10) + reduce_func = partial(reduce_func, y=5) + + map_reduce_job = MapReduceJob( + np.arange(10), + map_func=map_func, + reduce_func=reduce_func, + config=parallel_config, + ) + result = map_reduce_job() + assert result == 150 + + @pytest.mark.parametrize( - "map_reduce_job_and_parameters, indices, n_jobs, expected", + "x, expected_x", [ - ("other", [], 1, [[]]), + (None, None), + ([0, 1], [0, 1]), + (np.arange(3), np.arange(3)), ], - indirect=["map_reduce_job_and_parameters"], ) -def test_map_reduce_job_expected_failures( - map_reduce_job_and_parameters, indices, expected -): - map_reduce_job, *_ = map_reduce_job_and_parameters - with pytest.raises(expected): - map_reduce_job(indices) +def test_map_reduce_get_value(x, expected_x, parallel_config): + assert np.all(_get_value(x) == expected_x) + parallel_backend = init_parallel_backend(parallel_config) + x_id = parallel_backend.put(x) + assert np.all(_get_value(x_id) == expected_x) From faa582ecfa38c98952611e2f5486a8ef6594fd87 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Fri, 30 Dec 2022 18:42:44 +0100 Subject: [PATCH 40/42] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e5189983..196e49c3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ [PR #195](https://github.com/appliedAI-Initiative/pyDVL/pull/195) - **Breaking change**: Passes the input to `MapReduceJob` at initialization, removes `chunkify_inputs` argument from `MapReduceJob`, + removes `n_runs` argument from `MapReduceJob`, + calls the parallel backend's `put()` method for each generated chunk in `_chunkify()`, renames ParallelConfig's `num_workers` attribute to `n_local_workers`, fixes a bug in `MapReduceJob`'s chunkification when `n_runs` >= `n_jobs`, and defines a sequential parallel backend to run all jobs in the current thread From cdabd2f3c7e393c9388e6e989c7f3bff7528116b Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Fri, 30 Dec 2022 19:39:14 +0100 Subject: [PATCH 41/42] Update import path for available_cpus helper function --- notebooks/shapley_basic_spotify.ipynb | 2 +- notebooks/shapley_utility_learning.ipynb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/notebooks/shapley_basic_spotify.ipynb b/notebooks/shapley_basic_spotify.ipynb index 25b6a6563..7744f886f 100644 --- a/notebooks/shapley_basic_spotify.ipynb +++ b/notebooks/shapley_basic_spotify.ipynb @@ -95,12 +95,12 @@ "source": [ "from pydvl.reporting.plots import plot_shapley\n", "from pydvl.utils import (\n", - " available_cpus,\n", " load_spotify_dataset,\n", " Dataset,\n", " GroupedDataset,\n", " Utility,\n", ")\n", + "from pydvl.utils.parallel.backend import available_cpus\n", "from pydvl.value.shapley import compute_shapley_values" ] }, diff --git a/notebooks/shapley_utility_learning.ipynb b/notebooks/shapley_utility_learning.ipynb index 1b522a929..65824e1ed 100644 --- a/notebooks/shapley_utility_learning.ipynb +++ b/notebooks/shapley_utility_learning.ipynb @@ -149,8 +149,8 @@ " Utility,\n", " DataUtilityLearning,\n", " top_k_value_accuracy,\n", - " available_cpus,\n", ")\n", + "from pydvl.utils.parallel.backend import available_cpus\n", "from pydvl.reporting.plots import shaded_mean_std\n", "from pydvl.value.shapley import compute_shapley_values" ] From cdfe9ebb0ae3afe40918ff1fd5108955558ca252 Mon Sep 17 00:00:00 2001 From: Anes Benmerzoug Date: Sun, 1 Jan 2023 16:18:50 +0100 Subject: [PATCH 42/42] Apply suggestions from code review Co-authored-by: Miguel de Benito Delgado --- src/pydvl/utils/parallel/map_reduce.py | 71 ++++++++++++-------------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/src/pydvl/utils/parallel/map_reduce.py b/src/pydvl/utils/parallel/map_reduce.py index edc58ad1d..d60fa7d95 100644 --- a/src/pydvl/utils/parallel/map_reduce.py +++ b/src/pydvl/utils/parallel/map_reduce.py @@ -239,15 +239,14 @@ def _backpressure( """ if self.max_parallel_tasks is None: return 0 - else: - while (n_in_flight := n_dispatched - n_finished) > self.max_parallel_tasks: - wait_for_num_jobs = n_in_flight - self.max_parallel_tasks - finished_jobs, _ = self.parallel_backend.wait( - jobs, - num_returns=wait_for_num_jobs, - timeout=10, # FIXME make parameter? - ) - n_finished += len(finished_jobs) + while (n_in_flight := n_dispatched - n_finished) > self.max_parallel_tasks: + wait_for_num_jobs = n_in_flight - self.max_parallel_tasks + finished_jobs, _ = self.parallel_backend.wait( + jobs, + num_returns=wait_for_num_jobs, + timeout=10, # FIXME make parameter? + ) + n_finished += len(finished_jobs) return n_finished def _chunkify(self, data: ChunkifyInputType, n_chunks: int) -> List["ObjectRef[T]"]: @@ -257,41 +256,39 @@ def _chunkify(self, data: ChunkifyInputType, n_chunks: int) -> List["ObjectRef[T if n_chunks <= 0: raise ValueError("Number of chunks should be greater than 0") - elif n_chunks == 1: + if n_chunks == 1: data_id = self.parallel_backend.put(data) return [data_id] + + try: + # This is used as a check to determine whether data is iterable or not + # if it's the former, then the value will be used to determine the chunk indices. + n = len(data) + except TypeError: + data_id = self.parallel_backend.put(data) + return list(repeat(data_id, times=n_chunks)) else: - try: - # This is used as a check to determine whether data is iterable or not - # if it's the former, then the value will be used to determine the chunk indices. - n = len(data) - except TypeError: - data_id = self.parallel_backend.put(data) - return list(repeat(data_id, times=n_chunks)) - else: - # This is very much inspired by numpy's array_split function - # The difference is that it only uses built-in functions - # and does not convert the input data to an array - chunk_size, remainder = divmod(n, n_chunks) - chunk_indices = tuple( - accumulate( - [0] - + remainder * [chunk_size + 1] - + (n_chunks - remainder) * [chunk_size] - ) + # This is very much inspired by numpy's array_split function + # The difference is that it only uses built-in functions + # and does not convert the input data to an array + chunk_size, remainder = divmod(n, n_chunks) + chunk_indices = tuple( + accumulate( + [0] + + remainder * [chunk_size + 1] + + (n_chunks - remainder) * [chunk_size] ) + ) - chunks = [] + chunks = [] - for start_index, end_index in zip( - chunk_indices[:-1], chunk_indices[1:] - ): - if start_index >= end_index: - break - chunk_id = self.parallel_backend.put(data[start_index:end_index]) - chunks.append(chunk_id) + for start_index, end_index in zip(chunk_indices[:-1], chunk_indices[1:]): + if start_index >= end_index: + break + chunk_id = self.parallel_backend.put(data[start_index:end_index]) + chunks.append(chunk_id) - return chunks + return chunks @property def n_jobs(self) -> int: