Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
EngHabu committed May 8, 2020
1 parent 5fb85a7 commit 0780c0e
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions flytekit/common/tasks/sdk_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ def _add_upstream_entities(executable_sdk_object, sub_workflows, tasks):
# is registered (can't be dynamically created). Thi will cause a runtime error
# if it's not already registered with the control plane.
if isinstance(upstream_entity, _workflow.SdkWorkflow):
sub_workflows.append(upstream_entity)
sub_workflows.add(upstream_entity)
# Recursively discover all statically defined dependencies
SdkDynamicTask._add_upstream_entities(upstream_entity, sub_workflows, tasks)
elif isinstance(upstream_entity, _task.SdkTask):
tasks.append(upstream_entity)
tasks.add(upstream_entity)

def _produce_dynamic_job_spec(self, context, inputs):
"""
Expand Down Expand Up @@ -178,9 +178,9 @@ def _produce_dynamic_job_spec(self, context, inputs):
# Keeping future-tasks in original order. We don't use upstream_nodes exclusively because the parent task can
# yield sub-tasks that it never uses to produce final outputs but they need to execute nevertheless.
array_job_index = {}
tasks = []
tasks = set()
nodes = []
sub_workflows = []
sub_workflows = set()
visited_nodes = set()
generated_ids = {}
effective_failure_ratio = self._allowed_failure_ratio or 0.0
Expand Down Expand Up @@ -226,7 +226,7 @@ def _produce_dynamic_job_spec(self, context, inputs):
node = sub_task_node.assign_id_and_return(unique_node_id)
_append_node(generated_files, node, nodes, sub_task_node)
# Add the workflow itself to the yielded sub-workflows
sub_workflows.append(sub_task_node.executable_sdk_object)
sub_workflows.add(sub_task_node.executable_sdk_object)
# Recursively discover statically defined upstream entities (tasks, wfs)
SdkDynamicTask._add_upstream_entities(sub_task_node.executable_sdk_object, sub_workflows, tasks)
# Handling tasks
Expand Down Expand Up @@ -256,24 +256,24 @@ def _produce_dynamic_job_spec(self, context, inputs):
sub_task_node.inputs})
else:
node = sub_task_node.assign_id_and_return(unique_node_id)
tasks.append(sub_task_node.executable_sdk_object)
tasks.add(sub_task_node.executable_sdk_object)
_append_node(generated_files, node, nodes, sub_task_node)

# assign custom field to the ArrayJob properties computed.
for task, (array_job, _) in _six.iteritems(array_job_index):
# TODO: Reconstruct task template object instead of modifying an existing one?
tasks.append(task.assign_custom_and_return(array_job.to_dict()).assign_type_and_return(
tasks.add(task.assign_custom_and_return(array_job.to_dict()).assign_type_and_return(
_constants.SdkTaskType.CONTAINER_ARRAY_TASK))

# min_successes is absolute, it's computed as the reverse of allowed_failure_ratio and multiplied by the
# total length of tasks to get an absolute count.
nodes.extend([array_job_node for (_, array_job_node) in array_job_index.values()])
dynamic_job_spec = _dynamic_job.DynamicJobSpec(
min_successes=len(nodes),
tasks=tasks,
tasks=list(tasks),
nodes=nodes,
outputs=output_bindings,
subworkflows=sub_workflows)
subworkflows=list(sub_workflows))

return dynamic_job_spec, generated_files

Expand Down

0 comments on commit 0780c0e

Please sign in to comment.