Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ray] Reconstruct worker #2413

Merged
merged 14 commits into from
Sep 14, 2021
3 changes: 3 additions & 0 deletions mars/deploy/kubernetes/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ async def request_worker(
async def release_worker(self, address: str):
raise NotImplementedError

async def reconstruct_worker(self, address: str):
raise NotImplementedError


class K8SServiceMixin:
@staticmethod
Expand Down
80 changes: 69 additions & 11 deletions mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
node_placement_to_address,
process_address_to_placement
)
from ...oscar.backends.ray.pool import RayPoolState
from ...oscar.errors import ReconstructWorkerError
from ...services.cluster.backends.base import register_cluster_backend, AbstractClusterBackend
from ...services import NodeRole
from ...utils import merge_dict, flatten_dict_to_nested_dict
Expand Down Expand Up @@ -98,6 +100,9 @@ async def request_worker(
async def release_worker(self, address: str):
return await self._cluster_state_ref.release_worker(address)

async def reconstruct_worker(self, address: str):
return await self._cluster_state_ref.reconstruct_worker(address)

def get_cluster_state_ref(self):
return self._cluster_state_ref

Expand All @@ -109,6 +114,8 @@ def __init__(self):
self._pg_counter = itertools.count()
self._worker_count = 0
self._dynamic_created_workers = {}
self._releasing_tasks = {}
self._reconstructing_tasks = {}

async def __post_create__(self):
self._pg_name, _, _ = process_address_to_placement(self.address)
Expand Down Expand Up @@ -162,22 +169,72 @@ async def new_worker(self, worker_address, band_to_slot=None):
worker_address, time.time() - start_time)
start_time = time.time()
await start_worker(worker_address, self.address, band_to_slot, config=self._config)
await worker_pool.mark_service_ready.remote()
logger.info('Start services on worker %s succeeds in %.4f seconds.',
worker_address, time.time() - start_time)
return worker_pool

async def release_worker(self, address: str):
await stop_worker(address, self._config)
pool, pg = self._dynamic_created_workers.pop(address)
await pool.actor_pool.remote('stop')
if 'COV_CORE_SOURCE' in os.environ: # pragma: no cover
try:
# must clean up first, or coverage info lost
await pool.cleanup.remote()
except: # noqa: E722 # nosec # pylint: disable=bare-except
pass
ray.kill(pool)
ray.util.remove_placement_group(pg)
task = self._reconstructing_tasks.get(address)
if task is not None:
task.cancel()

task = self._releasing_tasks.get(address)
if task is not None:
logger.info("Waiting for releasing worker %s", address)
return await task

async def _release_worker():
await stop_worker(address, self._config)
pool, pg = self._dynamic_created_workers.pop(address)
await pool.actor_pool.remote('stop')
if 'COV_CORE_SOURCE' in os.environ: # pragma: no cover
try:
# must clean up first, or coverage info lost
await pool.cleanup.remote()
except: # noqa: E722 # nosec # pylint: disable=bare-except
pass
ray.kill(pool)
ray.util.remove_placement_group(pg)

task = asyncio.create_task(_release_worker())
task.add_done_callback(lambda _: self._releasing_tasks.pop(address, None))
self._releasing_tasks[address] = task
return await task

async def reconstruct_worker(self, address: str):
task = self._releasing_tasks.get(address)
if task is not None:
raise ReconstructWorkerError(f"Can't reconstruct releasing worker {address}")

task = self._reconstructing_tasks.get(address)
if task is not None:
logger.info("Waiting for reconstruct worker %s", address)
return await task

async def _reconstruct_worker():
logger.info("Reconstruct worker %s", address)
actor = ray.get_actor(address)
state = await actor.state.remote()
if state == RayPoolState.SERVICE_READY:
logger.info("Worker %s is service ready.")
return

if state == RayPoolState.INIT:
await actor.start.remote()
else:
assert state == RayPoolState.POOL_READY

start_time = time.time()
await start_worker(address, self.address, self._band_to_slot, config=self._config)
await actor.mark_service_ready.remote()
logger.info('Start services on worker %s succeeds in %.4f seconds.',
address, time.time() - start_time)

task = asyncio.create_task(_reconstruct_worker())
task.add_done_callback(lambda _: self._reconstructing_tasks.pop(address, None))
self._reconstructing_tasks[address] = task
return await task


async def new_cluster(cluster_name: str,
Expand Down Expand Up @@ -294,6 +351,7 @@ async def start(self):
# start service
await start_supervisor(self.supervisor_address, config=self._config)
logger.info('Start services on supervisor %s succeeds.', self.supervisor_address)
await self._supervisor_pool.mark_service_ready.remote()

worker_pools = await asyncio.gather(
*[self._cluster_backend.new_worker(addr) for addr in worker_addresses])
Expand Down
2 changes: 0 additions & 2 deletions mars/deploy/oscar/tests/fault_injection_config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"@inherits": '@default'
third_party_modules:
- mars.services.tests.fault_injection_patch
subtask:
subtask_processor_cls: mars.services.subtask.worker.tests.FaultInjectionSubtaskProcessor
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"@inherits": '@default'
third_party_modules:
- mars.services.tests.fault_injection_patch
subtask:
subtask_processor_cls: mars.services.subtask.worker.tests.FaultInjectionSubtaskProcessor
scheduling:
subtask_max_retries: 2
storage:
# shared-memory38 may lose object if the process crash after put success.
backends: [plasma]
53 changes: 51 additions & 2 deletions mars/deploy/oscar/tests/test_fault_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import os
import pytest
import numpy as np
import pandas as pd

from .... import dataframe as md
from .... import tensor as mt
from ....remote import spawn
from ....oscar.errors import ServerClosed
from ....remote import spawn
from ....services.tests.fault_injection_manager import (
AbstractFaultInjectionManager,
ExtraConfigKey,
Expand All @@ -27,6 +29,7 @@
FaultPosition,
FaultType,
)
from ....tensor.base.psrs import PSRSConcatPivot
from ..local import new_cluster
from ..session import get_default_async_session

Expand All @@ -48,15 +51,21 @@ async def fault_cluster(request):
yield client


async def create_fault_injection_manager(session_id, address, fault_count, fault_type):
async def create_fault_injection_manager(session_id, address, fault_count, fault_type, fault_op_types=None):
class FaultInjectionManager(AbstractFaultInjectionManager):
def __init__(self):
self._fault_count = fault_count

def set_fault_count(self, count):
self._fault_count = count

def get_fault_count(self):
return self._fault_count

def get_fault(self, pos: FaultPosition, ctx=None) -> FaultType:
# Check op types if fault_op_types provided.
if fault_op_types and type(ctx.get('operand')) not in fault_op_types:
return FaultType.NoFault
if self._fault_count.get(pos, 0) > 0:
self._fault_count[pos] -= 1
return fault_type
Expand Down Expand Up @@ -138,6 +147,46 @@ async def test_rerun_subtask(fault_cluster, fault_config):
await info


@pytest.mark.parametrize('fault_cluster',
[{'config': RERUN_SUBTASK_CONFIG_FILE}],
indirect=True)
@pytest.mark.parametrize('fault_config',
[[FaultType.Exception, {FaultPosition.ON_EXECUTE_OPERAND: 1},
[PSRSConcatPivot]],
[FaultType.ProcessExit, {FaultPosition.ON_EXECUTE_OPERAND: 1},
[PSRSConcatPivot]]])
@pytest.mark.asyncio
async def test_rerun_subtask_describe(fault_cluster, fault_config):
fault_type, fault_count, fault_op_types = fault_config
name = await create_fault_injection_manager(
session_id=fault_cluster.session.session_id,
address=fault_cluster.session.address,
fault_count=fault_count,
fault_type=fault_type,
fault_op_types=fault_op_types)
extra_config = {ExtraConfigKey.FAULT_INJECTION_MANAGER_NAME: name}
session = get_default_async_session()

s = np.random.RandomState(0)
raw = pd.DataFrame(s.rand(100, 4), columns=list('abcd'))
df = md.DataFrame(raw, chunk_size=30)

r = df.describe()
info = await session.execute(r, extra_config=extra_config)
await info
assert info.result() is None
assert info.exception() is None
assert info.progress() == 1
res = await session.fetch(r)
pd.testing.assert_frame_equal(res, raw.describe())

fault_injection_manager = await session.get_remote_object(
fault_cluster.session.session_id, name)
remain_fault_count = await fault_injection_manager.get_fault_count()
for key in fault_count:
assert remain_fault_count[key] == 0


@pytest.mark.parametrize('fault_cluster',
[{'config': RERUN_SUBTASK_CONFIG_FILE}],
indirect=True)
Expand Down
106 changes: 103 additions & 3 deletions mars/deploy/oscar/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
from .... import oscar as mo
from .... import tensor as mt
from .... import dataframe as md
from ....oscar.backends.ray.utils import process_address_to_placement, process_placement_to_address, kill_and_wait
from ....oscar.errors import ReconstructWorkerError
from ....serialization.ray import register_ray_serializers
from ....services.cluster import ClusterAPI
from ....services.scheduling.supervisor.autoscale import AutoscalerActor
from ....tests.core import require_ray
from ....tests.core import require_ray, mock
from ....utils import lazy_import
from ..ray import new_cluster, _load_config
from ..ray import new_cluster, _load_config, ClusterStateActor
from ..session import get_default_session, new_session
from ..tests import test_local
from .modules.utils import ( # noqa: F401; pylint: disable=unused-variable
Expand Down Expand Up @@ -246,8 +249,105 @@ async def test_request_worker(ray_large_cluster):
workers = await asyncio.gather(*[cluster_state_ref.request_worker(timeout=5) for _ in range(2)])
assert all(worker is not None for worker in workers)
assert not await cluster_state_ref.request_worker(timeout=5)
await asyncio.gather(*[cluster_state_ref.release_worker(worker) for worker in workers])
release_workers = [cluster_state_ref.release_worker(worker) for worker in workers]
# Duplicate release workers requests should be handled.
release_workers.extend([cluster_state_ref.release_worker(worker) for worker in workers])
await asyncio.gather(*release_workers)
assert await cluster_state_ref.request_worker(timeout=5)
cluster_state_ref.reconstruct_worker()


@pytest.mark.parametrize('ray_large_cluster', [{'num_nodes': 3, 'num_cpus': 1}], indirect=True)
@require_ray
@pytest.mark.asyncio
async def test_reconstruct_worker(ray_large_cluster):
worker_cpu, worker_mem = 1, 100 * 1024 ** 2
client = await new_cluster('test_cluster', worker_num=0, worker_cpu=worker_cpu, worker_mem=worker_mem)
async with client:
cluster_api = await ClusterAPI.create(client._cluster.supervisor_address)
worker = await cluster_api.request_worker(timeout=5)
pg_name, bundle_index, process_index = process_address_to_placement(worker)
worker_sub_pool = process_placement_to_address(pg_name, bundle_index, process_index + 1)

worker_actor = ray.get_actor(worker)
worker_pid = await worker_actor.getpid.remote()
# the worker pool actor should be destroyed even we get actor.
worker_sub_pool_actor = ray.get_actor(worker_sub_pool)
worker_sub_pool_pid = await worker_sub_pool_actor.getpid.remote()

# kill worker main pool
await kill_and_wait(ray.get_actor(worker))

# duplicated reconstruct worker request can be handled.
await asyncio.gather(cluster_api.reconstruct_worker(worker),
cluster_api.reconstruct_worker(worker))
worker_actor = ray.get_actor(worker)
new_worker_pid = await worker_actor.getpid.remote()
worker_sub_pool_actor = ray.get_actor(worker_sub_pool)
new_worker_sub_pool_pid = await worker_sub_pool_actor.getpid.remote()
assert new_worker_pid != worker_pid
assert new_worker_sub_pool_pid != worker_sub_pool_pid

# the compute should be ok after the worker is reconstructed.
raw = np.random.RandomState(0).rand(10, 5)
a = mt.tensor(raw, chunk_size=5).sum(axis=1)
b = a.execute(show_progress=False)
assert b is a
result = a.fetch()
np.testing.assert_array_equal(result, raw.sum(axis=1))


@require_ray
@pytest.mark.asyncio
@mock.patch('mars.deploy.oscar.ray.stop_worker')
async def test_reconstruct_worker_during_releasing_worker(fake_stop_worker):
stop_worker = asyncio.Event()
lock = asyncio.Event()

async def _stop_worker(*args):
stop_worker.set()
await lock.wait()

fake_stop_worker.side_effect = _stop_worker
cluster_state = ClusterStateActor()
release_task = asyncio.create_task(cluster_state.release_worker('abc'))
await stop_worker.wait()
with pytest.raises(ReconstructWorkerError, match='releasing'):
await cluster_state.reconstruct_worker('abc')
release_task.cancel()


@require_ray
@pytest.mark.asyncio
@mock.patch('mars.deploy.oscar.ray.stop_worker')
@mock.patch('ray.get_actor')
async def test_release_worker_during_reconstructing_worker(fake_get_actor, fake_stop_worker):
get_actor = asyncio.Event()
lock = asyncio.Event()

class FakeActorMethod:
async def remote(self):
get_actor.set()
await lock.wait()

class FakeActor:
state = FakeActorMethod()

def _get_actor(*args):
return FakeActor

async def _stop_worker(*args):
await lock.wait()

fake_get_actor.side_effect = _get_actor
fake_stop_worker.side_effect = _stop_worker
cluster_state = ClusterStateActor()
reconstruct_task = asyncio.create_task(cluster_state.reconstruct_worker('abc'))
await get_actor.wait()
release_task = asyncio.create_task(cluster_state.release_worker('abc'))
with pytest.raises(asyncio.CancelledError):
await reconstruct_task
release_task.cancel()


@pytest.mark.parametrize('ray_large_cluster', [{'num_nodes': 4, 'num_cpus': 2}], indirect=True)
Expand Down
Loading