Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write Tasks and SubWorkflows in Dynamic Job Spec #111

Merged
merged 5 commits into from
May 8, 2020
Merged

Conversation

EngHabu
Copy link
Collaborator

@EngHabu EngHabu commented May 8, 2020

TL;DR

Enable dynamic_tasks to yield dynamically generated workflows.

@inputs(task_input_num=Types.Integer)
@inputs(decider=Types.Boolean)
@outputs(out=Types.Integer)
@dynamic_task
def workflow_builder(wf_params, task_input_num, decider, out):
    wf_params.logging.info("Running inner task... yielding a code generated sub workflow")

    input_a = Input(Types.Integer, help="Tell me something")
    if decider:
        node1 = inverse_inner_task(num=input_a)
    else:
        node1 = inner_task(num=input_a)

    MyUnregisteredWorkflow = workflow(
        inputs={
            'a': input_a,
        },
        outputs={
            'ooo': Output(node1.outputs.out, sdk_type=Types.Integer, help='This is an integer output')
        },
        nodes={
            'node_one': node1,
        }
    )

    setattr(MyUnregisteredWorkflow, 'auto_assign_name', manual_assign_name)
    MyUnregisteredWorkflow._platform_valid_name = 'unregistered'

    unregistered_workflow_execution = MyUnregisteredWorkflow(a=task_input_num)

    yield unregistered_workflow_execution
    out.set(unregistered_workflow_execution.outputs.ooo)

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Tracking Issue

https://github.com/lyft/flyte/issues/

'a': input_a,
},
outputs={
'ooo': Output(node1.outputs.out1, sdk_type=Types.Integer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do things still work if this line isn't here? that is, if the sub workflow runs a task for side-effect, and has no outputs, and hence doesn't pass any outputs along through the workflow, will that task still be seen as an upstream entity?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unit test below to test that... seem to work..

@codecov-io
Copy link

Codecov Report

Merging #111 into master will increase coverage by 0.09%.
The diff coverage is 96.42%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #111      +/-   ##
==========================================
+ Coverage   81.68%   81.78%   +0.09%     
==========================================
  Files         209      209              
  Lines       13462    13529      +67     
  Branches     1120     1121       +1     
==========================================
+ Hits        10997    11065      +68     
+ Misses       2200     2199       -1     
  Partials      265      265              
Impacted Files Coverage Δ
flytekit/common/tasks/sdk_dynamic.py 96.29% <88.46%> (+1.34%) ⬆️
...ests/flytekit/unit/sdk/tasks/test_dynamic_tasks.py 86.11% <100.00%> (+9.36%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c125cd7...08df1cc. Read the comment docs.

@EngHabu EngHabu requested a review from wild-endeavor May 8, 2020 16:59
wild-endeavor
wild-endeavor previously approved these changes May 8, 2020
wild-endeavor
wild-endeavor previously approved these changes May 8, 2020
@wild-endeavor
Copy link
Contributor

This PR is tangentially related to flyteorg/flyte#246 (imo). However this PR here will not be deprecated by that work in order to allow the usage of unregistered tasks (and less ideally, the yielding of tasks in subworkflows which are registered, but which differ in structure. Haytham - please correct me if i'm wrong.

(related, can this be used as a hack to override resource constraints in tasks?)

@EngHabu EngHabu merged commit 6a322e7 into master May 8, 2020
max-hoffman pushed a commit to dolthub/flytekit that referenced this pull request May 11, 2021
* Write Tasks and SubWorkflows in Dynamic Job Spec

* Add a unit test with no outputs

* Merge master

* Correctly merge

* address PR comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants