Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ray] Support scheduling ray tasks in Ray oscar deploy backend #3165

Merged
merged 16 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ jobs:
mv .coverage build/.coverage.test_ray_dag.file
pytest $PYTEST_CONFIG --durations=0 --timeout=200 -v -s mars/deploy/oscar/tests/test_ray_dag_failover.py
mv .coverage build/.coverage.test_ray_dag_failover.file
pytest $PYTEST_CONFIG --durations=0 --timeout=200 -v -s mars/deploy/oscar/tests/test_ray_dag_oscar.py -m ray
mv .coverage build/.coverage.test_ray_dag_oscar.file

coverage combine build/ && coverage report
fi
Expand Down
54 changes: 45 additions & 9 deletions mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
AbstractClusterBackend,
)
from ...services import NodeRole
from ...services.task.execution.api import ExecutionConfig
from ...utils import lazy_import, retry_callable
from ..utils import (
load_config,
Expand Down Expand Up @@ -310,6 +311,7 @@ async def new_cluster(
worker_num: int = 1,
worker_cpu: int = 2,
worker_mem: int = 2 * 1024**3,
backend: str = None,
config: Union[str, Dict] = None,
**kwargs,
):
Expand All @@ -330,6 +332,7 @@ async def new_cluster(
worker_num,
worker_cpu,
worker_mem,
backend,
config,
n_supervisor_process=n_supervisor_process,
)
Expand Down Expand Up @@ -402,6 +405,7 @@ def __init__(
worker_num: int = 1,
worker_cpu: int = 2,
worker_mem: int = 4 * 1024**3,
backend: str = None,
config: Union[str, Dict] = None,
n_supervisor_process: int = DEFAULT_SUPERVISOR_SUB_POOL_NUM,
):
Expand All @@ -413,6 +417,7 @@ def __init__(
self._worker_num = worker_num
self._worker_cpu = worker_cpu
self._worker_mem = worker_mem
self.backend = backend
# load config file to dict.
self._config = load_config(config, default_config_file=DEFAULT_CONFIG_FILE)
self.supervisor_address = None
Expand All @@ -434,6 +439,35 @@ async def start(self):
logging.basicConfig(
format=ray.ray_constants.LOGGER_FORMAT, level=logging.INFO
)
execution_config = ExecutionConfig.from_config(
self._config, backend=self.backend
)
self.backend = execution_config.backend
if self.backend == "mars":
await self.start_oscar(
self._n_supervisor_process,
self._supervisor_mem,
self._worker_num,
self._worker_cpu,
self._worker_mem,
)
else:
execution_config.merge_from(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's better to specify the backend explicitly like ray or something others instead of else statement.
And we can raise an exception if user passes an unsupported backend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, we should validate the passed backend type, just fixed it.

ExecutionConfig.from_params(
backend=self.backend,
n_worker=self._worker_num,
n_cpu=self._worker_num * self._worker_cpu,
mem_bytes=self._worker_mem,
)
)
assert self._n_supervisor_process == 0, self._n_supervisor_process
await self.start_oscar(
self._n_supervisor_process, self._supervisor_mem, 0, 0, 0
)

async def start_oscar(
self, n_supervisor_process, supervisor_mem, worker_num, worker_cpu, worker_mem
):
logger.info("Start cluster with config %s", self._config)
# init metrics to guarantee metrics use in driver
metric_configs = self._config.get("metrics", {})
Expand All @@ -450,7 +484,7 @@ async def start(self):
self._config.get("cluster", {})
.get("ray", {})
.get("supervisor", {})
.get("sub_pool_num", self._n_supervisor_process)
.get("sub_pool_num", n_supervisor_process)
)
from ...storage.ray import support_specify_owner

Expand All @@ -466,11 +500,11 @@ async def start(self):
self._config["cluster"]["lookup_address"] = self.supervisor_address
address_to_resources[node_placement_to_address(self._cluster_name, 0)] = {
"CPU": 1,
# "memory": self._supervisor_mem,
# "memory": supervisor_mem,
}
worker_addresses = []
if supervisor_standalone:
for worker_index in range(1, self._worker_num + 1):
for worker_index in range(1, worker_num + 1):
worker_address = process_placement_to_address(
self._cluster_name, worker_index, 0
)
Expand All @@ -479,11 +513,11 @@ async def start(self):
self._cluster_name, worker_index
)
address_to_resources[worker_node_address] = {
"CPU": self._worker_cpu,
"CPU": worker_cpu,
# "memory": self._worker_mem,
}
else:
for worker_index in range(self._worker_num):
for worker_index in range(worker_num):
worker_process_index = (
supervisor_sub_pool_num + 1 if worker_index == 0 else 0
)
Expand All @@ -495,7 +529,7 @@ async def start(self):
self._cluster_name, worker_index
)
address_to_resources[worker_node_address] = {
"CPU": self._worker_cpu,
"CPU": worker_cpu,
# "memory": self._worker_mem,
}
mo.setup_cluster(address_to_resources)
Expand Down Expand Up @@ -525,7 +559,7 @@ async def start(self):
addr,
{
"numa-0": Resource(
num_cpus=self._worker_cpu, mem_bytes=self._worker_mem
num_cpus=worker_cpu, mem_bytes=self._worker_mem
)
},
modules=get_third_party_modules_from_config(
Expand All @@ -543,7 +577,7 @@ async def start(self):
)
cluster_state_ref = self._cluster_backend.get_cluster_state_ref()
await self._cluster_backend.get_cluster_state_ref().set_config(
self._worker_cpu, self._worker_mem, self._config
worker_cpu, self._worker_mem, self._config
)
# start service
await start_supervisor(self.supervisor_address, config=self._config)
Expand Down Expand Up @@ -596,7 +630,9 @@ def __init__(self, cluster: RayCluster, session: AbstractSession):

@classmethod
async def create(cls, cluster: RayCluster) -> "RayClient":
session = await _new_session(cluster.supervisor_address, default=True)
session = await _new_session(
cluster.supervisor_address, default=True, backend=cluster.backend
)
client = RayClient(cluster, session)
AbstractSession.default._ray_client = client
return client
Expand Down
54 changes: 54 additions & 0 deletions mars/deploy/oscar/tests/test_ray_dag_oscar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import pytest

from ....tests.core import require_ray
from ....utils import lazy_import
from ..ray import new_cluster, _load_config
from ..tests import test_local

ray = lazy_import("ray")
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "local_test_with_ray_config.yml")


@pytest.fixture
async def create_cluster(request):
param = getattr(request, "param", {})
ray_config = _load_config(CONFIG_FILE)
ray_config.update(param.get("config", {}))
client = await new_cluster(
supervisor_mem=1 * 1024**3,
worker_num=2,
worker_cpu=2,
worker_mem=1 * 1024**3,
backend="ray",
config=ray_config,
)
async with client:
yield client, param


@require_ray
@pytest.mark.asyncio
async def test_iterative_tiling(ray_start_regular_shared2, create_cluster):
await test_local.test_iterative_tiling(create_cluster)


@pytest.mark.asyncio
@require_ray
async def test_execute_describe(ray_start_regular_shared2, create_cluster):
await test_local.test_execute_describe(create_cluster)
2 changes: 1 addition & 1 deletion mars/deploy/oscar/tests/test_ray_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ async def test_auto_scale_in(ray_large_cluster):
assert await autoscaler_ref.get_dynamic_worker_nums() == 2


@pytest.mark.timeout(timeout=150)
@pytest.mark.timeout(timeout=500)
@pytest.mark.parametrize("ray_large_cluster", [{"num_nodes": 4}], indirect=True)
@require_ray
@pytest.mark.asyncio
Expand Down
4 changes: 2 additions & 2 deletions mars/services/task/execution/ray/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ def get_execution_config(self):

# noinspection DuplicatedCode
def destroy(self):
self._config = None
self._task = None
self._tile_context = None
self._task_context = {}
Expand All @@ -377,6 +376,7 @@ def destroy(self):
self._cur_stage_first_output_object_ref_to_subtask = dict()
self._execute_subtask_graph_aiotask = None
self._cancelled = None
self._config = None

@classmethod
@alru_cache(cache_exceptions=False)
Expand Down Expand Up @@ -518,7 +518,7 @@ def _on_execute_aiotask_done(_):
max_retries=subtask_max_retries,
).remote(
subtask.subtask_id,
serialize(subtask_chunk_graph),
serialize(subtask_chunk_graph, context={"serializer": "ray"}),
subtask_output_meta_keys,
is_mapper,
*input_object_refs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async def test_ray_executor_create(
assert mock_task_state_actor_create.call_count == 1


@require_ray
@pytest.mark.asyncio
async def test_ray_executor_destroy():
task = Task("mock_task", "mock_session")
Expand All @@ -150,6 +151,7 @@ async def test_ray_executor_destroy():
assert await executor.get_progress() == 1.0


@require_ray
def test_ray_execute_subtask_basic():
raw = np.ones((10, 10))
raw_expect = raw + 1
Expand Down
2 changes: 1 addition & 1 deletion mars/services/task/execution/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_band_resources_from_config(
n_worker: int = config["n_worker"]
n_cpu: int = config["n_cpu"]
mem_bytes: int = config["mem_bytes"]
cuda_devices: List[List[int]] = config["cuda_devices"]
cuda_devices: List[List[int]] = config.get("cuda_devices")

bands_to_resource = []
worker_cpus = n_cpu // n_worker
Expand Down