diff --git a/.github/workflows/platform-ci.yml b/.github/workflows/platform-ci.yml index f8be8e9912..ea65785c64 100644 --- a/.github/workflows/platform-ci.yml +++ b/.github/workflows/platform-ci.yml @@ -179,6 +179,8 @@ jobs: mv .coverage build/.coverage.test_ray_dag.file pytest $PYTEST_CONFIG --durations=0 --timeout=200 -v -s mars/deploy/oscar/tests/test_ray_dag_failover.py mv .coverage build/.coverage.test_ray_dag_failover.file + pytest $PYTEST_CONFIG --durations=0 --timeout=200 -v -s mars/deploy/oscar/tests/test_ray_dag_oscar.py -m ray + mv .coverage build/.coverage.test_ray_dag_oscar.file coverage combine build/ && coverage report fi diff --git a/mars/deploy/oscar/ray.py b/mars/deploy/oscar/ray.py index e17ac0ffdc..1e3edb28c7 100644 --- a/mars/deploy/oscar/ray.py +++ b/mars/deploy/oscar/ray.py @@ -37,6 +37,7 @@ AbstractClusterBackend, ) from ...services import NodeRole +from ...services.task.execution.api import ExecutionConfig from ...utils import lazy_import, retry_callable from ..utils import ( load_config, @@ -310,6 +311,7 @@ async def new_cluster( worker_num: int = 1, worker_cpu: int = 2, worker_mem: int = 2 * 1024**3, + backend: str = None, config: Union[str, Dict] = None, **kwargs, ): @@ -330,6 +332,7 @@ async def new_cluster( worker_num, worker_cpu, worker_mem, + backend, config, n_supervisor_process=n_supervisor_process, ) @@ -402,6 +405,7 @@ def __init__( worker_num: int = 1, worker_cpu: int = 2, worker_mem: int = 4 * 1024**3, + backend: str = None, config: Union[str, Dict] = None, n_supervisor_process: int = DEFAULT_SUPERVISOR_SUB_POOL_NUM, ): @@ -413,6 +417,7 @@ def __init__( self._worker_num = worker_num self._worker_cpu = worker_cpu self._worker_mem = worker_mem + self.backend = backend # load config file to dict. self._config = load_config(config, default_config_file=DEFAULT_CONFIG_FILE) self.supervisor_address = None @@ -434,6 +439,37 @@ async def start(self): logging.basicConfig( format=ray.ray_constants.LOGGER_FORMAT, level=logging.INFO ) + execution_config = ExecutionConfig.from_config( + self._config, backend=self.backend + ) + self.backend = execution_config.backend + if self.backend == "mars": + await self.start_oscar( + self._n_supervisor_process, + self._supervisor_mem, + self._worker_num, + self._worker_cpu, + self._worker_mem, + ) + elif self.backend == "ray": + execution_config.merge_from( + ExecutionConfig.from_params( + backend=self.backend, + n_worker=self._worker_num, + n_cpu=self._worker_num * self._worker_cpu, + mem_bytes=self._worker_mem, + ) + ) + assert self._n_supervisor_process == 0, self._n_supervisor_process + await self.start_oscar( + self._n_supervisor_process, self._supervisor_mem, 0, 0, 0 + ) + else: + raise ValueError(f"Unsupported backend type: {self.backend}.") + + async def start_oscar( + self, n_supervisor_process, supervisor_mem, worker_num, worker_cpu, worker_mem + ): logger.info("Start cluster with config %s", self._config) # init metrics to guarantee metrics use in driver metric_configs = self._config.get("metrics", {}) @@ -450,7 +486,7 @@ async def start(self): self._config.get("cluster", {}) .get("ray", {}) .get("supervisor", {}) - .get("sub_pool_num", self._n_supervisor_process) + .get("sub_pool_num", n_supervisor_process) ) from ...storage.ray import support_specify_owner @@ -466,11 +502,11 @@ async def start(self): self._config["cluster"]["lookup_address"] = self.supervisor_address address_to_resources[node_placement_to_address(self._cluster_name, 0)] = { "CPU": 1, - # "memory": self._supervisor_mem, + # "memory": supervisor_mem, } worker_addresses = [] if supervisor_standalone: - for worker_index in range(1, self._worker_num + 1): + for worker_index in range(1, worker_num + 1): worker_address = process_placement_to_address( self._cluster_name, worker_index, 0 ) @@ -479,11 +515,11 @@ async def start(self): self._cluster_name, worker_index ) address_to_resources[worker_node_address] = { - "CPU": self._worker_cpu, + "CPU": worker_cpu, # "memory": self._worker_mem, } else: - for worker_index in range(self._worker_num): + for worker_index in range(worker_num): worker_process_index = ( supervisor_sub_pool_num + 1 if worker_index == 0 else 0 ) @@ -495,7 +531,7 @@ async def start(self): self._cluster_name, worker_index ) address_to_resources[worker_node_address] = { - "CPU": self._worker_cpu, + "CPU": worker_cpu, # "memory": self._worker_mem, } mo.setup_cluster(address_to_resources) @@ -525,7 +561,7 @@ async def start(self): addr, { "numa-0": Resource( - num_cpus=self._worker_cpu, mem_bytes=self._worker_mem + num_cpus=worker_cpu, mem_bytes=self._worker_mem ) }, modules=get_third_party_modules_from_config( @@ -543,7 +579,7 @@ async def start(self): ) cluster_state_ref = self._cluster_backend.get_cluster_state_ref() await self._cluster_backend.get_cluster_state_ref().set_config( - self._worker_cpu, self._worker_mem, self._config + worker_cpu, self._worker_mem, self._config ) # start service await start_supervisor(self.supervisor_address, config=self._config) @@ -596,7 +632,9 @@ def __init__(self, cluster: RayCluster, session: AbstractSession): @classmethod async def create(cls, cluster: RayCluster) -> "RayClient": - session = await _new_session(cluster.supervisor_address, default=True) + session = await _new_session( + cluster.supervisor_address, default=True, backend=cluster.backend + ) client = RayClient(cluster, session) AbstractSession.default._ray_client = client return client diff --git a/mars/deploy/oscar/tests/test_ray_dag_oscar.py b/mars/deploy/oscar/tests/test_ray_dag_oscar.py new file mode 100644 index 0000000000..41d28c8c6c --- /dev/null +++ b/mars/deploy/oscar/tests/test_ray_dag_oscar.py @@ -0,0 +1,54 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import pytest + +from ....tests.core import require_ray +from ....utils import lazy_import +from ..ray import new_cluster, _load_config +from ..tests import test_local + +ray = lazy_import("ray") +CONFIG_FILE = os.path.join(os.path.dirname(__file__), "local_test_with_ray_config.yml") + + +@pytest.fixture +async def create_cluster(request): + param = getattr(request, "param", {}) + ray_config = _load_config(CONFIG_FILE) + ray_config.update(param.get("config", {})) + client = await new_cluster( + supervisor_mem=1 * 1024**3, + worker_num=2, + worker_cpu=2, + worker_mem=1 * 1024**3, + backend="ray", + config=ray_config, + ) + async with client: + yield client, param + + +@require_ray +@pytest.mark.asyncio +async def test_iterative_tiling(ray_start_regular_shared2, create_cluster): + await test_local.test_iterative_tiling(create_cluster) + + +@pytest.mark.asyncio +@require_ray +async def test_execute_describe(ray_start_regular_shared2, create_cluster): + await test_local.test_execute_describe(create_cluster) diff --git a/mars/deploy/oscar/tests/test_ray_scheduling.py b/mars/deploy/oscar/tests/test_ray_scheduling.py index 47f42b3454..141ff99964 100644 --- a/mars/deploy/oscar/tests/test_ray_scheduling.py +++ b/mars/deploy/oscar/tests/test_ray_scheduling.py @@ -235,7 +235,7 @@ async def test_auto_scale_in(ray_large_cluster): assert await autoscaler_ref.get_dynamic_worker_nums() == 2 -@pytest.mark.timeout(timeout=150) +@pytest.mark.timeout(timeout=500) @pytest.mark.parametrize("ray_large_cluster", [{"num_nodes": 4}], indirect=True) @require_ray @pytest.mark.asyncio diff --git a/mars/services/task/execution/ray/executor.py b/mars/services/task/execution/ray/executor.py index f49371575e..fb651ca0f6 100644 --- a/mars/services/task/execution/ray/executor.py +++ b/mars/services/task/execution/ray/executor.py @@ -356,7 +356,6 @@ def get_execution_config(self): # noinspection DuplicatedCode def destroy(self): - self._config = None self._task = None self._tile_context = None self._task_context = {} @@ -377,6 +376,7 @@ def destroy(self): self._cur_stage_first_output_object_ref_to_subtask = dict() self._execute_subtask_graph_aiotask = None self._cancelled = None + self._config = None @classmethod @alru_cache(cache_exceptions=False) @@ -518,7 +518,7 @@ def _on_execute_aiotask_done(_): max_retries=subtask_max_retries, ).remote( subtask.subtask_id, - serialize(subtask_chunk_graph), + serialize(subtask_chunk_graph, context={"serializer": "ray"}), subtask_output_meta_keys, is_mapper, *input_object_refs, diff --git a/mars/services/task/execution/ray/tests/test_ray_execution_backend.py b/mars/services/task/execution/ray/tests/test_ray_execution_backend.py index a723da00c0..a4a39c9268 100644 --- a/mars/services/task/execution/ray/tests/test_ray_execution_backend.py +++ b/mars/services/task/execution/ray/tests/test_ray_execution_backend.py @@ -124,6 +124,7 @@ async def test_ray_executor_create( assert mock_task_state_actor_create.call_count == 1 +@require_ray @pytest.mark.asyncio async def test_ray_executor_destroy(): task = Task("mock_task", "mock_session") @@ -150,6 +151,7 @@ async def test_ray_executor_destroy(): assert await executor.get_progress() == 1.0 +@require_ray def test_ray_execute_subtask_basic(): raw = np.ones((10, 10)) raw_expect = raw + 1 diff --git a/mars/services/task/execution/utils.py b/mars/services/task/execution/utils.py index ade928df81..648c0953ae 100644 --- a/mars/services/task/execution/utils.py +++ b/mars/services/task/execution/utils.py @@ -24,7 +24,7 @@ def get_band_resources_from_config( n_worker: int = config["n_worker"] n_cpu: int = config["n_cpu"] mem_bytes: int = config["mem_bytes"] - cuda_devices: List[List[int]] = config["cuda_devices"] + cuda_devices: List[List[int]] = config.get("cuda_devices") bands_to_resource = [] worker_cpus = n_cpu // n_worker