Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and improve map reduce #232

Merged
merged 43 commits into from
Jan 1, 2023
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
37f84d2
Add SequentialParallelBackend to run tasks without parallelism
AnesBenmerzoug Dec 9, 2022
949a3bf
Fix map_reduce test for non chunkified inputs
AnesBenmerzoug Dec 11, 2022
74c4cdd
Use expit from scipy to avoid overflow warnings
AnesBenmerzoug Dec 11, 2022
58d2149
Remove the chunkify_inputs argument of MapReduceJob
AnesBenmerzoug Dec 11, 2022
9a1ad1a
Rename num_chunks to n_chunks for consistency
AnesBenmerzoug Dec 11, 2022
80dec53
Remove all optional arguments from MapReduceJob's __call__ method
AnesBenmerzoug Dec 11, 2022
31be0b6
Call parallel backend inside MapReduceJob when a Utility or sequence …
AnesBenmerzoug Dec 11, 2022
7c72d2a
Rename num_workers in ParallelConfig to n_local_workers
AnesBenmerzoug Dec 11, 2022
219403f
Add a __repr__ to ValuationResults
AnesBenmerzoug Dec 11, 2022
9bc96d1
If n_runs >= n_jobs, do not chunkify inputs
AnesBenmerzoug Dec 11, 2022
6fcdc5f
Use older version of tox
AnesBenmerzoug Dec 19, 2022
0c2454d
Update changelog
AnesBenmerzoug Dec 19, 2022
c77a474
Apply suggestions from code review
AnesBenmerzoug Dec 19, 2022
4aa6450
Set explicit type for result of expit
AnesBenmerzoug Dec 19, 2022
8a15206
Explicitly use RayParallelBackend in RayActorWrapper's docstring example
AnesBenmerzoug Dec 19, 2022
2f4be67
Add support for sequential parallel backend in get_shapley_coordinato…
AnesBenmerzoug Dec 19, 2022
a75a4c9
Use singledispatchmethod and singledispatch decorators in map_reduce
AnesBenmerzoug Dec 19, 2022
9e68790
Use Any as type of default _get_value function
AnesBenmerzoug Dec 20, 2022
89b67b3
Use Iterable from typing for ReduceFunction type
AnesBenmerzoug Dec 20, 2022
cd1ced5
Fixes
AnesBenmerzoug Dec 20, 2022
dc491ec
Another fix
AnesBenmerzoug Dec 20, 2022
d06389f
Set ignore_reinit_error to True when using local ray cluster
AnesBenmerzoug Dec 21, 2022
f35de7a
Fix check in RayParallelBackend's init
AnesBenmerzoug Dec 21, 2022
7d25f14
Do not expose available_cpus at the package level
AnesBenmerzoug Dec 28, 2022
8ed2806
Disallow instantiating parallel backend classes directly
AnesBenmerzoug Dec 28, 2022
cbe1ca9
Remove unused import
AnesBenmerzoug Dec 28, 2022
9cdcde8
Fix RayActorWrapper's docstring
AnesBenmerzoug Dec 28, 2022
fd8f6bd
Pass inputs to MapReduceJob at initialization
AnesBenmerzoug Dec 28, 2022
0ce2d59
Apply suggestions from code review
AnesBenmerzoug Dec 29, 2022
59534d6
Fix indentation error
AnesBenmerzoug Dec 29, 2022
ff247df
Improve effective_n_jobs interface
AnesBenmerzoug Dec 29, 2022
bbaa07b
Set default n_jobs in shapley methods to 1
AnesBenmerzoug Dec 29, 2022
4dffe03
Call put on map_kwargs' and reduce_kwargs' items inside MapReduceJob'…
AnesBenmerzoug Dec 29, 2022
e9ccbc2
Update src/pydvl/utils/parallel/backend.py
AnesBenmerzoug Dec 29, 2022
fe2bab6
Remove usage of singledispatchmethod in MapReduceJob
AnesBenmerzoug Dec 29, 2022
f8e3aba
Fix type hint
AnesBenmerzoug Dec 29, 2022
0cd9945
Call put() on chunks inside _chunkify
AnesBenmerzoug Dec 30, 2022
12cfb0c
Remove n_runs from MapReduceJobs, fix bug in _chunkify when passing n…
AnesBenmerzoug Dec 30, 2022
03c690f
Set default value of max_parallel_tasks to None, add tests for _get_v…
AnesBenmerzoug Dec 30, 2022
f698124
Merge branch 'develop' into fix-and-improve-map-reduce
AnesBenmerzoug Dec 30, 2022
faa582e
Update changelog
AnesBenmerzoug Dec 30, 2022
cdabd2f
Update import path for available_cpus helper function
AnesBenmerzoug Dec 30, 2022
cdfe9eb
Apply suggestions from code review
AnesBenmerzoug Jan 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
- Fixes bug in Influence calculation with multi-dimensional input and adds
new example notebook
[PR #195](https://github.com/appliedAI-Initiative/pyDVL/pull/195)
- **Breaking change**: Passes the input to `MapReduceJob` at initialization,
removes `chunkify_inputs` argument from `MapReduceJob`,
removes `n_runs` argument from `MapReduceJob`,
calls the parallel backend's `put()` method for each generated chunk in `_chunkify()`,
renames ParallelConfig's `num_workers` attribute to `n_local_workers`,
fixes a bug in `MapReduceJob`'s chunkification when `n_runs` >= `n_jobs`,
and defines a sequential parallel backend to run all jobs in the current thread
[PR #232](https://github.com/appliedAI-Initiative/pyDVL/pull/232)

## 0.3.0 - 💥 Breaking changes

Expand Down
2 changes: 1 addition & 1 deletion notebooks/shapley_basic_spotify.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@
"source": [
"from pydvl.reporting.plots import plot_shapley\n",
"from pydvl.utils import (\n",
" available_cpus,\n",
" load_spotify_dataset,\n",
" Dataset,\n",
" GroupedDataset,\n",
" Utility,\n",
")\n",
"from pydvl.utils.parallel.backend import available_cpus\n",
"from pydvl.value.shapley import compute_shapley_values"
]
},
Expand Down
2 changes: 1 addition & 1 deletion notebooks/shapley_utility_learning.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@
" Utility,\n",
" DataUtilityLearning,\n",
" top_k_value_accuracy,\n",
" available_cpus,\n",
")\n",
"from pydvl.utils.parallel.backend import available_cpus\n",
"from pydvl.reporting.plots import shaded_mean_std\n",
"from pydvl.value.shapley import compute_shapley_values"
]
Expand Down
8 changes: 4 additions & 4 deletions src/pydvl/utils/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Iterable, Optional, Tuple, Union
from typing import Iterable, Literal, Optional, Tuple, Union

from pymemcache.serde import PickleSerde

Expand All @@ -16,12 +16,12 @@ class ParallelConfig:

:param backend: Type of backend to use. For now only 'ray' is supported.
:param address: Address of existing remote or local cluster to use.
:param num_workers: Number of workers (CPUs) to use.
:param n_local_workers: Number of workers (CPUs) to use when using a local ray cluster
"""

backend: str = "ray"
backend: Literal["sequential", "ray"] = "ray"
address: Optional[Union[str, Tuple[str, int]]] = None
num_workers: Optional[int] = None
n_local_workers: Optional[int] = None


@unpackable
Expand Down
4 changes: 3 additions & 1 deletion src/pydvl/utils/numeric.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)

import numpy as np
from scipy.special import expit

from pydvl.utils.types import compose_score

Expand Down Expand Up @@ -252,7 +253,8 @@ def top_k_value_accuracy(y_true: "NDArray", y_pred: "NDArray", k: int = 3) -> fl


def sigmoid(x: float) -> float:
return float(1 / (1 + np.exp(-x)))
result: float = expit(x).item()
return result


squashed_r2 = compose_score("r2", sigmoid, "squashed r2")
Expand Down
11 changes: 4 additions & 7 deletions src/pydvl/utils/parallel/actor.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import abc
import inspect
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Union

from ray import ObjectRef

from .backend import RayParallelBackend

if TYPE_CHECKING:
from numpy.typing import NDArray


__all__ = ["RayActorWrapper", "Coordinator", "Worker"]


Expand All @@ -25,7 +21,7 @@ class RayActorWrapper:

:Example:

>>> from pydvl.utils.parallel import init_parallel_backend
>>> from pydvl.utils.parallel.backend import RayParallelBackend, init_parallel_backend
>>> from pydvl.utils.config import ParallelConfig
>>> from pydvl.utils.parallel.actor import RayActorWrapper
>>> class Actor:
Expand All @@ -35,8 +31,9 @@ class RayActorWrapper:
... def get(self):
... return self.x
...
>>> config = ParallelConfig()
>>> config = ParallelConfig(backend="ray")
>>> parallel_backend = init_parallel_backend(config)
>>> assert isinstance(parallel_backend, RayParallelBackend)
>>> actor_handle = parallel_backend.wrap(Actor).remote(5)
>>> parallel_backend.get(actor_handle.get.remote())
5
Expand Down
177 changes: 137 additions & 40 deletions src/pydvl/utils/parallel/backend.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import functools
import os
from abc import ABCMeta, abstractmethod
from dataclasses import asdict
from typing import Any, Iterable, List, Optional, Tuple, TypeVar, Union
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union

import ray
from ray import ObjectRef
Expand All @@ -10,86 +12,182 @@

__all__ = [
"init_parallel_backend",
"available_cpus",
]

T = TypeVar("T")

_PARALLEL_BACKEND: Optional["RayParallelBackend"] = None
_PARALLEL_BACKENDS: Dict[str, "Type[BaseParallelBackend]"] = {}


class RayParallelBackend:
"""Class used to wrap ray to make it transparent to algorithms. It shouldn't
class NoPublicConstructor(ABCMeta):
"""Metaclass that ensures a private constructor

If a class uses this metaclass like this:

class SomeClass(metaclass=NoPublicConstructor):
pass

If you try to instantiate your class (`SomeClass()`),
a `TypeError` will be thrown.

Taken almost verbatim from:
https://stackoverflow.com/a/64682734
"""

def __call__(cls, *args, **kwargs):
raise TypeError(
f"{cls.__module__}.{cls.__qualname__} cannot be initialized directly. "
"Use init_parallel_backend() instead."
)

def _create(cls, *args: Any, **kwargs: Any):
return super().__call__(*args, **kwargs)


class BaseParallelBackend(metaclass=NoPublicConstructor):
"""Abstract base class for all parallel backends"""

config: Dict[str, Any] = {}

def __init_subclass__(cls, *, backend_name: str, **kwargs):
global _PARALLEL_BACKENDS
_PARALLEL_BACKENDS[backend_name] = cls
super().__init_subclass__(**kwargs)

@abstractmethod
def get(self, v: Any, *args, **kwargs):
...

@abstractmethod
def put(self, v: Any, *args, **kwargs) -> Any:
...

@abstractmethod
def wrap(self, *args, **kwargs) -> Any:
...

@abstractmethod
def wait(self, v: Any, *args, **kwargs) -> Any:
...

@abstractmethod
def _effective_n_jobs(self, n_jobs: int) -> int:
...

def effective_n_jobs(self, n_jobs: int = -1) -> int:
if n_jobs == 0:
raise ValueError("n_jobs == 0 in Parallel has no meaning")
n_jobs = self._effective_n_jobs(n_jobs)
return n_jobs

def __repr__(self) -> str:
return f"<{self.__class__.__name__}: {self.config}>"


class SequentialParallelBackend(BaseParallelBackend, backend_name="sequential"):
"""Class used to run jobs sequentially and locally. It shouldn't
be initialized directly. You should instead call `init_parallel_backend`.
AnesBenmerzoug marked this conversation as resolved.
Show resolved Hide resolved

:param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with
cluster address, number of cpus, etc.
:param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with number of cpus
"""

:Example:
def __init__(self, config: ParallelConfig):
config_dict = asdict(config)
config_dict.pop("backend")
config_dict.pop("address")
config_dict["num_cpus"] = config_dict.pop("n_local_workers")
self.config = config_dict

>>> from pydvl.utils.parallel.backend import RayParallelBackend
>>> from pydvl.utils.config import ParallelConfig
>>> config = ParallelConfig(backend="ray")
>>> parallel_backend = RayParallelBackend(config)
>>> parallel_backend
<RayParallelBackend: {'address': None, 'num_cpus': None}>
def get(self, v: Any, *args, **kwargs):
return v

def put(self, v: Any, *args, **kwargs) -> Any:
AnesBenmerzoug marked this conversation as resolved.
Show resolved Hide resolved
return v

def wrap(self, *args, **kwargs) -> Any:
assert len(args) == 1
return functools.partial(args[0], **kwargs)

def wait(self, v: Any, *args, **kwargs) -> Tuple[list, list]:
return v, []

def _effective_n_jobs(self, n_jobs: int) -> int:
if n_jobs < 0:
if self.config["num_cpus"]:
eff_n_jobs: int = self.config["num_cpus"]
else:
eff_n_jobs = available_cpus()
else:
eff_n_jobs = n_jobs
return eff_n_jobs


class RayParallelBackend(BaseParallelBackend, backend_name="ray"):
"""Class used to wrap ray to make it transparent to algorithms. It shouldn't
be initialized directly. You should instead call `init_parallel_backend`.

:param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with
cluster address, number of cpus, etc.
"""

def __init__(self, config: ParallelConfig):
config_dict = asdict(config)
config_dict.pop("backend")
config_dict["num_cpus"] = config_dict.pop("num_workers")
config_dict["num_cpus"] = config_dict.pop("n_local_workers")
self.config = config_dict
if self.config["address"] is None:
self.config["ignore_reinit_error"] = True
ray.init(**self.config)

def get(
self,
v: Union[ObjectRef, Iterable[ObjectRef], T],
*,
timeout: Optional[float] = None,
*args,
**kwargs,
) -> Union[T, Any]:
timeout: Optional[float] = kwargs.get("timeout", None)
if isinstance(v, ObjectRef):
return ray.get(v, timeout=timeout)
elif isinstance(v, Iterable):
return [self.get(x, timeout=timeout) for x in v]
else:
return v

def put(self, x: Any, **kwargs) -> ObjectRef:
return ray.put(x, **kwargs) # type: ignore
def put(self, v: T, *args, **kwargs) -> Union["ObjectRef[T]", T]:
try:
return ray.put(v, **kwargs) # type: ignore
except TypeError:
return v # type: ignore

def wrap(self, *args, **kwargs) -> RemoteFunction:
return ray.remote(*args, **kwargs) # type: ignore

def wait(
self,
object_refs: List["ray.ObjectRef"],
*,
num_returns: int = 1,
timeout: Optional[float] = None,
v: List["ObjectRef"],
*args,
**kwargs,
) -> Tuple[List[ObjectRef], List[ObjectRef]]:
num_returns: int = kwargs.get("num_returns", 1)
timeout: Optional[float] = kwargs.get("timeout", None)
return ray.wait( # type: ignore
object_refs,
v,
num_returns=num_returns,
timeout=timeout,
)

def effective_n_jobs(self, n_jobs: Optional[int]) -> int:
if n_jobs == 0:
raise ValueError("n_jobs == 0 in Parallel has no meaning")
elif n_jobs is None or n_jobs < 0:
def _effective_n_jobs(self, n_jobs: int) -> int:
if n_jobs < 0:
ray_cpus = int(ray._private.state.cluster_resources()["CPU"]) # type: ignore
eff_n_jobs = ray_cpus
else:
eff_n_jobs = n_jobs
return eff_n_jobs

def __repr__(self) -> str:
return f"<RayParallelBackend: {self.config}>"


def init_parallel_backend(config: ParallelConfig) -> "RayParallelBackend":
def init_parallel_backend(
config: ParallelConfig,
) -> BaseParallelBackend:
"""Initializes the parallel backend and returns an instance of it.

:param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc.
Expand All @@ -101,16 +199,15 @@ def init_parallel_backend(config: ParallelConfig) -> "RayParallelBackend":
>>> config = ParallelConfig(backend="ray")
>>> parallel_backend = init_parallel_backend(config)
>>> parallel_backend
<RayParallelBackend: {'address': None, 'num_cpus': None}>
<RayParallelBackend: {'address': None, 'num_cpus': None, 'ignore_reinit_error': True}>

"""
global _PARALLEL_BACKEND
if _PARALLEL_BACKEND is None:
if config.backend == "ray":
_PARALLEL_BACKEND = RayParallelBackend(config)
else:
raise NotImplementedError(f"Unexpected parallel type {config.backend}")
return _PARALLEL_BACKEND
try:
parallel_backend_cls = _PARALLEL_BACKENDS[config.backend]
except KeyError:
raise NotImplementedError(f"Unexpected parallel backend {config.backend}")
parallel_backend = parallel_backend_cls._create(config)
return parallel_backend # type: ignore


def available_cpus() -> int:
Expand Down
Loading