Skip to content

Commit

Permalink
Add web API for scheduling (#2533)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored Oct 20, 2021
1 parent 19dcd80 commit e2b3eab
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 19 deletions.
15 changes: 15 additions & 0 deletions mars/services/scheduling/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .oscar import SchedulingAPI, MockSchedulingAPI
38 changes: 38 additions & 0 deletions mars/services/scheduling/api/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import List, Optional

from ..core import SubtaskScheduleSummary


class AbstractSchedulingAPI(ABC):
@abstractmethod
def get_subtask_schedule_summaries(
self,
task_id: Optional[str] = None
) -> List[SubtaskScheduleSummary]:
"""
Get details of scheduling for tasks
Parameters
----------
task_id
Returns
-------
details
List of details for subtasks
"""
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC
from typing import List, Optional, Tuple, Type, TypeVar, Union

from ... import oscar as mo
from ...lib.aio import alru_cache
from ..subtask import Subtask
from .... import oscar as mo
from ....lib.aio import alru_cache
from ...subtask import Subtask
from ..core import SubtaskScheduleSummary
from .core import AbstractSchedulingAPI

APIType = TypeVar('APIType', bound='SchedulingAPI')


class SchedulingAPI(ABC):
class SchedulingAPI(AbstractSchedulingAPI):
def __init__(self, session_id: str, address: str,
manager_ref=None, queueing_ref=None):
self._session_id = session_id
Expand All @@ -36,11 +37,11 @@ def __init__(self, session_id: str, address: str,
async def create(cls: Type[APIType],
session_id: str,
address: str) -> APIType:
from .supervisor.manager import SubtaskManagerActor
from ..supervisor.manager import SubtaskManagerActor
manager_ref = await mo.actor_ref(
SubtaskManagerActor.gen_uid(session_id), address=address
)
from .supervisor.queueing import SubtaskQueueingActor
from ..supervisor.queueing import SubtaskQueueingActor
queueing_ref = await mo.actor_ref(
SubtaskQueueingActor.gen_uid(session_id), address=address
)
Expand All @@ -49,6 +50,12 @@ async def create(cls: Type[APIType],
session_id, address, manager_ref, queueing_ref)
return scheduling_api

async def get_subtask_schedule_summaries(
self,
task_id: Optional[str] = None
) -> List[SubtaskScheduleSummary]:
return await self._manager_ref.get_schedule_summaries(task_id)

async def add_subtasks(self,
subtasks: List[Subtask],
priorities: Optional[List[Tuple]] = None):
Expand Down Expand Up @@ -126,16 +133,16 @@ class MockSchedulingAPI(SchedulingAPI):
async def create(cls: Type[APIType],
session_id: str,
address: str) -> APIType:
from .supervisor import GlobalSlotManagerActor, AutoscalerActor
from ..supervisor import GlobalSlotManagerActor, AutoscalerActor
await mo.create_actor(GlobalSlotManagerActor,
uid=GlobalSlotManagerActor.default_uid(),
address=address)
await mo.create_actor(AutoscalerActor, {},
uid=AutoscalerActor.default_uid(),
address=address)

from ... import resource as mars_resource
from .worker import SubtaskExecutionActor, \
from .... import resource as mars_resource
from ..worker import SubtaskExecutionActor, \
WorkerSlotManagerActor, WorkerQuotaManagerActor
await mo.create_actor(SubtaskExecutionActor,
subtask_max_retries=0,
Expand All @@ -149,7 +156,7 @@ async def create(cls: Type[APIType],
uid=WorkerQuotaManagerActor.default_uid(),
address=address)

from .supervisor import SchedulingSupervisorService
from ..supervisor import SchedulingSupervisorService
service = SchedulingSupervisorService({}, address)
await service.create_session(session_id)
return await super().create(session_id, address)
100 changes: 100 additions & 0 deletions mars/services/scheduling/api/web.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from typing import List, Optional

from ....lib.aio import alru_cache
from ...web import web_api, MarsServiceWebAPIHandler, MarsWebAPIClientMixin
from ..core import SubtaskScheduleSummary
from .core import AbstractSchedulingAPI


class SchedulingWebAPIHandler(MarsServiceWebAPIHandler):
_root_pattern = '/api/session/(?P<session_id>[^/]+)/scheduling'

@alru_cache(cache_exceptions=False)
async def _get_cluster_api(self):
from ...cluster import ClusterAPI
return await ClusterAPI.create(self._supervisor_addr)

@alru_cache(cache_exceptions=False)
async def _get_oscar_scheduling_api(self, session_id: str):
from ..api import SchedulingAPI
cluster_api = await self._get_cluster_api()
[address] = await cluster_api.get_supervisors_by_keys([session_id])
return await SchedulingAPI.create(session_id, address)

@web_api('subtasks', method='get')
async def get_subtask_schedule_summaries(self, session_id: str):
oscar_api = await self._get_oscar_scheduling_api(session_id)
task_id = self.get_argument('task_id', None) or None

result = await oscar_api.get_subtask_schedule_summaries(task_id)
self.write(json.dumps({
summary.subtask_id: {
"task_id": summary.task_id,
"subtask_id": summary.subtask_id,
"bands": [
{
"endpoint": band[0],
"band_name": band[1],
}
for band in summary.bands
],
"num_reschedules": summary.num_reschedules,
"is_finished": summary.is_finished,
"is_cancelled": summary.is_cancelled,
}
for summary in result
}))


web_handlers = {
SchedulingWebAPIHandler.get_root_pattern(): SchedulingWebAPIHandler
}


class WebSchedulingAPI(AbstractSchedulingAPI, MarsWebAPIClientMixin):
def __init__(self,
session_id: str,
address: str):
self._session_id = session_id
self._address = address.rstrip('/')

async def get_subtask_schedule_summaries(
self,
task_id: Optional[str] = None
) -> List[SubtaskScheduleSummary]:
task_id = task_id or ""
path = f'{self._address}/api/session/{self._session_id}/scheduling/subtasks' \
f'?task_id={task_id}'

res = await self._request_url('GET', path)
res_json = json.loads(res.body)

return [
SubtaskScheduleSummary(
task_id=summary_json["task_id"],
subtask_id=summary_json["subtask_id"],
bands=[
(band_json["endpoint"], band_json["band_name"])
for band_json in summary_json["bands"]
],
num_reschedules=summary_json["num_reschedules"],
is_finished=summary_json["is_finished"],
is_cancelled=summary_json["is_cancelled"],
)
for summary_json in res_json.values()
]
29 changes: 29 additions & 0 deletions mars/services/scheduling/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List

from ...serialization.serializables import Serializable, FieldTypes, \
StringField, ListField, BoolField, Int32Field
from ...typing import BandType


class SubtaskScheduleSummary(Serializable):
task_id: str = StringField('task_id')
subtask_id: str = StringField('subtask_id')
bands: List[BandType] = ListField(
'bands', FieldTypes.tuple(FieldTypes.string))
is_finished: bool = BoolField('is_finished', default=False)
is_cancelled: bool = BoolField('is_cancelled', default=False)
num_reschedules: int = Int32Field('num_reschedules', default=0)
44 changes: 38 additions & 6 deletions mars/services/scheduling/supervisor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ....utils import dataslots
from ...subtask import Subtask, SubtaskResult, SubtaskStatus
from ...task import TaskAPI
from ..core import SubtaskScheduleSummary
from ..utils import redirect_subtask_errors

logger = logging.getLogger(__name__)
Expand All @@ -42,17 +43,29 @@ class SubtaskScheduleInfo:
max_reschedules: int = 0
num_reschedules: int = 0

def to_summary(self, **kwargs) -> SubtaskScheduleSummary:
return SubtaskScheduleSummary(
task_id=self.subtask.task_id,
subtask_id=self.subtask.subtask_id,
bands=list(self.band_futures.keys()),
num_reschedules=self.num_reschedules,
**kwargs
)


class SubtaskManagerActor(mo.Actor):
_subtask_infos: Dict[str, SubtaskScheduleInfo] # key is subtask id
_subtask_infos: Dict[str, SubtaskScheduleInfo] # subtask id -> schedule info
_subtask_summaries: Dict[str, SubtaskScheduleSummary] # subtask id -> summary

@classmethod
def gen_uid(cls, session_id: str):
return f'{session_id}_subtask_manager'

def __init__(self, session_id: str, subtask_max_reschedules: int = DEFAULT_SUBTASK_MAX_RESCHEDULES):
def __init__(self, session_id: str,
subtask_max_reschedules: int = DEFAULT_SUBTASK_MAX_RESCHEDULES):
self._session_id = session_id
self._subtask_infos = dict()
self._subtask_summaries = dict()
self._subtask_max_reschedules = subtask_max_reschedules

self._queueing_ref = None
Expand Down Expand Up @@ -104,9 +117,11 @@ async def finish_subtasks(self, subtask_ids: List[str], schedule_next: bool = Tr
band_tasks = defaultdict(lambda: 0)
for subtask_id in subtask_ids:
subtask_info = self._subtask_infos.pop(subtask_id, None)
if schedule_next and subtask_info is not None:
for band in subtask_info.band_futures.keys():
band_tasks[band] += 1
if subtask_info is not None:
self._subtask_summaries[subtask_id] = subtask_info.to_summary(is_finished=True)
if schedule_next:
for band in subtask_info.band_futures.keys():
band_tasks[band] += 1

if band_tasks:
coros = []
Expand Down Expand Up @@ -214,4 +229,21 @@ async def cancel_single_task(subtask, raw_tasks, cancel_tasks):
yield asyncio.wait(single_cancel_tasks)

for subtask_id in subtask_ids:
self._subtask_infos.pop(subtask_id, None)
subtask_info = self._subtask_infos.pop(subtask_id, None)
if subtask_info is not None:
self._subtask_summaries[subtask_id] = subtask_info.to_summary(
is_finished=True, is_cancelled=True
)

def get_schedule_summaries(self, task_id: Optional[str] = None):
if task_id is not None:
summaries = {
subtask_id: summary for subtask_id, summary in self._subtask_summaries.items()
if summary.task_id == task_id
}
else:
summaries = dict(self._subtask_summaries)
for info in self._subtask_infos.values():
if task_id is None or info.subtask.task_id == task_id:
summaries[info.subtask.subtask_id] = info.to_summary()
return list(summaries.values())
Loading

0 comments on commit e2b3eab

Please sign in to comment.