Skip to content

Commit

Permalink
Refine and unify subtask detail APIs (mars-project#2465)
Browse files Browse the repository at this point in the history
  • Loading branch information
RandomY-2 authored and wjsi committed Sep 19, 2021
1 parent cf6ed93 commit fc2271d
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 396 deletions.
8 changes: 2 additions & 6 deletions mars/services/task/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,12 @@ async def get_tileable_details(self, task_id: str):
async def get_tileable_subtasks(self,
task_id: str,
tileable_id: str,
with_input_output: bool,
with_info: bool,
with_dependency: bool):
with_input_output: bool):

return await self._task_manager_ref.get_tileable_subtasks(
task_id,
tileable_id,
with_input_output,
with_info,
with_dependency
with_input_output
)

async def wait_task(self, task_id: str, timeout: float = None):
Expand Down
19 changes: 4 additions & 15 deletions mars/services/task/api/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,9 @@ async def get_tileable_details(self, session_id: str, task_id: str):
@web_api('(?P<task_id>[^/]+)/(?P<tileable_id>[^/]+)/subtask', method='get')
async def get_tileable_subtasks(self, session_id: str, task_id: str, tileable_id: str):
with_input_output = (self.get_argument('with_input_output', 'false') == 'true')
with_info = (self.get_argument('with_info', 'false') == 'true')
with_dependency = (self.get_argument('with_dependency', 'false') == 'true')
oscar_api = await self._get_oscar_task_api(session_id)
res = await oscar_api.get_tileable_subtasks(task_id,
tileable_id,
with_input_output,
with_info,
with_dependency)
res = await oscar_api.get_tileable_subtasks(
task_id, tileable_id, with_input_output)
self.write(json.dumps(res))

@web_api('(?P<task_id>[^/]+)', method='get', arg_filter={'action': 'progress'})
Expand Down Expand Up @@ -245,19 +240,13 @@ async def get_tileable_details(self, task_id: str):
async def get_tileable_subtasks(self,
task_id: str,
tileable_id: str,
input_output_indicator: bool,
info_indicator: bool,
dependency_indicator: bool):
with_input_output: bool):

with_input_output = 'true' if input_output_indicator else 'false'
with_info = 'true' if info_indicator else 'false'
with_dependency = 'true' if dependency_indicator else 'false'
with_input_output = 'true' if with_input_output else 'false'
path = f'{self._address}/api/session/{self._session_id}/task/{task_id}/{tileable_id}/subtask'
params = {
'action': 'fetch_graph',
'with_input_output': with_input_output,
'with_info': with_info,
'with_dependency': with_dependency
}
res = await self._request_url(path=path, params=params, method='GET')
return json.loads(res.body.decode())
4 changes: 2 additions & 2 deletions mars/services/task/supervisor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,13 @@ async def get_tileable_details(self, task_id):

return await processor_ref.get_tileable_details()

async def get_tileable_subtasks(self, task_id, tileable_id, with_input_output, with_info, with_dependency):
async def get_tileable_subtasks(self, task_id, tileable_id, with_input_output):
try:
processor_ref = self._task_id_to_processor_ref[task_id]
except KeyError:
raise TaskNotExist(f'Task {task_id} does not exist')

return await processor_ref.get_tileable_subtasks(tileable_id, with_input_output, with_info, with_dependency)
return await processor_ref.get_tileable_subtasks(tileable_id, with_input_output)

async def _gen_tiled_context(self, graph: TileableGraph) -> \
Dict[TileableType, TileableType]:
Expand Down
141 changes: 44 additions & 97 deletions mars/services/task/supervisor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,117 +606,64 @@ def get_tileable_details(self):
}
return tileable_infos

def get_tileable_subtasks(self, tileable_id: str, with_input_output: bool, with_detail: bool, with_dependencies: bool):
requested_tileable = None
requested_subtasks = None

returned_subtasks = set()
input_subtask_ids = set()
output_subtask_ids = set()

subtask_list = []
dependency_list = []
def get_tileable_subtasks(self, tileable_id: str, with_input_output: bool):
returned_subtasks = dict()
subtask_id_to_types = dict()

subtask_details = dict()
default_result = SubtaskResult(progress=0.0, status=SubtaskStatus.pending)

subtask_graph = subtask_results = subtask_snapshots = None
for processor in self._task_id_to_processor.values():
for stage in processor.stage_processors:
if tileable_id in stage.tileable_id_to_tileable:
requested_tileable = stage.tileable_id_to_tileable.get(tileable_id)
requested_subtasks = stage.tileable_to_subtasks.get(requested_tileable, None).copy()
tileable = stage.tileable_id_to_tileable[tileable_id]
returned_subtasks = {
subtask.subtask_id: subtask
for subtask in stage.tileable_to_subtasks[tileable]
}
subtask_graph = stage.subtask_graph
subtask_results = stage.subtask_results
subtask_snapshots = stage.subtask_snapshots
break
if requested_subtasks is not None:
if returned_subtasks:
break

if requested_subtasks is None: # pragma: no cover
if not with_detail:
return {
'subtasks': [],
'dependencies': []
}
else:
return {}
if subtask_graph is None: # pragma: no cover
return {}

if with_input_output:
input_subtasks = []
output_subtasks = []

for subtask in requested_subtasks:
returned_subtasks.add(subtask.subtask_id)

for subtask in requested_subtasks:
for predecessor in stage.subtask_graph.iter_predecessors(subtask):
predecessor_id = predecessor.subtask_id

if predecessor_id not in returned_subtasks and predecessor_id not in input_subtask_ids:
input_subtask_ids.add(predecessor.subtask_id)
input_subtasks.append(predecessor)

for successor in stage.subtask_graph.iter_successors(subtask):
successor_id = successor.subtask_id

if successor_id not in returned_subtasks and successor_id not in output_subtask_ids:
output_subtask_ids.add(successor_id)
output_subtasks.append(successor)

requested_subtasks.extend(input_subtasks)
requested_subtasks.extend(output_subtasks)
returned_subtasks = set()

for subtask in requested_subtasks:
subtask_id = subtask.subtask_id

if with_detail and subtask_id not in returned_subtasks:
returned_subtasks.add(subtask_id)

subtask_result = stage.subtask_results.get(subtask, default_result)
progress = subtask_result.progress
status = subtask_result.status.value

if subtask_id not in input_subtask_ids and subtask_id not in output_subtask_ids:
subtask_details[subtask.subtask_id] = {
'status': status,
'progress': progress,
'name': subtask.subtask_name,
'nodeType': 'Calculation',
}
elif subtask_id in input_subtask_ids:
subtask_details[subtask.subtask_id] = {
'nodeType': 'Input',
}
else:
subtask_details[subtask.subtask_id] = {
'nodeType': 'Output',
}

elif subtask.subtask_id not in returned_subtasks: # pragma: no cover
returned_subtasks.add(subtask.subtask_id)

subtask_list.append({
'subtaskId': subtask.subtask_id,
'subtaskName': subtask.subtask_name,
})

if with_dependencies:
for subtask in requested_subtasks:
for predecessor in stage.subtask_graph.iter_predecessors(subtask):
predecessor_id = predecessor.subtask_id

if predecessor_id in returned_subtasks:
dependency_list.append({
'fromSubtaskId': predecessor_id,
'toSubtaskId': subtask.subtask_id,
})

if with_detail:
return subtask_details
else:
return {
'subtasks': subtask_list,
'dependencies': dependency_list
for subtask in list(returned_subtasks.values()):
for pred in subtask_graph.iter_predecessors(subtask):
if pred.subtask_id in returned_subtasks: # pragma: no cover
continue
returned_subtasks[pred.subtask_id] = pred
subtask_id_to_types[pred.subtask_id] = 'Input'
for succ in subtask_graph.iter_successors(subtask):
if succ.subtask_id in returned_subtasks: # pragma: no cover
continue
returned_subtasks[succ.subtask_id] = succ
subtask_id_to_types[succ.subtask_id] = 'Output'

for subtask in returned_subtasks.values():
subtask_result = subtask_results.get(
subtask, subtask_snapshots.get(subtask, default_result)
)
subtask_details[subtask.subtask_id] = {
'name': subtask.subtask_name,
'status': subtask_result.status.value,
'progress': subtask_result.progress,
'nodeType': subtask_id_to_types.get(subtask.subtask_id, 'Calculation'),
}

for subtask in returned_subtasks.values():
pred_ids = []
for pred in subtask_graph.iter_predecessors(subtask):
if pred.subtask_id in returned_subtasks:
pred_ids.append(pred.subtask_id)
subtask_details[subtask.subtask_id]['fromSubtaskIds'] = pred_ids
return subtask_details

def get_result_tileable(self, tileable_key: str):
processor = list(self._task_id_to_processor.values())[-1]
tileable_graph = processor.tileable_graph
Expand Down
Loading

0 comments on commit fc2271d

Please sign in to comment.