Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Add TuneController [no_early_kickoff] #33499

Merged
merged 67 commits into from
Mar 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
c942c1d
Initial refactoring
Mar 11, 2023
1381d47
Merge remote-tracking branch 'upstream/master' into tune/trial-runner…
Mar 20, 2023
a7d154a
Pending trial list
Mar 20, 2023
3230fbd
Reuse
Mar 20, 2023
5ffbf1e
Split TuneController
Mar 20, 2023
bd07f50
whitelist
Mar 20, 2023
27e4cd9
Undo changes to example
Mar 20, 2023
4eb0311
Set trial executor early
Mar 20, 2023
0cda6a0
lru cache
Mar 20, 2023
0b433db
Merge remote-tracking branch 'upstream/master' into tune/trial-runner…
Mar 20, 2023
e88f06d
Rebase
Mar 20, 2023
91a79fe
revert example changes
Mar 20, 2023
f25c089
Merge remote-tracking branch 'upstream/master' into tune/tune-controller
Mar 21, 2023
734d1fe
Enable per default to run examples
krfricke Mar 21, 2023
628bad3
Add initial test
krfricke Mar 21, 2023
65dcff5
Fix checkpointing
krfricke Mar 21, 2023
d39600c
error handling
Mar 21, 2023
1d7df82
Fix reuse
Mar 21, 2023
1a64fb3
fix set
Mar 21, 2023
0822463
Fix controller init
Mar 21, 2023
914d74c
Merge remote-tracking branch 'upstream/master' into tune/tune-controller
Mar 21, 2023
6a93ab2
Merge remote-tracking branch 'upstream/master' into tune/tune-controller
Mar 21, 2023
d07f7b7
[no_early_kickoff] Add comments
Mar 21, 2023
58f5224
Merge remote-tracking branch 'upstream/master' into tune/tune-controller
Mar 22, 2023
a70c48c
Fix cleanup
Mar 22, 2023
199898d
Do not schedule new actor if already exists
Mar 22, 2023
8eab356
Safeguard
Mar 22, 2023
c11145f
chdir per default
Mar 22, 2023
6cfc830
Trigger on trial start callback
Mar 22, 2023
499df31
Merge remote-tracking branch 'upstream/master' into tune/tune-controller
Mar 22, 2023
00d1dcf
Fix dynamic resource update, hyperband
Mar 22, 2023
20bfa63
Catch actor creation failures
Mar 22, 2023
448dee9
Actor removal test
Mar 22, 2023
10ec386
No logging
Mar 22, 2023
24ef8f1
Only remove trials that are not stopping
Mar 22, 2023
241f0ca
failed actor IDs
Mar 22, 2023
6315ea3
Disable callbacks on actor cache clear
Mar 23, 2023
8a144d5
graceful trainable shutdown
Mar 23, 2023
b103d72
cleanup
krfricke Mar 23, 2023
59ec736
Cleanup on exception
krfricke Mar 23, 2023
8bf3712
Clear futures on actor stop
krfricke Mar 23, 2023
50e68f3
Fix tests
Mar 23, 2023
8f40fae
failed actor is not started
Mar 23, 2023
ce5a8a8
Merge remote-tracking branch 'upstream/master' into tune/tune-controller
Mar 23, 2023
a49c114
Merge remote-tracking branch 'upstream/master' into tune/tune-controller
Mar 23, 2023
3d7f6c7
[no_early_kickoff] Fix fixed resource manager cleanup
Mar 23, 2023
c86d670
[no_early_kickoff] fix export
Mar 23, 2023
5716d4e
[no_early_kickoff] Graceful trial stop
Mar 23, 2023
cdf0233
[no_early_kickoff] Add separate test jobs for new execution path
Mar 23, 2023
dd8e2c8
[no_early_kickoff] Progress reporter fixes
Mar 23, 2023
e65ac6c
[no_early_kickoff] Skip cluster searcher test on new execution backend
Mar 24, 2023
0bd1ae0
Custom stop future, removing actors
Mar 24, 2023
9bfe24e
Increase PBT perturbation interval
Mar 24, 2023
71da9a0
Only kill if still running
Mar 24, 2023
40f7574
Skip bohb warm start tests
Mar 24, 2023
4ad8bbb
Disable BOHB test, cleanup stopping actors after graceful stop
Mar 24, 2023
52d1fc4
[no_early_kickoff] python/ray/air/execution/_internal/actor_manager.py
krfricke Mar 24, 2023
16e839c
Do not eagerly run PAUSED trials
Mar 24, 2023
547c4b8
Do not eagerly run PAUSED trials
Mar 24, 2023
c9827ef
Remove PAUSED status update exception
Mar 24, 2023
83419db
Exclude bohb_example from testing
Mar 24, 2023
8b3ee45
Merge remote-tracking branch 'upstream/master' into tune/tune-controller
Mar 24, 2023
06b18f0
[no_early_kickoff] kick off tests again
Mar 24, 2023
7431ad1
[no_early_kickoff] kick off tests again
Mar 24, 2023
1e00cd7
Merge remote-tracking branch 'upstream/master' into tune/tune-controller
Mar 24, 2023
41dc7e6
Merge [no_early_kickoff]
Mar 24, 2023
8a45f72
used_resources_str [no_early_kickoff]
Mar 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,69 @@
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/tests/horovod/...
- bazel test --config=ci $(./ci/run/bazel_export_options) python/ray/tests/ray_lightning/...

### NEW EXECUTION PATH


- label: ":octopus: :sunny: New execution path: Tune tests and examples (small)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: small
parallelism: 3
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- ./ci/run/run_bazel_test_with_sharding.sh
--config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_tag_filters=-medium_instance,-py37,-soft_imports,-gpu_only,-rllib,-multinode,-exclude_new_execution
python/ray/tune/...

- label: ":octopus: :sunny: New execution path:Tune tests and examples (medium)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_tag_filters=medium_instance,-py37,-soft_imports,-gpu_only,-rllib,-multinode,-exclude_new_execution
python/ray/tune/...

- label: ":octopus: :brain: :sunny: New execution path: Tune tests and examples {using RLlib}"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED", "RAY_CI_RLLIB_AFFECTED"]
instance_size: large
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_tag_filters=-gpu_only,rllib,-exclude_new_execution python/ray/tune/...

- label: ":octopus: :sunny: New execution path: Tune tests and examples. Python 3.7"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: small
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 INSTALL_HOROVOD=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_tag_filters=py37,-client python/ray/tune/...

- label: ":octopus: :sunny: New execution path: ML library integrations tests and examples. Python 3.7"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: small
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 INSTALL_HOROVOD=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_env=TUNE_NEW_EXECUTION=1 python/ray/tests/xgboost/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_env=TUNE_NEW_EXECUTION=1 python/ray/tests/horovod/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --test_env=TUNE_NEW_EXECUTION=1 python/ray/tests/ray_lightning/...


# TODO(amogkam): Re-enable Ludwig tests after Ludwig supports Ray 2.0
#- label: ":octopus: Ludwig tests and examples. Python 3.7"
# conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
Expand Down
131 changes: 107 additions & 24 deletions python/ray/air/execution/_internal/actor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time
import uuid
from collections import defaultdict, Counter
from functools import lru_cache
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union

import ray
Expand All @@ -18,7 +17,6 @@
from ray.air.execution._internal.tracked_actor_task import TrackedActorTask
from ray.exceptions import RayTaskError, RayActorError


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -150,12 +148,16 @@ def __init__(self, resource_manager: ResourceManager):
self._live_actors_to_ray_actors_resources: Dict[
TrackedActor, Tuple[ray.actor.ActorHandle, AcquiredResources]
] = {}
self._live_resource_cache: Optional[Dict[str, Any]] = None

# This dict contains all actors that should be killed (after calling
# `remove_actor()`). Kill requests will be handled in wait().
self._live_actors_to_kill: Set[TrackedActor] = set()

def next(self, timeout: Optional[Union[int, float]] = None) -> None:
# Track failed actors
self._failed_actor_ids: Set[int] = set()

def next(self, timeout: Optional[Union[int, float]] = None) -> bool:
"""Yield control to event manager to await the next event and invoke callbacks.

Calling this method will wait for up to ``timeout`` seconds for the next
Expand All @@ -178,6 +180,9 @@ def next(self, timeout: Optional[Union[int, float]] = None) -> None:
Args:
timeout: Timeout in seconds to wait for next event.

Returns:
True if at least one event was processed.

"""
# First issue any pending forceful actor kills
actor_killed = self._try_kill_actor()
Expand All @@ -187,7 +192,7 @@ def next(self, timeout: Optional[Union[int, float]] = None) -> None:

# If an actor was killed, this was our event, and we return.
if actor_killed:
return
return True

# Otherwise, collect all futures and await the next.
resource_futures = self._resource_manager.get_resource_futures()
Expand All @@ -209,7 +214,7 @@ def next(self, timeout: Optional[Union[int, float]] = None) -> None:
ready, _ = ray.wait(all_futures, num_returns=1, timeout=timeout)

if not ready:
return
return False

[future] = ready

Expand All @@ -228,6 +233,7 @@ def next(self, timeout: Optional[Union[int, float]] = None) -> None:
)

self._try_start_actors()
return True

def _actor_start_resolved(self, tracked_actor: TrackedActor, future: ray.ObjectRef):
"""Callback to be invoked when actor started"""
Expand All @@ -245,6 +251,8 @@ def _actor_stop_resolved(self, tracked_actor: TrackedActor):

def _actor_start_failed(self, tracked_actor: TrackedActor, exception: Exception):
"""Callback to be invoked when actor start/stop failed"""
self._failed_actor_ids.add(tracked_actor.actor_id)

self._cleanup_actor(tracked_actor=tracked_actor)

if tracked_actor._on_error:
Expand All @@ -262,16 +270,19 @@ def _actor_task_failed(
tracked_actor = tracked_actor_task._tracked_actor

if isinstance(exception, RayActorError):
# Here the actual actor process died.
# First, clean up any references to the actor and its futures
self._failed_actor_ids.add(tracked_actor.actor_id)

# Clean up any references to the actor and its futures
self._cleanup_actor(tracked_actor=tracked_actor)

# Handle actor state callbacks
if tracked_actor._on_error:
tracked_actor._on_error(tracked_actor, exception)

# Then trigger actor task error callback
if tracked_actor_task._on_error:
tracked_actor_task._on_error(tracked_actor, exception)

elif isinstance(exception, RayTaskError):
# Otherwise only the task failed. Invoke callback
if tracked_actor_task._on_error:
Expand Down Expand Up @@ -385,7 +396,7 @@ def on_error(exception: Exception):
actor,
acquired_resources,
)
self.get_live_actors_resources.cache_clear()
self._live_resource_cache = None

self._enqueue_cached_actor_tasks(tracked_actor=tracked_actor)

Expand Down Expand Up @@ -422,27 +433,21 @@ def _try_kill_actor(self) -> bool:
# Hard kill if requested
ray.kill(ray_actor)

self._cleanup_actor_futures(tracked_actor)

self._actor_stop_resolved(tracked_actor)

return True

def _cleanup_actor(self, tracked_actor: TrackedActor):
# Remove all actor task futures
futures = self._tracked_actors_to_task_futures.pop(tracked_actor, [])
for future in futures:
self._actor_task_events.discard_future(future)

# Remove all actor state futures
futures = self._tracked_actors_to_state_futures.pop(tracked_actor, [])
for future in futures:
self._actor_state_events.discard_future(future)
self._cleanup_actor_futures(tracked_actor)

# Remove from tracked actors
(
ray_actor,
acquired_resources,
) = self._live_actors_to_ray_actors_resources.pop(tracked_actor)
self.get_live_actors_resources.cache_clear()
self._live_resource_cache = None

# Return resources
self._resource_manager.free_resources(acquired_resource=acquired_resources)
Expand Down Expand Up @@ -482,13 +487,16 @@ def num_actor_tasks(self):
"""Return number of pending tasks"""
return self._actor_task_events.num_futures

@lru_cache()
def get_live_actors_resources(self):
if self._live_resource_cache:
return self._live_resource_cache

counter = Counter()
for _, acq in self._live_actors_to_ray_actors_resources.values():
for bdl in acq.resource_request.bundles:
counter.update(bdl)
return dict(counter)
self._live_resource_cache = dict(counter)
return self._live_resource_cache

def add_actor(
self,
Expand Down Expand Up @@ -535,6 +543,7 @@ def remove_actor(
self,
tracked_actor: TrackedActor,
kill: bool = False,
stop_future: Optional[ray.ObjectRef] = None,
) -> None:
"""Remove a tracked actor.

Expand All @@ -546,7 +555,6 @@ def remove_actor(
If the actor has only been requested, but not started, yet, this will cancel
the actor request. This will not trigger any callback.


If ``kill=True``, this will use ``ray.kill()`` to forcefully terminate the
actor. Otherwise, graceful actor deconstruction will be scheduled after
all currently tracked futures are resolved.
Expand All @@ -555,23 +563,51 @@ def remove_actor(
tracked_actor: Tracked actor to be removed.
kill: If set, will forcefully terminate the actor instead of gracefully
scheduling termination.
stop_future: If set, use this future to track actor termination.
Otherwise, schedule a ``__ray_terminate__`` future.
"""
if tracked_actor in self._live_actors_to_ray_actors_resources:
if tracked_actor.actor_id in self._failed_actor_ids:
logger.debug(
f"Tracked actor already failed, no need to remove: {tracked_actor}"
)
elif tracked_actor in self._live_actors_to_ray_actors_resources:
# Ray actor is running.

if not kill:
# Schedule __ray_terminate__ future
ray_actor, _ = self._live_actors_to_ray_actors_resources[tracked_actor]

# Clear state futures here to avoid resolving __ray_ready__ futures
for future in list(
self._tracked_actors_to_state_futures[tracked_actor]
):
self._actor_state_events.discard_future(future)
self._tracked_actors_to_state_futures[tracked_actor].remove(future)

# If the __ray_ready__ future hasn't resolved yet, but we already
# scheduled the actor via Actor.remote(), we just want to stop
# it but not trigger any callbacks. This is in accordance with
# the contract defined in the docstring.
tracked_actor._on_start = None
tracked_actor._on_stop = None
tracked_actor._on_error = None

def on_actor_stop(*args, **kwargs):
self._actor_stop_resolved(tracked_actor=tracked_actor)

stop_future = ray_actor.__ray_terminate__.remote()
if stop_future:
# If the stop future was schedule via the actor manager,
# discard (track it as state future instead).
self._actor_task_events.discard_future(stop_future)
else:
stop_future = ray_actor.__ray_terminate__.remote()

self._actor_state_events.track_future(
future=stop_future,
on_result=on_actor_stop,
on_error=on_actor_stop,
)

self._tracked_actors_to_state_futures[tracked_actor].add(stop_future)

else:
Expand All @@ -581,6 +617,9 @@ def on_actor_stop(*args, **kwargs):
elif tracked_actor in self._pending_actors_to_attrs:
# Actor is pending, stop
_, _, resource_request = self._pending_actors_to_attrs.pop(tracked_actor)
self._resource_request_to_pending_actors[resource_request].remove(
tracked_actor
)
self._resource_manager.cancel_resource_request(
resource_request=resource_request
)
Expand All @@ -593,7 +632,13 @@ def is_actor_started(self, tracked_actor: TrackedActor) -> bool:
Args:
tracked_actor: Tracked actor object.
"""
return tracked_actor in self._live_actors_to_ray_actors_resources
return (
tracked_actor in self._live_actors_to_ray_actors_resources
and tracked_actor.actor_id not in self._failed_actor_ids
)

def is_actor_failed(self, tracked_actor: TrackedActor) -> bool:
return tracked_actor.actor_id in self._failed_actor_ids

def get_actor_resources(
self, tracked_actor: TrackedActor
Expand Down Expand Up @@ -675,6 +720,7 @@ def schedule_actor_task(
method_name=method_name,
args=args,
kwargs=kwargs,
_return_future=_return_future,
)
if _return_future:
return res[1]
Expand Down Expand Up @@ -794,3 +840,40 @@ def schedule_actor_tasks(
on_result=on_result,
on_error=on_error,
)

def clear_actor_task_futures(self, tracked_actor: TrackedActor):
"""Discard all actor task futures from a tracked actor."""
futures = self._tracked_actors_to_task_futures.pop(tracked_actor, [])
for future in futures:
self._actor_task_events.discard_future(future)

def _cleanup_actor_futures(self, tracked_actor: TrackedActor):
# Remove all actor task futures
self.clear_actor_task_futures(tracked_actor=tracked_actor)

# Remove all actor state futures
futures = self._tracked_actors_to_state_futures.pop(tracked_actor, [])
for future in futures:
self._actor_state_events.discard_future(future)

def cleanup(self):
for (
actor,
acquired_resources,
) in self._live_actors_to_ray_actors_resources.values():
ray.kill(actor)
self._resource_manager.free_resources(acquired_resources)

for (
resource_request,
pending_actors,
) in self._resource_request_to_pending_actors.items():
for i in range(len(pending_actors)):
self._resource_manager.cancel_resource_request(resource_request)

self._resource_manager.clear()

self.__init__(resource_manager=self._resource_manager)

def __del__(self):
self.cleanup()
8 changes: 6 additions & 2 deletions python/ray/air/execution/_internal/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,13 @@ def resolve_future(self, future: ray.ObjectRef):
try:
result = ray.get(future)
except Exception as e:
on_error(e)
if on_error:
on_error(e)
else:
raise e
else:
on_result(result)
if on_result:
on_result(result)

def wait(
self,
Expand Down
Loading