Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine and unify subtask detail APIs #2465

Merged
merged 4 commits into from
Sep 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions mars/services/task/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,12 @@ async def get_tileable_details(self, task_id: str):
async def get_tileable_subtasks(self,
task_id: str,
tileable_id: str,
with_input_output: bool,
with_info: bool,
with_dependency: bool):
with_input_output: bool):

return await self._task_manager_ref.get_tileable_subtasks(
task_id,
tileable_id,
with_input_output,
with_info,
with_dependency
with_input_output
)

async def wait_task(self, task_id: str, timeout: float = None):
Expand Down
19 changes: 4 additions & 15 deletions mars/services/task/api/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,9 @@ async def get_tileable_details(self, session_id: str, task_id: str):
@web_api('(?P<task_id>[^/]+)/(?P<tileable_id>[^/]+)/subtask', method='get')
async def get_tileable_subtasks(self, session_id: str, task_id: str, tileable_id: str):
with_input_output = (self.get_argument('with_input_output', 'false') == 'true')
with_info = (self.get_argument('with_info', 'false') == 'true')
with_dependency = (self.get_argument('with_dependency', 'false') == 'true')
oscar_api = await self._get_oscar_task_api(session_id)
res = await oscar_api.get_tileable_subtasks(task_id,
tileable_id,
with_input_output,
with_info,
with_dependency)
res = await oscar_api.get_tileable_subtasks(
task_id, tileable_id, with_input_output)
self.write(json.dumps(res))

@web_api('(?P<task_id>[^/]+)', method='get', arg_filter={'action': 'progress'})
Expand Down Expand Up @@ -245,19 +240,13 @@ async def get_tileable_details(self, task_id: str):
async def get_tileable_subtasks(self,
task_id: str,
tileable_id: str,
input_output_indicator: bool,
info_indicator: bool,
dependency_indicator: bool):
with_input_output: bool):

with_input_output = 'true' if input_output_indicator else 'false'
with_info = 'true' if info_indicator else 'false'
with_dependency = 'true' if dependency_indicator else 'false'
with_input_output = 'true' if with_input_output else 'false'
path = f'{self._address}/api/session/{self._session_id}/task/{task_id}/{tileable_id}/subtask'
params = {
'action': 'fetch_graph',
'with_input_output': with_input_output,
'with_info': with_info,
'with_dependency': with_dependency
}
res = await self._request_url(path=path, params=params, method='GET')
return json.loads(res.body.decode())
4 changes: 2 additions & 2 deletions mars/services/task/supervisor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,13 @@ async def get_tileable_details(self, task_id):

return await processor_ref.get_tileable_details()

async def get_tileable_subtasks(self, task_id, tileable_id, with_input_output, with_info, with_dependency):
async def get_tileable_subtasks(self, task_id, tileable_id, with_input_output):
try:
processor_ref = self._task_id_to_processor_ref[task_id]
except KeyError:
raise TaskNotExist(f'Task {task_id} does not exist')

return await processor_ref.get_tileable_subtasks(tileable_id, with_input_output, with_info, with_dependency)
return await processor_ref.get_tileable_subtasks(tileable_id, with_input_output)

async def _gen_tiled_context(self, graph: TileableGraph) -> \
Dict[TileableType, TileableType]:
Expand Down
141 changes: 44 additions & 97 deletions mars/services/task/supervisor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,117 +606,64 @@ def get_tileable_details(self):
}
return tileable_infos

def get_tileable_subtasks(self, tileable_id: str, with_input_output: bool, with_detail: bool, with_dependencies: bool):
requested_tileable = None
requested_subtasks = None

returned_subtasks = set()
input_subtask_ids = set()
output_subtask_ids = set()

subtask_list = []
dependency_list = []
def get_tileable_subtasks(self, tileable_id: str, with_input_output: bool):
returned_subtasks = dict()
subtask_id_to_types = dict()

subtask_details = dict()
default_result = SubtaskResult(progress=0.0, status=SubtaskStatus.pending)

subtask_graph = subtask_results = subtask_snapshots = None
for processor in self._task_id_to_processor.values():
for stage in processor.stage_processors:
if tileable_id in stage.tileable_id_to_tileable:
requested_tileable = stage.tileable_id_to_tileable.get(tileable_id)
requested_subtasks = stage.tileable_to_subtasks.get(requested_tileable, None).copy()
tileable = stage.tileable_id_to_tileable[tileable_id]
returned_subtasks = {
subtask.subtask_id: subtask
for subtask in stage.tileable_to_subtasks[tileable]
}
subtask_graph = stage.subtask_graph
subtask_results = stage.subtask_results
subtask_snapshots = stage.subtask_snapshots
break
if requested_subtasks is not None:
if returned_subtasks:
break

if requested_subtasks is None: # pragma: no cover
if not with_detail:
return {
'subtasks': [],
'dependencies': []
}
else:
return {}
if subtask_graph is None: # pragma: no cover
return {}

if with_input_output:
input_subtasks = []
output_subtasks = []

for subtask in requested_subtasks:
returned_subtasks.add(subtask.subtask_id)

for subtask in requested_subtasks:
for predecessor in stage.subtask_graph.iter_predecessors(subtask):
predecessor_id = predecessor.subtask_id

if predecessor_id not in returned_subtasks and predecessor_id not in input_subtask_ids:
input_subtask_ids.add(predecessor.subtask_id)
input_subtasks.append(predecessor)

for successor in stage.subtask_graph.iter_successors(subtask):
successor_id = successor.subtask_id

if successor_id not in returned_subtasks and successor_id not in output_subtask_ids:
output_subtask_ids.add(successor_id)
output_subtasks.append(successor)

requested_subtasks.extend(input_subtasks)
requested_subtasks.extend(output_subtasks)
returned_subtasks = set()

for subtask in requested_subtasks:
subtask_id = subtask.subtask_id

if with_detail and subtask_id not in returned_subtasks:
returned_subtasks.add(subtask_id)

subtask_result = stage.subtask_results.get(subtask, default_result)
progress = subtask_result.progress
status = subtask_result.status.value

if subtask_id not in input_subtask_ids and subtask_id not in output_subtask_ids:
subtask_details[subtask.subtask_id] = {
'status': status,
'progress': progress,
'name': subtask.subtask_name,
'nodeType': 'Calculation',
}
elif subtask_id in input_subtask_ids:
subtask_details[subtask.subtask_id] = {
'nodeType': 'Input',
}
else:
subtask_details[subtask.subtask_id] = {
'nodeType': 'Output',
}

elif subtask.subtask_id not in returned_subtasks: # pragma: no cover
returned_subtasks.add(subtask.subtask_id)

subtask_list.append({
'subtaskId': subtask.subtask_id,
'subtaskName': subtask.subtask_name,
})

if with_dependencies:
for subtask in requested_subtasks:
for predecessor in stage.subtask_graph.iter_predecessors(subtask):
predecessor_id = predecessor.subtask_id

if predecessor_id in returned_subtasks:
dependency_list.append({
'fromSubtaskId': predecessor_id,
'toSubtaskId': subtask.subtask_id,
})

if with_detail:
return subtask_details
else:
return {
'subtasks': subtask_list,
'dependencies': dependency_list
for subtask in list(returned_subtasks.values()):
for pred in subtask_graph.iter_predecessors(subtask):
if pred.subtask_id in returned_subtasks: # pragma: no cover
continue
returned_subtasks[pred.subtask_id] = pred
subtask_id_to_types[pred.subtask_id] = 'Input'
for succ in subtask_graph.iter_successors(subtask):
if succ.subtask_id in returned_subtasks: # pragma: no cover
continue
returned_subtasks[succ.subtask_id] = succ
subtask_id_to_types[succ.subtask_id] = 'Output'

for subtask in returned_subtasks.values():
subtask_result = subtask_results.get(
subtask, subtask_snapshots.get(subtask, default_result)
)
subtask_details[subtask.subtask_id] = {
'name': subtask.subtask_name,
'status': subtask_result.status.value,
'progress': subtask_result.progress,
'nodeType': subtask_id_to_types.get(subtask.subtask_id, 'Calculation'),
}

for subtask in returned_subtasks.values():
pred_ids = []
for pred in subtask_graph.iter_predecessors(subtask):
if pred.subtask_id in returned_subtasks:
pred_ids.append(pred.subtask_id)
subtask_details[subtask.subtask_id]['fromSubtaskIds'] = pred_ids
return subtask_details

def get_result_tileable(self, tileable_key: str):
processor = list(self._task_id_to_processor.values())[-1]
tileable_graph = processor.tileable_graph
Expand Down
Loading