Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write Tasks and SubWorkflows in Dynamic Job Spec #111

Merged
merged 5 commits into from
May 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import absolute_import

import flytekit.plugins

__version__ = '0.8.0b0'
__version__ = '0.8.0b1'
84 changes: 49 additions & 35 deletions flytekit/common/tasks/sdk_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
launch_plan as _launch_plan, workflow as _workflow
from flytekit.common.core import identifier as _identifier
from flytekit.common.exceptions import scopes as _exception_scopes
from flytekit.common.tasks import output as _task_output, sdk_runnable as _sdk_runnable
from flytekit.common.mixins import registerable as _registerable
from flytekit.common.tasks import output as _task_output, sdk_runnable as _sdk_runnable, task as _task
from flytekit.common.types import helpers as _type_helpers
from flytekit.common.utils import _dnsify
from flytekit.configuration import internal as _internal_config
from flytekit.models import literals as _literal_models, dynamic_job as _dynamic_job, array_job as _array_job
from flytekit.models.core import identifier as _identifier_model
from flytekit.common.mixins import registerable as _registerable
from flytekit.configuration import platform as _platform_config, internal as _internal_config


class PromiseOutputReference(_task_output.OutputReference):
Expand All @@ -39,6 +38,19 @@ def set(self, value):
self._raw_value = value


def _append_node(generated_files, node, nodes, sub_task_node):
nodes.append(node)
for k, node_output in _six.iteritems(sub_task_node.outputs):
if not node_output.sdk_node.id:
node_output.sdk_node.assign_id_and_return(node.id)

# Upload inputs to working directory under /array_job.input_ref/inputs.pb
input_path = _os.path.join(node.id, _constants.INPUT_FILE_NAME)
generated_files[input_path] = _literal_models.LiteralMap(
literals={binding.var: binding.binding.to_literal_model() for binding in
sub_task_node.inputs})


class SdkDynamicTask(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _sdk_runnable.SdkRunnableTask)):
"""
This class includes the additional logic for building a task that executes parent-child tasks in Python code. It
Expand Down Expand Up @@ -121,6 +133,20 @@ def _can_run_as_array(task_type):
"""
return task_type == _constants.SdkTaskType.PYTHON_TASK

@staticmethod
def _add_upstream_entities(executable_sdk_object, sub_workflows, tasks):
for upstream_entity in executable_sdk_object.upstream_entities:
# If the upstream entity is either a Workflow or a Task, yield them in the
# dynamic job spec. Otherwise (e.g. a LaunchPlan), we will assume it already
# is registered (can't be dynamically created). Thi will cause a runtime error
# if it's not already registered with the control plane.
if isinstance(upstream_entity, _workflow.SdkWorkflow):
sub_workflows.add(upstream_entity)
# Recursively discover all statically defined dependencies
SdkDynamicTask._add_upstream_entities(upstream_entity, sub_workflows, tasks)
elif isinstance(upstream_entity, _task.SdkTask):
tasks.add(upstream_entity)

def _produce_dynamic_job_spec(self, context, inputs):
"""
Runs user code and and produces future task nodes to run sub-tasks.
Expand Down Expand Up @@ -152,8 +178,9 @@ def _produce_dynamic_job_spec(self, context, inputs):
# Keeping future-tasks in original order. We don't use upstream_nodes exclusively because the parent task can
# yield sub-tasks that it never uses to produce final outputs but they need to execute nevertheless.
array_job_index = {}
tasks = []
tasks = set()
nodes = []
sub_workflows = set()
visited_nodes = set()
generated_ids = {}
effective_failure_ratio = self._allowed_failure_ratio or 0.0
Expand Down Expand Up @@ -190,20 +217,18 @@ def _produce_dynamic_job_spec(self, context, inputs):
new_count = generated_ids[safe_task_id] = 0
unique_node_id = _dnsify("{}-{}".format(safe_task_id, new_count))

# Handling cases where the yielded nodes are launch plan or subworkflow nodes
if isinstance(sub_task_node.executable_sdk_object, (_launch_plan.SdkLaunchPlan, _workflow.SdkWorkflow)):
# Handling case where the yielded node is launch plan
if isinstance(sub_task_node.executable_sdk_object, _launch_plan.SdkLaunchPlan):
node = sub_task_node.assign_id_and_return(unique_node_id)
nodes.append(node)
for k, node_output in _six.iteritems(sub_task_node.outputs):
if not node_output.sdk_node.id:
node_output.sdk_node.assign_id_and_return(node.id)

# Upload inputs to working directory under /array_job.input_ref/inputs.pb
input_path = _os.path.join(node.id, _constants.INPUT_FILE_NAME)
generated_files[input_path] = _literal_models.LiteralMap(
literals={binding.var: binding.binding.to_literal_model() for binding in
sub_task_node.inputs})

_append_node(generated_files, node, nodes, sub_task_node)
# Handling case where the yielded node is launching a sub-workflow
elif isinstance(sub_task_node.executable_sdk_object, _workflow.SdkWorkflow):
node = sub_task_node.assign_id_and_return(unique_node_id)
_append_node(generated_files, node, nodes, sub_task_node)
# Add the workflow itself to the yielded sub-workflows
sub_workflows.add(sub_task_node.executable_sdk_object)
# Recursively discover statically defined upstream entities (tasks, wfs)
SdkDynamicTask._add_upstream_entities(sub_task_node.executable_sdk_object, sub_workflows, tasks)
# Handling tasks
else:
# If the task can run as an array job, group its instances together. Otherwise, keep each
Expand Down Expand Up @@ -231,35 +256,24 @@ def _produce_dynamic_job_spec(self, context, inputs):
sub_task_node.inputs})
else:
node = sub_task_node.assign_id_and_return(unique_node_id)

tasks.append(sub_task_node.executable_sdk_object)
nodes.append(node)

for k, node_output in _six.iteritems(sub_task_node.outputs):
if not node_output.sdk_node.id:
node_output.sdk_node.assign_id_and_return(node.id)

# Upload inputs to working directory under /array_job.input_ref/inputs.pb
input_path = _os.path.join(node.id, _constants.INPUT_FILE_NAME)
generated_files[input_path] = _literal_models.LiteralMap(
literals={binding.var: binding.binding.to_literal_model() for binding in
sub_task_node.inputs})
tasks.add(sub_task_node.executable_sdk_object)
_append_node(generated_files, node, nodes, sub_task_node)

# assign custom field to the ArrayJob properties computed.
for task, (array_job, _) in _six.iteritems(array_job_index):
# TODO: Reconstruct task template object instead of modifying an existing one?
tasks.append(task.assign_custom_and_return(array_job.to_dict()).assign_type_and_return(
tasks.add(task.assign_custom_and_return(array_job.to_dict()).assign_type_and_return(
_constants.SdkTaskType.CONTAINER_ARRAY_TASK))

# min_successes is absolute, it's computed as the reverse of allowed_failure_ratio and multiplied by the
# total length of tasks to get an absolute count.
nodes.extend([array_job_node for (_, array_job_node) in array_job_index.values()])
dynamic_job_spec = _dynamic_job.DynamicJobSpec(
min_successes=len(nodes),
tasks=tasks,
tasks=list(tasks),
nodes=nodes,
outputs=output_bindings,
subworkflows=[])
subworkflows=list(sub_workflows))

return dynamic_job_spec, generated_files

Expand All @@ -280,7 +294,7 @@ def execute(self, context, inputs):
spec, generated_files = self._produce_dynamic_job_spec(context, inputs)

# If no sub-tasks are requested to run, just produce an outputs file like any other single-step tasks.
if len(generated_files) == 0:
if len(spec.nodes) == 0:
return {
_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(
literals={binding.var: binding.binding.to_literal_model() for binding in spec.outputs})
Expand Down
133 changes: 133 additions & 0 deletions tests/flytekit/unit/sdk/tasks/test_dynamic_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from flytekit.common.tasks import sdk_runnable as _sdk_runnable, sdk_dynamic as _sdk_dynamic
from flytekit.sdk.tasks import inputs, outputs, dynamic_task, python_task
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import Input, workflow, Output


@inputs(in1=Types.Integer)
Expand Down Expand Up @@ -109,6 +110,39 @@ def no_future_batch_task(wf_params, in1, out_str):
out_str.set(["res1", "res2"])


def manual_assign_name():
pass


@inputs(task_input_num=Types.Integer)
@outputs(out=Types.Integer)
@dynamic_task
def dynamic_wf_task(wf_params, task_input_num, out):
wf_params.logging.info("Running inner task... yielding a code generated sub workflow")

input_a = Input(Types.Integer, help="Tell me something")
node1 = sq_sub_task(in1=input_a)

MyUnregisteredWorkflow = workflow(
inputs={
'a': input_a,
},
outputs={
'ooo': Output(node1.outputs.out1, sdk_type=Types.Integer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do things still work if this line isn't here? that is, if the sub workflow runs a task for side-effect, and has no outputs, and hence doesn't pass any outputs along through the workflow, will that task still be seen as an upstream entity?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unit test below to test that... seem to work..

help='This is an integer output')
},
nodes={
'node_one': node1,
}
)

setattr(MyUnregisteredWorkflow, 'auto_assign_name', manual_assign_name)
MyUnregisteredWorkflow._platform_valid_name = 'unregistered'

unregistered_workflow_execution = MyUnregisteredWorkflow(a=task_input_num)
out.set(unregistered_workflow_execution.outputs.ooo)


def test_batch_task():
assert isinstance(sample_batch_task, _sdk_runnable.SdkRunnableTask)
assert isinstance(sample_batch_task, _sdk_dynamic.SdkDynamicTask)
Expand All @@ -131,3 +165,102 @@ def test_no_future_batch_task():

res = no_future_batch_task.unit_test(in1=3)
assert expected == res


def test_dynamic_workflow():
res = dynamic_wf_task.unit_test(task_input_num=2)
dynamic_spec = res["futures.pb"]
assert len(dynamic_spec.nodes) == 1
assert len(dynamic_spec.subworkflows) == 1
assert len(dynamic_spec.tasks) == 1


@inputs(task_input_num=Types.Integer)
@outputs(out=Types.Integer)
@dynamic_task
def nested_dynamic_wf_task(wf_params, task_input_num, out):
wf_params.logging.info("Running inner task... yielding a code generated sub workflow")

# Inner workflow
input_a = Input(Types.Integer, help="Tell me something")
node1 = sq_sub_task(in1=input_a)

MyUnregisteredWorkflowInner = workflow(
inputs={
'a': input_a,
},
outputs={
'ooo': Output(node1.outputs.out1, sdk_type=Types.Integer,
help='This is an integer output')
},
nodes={
'node_one': node1,
}
)

setattr(MyUnregisteredWorkflowInner, 'auto_assign_name', manual_assign_name)
MyUnregisteredWorkflowInner._platform_valid_name = 'unregistered'

# Output workflow
input_a = Input(Types.Integer, help="Tell me something")
node1 = MyUnregisteredWorkflowInner(a=task_input_num)

MyUnregisteredWorkflowOuter = workflow(
inputs={
'a': input_a,
},
outputs={
'ooo': Output(node1.outputs.ooo, sdk_type=Types.Integer,
help='This is an integer output')
},
nodes={
'node_one': node1,
}
)

setattr(MyUnregisteredWorkflowOuter, 'auto_assign_name', manual_assign_name)
MyUnregisteredWorkflowOuter._platform_valid_name = 'unregistered'

unregistered_workflow_execution = MyUnregisteredWorkflowOuter(a=task_input_num)
out.set(unregistered_workflow_execution.outputs.ooo)


def test_nested_dynamic_workflow():
res = nested_dynamic_wf_task.unit_test(task_input_num=2)
dynamic_spec = res["futures.pb"]
assert len(dynamic_spec.nodes) == 1
assert len(dynamic_spec.subworkflows) == 2
assert len(dynamic_spec.tasks) == 1


@inputs(task_input_num=Types.Integer)
@dynamic_task
def dynamic_wf_no_outputs_task(wf_params, task_input_num):
wf_params.logging.info("Running inner task... yielding a code generated sub workflow")

input_a = Input(Types.Integer, help="Tell me something")
node1 = sq_sub_task(in1=input_a)

MyUnregisteredWorkflow = workflow(
inputs={
'a': input_a,
},
outputs={},
nodes={
'node_one': node1,
}
)

setattr(MyUnregisteredWorkflow, 'auto_assign_name', manual_assign_name)
MyUnregisteredWorkflow._platform_valid_name = 'unregistered'

unregistered_workflow_execution = MyUnregisteredWorkflow(a=task_input_num)
yield unregistered_workflow_execution


def test_dynamic_workflow_no_outputs():
res = dynamic_wf_no_outputs_task.unit_test(task_input_num=2)
dynamic_spec = res["futures.pb"]
assert len(dynamic_spec.nodes) == 1
assert len(dynamic_spec.subworkflows) == 1
assert len(dynamic_spec.tasks) == 1