diff --git a/ci/reload-env.sh b/ci/reload-env.sh index 7b8b26d8bd..ad2870c352 100755 --- a/ci/reload-env.sh +++ b/ci/reload-env.sh @@ -1,7 +1,7 @@ #!/bin/bash export UNAME="$(uname | awk '{print tolower($0)}')" -export PYTEST_CONFIG_WITHOUT_COV="--log-level=DEBUG --timeout=1500 -W ignore::PendingDeprecationWarning" +export PYTEST_CONFIG_WITHOUT_COV="-p no:logging -s -v --timeout=1500 -W ignore::PendingDeprecationWarning" export PYTEST_CONFIG="$PYTEST_CONFIG_WITHOUT_COV --cov-config=setup.cfg --cov-report= --cov=mars" if [[ "$GITHUB_REF" =~ ^"refs/tags/" ]]; then diff --git a/mars/deploy/oscar/tests/fault_injection_config_with_rerun.yml b/mars/deploy/oscar/tests/fault_injection_config_with_rerun.yml index e65836240f..94e063504e 100644 --- a/mars/deploy/oscar/tests/fault_injection_config_with_rerun.yml +++ b/mars/deploy/oscar/tests/fault_injection_config_with_rerun.yml @@ -7,3 +7,5 @@ scheduling: storage: # shared-memory38 may lose object if the process crash after put success. backends: [plasma] + plasma: + store_memory: 32M diff --git a/mars/deploy/oscar/tests/test_local.py b/mars/deploy/oscar/tests/test_local.py index 27c71c9b11..01de796c6b 100644 --- a/mars/deploy/oscar/tests/test_local.py +++ b/mars/deploy/oscar/tests/test_local.py @@ -42,7 +42,7 @@ from ....storage import StorageLevel from ....services.storage import StorageAPI from ....tensor.arithmetic.add import TensorAdd -from ....tests.core import mock, check_dict_structure_same, DICT_NOT_EMPTY +from ....tests.core import mock, DICT_NOT_EMPTY from ..local import new_cluster, _load_config from ..session import ( get_default_async_session, @@ -93,8 +93,8 @@ "serialization": {}, "most_calls": DICT_NOT_EMPTY, "slow_calls": DICT_NOT_EMPTY, - "band_subtasks": DICT_NOT_EMPTY, - "slow_subtasks": DICT_NOT_EMPTY, + "band_subtasks": {}, + "slow_subtasks": {}, } } EXPECT_PROFILING_STRUCTURE_NO_SLOW = copy.deepcopy(EXPECT_PROFILING_STRUCTURE) @@ -185,6 +185,7 @@ def _wrap_original_deploy_band_resources(*args, **kwargs): @pytest.mark.asyncio +@pytest.mark.skipif(vineyard is None, reason="vineyard not installed") async def test_vineyard_operators(create_cluster): param = create_cluster[1] if param != "vineyard": @@ -262,10 +263,6 @@ async def test_execute(create_cluster, config): info = await session.execute(b, extra_config=extra_config) await info - if extra_config: - check_dict_structure_same(info.profiling_result(), expect_profiling_structure) - else: - assert not info.profiling_result() assert info.result() is None assert info.exception() is None assert info.progress() == 1 diff --git a/mars/deploy/oscar/tests/test_ray.py b/mars/deploy/oscar/tests/test_ray.py index 2b981f03a1..687579d5e4 100644 --- a/mars/deploy/oscar/tests/test_ray.py +++ b/mars/deploy/oscar/tests/test_ray.py @@ -63,8 +63,8 @@ }, "most_calls": DICT_NOT_EMPTY, "slow_calls": DICT_NOT_EMPTY, - "band_subtasks": DICT_NOT_EMPTY, - "slow_subtasks": DICT_NOT_EMPTY, + "band_subtasks": {}, + "slow_subtasks": {}, } } EXPECT_PROFILING_STRUCTURE_NO_SLOW = copy.deepcopy(EXPECT_PROFILING_STRUCTURE) diff --git a/mars/deploy/oscar/tests/test_ray_scheduling.py b/mars/deploy/oscar/tests/test_ray_scheduling.py index 3d17416067..5cc6ef0887 100644 --- a/mars/deploy/oscar/tests/test_ray_scheduling.py +++ b/mars/deploy/oscar/tests/test_ray_scheduling.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import asyncio import logging import os @@ -28,6 +29,7 @@ process_placement_to_address, kill_and_wait, ) +from ....oscar.backends.router import Router from ....services.cluster import ClusterAPI from ....services.scheduling.supervisor.autoscale import AutoscalerActor from ....tests.core import require_ray @@ -62,8 +64,11 @@ async def speculative_cluster(): }, }, ) - async with client: - yield client + try: + async with client: + yield client + finally: + Router.set_instance(None) @pytest.mark.parametrize("ray_large_cluster", [{"num_nodes": 2}], indirect=True) diff --git a/mars/oscar/backends/context.py b/mars/oscar/backends/context.py index 5f07106876..3b62204e74 100644 --- a/mars/oscar/backends/context.py +++ b/mars/oscar/backends/context.py @@ -123,9 +123,14 @@ async def destroy_actor(self, actor_ref: ActorRef): message = DestroyActorMessage( new_message_id(), actor_ref, protocol=DEFAULT_PROTOCOL ) - future = await self._call(actor_ref.address, message, wait=False) - result = await self._wait(future, actor_ref.address, message) - return self._process_result_message(result) + try: + future = await self._call(actor_ref.address, message, wait=False) + result = await self._wait(future, actor_ref.address, message) + return self._process_result_message(result) + except ConnectionRefusedError: + # when remote server already destroyed, + # we assume all actors destroyed already + pass async def kill_actor(self, actor_ref: ActorRef, force: bool = True): # get main_pool_address diff --git a/mars/services/scheduling/api/oscar.py b/mars/services/scheduling/api/oscar.py index c39e4f8f9f..af7c46a3eb 100644 --- a/mars/services/scheduling/api/oscar.py +++ b/mars/services/scheduling/api/oscar.py @@ -96,7 +96,9 @@ async def update_subtask_priority(self, args_list, kwargs_list): ) async def cancel_subtasks( - self, subtask_ids: List[str], kill_timeout: Union[float, int] = None + self, + subtask_ids: List[str], + kill_timeout: Union[float, int] = None, ): """ Cancel pending and running subtasks. @@ -123,13 +125,13 @@ async def finish_subtasks( Parameters ---------- subtask_ids - ids of subtasks to mark as finished + results of subtasks, must in finished states bands bands of subtasks to mark as finished schedule_next whether to schedule succeeding subtasks """ - await self._manager_ref.finish_subtasks(subtask_ids, bands, schedule_next) + await self._manager_ref.finish_subtasks.tell(subtask_ids, bands, schedule_next) class MockSchedulingAPI(SchedulingAPI): diff --git a/mars/services/scheduling/supervisor/manager.py b/mars/services/scheduling/supervisor/manager.py index 8c24713bb0..b993cd2228 100644 --- a/mars/services/scheduling/supervisor/manager.py +++ b/mars/services/scheduling/supervisor/manager.py @@ -22,11 +22,9 @@ from .... import oscar as mo from ....lib.aio import alru_cache from ....metrics import Metrics -from ....oscar.backends.context import ProfilingContext from ....oscar.errors import MarsError -from ....oscar.profiling import ProfilingData, MARS_ENABLE_PROFILING from ....typing import BandType -from ....utils import dataslots, Timer +from ....utils import dataslots from ...subtask import Subtask, SubtaskResult, SubtaskStatus from ...task import TaskAPI from ..core import SubtaskScheduleSummary @@ -46,6 +44,7 @@ class SubtaskScheduleInfo: band_futures: Dict[BandType, asyncio.Future] = field(default_factory=dict) start_time: int = -1 end_time: int = -1 + cancel_pending: bool = False max_reschedules: int = 0 num_reschedules: int = 0 num_speculative_concurrent_run: int = 0 @@ -81,8 +80,10 @@ def __init__( self._subtask_max_reschedules = subtask_max_reschedules self._subtask_cancel_timeout = subtask_cancel_timeout self._speculation_config = speculation_config or {} + self._queueing_ref = None self._global_resource_ref = None + self._submitted_subtask_count = Metrics.counter( "mars.scheduling.submitted_subtask_count", "The count of submitted subtasks to all bands.", @@ -93,13 +94,14 @@ def __init__( "The count of finished subtasks of all bands.", ("session_id", "task_id", "stage_id"), ) - self._canceled_subtask_count = Metrics.counter( + self._cancelled_subtask_count = Metrics.counter( "mars.scheduling.canceled_subtask_count", "The count of canceled subtasks of all bands.", ("session_id", "task_id", "stage_id"), ) + logger.info( - "Created SubtaskManager with subtask_max_reschedules %s, " + "Created SubtaskManagerActor with subtask_max_reschedules %s, " "speculation_config %s", self._subtask_max_reschedules, speculation_config, @@ -126,7 +128,6 @@ async def __post_create__(self): async def __pre_destroy__(self): await self._speculation_execution_scheduler.stop() - @alru_cache async def _get_task_api(self): return await TaskAPI.create(self._session_id, self.address) @@ -171,6 +172,76 @@ async def _get_execution_ref(self, band: BandType): return await mo.actor_ref(SubtaskExecutionActor.default_uid(), address=band[0]) + async def set_subtask_result(self, result: SubtaskResult, band: BandType): + info = self._subtask_infos[result.subtask_id] + subtask_id = info.subtask.subtask_id + notify_task_service = False + + async with redirect_subtask_errors(self, [info.subtask], reraise=False): + try: + info.band_futures[band].set_result(result) + if result.error is not None: + raise result.error.with_traceback(result.traceback) + logger.debug("Finished subtask %s with result %s.", subtask_id, result) + notify_task_service = True + except (OSError, MarsError) as ex: + # TODO: We should handle ServerClosed Error. + if ( + info.subtask.retryable + and info.num_reschedules < info.max_reschedules + ): + logger.error( + "Reschedule subtask %s due to %s", + info.subtask.subtask_id, + ex, + ) + info.num_reschedules += 1 + await self._queueing_ref.add_subtasks( + [info.subtask], + [info.subtask.priority or tuple()], + exclude_bands=set(info.band_futures.keys()), + ) + else: + raise ex + except asyncio.CancelledError: + raise + except BaseException as ex: + if ( + info.subtask.retryable + and info.num_reschedules < info.max_reschedules + ): + logger.error( + "Failed to reschedule subtask %s, " + "num_reschedules: %s, max_reschedules: %s, unhandled exception: %s", + info.subtask.subtask_id, + info.num_reschedules, + info.max_reschedules, + ex, + ) + raise ex + finally: + # make sure slot is released before marking tasks as finished + await self._global_resource_ref.release_subtask_resource( + band, + info.subtask.session_id, + info.subtask.subtask_id, + ) + logger.debug( + "Slot released for band %s after subtask %s", + band, + info.subtask.subtask_id, + ) + # We should call submit_subtasks after the resource is released. + # If submit_subtasks runs before release_subtask_resource + # then the rescheduled subtask may not be submitted due to + # no available resource. The mars will hangs. + if info.num_reschedules > 0: + await self._queueing_ref.submit_subtasks.tell() + + if notify_task_service: + task_api = await self._get_task_api() + await task_api.set_subtask_result(result) + async def finish_subtasks( self, subtask_ids: List[str], @@ -182,6 +253,7 @@ async def finish_subtasks( bands = bands or [None] * len(subtask_ids) for subtask_id, subtask_band in zip(subtask_ids, bands): subtask_info = self._subtask_infos.get(subtask_id, None) + if subtask_info is not None: self._finished_subtask_count.record( 1, @@ -191,15 +263,19 @@ async def finish_subtasks( "stage_id": subtask_info.subtask.stage_id, }, ) - self._subtask_summaries[subtask_id] = subtask_info.to_summary( - is_finished=True - ) + if subtask_id not in self._subtask_summaries: + summary_kw = dict(is_finished=True) + if subtask_info.cancel_pending: + summary_kw["is_cancelled"] = True + self._subtask_summaries[subtask_id] = subtask_info.to_summary( + **summary_kw + ) subtask_info.end_time = time.time() self._speculation_execution_scheduler.finish_subtask(subtask_info) - # Cancel subtask on other bands. + # Cancel subtask on other bands. aio_task = subtask_info.band_futures.pop(subtask_band, None) if aio_task: - await aio_task + yield aio_task if schedule_next: band_tasks[subtask_band] += 1 if subtask_info.band_futures: @@ -219,15 +295,8 @@ async def finish_subtasks( if schedule_next: for band in subtask_info.band_futures.keys(): band_tasks[band] += 1 - await self._queueing_ref.remove_queued_subtasks(subtask_ids) if band_tasks: - tasks = [] - for band, subtask_count in band_tasks.items(): - task = asyncio.ensure_future( - self._queueing_ref.submit_subtasks.tell(band, subtask_count) - ) - tasks.append(task) - await asyncio.wait(tasks) + await self._queueing_ref.submit_subtasks.tell(dict(band_tasks)) def _get_subtasks_by_ids(self, subtask_ids: List[str]) -> List[Optional[Subtask]]: subtasks = [] @@ -238,106 +307,83 @@ def _get_subtasks_by_ids(self, subtask_ids: List[str]) -> List[Optional[Subtask] subtasks.append(None) return subtasks + @mo.extensible async def submit_subtask_to_band(self, subtask_id: str, band: BandType): - if subtask_id not in self._subtask_infos: # pragma: no cover - logger.info( - "Subtask %s is not in added subtasks set, it may be finished or canceled, skip it.", - subtask_id, + raise NotImplementedError + + @submit_subtask_to_band.batch + async def batch_submit_subtask_to_band(self, args_list, kwargs_list): + band_to_subtask_ids = defaultdict(list) + res_release_delays = [] + for args, kwargs in zip(args_list, kwargs_list): + subtask_id, band = self.submit_subtask_to_band.bind(*args, **kwargs) + try: + info = self._subtask_infos[subtask_id] + if info.cancel_pending: + res_release_delays.append( + self._global_resource_ref.release_subtask_resource.delay( + band, self._session_id, subtask_id + ) + ) + continue + except KeyError: # pragma: no cover + logger.info( + "Subtask %s is not in added subtasks set, it may be finished or canceled, skip it.", + subtask_id, + ) + # in case resource already allocated, do deallocate + res_release_delays.append( + self._global_resource_ref.release_subtask_resource.delay( + band, self._session_id, subtask_id + ) + ) + continue + band_to_subtask_ids[band].append(subtask_id) + + if res_release_delays: + await self._global_resource_ref.release_subtask_resource.batch( + *res_release_delays ) - return + + for band, subtask_ids in band_to_subtask_ids.items(): + asyncio.create_task(self._submit_subtasks_to_band(band, subtask_ids)) + + async def _submit_subtasks_to_band(self, band: BandType, subtask_ids: List[str]): + execution_ref = await self._get_execution_ref(band) + delays = [] + task_stage_count = defaultdict(lambda: 0) + async with redirect_subtask_errors( - self, self._get_subtasks_by_ids([subtask_id]) + self, self._get_subtasks_by_ids(subtask_ids) ): - try: + for subtask_id in subtask_ids: subtask_info = self._subtask_infos[subtask_id] - execution_ref = await self._get_execution_ref(band) - extra_config = subtask_info.subtask.extra_config - enable_profiling = MARS_ENABLE_PROFILING or ( - extra_config and extra_config.get("enable_profiling") - ) - profiling_context = ( - ProfilingContext(subtask_info.subtask.task_id) - if enable_profiling - else None + subtask = subtask_info.subtask + task_stage_count[(subtask.task_id, subtask.stage_id)] += 1 + delays.append( + execution_ref.run_subtask.delay(subtask, band[1], self.address) ) + subtask_info.band_futures[band] = asyncio.Future() + subtask_info.start_time = time.time() + self._speculation_execution_scheduler.add_subtask(subtask_info) + + for (task_id, stage_id), cnt in task_stage_count.items(): self._submitted_subtask_count.record( - 1, + cnt, { "session_id": self._session_id, - "task_id": subtask_info.subtask.task_id, - "stage_id": subtask_info.subtask.stage_id, + "task_id": task_id, + "stage_id": stage_id, }, ) - logger.debug("Start run subtask %s in band %s.", subtask_id, band) - with Timer() as timer: - task = asyncio.create_task( - execution_ref.run_subtask.options( - profiling_context=profiling_context - ).send(subtask_info.subtask, band[1], self.address) - ) - subtask_info.band_futures[band] = task - subtask_info.start_time = time.time() - self._speculation_execution_scheduler.add_subtask(subtask_info) - result = yield task - ProfilingData.collect_subtask( - subtask_info.subtask, band, timer.duration - ) - task_api = await self._get_task_api() - logger.debug("Finished subtask %s with result %s.", subtask_id, result) - await task_api.set_subtask_result(result) - except (OSError, MarsError) as ex: - # TODO: We should handle ServerClosed Error. - if ( - subtask_info.subtask.retryable - and subtask_info.num_reschedules < subtask_info.max_reschedules - ): - logger.error( - "Reschedule subtask %s due to %s", - subtask_info.subtask.subtask_id, - ex, - ) - subtask_info.num_reschedules += 1 - await self._queueing_ref.add_subtasks( - [subtask_info.subtask], - [subtask_info.subtask.priority or tuple()], - exclude_bands=set(subtask_info.band_futures.keys()), - ) - else: - raise ex - except asyncio.CancelledError: - raise - except Exception as ex: - if ( - subtask_info.subtask.retryable - and subtask_info.num_reschedules < subtask_info.max_reschedules - ): - logger.error( - "Failed to reschedule subtask %s, " - "num_reschedules: %s, max_reschedules: %s, unhandled exception: %s", - subtask_info.subtask.subtask_id, - subtask_info.num_reschedules, - subtask_info.max_reschedules, - ex, - ) - raise ex - finally: - # make sure slot is released before marking tasks as finished - await self._global_resource_ref.release_subtask_resource( - band, - subtask_info.subtask.session_id, - subtask_info.subtask.subtask_id, - ) - logger.debug( - "Slot released for band %s after subtask %s", - band, - subtask_info.subtask.subtask_id, - ) - # We should call submit_subtasks after the resource is released. - # If submit_subtasks runs before release_subtask_resource - # then the rescheduled subtask may not be submitted due to - # no available resource. The mars will hangs. - if subtask_info.num_reschedules > 0: - await self._queueing_ref.submit_subtasks.tell() + + logger.debug( + "Start run %d subtasks %r in band %s.", + len(subtask_ids), + subtask_ids, + band, + ) + await execution_ref.run_subtask.batch(*delays, send=False) async def cancel_subtasks( self, subtask_ids: List[str], kill_timeout: Union[float, int] = None @@ -348,29 +394,22 @@ async def cancel_subtasks( subtask_ids, kill_timeout, ) - queued_subtask_ids = [] - single_cancel_tasks = [] task_api = await self._get_task_api() - async def cancel_single_task(subtask, raw_tasks, cancel_tasks): - if cancel_tasks: - await asyncio.wait(cancel_tasks) - if raw_tasks: - dones, _ = await asyncio.wait(raw_tasks) - else: - dones = [] - if not dones or all(fut.cancelled() for fut in dones): - await task_api.set_subtask_result( - SubtaskResult( - subtask_id=subtask.subtask_id, - session_id=subtask.session_id, - task_id=subtask.task_id, - stage_id=subtask.stage_id, - status=SubtaskStatus.cancelled, - ) - ) + async def cancel_task_in_band(band): + cancel_delays = band_to_cancel_delays.get(band) or [] + execution_ref = await self._get_execution_ref(band) + if cancel_delays: + await execution_ref.cancel_subtask.batch(*cancel_delays) + band_futures = band_to_futures.get(band) + if band_futures: + await asyncio.wait(band_futures) + queued_subtask_ids = [] + cancel_tasks = [] + band_to_cancel_delays = defaultdict(list) + band_to_futures = defaultdict(list) for subtask_id in subtask_ids: if subtask_id not in self._subtask_infos: # subtask may already finished or not submitted at all @@ -380,56 +419,54 @@ async def cancel_single_task(subtask, raw_tasks, cancel_tasks): ) continue - subtask_info = self._subtask_infos[subtask_id] - raw_tasks_to_cancel = list(subtask_info.band_futures.values()) - - if not raw_tasks_to_cancel: - queued_subtask_ids.append(subtask_id) - single_cancel_tasks.append( - asyncio.create_task( - cancel_single_task(subtask_info.subtask, [], []) - ) + info = self._subtask_infos[subtask_id] + info.cancel_pending = True + + if not info.band_futures: + # not submitted yet: mark subtasks as cancelled + result = SubtaskResult( + subtask_id=info.subtask.subtask_id, + session_id=info.subtask.session_id, + task_id=info.subtask.task_id, + stage_id=info.subtask.stage_id, + status=SubtaskStatus.cancelled, ) + cancel_tasks.append(task_api.set_subtask_result(result)) + queued_subtask_ids.append(subtask_id) else: - cancel_tasks = [] - for band in subtask_info.band_futures.keys(): + for band, future in info.band_futures.items(): execution_ref = await self._get_execution_ref(band) - cancel_tasks.append( - asyncio.create_task( - execution_ref.cancel_subtask( - subtask_id, kill_timeout=kill_timeout - ) - ) - ) - single_cancel_tasks.append( - asyncio.create_task( - cancel_single_task( - subtask_info.subtask, raw_tasks_to_cancel, cancel_tasks - ) + band_to_cancel_delays[band].append( + execution_ref.cancel_subtask.delay(subtask_id, kill_timeout) ) - ) + band_to_futures[band].append(future) + + # Dequeue first as it is possible to leak subtasks from queues if queued_subtask_ids: - # Don't use `finish_subtasks` because it may remove queued await self._queueing_ref.remove_queued_subtasks(queued_subtask_ids) - if single_cancel_tasks: - yield asyncio.wait(single_cancel_tasks) + + for band in band_to_futures: + cancel_tasks.append(asyncio.create_task(cancel_task_in_band(band))) + + if cancel_tasks: + yield asyncio.gather(*cancel_tasks) for subtask_id in subtask_ids: - subtask_info = self._subtask_infos.pop(subtask_id, None) - if subtask_info is not None: - self._subtask_summaries[subtask_id] = subtask_info.to_summary( + info = self._subtask_infos.pop(subtask_id, None) + if info is not None: + self._subtask_summaries[subtask_id] = info.to_summary( is_finished=True, is_cancelled=True ) - self._canceled_subtask_count.record( + self._cancelled_subtask_count.record( 1, { "session_id": self._session_id, - "task_id": subtask_info.subtask.task_id, - "stage_id": subtask_info.subtask.stage_id, + "task_id": info.subtask.task_id, + "stage_id": info.subtask.stage_id, }, ) await self._queueing_ref.submit_subtasks.tell() - logger.info("Subtasks %s canceled.", subtask_ids) + logger.info("Subtasks %s cancelled.", subtask_ids) def get_schedule_summaries(self, task_id: Optional[str] = None): if task_id is not None: diff --git a/mars/services/scheduling/supervisor/queueing.py b/mars/services/scheduling/supervisor/queueing.py index 97c5d5f6b3..65c10e8777 100644 --- a/mars/services/scheduling/supervisor/queueing.py +++ b/mars/services/scheduling/supervisor/queueing.py @@ -24,6 +24,7 @@ from ....lib.aio import alru_cache from ....metrics import Metrics from ....resource import ZeroResource +from ....typing import BandType from ....utils import dataslots from ...subtask import Subtask from ...task import TaskAPI @@ -48,6 +49,7 @@ class SubtaskQueueingActor(mo.Actor): _stid_to_bands: DefaultDict[str, List[Tuple]] _stid_to_items: Dict[str, HeapItem] _band_queues: DefaultDict[Tuple, List[HeapItem]] + _submit_requests: List[Optional[Dict[BandType, int]]] @classmethod def gen_uid(cls, session_id: str): @@ -61,6 +63,10 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None): # so that we can ensure band queue is busy if the band queue is not empty. self._band_queues = defaultdict(list) + self._submit_requests = [] + self._submit_request_event = asyncio.Event() + self._submit_request_task = None + self._cluster_api = None self._slots_ref = None self._assigner_ref = None @@ -69,7 +75,6 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None): self._band_watch_task = None self._max_enqueue_id = 0 - self._periodical_submit_task = None self._submit_period = submit_period or _DEFAULT_SUBMIT_PERIOD self._submitted_subtask_number = Metrics.gauge( "mars.band.submitted_subtask_number", @@ -133,23 +138,13 @@ async def watch_bands(): AssignerActor.gen_uid(self._session_id), address=self.address ) - if self._submit_period > 0: - self._periodical_submit_task = self.ref().periodical_submit.tell_delay( - delay=self._submit_period - ) + self._submit_request_task = asyncio.create_task(self._submission_task_func()) async def __pre_destroy__(self): self._band_watch_task.cancel() - if self._periodical_submit_task is not None: # pragma: no branch - self._periodical_submit_task.cancel() - - async def periodical_submit(self): - await self.ref().submit_subtasks.tell() - self._periodical_submit_task = self.ref().periodical_submit.tell_delay( - delay=self._submit_period - ) + if self._submit_request_task is not None: # pragma: no branch + self._submit_request_task.cancel() - @alru_cache async def _get_task_api(self): return await TaskAPI.create(self._session_id, self.address) @@ -180,114 +175,159 @@ async def add_subtasks( self._max_enqueue_id += 1 heapq.heappush(self._band_queues[band], heap_item) logger.debug( - "Subtask %s enqueued to band %s excluded from %s.", + "Subtask %s enqueued to band %s. exclude_bands=%s.", subtask.subtask_id, band, exclude_bands, ) logger.debug("%d subtasks enqueued", len(subtasks)) - async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None): - logger.debug("Submitting subtasks with limit %s", limit) - - if not limit and band not in self._band_to_resource: + def submit_subtasks(self, band_to_limit: Dict[BandType, int] = None): + self._submit_requests.append(band_to_limit) + self._submit_request_event.set() + + async def _submission_task_func(self): + while True: + try: + periodical_triggered = False + if not self._submit_requests: # pragma: no branch + try: + if self._submit_period: + await asyncio.wait_for( + self._submit_request_event.wait(), self._submit_period + ) + else: + await self._submit_request_event.wait() + + self._submit_request_event.clear() + except asyncio.TimeoutError: + periodical_triggered = True + + requests = self._submit_requests + self._submit_requests = [] + if not periodical_triggered and not requests: # pragma: no cover + continue + + merged_band_to_limit = dict() + for req in requests: + if req is None: + merged_band_to_limit = None + break + merged_band_to_limit.update(req) + await self._submit_subtask_request(merged_band_to_limit) + except asyncio.CancelledError: + break + + async def _submit_subtask_request(self, band_to_limit: Dict[BandType, int] = None): + if band_to_limit: + logger.debug("Submitting subtasks with limits: %r", band_to_limit) + + if not self._band_to_resource or any( + not limit and band not in self._band_to_resource + for band, limit in band_to_limit or () + ): self._band_to_resource = await self._cluster_api.get_all_bands() - bands = [band] if band is not None else list(self._band_to_resource.keys()) - submit_aio_tasks = [] - manager_ref = await self._get_manager_ref() + if not band_to_limit: + band_to_limit = {band: None for band in self._band_to_resource.keys()} apply_delays = [] submit_items_list = [] submitted_bands = [] - for band in bands: - band_limit = limit or ( - self._band_to_resource[band].num_cpus - or self._band_to_resource[band].num_gpus - ) - task_queue = self._band_queues[band] - submit_items = dict() - while ( - self._ensure_top_item_valid(task_queue) - and len(submit_items) < band_limit - ): - item = heapq.heappop(task_queue) - submit_items[item.subtask.subtask_id] = item - - subtask_ids = list(submit_items) - if not subtask_ids: - continue - - submitted_bands.append(band) - submit_items_list.append(submit_items) - - # Before hbo, when a manager finish a subtask, it will schedule one subtask successfully because - # there is a slot idle. But now we have memory requirements, so the subtask may apply resource - # from supervisor failed. In such cases, those subtasks will never got scheduled. - # TODO We can use `_periodical_submit_task` to submit those subtasks. - subtask_resources = [ - item.subtask.required_resource for item in submit_items.values() - ] - apply_delays.append( - self._slots_ref.apply_subtask_resources.delay( - band, self._session_id, subtask_ids, subtask_resources + def _load_items_to_submit(): + for band, limit in band_to_limit.items(): + band_limit = limit or ( + self._band_to_resource[band].num_cpus + or self._band_to_resource[band].num_gpus + ) + task_queue = self._band_queues[band] + submit_items = dict() + while ( + self._ensure_top_item_valid(task_queue) + and len(submit_items) < band_limit + ): + item = heapq.heappop(task_queue) + submit_items[item.subtask.subtask_id] = item + + subtask_ids = list(submit_items) + if not subtask_ids: + continue + + submitted_bands.append(band) + submit_items_list.append(submit_items) + + # Before hbo, when a manager finish a subtask, it will schedule one subtask successfully because + # there is a slot idle. But now we have memory requirements, so the subtask may apply resource + # from supervisor failed. In such cases, those subtasks will never got scheduled. + # TODO We can use `_periodical_submit_task` to submit those subtasks. + subtask_resources = [ + item.subtask.required_resource for item in submit_items.values() + ] + apply_delays.append( + self._slots_ref.apply_subtask_resources.delay( + band, self._session_id, subtask_ids, subtask_resources + ) ) - ) + + await asyncio.to_thread(_load_items_to_submit) async with redirect_subtask_errors( self, - [ + ( item.subtask for submit_items in submit_items_list for item in submit_items.values() - ], + ), ): submitted_ids_list = await self._slots_ref.apply_subtask_resources.batch( *apply_delays ) - for band, submit_items, submitted_ids in zip( - submitted_bands, submit_items_list, submitted_ids_list - ): - subtask_ids = list(submit_items) - task_queue = self._band_queues[band] + manager_ref = await self._get_manager_ref() + submit_delays = [] - async with redirect_subtask_errors( - self, [item.subtask for item in submit_items.values()] + def _gather_submissions(): + for band, submit_items, submitted_ids in zip( + submitted_bands, submit_items_list, submitted_ids_list ): - non_submitted_ids = [k for k in submit_items if k not in submitted_ids] + subtask_ids = list(submit_items) + task_queue = self._band_queues[band] + submitted_id_set = set(submitted_ids) + + non_submitted_ids = [ + k for k in submit_items if k not in submitted_id_set + ] tags = { "session_id": self._session_id, "band": band[0] if band else "", } self._submitted_subtask_number.record(len(submitted_ids), tags) self._unsubmitted_subtask_number.record(len(non_submitted_ids), tags) - if submitted_ids: + + if not submitted_ids: + if non_submitted_ids: + logger.debug("No slots available on band %s", band) + else: for stid in subtask_ids: - if stid not in submitted_ids: + if stid not in submitted_id_set: continue item = submit_items[stid] logger.debug("Submit subtask %r to band %r", item.subtask, band) - submit_aio_tasks.append( - asyncio.create_task( - manager_ref.submit_subtask_to_band.tell( - item.subtask.subtask_id, band - ) + submit_delays.append( + manager_ref.submit_subtask_to_band.delay( + item.subtask.subtask_id, band ) ) - await asyncio.sleep(0) self.remove_queued_subtasks([item.subtask.subtask_id]) - else: - logger.debug("No slots available") - for stid in non_submitted_ids: - # TODO if subtasks submit failed due to lacking memory/cpu/gpu resources, lower the priority so that - # other subtasks can be submitted. - heapq.heappush(task_queue, submit_items[stid]) + for stid in non_submitted_ids: + # TODO if subtasks submit failed due to lacking memory/cpu/gpu resources, lower the priority so that + # other subtasks can be submitted. + heapq.heappush(task_queue, submit_items[stid]) - if submit_aio_tasks: - yield asyncio.gather(*submit_aio_tasks) + await asyncio.to_thread(_gather_submissions) + await manager_ref.submit_subtask_to_band.batch(*submit_delays) def _ensure_top_item_valid(self, task_queue): """Clean invalid subtask item from the queue to ensure that when the queue is not empty, diff --git a/mars/services/scheduling/supervisor/tests/test_assigner.py b/mars/services/scheduling/supervisor/tests/test_assigner.py index 3766069498..5ba317cdbc 100644 --- a/mars/services/scheduling/supervisor/tests/test_assigner.py +++ b/mars/services/scheduling/supervisor/tests/test_assigner.py @@ -62,32 +62,31 @@ def get_all_bands(self, role=None, statuses=None): class FakeClusterAPI(ClusterAPI): @classmethod async def create(cls, address: str, **kw): - dones, _ = await asyncio.wait( - [ - mo.create_actor( - SupervisorPeerLocatorActor, - "fixed", - address, - uid=SupervisorPeerLocatorActor.default_uid(), - address=address, - ), - mo.create_actor( - MockNodeInfoCollectorActor, - with_gpu=kw.get("with_gpu", False), - uid=NodeInfoCollectorActor.default_uid(), - address=address, - ), - mo.create_actor( - NodeInfoUploaderActor, - NodeRole.WORKER, - interval=kw.get("upload_interval"), - band_to_resource=kw.get("band_to_resource"), - use_gpu=kw.get("use_gpu", False), - uid=NodeInfoUploaderActor.default_uid(), - address=address, - ), - ] - ) + coros = [ + mo.create_actor( + SupervisorPeerLocatorActor, + "fixed", + address, + uid=SupervisorPeerLocatorActor.default_uid(), + address=address, + ), + mo.create_actor( + MockNodeInfoCollectorActor, + with_gpu=kw.get("with_gpu", False), + uid=NodeInfoCollectorActor.default_uid(), + address=address, + ), + mo.create_actor( + NodeInfoUploaderActor, + NodeRole.WORKER, + interval=kw.get("upload_interval"), + band_to_resource=kw.get("band_to_resource"), + use_gpu=kw.get("use_gpu", False), + uid=NodeInfoUploaderActor.default_uid(), + address=address, + ), + ] + dones, _ = await asyncio.wait([asyncio.create_task(coro) for coro in coros]) for task in dones: try: diff --git a/mars/services/scheduling/supervisor/tests/test_globalresource.py b/mars/services/scheduling/supervisor/tests/test_globalresource.py index 6d938ba3e4..bb42b01f3a 100644 --- a/mars/services/scheduling/supervisor/tests/test_globalresource.py +++ b/mars/services/scheduling/supervisor/tests/test_globalresource.py @@ -67,11 +67,12 @@ async def test_global_resource(actor_pool): band, session_id, ["subtask1"], [Resource(num_cpus=1)] ) - wait_coro = global_resource_ref.wait_band_idle(band) - (done, pending) = await asyncio.wait([wait_coro], timeout=0.5) + wait_task = asyncio.create_task(global_resource_ref.wait_band_idle(band)) + (done, pending) = await asyncio.wait([wait_task], timeout=0.5) assert not done await global_resource_ref.release_subtask_resource(band, session_id, "subtask0") - (done, pending) = await asyncio.wait([wait_coro], timeout=0.5) + wait_task = asyncio.create_task(global_resource_ref.wait_band_idle(band)) + (done, pending) = await asyncio.wait([wait_task], timeout=0.5) assert done assert band in await global_resource_ref.get_idle_bands(0) assert ["subtask1"] == await global_resource_ref.apply_subtask_resources( diff --git a/mars/services/scheduling/supervisor/tests/test_manager.py b/mars/services/scheduling/supervisor/tests/test_manager.py index c4ad5c895c..465859f7af 100644 --- a/mars/services/scheduling/supervisor/tests/test_manager.py +++ b/mars/services/scheduling/supervisor/tests/test_manager.py @@ -13,8 +13,9 @@ # limitations under the License. import asyncio +import time from collections import defaultdict -from typing import List, Tuple, Set +from typing import List, Dict, Tuple, Set import pytest @@ -35,8 +36,12 @@ class MockTaskManagerActor(mo.Actor): def __init__(self): self._results = dict() - def set_subtask_result(self, result: SubtaskResult): + async def set_subtask_result(self, result: SubtaskResult): self._results[result.subtask_id] = result + manager_ref = await mo.actor_ref( + uid=SubtaskManagerActor.gen_uid(result.session_id), address=self.address + ) + await manager_ref.finish_subtasks([result], result.bands) def get_result(self, subtask_id: str) -> SubtaskResult: return self._results[subtask_id] @@ -59,7 +64,7 @@ def add_subtasks( for subtask, priority in zip(subtasks, priorities): self._subtasks[subtask.subtask_id] = (subtask, priority) - def submit_subtasks(self, band: BandType, limit: int): + def submit_subtasks(self, band_to_limit: Dict[BandType, int] = None): pass def remove_queued_subtasks(self, subtask_ids: List[str]): @@ -78,22 +83,55 @@ def __init__(self): async def set_run_subtask_event(self, subtask_id, event): self._run_subtask_events[subtask_id] = event + @mo.extensible async def run_subtask( self, subtask: Subtask, band_name: str, supervisor_address: str ): self._run_subtask_events[subtask.subtask_id].set() - task = self._subtask_aiotasks[subtask.subtask_id][ - band_name - ] = asyncio.create_task(asyncio.sleep(20)) - return await task + async def task_fun(): + manager_ref = await mo.actor_ref( + uid=SubtaskManagerActor.gen_uid(subtask.session_id), + address=supervisor_address, + ) + result = SubtaskResult( + subtask_id=subtask.subtask_id, + session_id=subtask.session_id, + task_id=subtask.task_id, + stage_id=subtask.stage_id, + bands=[(self.address, band_name)], + progress=1.0, + execution_start_time=time.time(), + ) + try: + await asyncio.sleep(20) + except asyncio.CancelledError as ex: + result.status = SubtaskStatus.cancelled + result.error = ex + result.traceback = ex.__traceback__ + await manager_ref.set_subtask_result.tell( + result, (self.address, band_name) + ) + raise + else: + result.status = SubtaskStatus.succeeded + result.execution_end_time = time.time() + await manager_ref.set_subtask_result.tell( + result, (self.address, band_name) + ) + + self._subtask_aiotasks[subtask.subtask_id][band_name] = asyncio.create_task( + task_fun() + ) + + @mo.extensible def cancel_subtask(self, subtask_id: str, kill_timeout: int = 5): for task in self._subtask_aiotasks[subtask_id].values(): task.cancel() async def wait_subtask(self, subtask_id: str, band_name: str): try: - yield self._subtask_aiotasks[subtask_id][band_name] + await self._subtask_aiotasks[subtask_id][band_name] except asyncio.CancelledError: pass @@ -158,12 +196,12 @@ async def test_subtask_manager(actor_pool): await execution_ref.set_run_subtask_event(subtask1.subtask_id, run_subtask1_event) await execution_ref.set_run_subtask_event(subtask2.subtask_id, run_subtask2_event) - submit1 = asyncio.create_task( + asyncio.create_task( manager_ref.submit_subtask_to_band( subtask1.subtask_id, (pool.external_address, "gpu-0") ) ) - submit2 = asyncio.create_task( + asyncio.create_task( manager_ref.submit_subtask_to_band( subtask2.subtask_id, (pool.external_address, "gpu-1") ) @@ -179,10 +217,6 @@ async def test_subtask_manager(actor_pool): ), timeout=10, ) - with pytest.raises(asyncio.CancelledError): - await submit1 - with pytest.raises(asyncio.CancelledError): - await submit2 assert ( await task_manager_ref.get_result(subtask1.subtask_id) ).status == SubtaskStatus.cancelled diff --git a/mars/services/scheduling/supervisor/tests/test_queue_balance.py b/mars/services/scheduling/supervisor/tests/test_queue_balance.py index f2760b5dac..0a08634ca9 100644 --- a/mars/services/scheduling/supervisor/tests/test_queue_balance.py +++ b/mars/services/scheduling/supervisor/tests/test_queue_balance.py @@ -62,31 +62,30 @@ def get_all_bands(self, role=None, statuses=None): class FakeClusterAPI(ClusterAPI): @classmethod async def create(cls, address: str, **kw): - dones, _ = await asyncio.wait( - [ - mo.create_actor( - SupervisorPeerLocatorActor, - "fixed", - address, - uid=SupervisorPeerLocatorActor.default_uid(), - address=address, - ), - mo.create_actor( - MockNodeInfoCollectorActor, - uid=NodeInfoCollectorActor.default_uid(), - address=address, - ), - mo.create_actor( - NodeInfoUploaderActor, - NodeRole.WORKER, - interval=kw.get("upload_interval"), - band_to_resource=kw.get("band_to_resource"), - use_gpu=kw.get("use_gpu", False), - uid=NodeInfoUploaderActor.default_uid(), - address=address, - ), - ] - ) + coros = [ + mo.create_actor( + SupervisorPeerLocatorActor, + "fixed", + address, + uid=SupervisorPeerLocatorActor.default_uid(), + address=address, + ), + mo.create_actor( + MockNodeInfoCollectorActor, + uid=NodeInfoCollectorActor.default_uid(), + address=address, + ), + mo.create_actor( + NodeInfoUploaderActor, + NodeRole.WORKER, + interval=kw.get("upload_interval"), + band_to_resource=kw.get("band_to_resource"), + use_gpu=kw.get("use_gpu", False), + uid=NodeInfoUploaderActor.default_uid(), + address=address, + ), + ] + dones, _ = await asyncio.wait([asyncio.create_task(coro) for coro in coros]) for task in dones: try: @@ -217,20 +216,23 @@ async def test_subtask_queueing(actor_pool): ) # 9 subtasks on ('address0', 'numa-0') - await queueing_ref.submit_subtasks(band=("address0", "numa-0"), limit=10) + await queueing_ref.submit_subtasks({("address0", "numa-0"): 10}) + await asyncio.sleep(0.2) commited_subtask_ids = (await manager_ref.dump_data())[("address0", "numa-0")] assert ( len(commited_subtask_ids) == 9 - ), f"commited_subtask_ids {commited_subtask_ids}" + ), f"committed_subtask_ids {commited_subtask_ids}" # 0 subtasks on ('address1', 'numa-0') - await queueing_ref.submit_subtasks(band=("address1", "numa-0"), limit=10) + await queueing_ref.submit_subtasks({("address1", "numa-0"): 10}) + await asyncio.sleep(0.2) commited_subtask_ids = (await manager_ref.dump_data())[("address0", "numa-0")] assert ( len(commited_subtask_ids) == 9 - ), f"commited_subtask_ids {commited_subtask_ids}" + ), f"committed_subtask_ids {commited_subtask_ids}" # 9 subtasks on ('address2', 'numa-0') - await queueing_ref.submit_subtasks(band=("address2", "numa-0"), limit=10) + await queueing_ref.submit_subtasks({("address2", "numa-0"): 10}) + await asyncio.sleep(0.2) submitted_subtask_ids = await manager_ref.dump_data() assert sum(len(v) for v in submitted_subtask_ids.values()) == 18 diff --git a/mars/services/scheduling/supervisor/tests/test_queueing.py b/mars/services/scheduling/supervisor/tests/test_queueing.py index 21bcd2d1ac..38d6fb7921 100644 --- a/mars/services/scheduling/supervisor/tests/test_queueing.py +++ b/mars/services/scheduling/supervisor/tests/test_queueing.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import pytest from typing import Tuple, List @@ -57,16 +58,20 @@ def assign_subtasks( return [(self.address, "numa-0")] * len(subtasks) -class MockSubtaskManagerActor(mo.Actor): +class MockSubtaskManagerActor(mo.StatelessActor): def __init__(self): self._subtask_ids, self._bands = [], [] + self._event = asyncio.Event() @mo.extensible def submit_subtask_to_band(self, subtask_id: str, band: Tuple): self._subtask_ids.append(subtask_id) self._bands.append(band) + self._event.set() - def dump_data(self): + async def dump_data(self): + await asyncio.wait_for(self._event.wait(), timeout=10) + self._event.clear() return self._subtask_ids, self._bands @@ -123,7 +128,7 @@ async def test_subtask_queueing(actor_pool): assert await queueing_ref.all_bands_busy() await queueing_ref.submit_subtasks() # queue: [2 1 0] - commited_subtask_ids, _commited_bands = await manager_ref.dump_data() + commited_subtask_ids, _committed_bands = await manager_ref.dump_data() assert commited_subtask_ids == ["4", "3"] await queueing_ref.remove_queued_subtasks(["1"]) @@ -135,6 +140,6 @@ async def test_subtask_queueing(actor_pool): # queue: [0(3) 2] await queueing_ref.submit_subtasks() # queue: [] - commited_subtasks, _commited_bands = await manager_ref.dump_data() + commited_subtasks, _committed_bands = await manager_ref.dump_data() assert commited_subtasks == ["4", "3", "0", "2"] assert not await queueing_ref.all_bands_busy() diff --git a/mars/services/scheduling/tests/test_service.py b/mars/services/scheduling/tests/test_service.py index 985a1ea821..60f586d29b 100644 --- a/mars/services/scheduling/tests/test_service.py +++ b/mars/services/scheduling/tests/test_service.py @@ -19,7 +19,6 @@ import numpy as np import pytest -from ..api.web import WebSchedulingAPI from .... import oscar as mo from .... import remote as mr from .... import tensor as mt @@ -33,6 +32,7 @@ from ...task.supervisor.manager import TaskManagerActor from ...web import WebActor from .. import SchedulingAPI +from ..api.web import WebSchedulingAPI from ..supervisor import GlobalResourceManagerActor @@ -42,11 +42,17 @@ def __init__(self, *args, **kwargs): self._events = defaultdict(list) self._results = dict() - def set_subtask_result(self, subtask_result: SubtaskResult): + async def set_subtask_result(self, subtask_result: SubtaskResult): + scheduling_api = await SchedulingAPI.create( + subtask_result.session_id, self.address + ) self._results[subtask_result.subtask_id] = subtask_result for event in self._events[subtask_result.subtask_id]: event.set() self._events.pop(subtask_result.subtask_id, None) + await scheduling_api.finish_subtasks( + [subtask_result.subtask_id], subtask_result.bands + ) def _return_result(self, subtask_id: str): result = self._results[subtask_id] @@ -184,7 +190,6 @@ async def test_schedule_success(actor_pools): subtask.expect_bands = [(worker_pool.external_address, "numa-0")] await scheduling_api.add_subtasks([subtask], [(0,)]) await task_manager_ref.wait_subtask_result(subtask.subtask_id) - await scheduling_api.finish_subtasks([subtask.subtask_id]) result_key = next(subtask.chunk_graph.iter_indep(reverse=True)).key result = await storage_api.get(result_key) @@ -220,17 +225,17 @@ def _remote_fun(secs): async def _waiter_fun(subtask_id): await task_manager_ref.wait_subtask_result(subtask_id) - await scheduling_api.finish_subtasks([subtask_id]) finish_ids.append(subtask_id) finish_time.append(time.time()) subtasks = [] wait_tasks = [] + band = (worker_pool.external_address, "numa-0") for task_id in range(6): a = mr.spawn(_remote_fun, args=(0.5 + 0.01 * task_id,)) subtask = _gen_subtask(a, session_id) subtask.subtask_id = f"test_schedule_queue_subtask_{task_id}" - subtask.expect_bands = [(worker_pool.external_address, "numa-0")] + subtask.expect_bands = [band] subtask.priority = (4 - task_id,) wait_tasks.append(asyncio.create_task(_waiter_fun(subtask.subtask_id))) subtasks.append(subtask) @@ -291,15 +296,15 @@ def _remote_fun(secs): async def _waiter_fun(subtask_id): await task_manager_ref.wait_subtask_result(subtask_id) - await scheduling_api.finish_subtasks([subtask_id]) subtasks = [] wait_tasks = [] + band = (worker_pool.external_address, "numa-0") for task_id in range(6): a = mr.spawn(_remote_fun, args=(1 - 0.01 * task_id,)) subtask = _gen_subtask(a, session_id) subtask.subtask_id = f"test_schedule_queue_subtask_{task_id}" - subtask.expect_bands = [(worker_pool.external_address, "numa-0")] + subtask.expect_bands = [band] subtask.priority = (4 - task_id,) wait_tasks.append(asyncio.create_task(_waiter_fun(subtask.subtask_id))) subtasks.append(subtask) diff --git a/mars/services/scheduling/utils.py b/mars/services/scheduling/utils.py index f8325697ec..57b7ee2349 100644 --- a/mars/services/scheduling/utils.py +++ b/mars/services/scheduling/utils.py @@ -15,20 +15,17 @@ import asyncio import contextlib import sys +from typing import Iterable from ... import oscar as mo -from ...lib.aio import alru_cache -from ..subtask import SubtaskResult, SubtaskStatus +from ..subtask import Subtask, SubtaskResult, SubtaskStatus from ..task import TaskAPI -@alru_cache -async def _get_task_api(actor: mo.Actor): - return await TaskAPI.create(getattr(actor, "_session_id"), actor.address) - - @contextlib.asynccontextmanager -async def redirect_subtask_errors(actor: mo.Actor, subtasks): +async def redirect_subtask_errors( + actor: mo.Actor, subtasks: Iterable[Subtask], reraise: bool = True +): try: yield except: # noqa: E722 # pylint: disable=bare-except @@ -38,7 +35,7 @@ async def redirect_subtask_errors(actor: mo.Actor, subtasks): if isinstance(error, asyncio.CancelledError) else SubtaskStatus.errored ) - task_api = await _get_task_api(actor) + task_api = await TaskAPI.create(getattr(actor, "_session_id"), actor.address) coros = [] for subtask in subtasks: if subtask is None: # pragma: no cover @@ -59,4 +56,5 @@ async def redirect_subtask_errors(actor: mo.Actor, subtasks): ) tasks = [asyncio.ensure_future(coro) for coro in coros] await asyncio.wait(tasks) - raise + if reraise: + raise diff --git a/mars/services/scheduling/worker/execution.py b/mars/services/scheduling/worker/execution.py index f07edc44b5..23f7e9b159 100644 --- a/mars/services/scheduling/worker/execution.py +++ b/mars/services/scheduling/worker/execution.py @@ -18,6 +18,7 @@ import operator import pprint import sys +import time from collections import defaultdict from dataclasses import dataclass, field from typing import Dict, List, Optional @@ -35,8 +36,8 @@ from ...meta import MetaAPI from ...storage import StorageAPI from ...subtask import Subtask, SubtaskAPI, SubtaskResult, SubtaskStatus -from .workerslot import BandSlotManagerActor from .quota import QuotaActor +from .workerslot import BandSlotManagerActor logger = logging.getLogger(__name__) @@ -108,7 +109,7 @@ async def _retry_run( def _fill_subtask_result_with_exception( - subtask: Subtask, subtask_info: SubtaskExecutionInfo + subtask: Subtask, band_name: str, result: SubtaskResult ): _, exc, tb = sys.exc_info() if isinstance(exc, ExecutionError): @@ -119,9 +120,9 @@ def _fill_subtask_result_with_exception( if isinstance(exc, asyncio.CancelledError): status = SubtaskStatus.cancelled logger.exception( - "Cancel run subtask %s on band %s", + "Cancel subtask %s on band %s", subtask.subtask_id, - subtask_info.band_name, + band_name, exc_info=exc_info, ) else: @@ -129,13 +130,13 @@ def _fill_subtask_result_with_exception( logger.exception( "Failed to run subtask %s on band %s", subtask.subtask_id, - subtask_info.band_name, + band_name, exc_info=exc_info, ) - subtask_info.result.status = status - subtask_info.result.progress = 1.0 - subtask_info.result.error = exc - subtask_info.result.traceback = tb + result.status = status + result.progress = 1.0 + result.error = exc + result.traceback = tb class SubtaskExecutionActor(mo.StatelessActor): @@ -176,6 +177,17 @@ async def _get_slot_manager_ref( BandSlotManagerActor.gen_uid(band), address=self.address ) + @classmethod + @alru_cache(cache_exceptions=False) + async def _get_manager_ref( + cls, session_id: str, supervisor_address: str + ) -> mo.ActorRefType[BandSlotManagerActor]: + from ..supervisor import SubtaskManagerActor + + return await mo.actor_ref( + SubtaskManagerActor.gen_uid(session_id), address=supervisor_address + ) + @alru_cache(cache_exceptions=False) async def _get_band_quota_ref(self, band: str) -> mo.ActorRefType[QuotaActor]: return await mo.actor_ref(QuotaActor.gen_uid(band), address=self.address) @@ -367,6 +379,8 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str): task_id=subtask.task_id, stage_id=subtask.stage_id, status=SubtaskStatus.pending, + execution_start_time=time.time(), + bands=[(self.address, band_name)], ) try: logger.debug("Preparing data for subtask %s", subtask.subtask_id) @@ -397,17 +411,26 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str): subtask.session_id, band_name, remote_mapper_keys ) except: # noqa: E722 # pylint: disable=bare-except - _fill_subtask_result_with_exception(subtask, subtask_info) + _fill_subtask_result_with_exception(subtask, band_name, subtask_info.result) finally: # make sure new slot usages are uploaded in time try: slot_manager_ref = await self._get_slot_manager_ref(band_name) await slot_manager_ref.upload_slot_usages(periodical=False) except: # noqa: E722 # pylint: disable=bare-except - _fill_subtask_result_with_exception(subtask, subtask_info) + _fill_subtask_result_with_exception( + subtask, band_name, subtask_info.result + ) finally: # pop the subtask info at the end is to cancel the job. self._subtask_info.pop(subtask.subtask_id, None) + + manager_ref = await self._get_manager_ref( + subtask.session_id, subtask_info.supervisor_address + ) + await manager_ref.set_subtask_result.tell( + subtask_info.result, (self.address, subtask_info.band_name) + ) return subtask_info.result async def _retry_run_subtask( @@ -515,21 +538,50 @@ async def _run_subtask_once(): e, wrap_name="_UnretryableException", message=message ) + @mo.extensible async def run_subtask( self, subtask: Subtask, band_name: str, supervisor_address: str ): - if subtask.subtask_id in self._subtask_info: # pragma: no cover - raise Exception( - f"Subtask {subtask.subtask_id} is already running on this band[{self.address}]." - ) - logger.debug( - "Start to schedule subtask %s on %s.", subtask.subtask_id, self.address - ) + subtask_id = subtask.subtask_id + assert ( + subtask_id not in self._subtask_info + ), f"Subtask {subtask_id} is already running on this band[{self.address}]." + + logger.debug("Start to schedule subtask %s on %s.", subtask_id, self.address) self._submitted_subtask_count.record(1, {"band": self.address}) + + async def subtask_caller(): + try: + res = await self.ref().internal_run_subtask(subtask, band_name) + except: # noqa: E722 # pylint: disable=bare-except + logger.error( + "Unexpected error occurred when running subtask %s", + subtask.subtask_id, + ) + res = SubtaskResult( + subtask_id=subtask.subtask_id, + session_id=subtask.session_id, + task_id=subtask.task_id, + stage_id=subtask.stage_id, + status=SubtaskStatus.pending, + execution_start_time=time.time(), + bands=[(self.address, band_name)], + ) + _fill_subtask_result_with_exception(subtask, band_name, res) + + manager_ref = await self._get_manager_ref( + subtask.session_id, supervisor_address + ) + await manager_ref.set_subtask_result.tell( + res, (self.address, band_name) + ) + finally: + self._subtask_info.pop(subtask_id, None) + self._finished_subtask_count.record(1, {"band": self.address}) + logger.debug("Subtask %s finished with result %s", subtask_id, res) + with mo.debug.no_message_trace(): - task = asyncio.create_task( - self.ref().internal_run_subtask(subtask, band_name) - ) + task = asyncio.create_task(subtask_caller()) logger.debug("Subtask %r accepted in worker %s", subtask, self.address) # the extra_config may be None. the extra config overwrites the default value. @@ -541,33 +593,53 @@ async def run_subtask( if subtask_max_retries is None: subtask_max_retries = self._subtask_max_retries - self._subtask_info[subtask.subtask_id] = SubtaskExecutionInfo( + self._subtask_info[subtask_id] = SubtaskExecutionInfo( task, band_name, supervisor_address, max_retries=subtask_max_retries ) - result = await task - self._subtask_info.pop(subtask.subtask_id, None) - self._finished_subtask_count.record(1, {"band": self.address}) - logger.debug("Subtask %s finished with result %s", subtask.subtask_id, result) - return result + @mo.extensible async def cancel_subtask(self, subtask_id: str, kill_timeout: Optional[int] = 5): - try: - subtask_info = self._subtask_info[subtask_id] - except KeyError: - logger.info("Subtask %s not exists, skip cancel.", subtask_id) - return - logger.info( - "Start to cancel subtask %s in slot %s, kill_timeout is %s", - subtask_id, - subtask_info.slot_id, - kill_timeout, - ) + raise NotImplementedError - kill_timeout = kill_timeout if self._enable_kill_slot else None - if not subtask_info.cancelling: - subtask_info.kill_timeout = kill_timeout - subtask_info.cancelling = True - subtask_info.aio_task.cancel() + @cancel_subtask.batch + async def batch_cancel_subtask(self, args_list, kwargs_list): + subtask_ids = [] + tasks = [] - await subtask_info.aio_task - self._subtask_info.pop(subtask_id, None) + for args, kwargs in zip(args_list, kwargs_list): + subtask_id, kill_timeout = self.cancel_subtask.bind(*args, **kwargs) + + try: + subtask_info = self._subtask_info[subtask_id] + except KeyError: + logger.info("Subtask %s not exists, skip cancel.", subtask_id) + continue + + subtask_ids.append(subtask_id) + logger.info( + "Start to cancel subtask %s in slot %s, kill_timeout is %s", + subtask_id, + subtask_info.slot_id, + kill_timeout, + ) + + kill_timeout = kill_timeout if self._enable_kill_slot else None + if not subtask_info.cancelling: + subtask_info.kill_timeout = kill_timeout + subtask_info.cancelling = True + subtask_info.aio_task.cancel() + tasks.append(subtask_info.aio_task) + + if tasks: + await asyncio.wait(tasks) + + for subtask_id in subtask_ids: + try: + subtask_info = self._subtask_info[subtask_id] + except KeyError: + continue + + try: + self._subtask_info.pop(subtask_info.aio_task.result().subtask_id, None) + except BaseException: # pragma: no cover + logger.error("Failed to cancel subtask %s", subtask_id) diff --git a/mars/services/scheduling/worker/tests/test_execution.py b/mars/services/scheduling/worker/tests/test_execution.py index abeb50ba7f..6940ee45d1 100644 --- a/mars/services/scheduling/worker/tests/test_execution.py +++ b/mars/services/scheduling/worker/tests/test_execution.py @@ -37,6 +37,7 @@ from .....resource import Resource from .....tensor.fetch import TensorFetch from .....tensor.arithmetic import TensorTreeAdd +from .....typing import BandType from .....utils import Timer from ....cluster import MockClusterAPI from ....lifecycle import MockLifecycleAPI @@ -44,10 +45,10 @@ from ....session import MockSessionAPI from ....storage import MockStorageAPI from ....storage.handler import StorageHandlerActor -from ....subtask import MockSubtaskAPI, Subtask, SubtaskStatus +from ....subtask import MockSubtaskAPI, Subtask, SubtaskStatus, SubtaskResult from ....task.supervisor.manager import TaskManagerActor from ....mutable import MockMutableAPI -from ...supervisor import GlobalResourceManagerActor +from ...supervisor import GlobalResourceManagerActor, SubtaskManagerActor from ...worker import SubtaskExecutionActor, QuotaActor, BandSlotManagerActor @@ -136,13 +137,36 @@ async def update_subtask_resources( class MockTaskManager(mo.Actor): def __init__(self): - self._results = [] + self._results = dict() + self._subtask_futures = dict() - def set_subtask_result(self, result): - self._results.append(result) + def set_subtask_result(self, result: SubtaskResult): + self._results[result.subtask_id] = result + if result.subtask_id in self._subtask_futures: + self._subtask_futures[result.subtask_id].set_result(result) + + async def wait_subtask(self, subtask_id: str): + if subtask_id in self._results: + return self._results[subtask_id] + if subtask_id not in self._subtask_futures: + self._subtask_futures[subtask_id] = asyncio.Future() + return self._subtask_futures[subtask_id] def get_results(self): - return self._results + return list(self._results.values()) + + +class MockSubtaskManagerActor(mo.Actor): + def __init__(self, session_id: str): + self._session_id = session_id + + async def __post_create__(self): + self._task_manager_ref = await mo.actor_ref( + uid=TaskManagerActor.gen_uid(self._session_id), address=self.address + ) + + async def set_subtask_result(self, result: SubtaskResult, band: BandType): + await self._task_manager_ref.set_subtask_result.tell(result) @pytest.fixture @@ -211,9 +235,17 @@ async def actor_pool(request): address=pool.external_address, ) + subtask_manager_ref = await mo.create_actor( + MockSubtaskManagerActor, + session_id, + uid=SubtaskManagerActor.gen_uid(session_id), + address=pool.external_address, + ) + try: yield pool, session_id, meta_api, worker_meta_api, storage_api, execution_ref finally: + await mo.destroy_actor(subtask_manager_ref) await mo.destroy_actor(task_manager_ref) await mo.destroy_actor(band_slot_ref) await mo.destroy_actor(global_resource_ref) @@ -229,6 +261,9 @@ async def actor_pool(request): @pytest.mark.parametrize("actor_pool", [(1, True)], indirect=True) async def test_execute_tensor(actor_pool): pool, session_id, meta_api, worker_meta_api, storage_api, execution_ref = actor_pool + task_manager_ref = await mo.actor_ref( + TaskManagerActor.gen_uid(session_id), address=pool.external_address + ) data1 = np.random.rand(10, 10) data2 = np.random.rand(10, 10) @@ -268,6 +303,7 @@ async def test_execute_tensor(actor_pool): subtask = Subtask("test_subtask", session_id=session_id, chunk_graph=chunk_graph) await execution_ref.run_subtask(subtask, "numa-0", pool.external_address) + await task_manager_ref.wait_subtask(subtask.subtask_id) # check if results are correct result = await storage_api.get(result_chunk.key) @@ -306,6 +342,9 @@ async def test_execute_with_cancel(actor_pool, cancel_phase): pool, session_id, meta_api, worker_meta_api, storage_api, execution_ref = actor_pool delay_fetch_event = asyncio.Event() delay_wait_event = asyncio.Event() + task_manager_ref = await mo.actor_ref( + TaskManagerActor.gen_uid(session_id), address=pool.external_address + ) # config for different phases ref_to_delay = None @@ -361,7 +400,7 @@ def delay_fun(delay, _inp1): subtask = Subtask( f"test_subtask_{uuid.uuid4()}", session_id=session_id, chunk_graph=chunk_graph ) - aiotask = asyncio.create_task( + asyncio.create_task( execution_ref.run_subtask(subtask, "numa-0", pool.external_address) ) if ref_to_delay: @@ -375,7 +414,9 @@ def delay_fun(delay, _inp1): execution_ref.cancel_subtask(subtask.subtask_id, kill_timeout=1), timeout=30, ) - r = await asyncio.wait_for(aiotask, timeout=30) + r = await asyncio.wait_for( + task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30 + ) assert r.status == SubtaskStatus.cancelled assert timer.duration < 15 @@ -403,6 +444,9 @@ def delay_fun(delay, _inp1): @pytest.mark.parametrize("actor_pool", [(1, True)], indirect=True) async def test_execute_with_pure_deps(actor_pool): pool, session_id, meta_api, worker_meta_api, storage_api, execution_ref = actor_pool + task_manager_ref = await mo.actor_ref( + TaskManagerActor.gen_uid(session_id), address=pool.external_address + ) dep = TensorFetch(key="input1", dtype=np.dtype(int)).new_chunk([]) @@ -424,6 +468,9 @@ def main_fun(): ) # subtask shall run well without data of `dep` available await execution_ref.run_subtask(subtask, "numa-0", pool.external_address) + await asyncio.wait_for( + task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30 + ) res = await storage_api.get(remote_result.key) assert res == session_id @@ -476,6 +523,9 @@ async def test_cancel_without_kill(actor_pool): executed_file = os.path.join( tempfile.gettempdir(), f"mars_test_cancel_without_kill_{os.getpid()}.tmp" ) + task_manager_ref = await mo.actor_ref( + TaskManagerActor.gen_uid(session_id), address=pool.external_address + ) def delay_fun(delay): import mars @@ -499,7 +549,7 @@ def check_fun(): subtask = Subtask( f"test_subtask_{uuid.uuid4()}", session_id=session_id, chunk_graph=chunk_graph ) - aiotask = asyncio.create_task( + asyncio.create_task( execution_ref.run_subtask(subtask, "numa-0", pool.external_address) ) await asyncio.sleep(0.5) @@ -508,7 +558,9 @@ def check_fun(): execution_ref.cancel_subtask(subtask.subtask_id, kill_timeout=1), timeout=30, ) - r = await asyncio.wait_for(aiotask, timeout=30) + r = await asyncio.wait_for( + task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30 + ) assert r.status == SubtaskStatus.cancelled remote_result = RemoteFunction( @@ -523,6 +575,9 @@ def check_fun(): await asyncio.wait_for( execution_ref.run_subtask(subtask, "numa-0", pool.external_address), timeout=30 ) + await asyncio.wait_for( + task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30 + ) # check if slots not killed (or slot assignment may be cancelled) if os.path.exists(executed_file): diff --git a/mars/services/subtask/core.py b/mars/services/subtask/core.py index 2f1512819f..bdb9d43650 100644 --- a/mars/services/subtask/core.py +++ b/mars/services/subtask/core.py @@ -181,7 +181,8 @@ def update(self, result: Optional["SubtaskResult"]): if result and result.bands: bands = self.bands or [] self.bands = sorted(set(bands + result.bands)) - self.execution_start_time = result.execution_start_time + if hasattr(result, "execution_start_time"): + self.execution_start_time = result.execution_start_time if hasattr(result, "execution_end_time"): self.execution_end_time = result.execution_end_time return self diff --git a/mars/services/task/api/web.py b/mars/services/task/api/web.py index 8d498ca4c5..0afb5edf04 100644 --- a/mars/services/task/api/web.py +++ b/mars/services/task/api/web.py @@ -15,6 +15,7 @@ import asyncio import base64 import json +import logging from typing import Callable, List, Optional, Union from ....core import TileableGraph, Tileable @@ -253,7 +254,11 @@ async def get_task_progress(self, task_id: str) -> float: path = f"{self._address}/api/session/{self._session_id}/task/{task_id}" params = dict(action="progress") res = await self._request_url("GET", path, params=params) - return float(res.body.decode()) + try: + return float(res.body.decode()) + except ValueError: + logging.exception("Failed to handle content %r", res.body) + raise async def get_last_idle_time(self) -> Union[float, None]: path = f"{self._address}/api/session/{self._session_id}/task" diff --git a/mars/services/task/tests/test_service.py b/mars/services/task/tests/test_service.py index 5eed76977f..a0ae843d9e 100644 --- a/mars/services/task/tests/test_service.py +++ b/mars/services/task/tests/test_service.py @@ -264,6 +264,7 @@ def f1(): await asyncio.sleep(0.5) with Timer() as timer: await task_api.cancel_task(task_id) + await asyncio.sleep(0.5) result = await task_api.get_task_result(task_id) assert result.status == TaskStatus.terminated assert timer.duration < 20 diff --git a/mars/tests/test_cluster.py b/mars/tests/test_cluster.py index 7a352c9f35..ce21332da8 100644 --- a/mars/tests/test_cluster.py +++ b/mars/tests/test_cluster.py @@ -47,9 +47,19 @@ def _terminate(pid: int): continue +@pytest.fixture +def config_log(): + import logging + logging.basicConfig(level=logging.WARNING) + try: + yield + finally: + logging.basicConfig(level=logging.DEBUG) + + @flaky(max_runs=3) @pytest.mark.asyncio -async def test_cluster(): +async def test_cluster(config_log): port = get_next_port() web_port = get_next_port() supervisor_addr = f"127.0.0.1:{port}" diff --git a/mars/tests/test_utils.py b/mars/tests/test_utils.py index 4ed1dbce1f..3bdd4fdeb8 100644 --- a/mars/tests/test_utils.py +++ b/mars/tests/test_utils.py @@ -613,7 +613,7 @@ def __call__(self, *args, **kwargs): assert get_func_token_values(func) == [func] -@pytest.mark.parametrize("id_length", [0, 5, 32, 63]) +@pytest.mark.parametrize("id_length", [0, 5, 32, 63, 254]) def test_gen_random_id(id_length): rnd_id = utils.new_random_id(id_length) assert len(rnd_id) == id_length