Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core Feature] Allow Some Dynamic Sub-workflows to Fail #878

Open
michaels-lyft opened this issue Mar 30, 2021 · 21 comments
Open

[Core Feature] Allow Some Dynamic Sub-workflows to Fail #878

michaels-lyft opened this issue Mar 30, 2021 · 21 comments
Labels
enhancement New feature or request stale

Comments

@michaels-lyft
Copy link

Motivation: Why do you think this is important?
We have a lot of scenarios where there's a dynamic_task that executes a list of sub-workflows or sub-dynamic_tasks. Today, when one of these sub-executions fail, it would cause the parent dynamic_task to fail even with FAIL_AFTER_EXECUTABLE_NODES_COMPLETE enabled on the parent workflow or allowed_failure_ratio enabled on the dynamic_task (which only works with sub-python_tasks). This aborts any of the dependent downstream tasks automatically. But sometimes, we would like to allow certain % of the tasks to fail since we may not always need 100% of the output.

Goal: What should the final outcome look like, ideally?
There should be an option to allow a % of the sub-workflows or sub-dynamic_tasks to fail.

Describe alternatives you've considered
Gone through the discussions at #191. Tried using FAIL_AFTER_EXECUTABLE_NODES_COMPLETE and allowed_failure_ratio but neither works in this scenario.

[Optional] Propose: Link/Inline OR Additional context
I can think of two ways to do this:

  1. Expand allowed_failure_ratio to include sub-workflows and sub-dynamic_tasks
  2. Add something similar to a try-catch in the dynamic_task that we could use to catch any sub-execution failures and make the parent task succeed (as suggested here).
@michaels-lyft michaels-lyft added enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers labels Mar 30, 2021
@giorgiawillits
Copy link

giorgiawillits commented May 19, 2021

+1 to this feature

we have many use-cases for a dynamic_task that executes a list of subworkflows or subtasks that are all independent of each other, so there's no reason for all of them to get aborted when one has an error

example: workflow A kicks off a subworkflow for each region's computation
subworkflows: [SFO, LAX, BKN, CHI]
if BKN & CHI subworkflows fail, we could kick off a new run of workflow A for only [BKN, CHI] regions, because all other regions' subworkflow was run to completion in the original run (rather than possibly aborted)
this would make it easier & faster to fix when something goes wrong

@kumare3
Copy link
Contributor

kumare3 commented May 20, 2021

@giorgiawillits / @michaels-lyft, I want to understand the usecase a little better.

As you guys say that the top workflow should continue even if a child workflow fails as they are independent.
Do you think it is ok for the top workflow to just launch the child workflow and do not wait for completion- I.e async trigger the workflows and continue or complete itself. The child workflows are then like detached processes

IMO this is a cleaner feature then adding weird behavior to the parent. The reason to wait today is because the parent can consume the outputs. We can probably easily model asyn workflow triggers, where outputs are not available.

@EngHabu / @kanterov what do you guys think

@michaels-lyft
Copy link
Author

Thanks for following up on this @kumare3 !

For my use case, we would actually like a downstream workflow to consume the output of the parent workflow. The reason we are okay with some sub-workflows/sub-tasks failing is because we split work by date and we are okay with missing data from some dates as long as there's enough data at the end.

With the async solution, as long as there's an API to wait and "join" all the detached processes then it should be fine for our use case.

@kumare3
Copy link
Contributor

kumare3 commented May 20, 2021

@michaels-lyft as a follow up would you be ok doing this yourself in a regular python task, or you prefer this to be an idiomatic solution in the programming models

@michaels-lyft
Copy link
Author

Unfortunately we can't migrate the sub-workflow to a regular python task since the workflow spins out several sub-tasks and sub-dynamic-tasks... Could this be implemented idiomatically so that allowed_failure_ratio would work for sub-workflows or sub-dynamic-tasks?

@kanterov
Copy link
Contributor

kanterov commented May 22, 2021

We have a similar use-case, but haven't yet implemented it. Our expectation was that if we put FAIL_AFTER_EXECUTABLE_NODES_COMPLETE workflow execution will try to make as much progress as it can. If any node fails, all independent nodes should be attempted. In the case of sub-workflow, all nodes that depend on WorkflowNode should be skipped, but all independent nodes should be attempted. In our case async execution is not an option because we care about completion of each sub-workflow.

Example:
parent workflow is:

A --> B -> C
|
+---> D

B is sub-workflow consisting of:

B-a -> B-b -> B-c
|
+----> B-d

If B-b fails, B-d and D should be attempted because they don't depend on B-b, and B-c and C are skipped as their upstream node has failed

@kumare3
Copy link
Contributor

kumare3 commented May 23, 2021

@kanterov / @michaels-lyft this is an interesting discussion. @michaels-lyft FAIL_AFTER_EXECUTABLE_NODES_COMPLETE does not work for you, because you actually want the downstream of a failed workflow-node to continue to work - is that right? The problem is without the outputs, how can this happen?

Closely reading and understanding. - I think @kanterov your execution should work. but @michaels-lyft I think for you a try-catch is the right behavior. And interestingly the backend supports it. We need to add it to flytekit.

Whats your current state, can you use the new flytekit interface?

@wild-endeavor

@michaels-lyft
Copy link
Author

michaels-lyft commented May 23, 2021

Whats your current state, can you use the new flytekit interface?

Yes a try-catch would work for our use case! We have fully migrated from modelbuilder to flytekit (current on v0.15.2). We should be able to upgrade the version to the latest once this feature is implemented.

The problem is without the outputs, how can this happen?

The workflow fans out jobs split by date and hour. We don't need data for every hour of every day to have enough data for the downstream job. As long as a certain % of the jobs succeed, we can start the downstream. The reason some jobs might fail could either be due to lack of data for that particular hour or flyte infra issues.

@kumare3

@kumare3 kumare3 added this to the 0.16.0 milestone Jun 7, 2021
@kumare3
Copy link
Contributor

kumare3 commented Jun 7, 2021

@michaels-lyft we have started the exploration for this feature in current milestone, but, we will deliver it in the next milestone

@kumare3 kumare3 removed the untriaged This issues has not yet been looked at by the Maintainers label Jun 7, 2021
@michaels-lyft
Copy link
Author

@kumare3 Thanks for the update! How long is a milestone?

@kumare3
Copy link
Contributor

kumare3 commented Jun 7, 2021

1 milestone is month long

@EngHabu EngHabu modified the milestones: 0.16.0, 0.17.0 Aug 2, 2021
@michaels-lyft
Copy link
Author

Hi @kumare3 Just wondering if there's an update on this issue?

@EngHabu
Copy link
Contributor

EngHabu commented Aug 18, 2021

Hey @michaels-lyft,

I wanted to clarify whether the try-finally behavior proposed will solve your problem. The behavior we are proposing is something like this:

@task
def my_error_handler(...):
   ...

@workflow(on_error=my_error_handler)
def my_wf():
   ...

If my_wf() fails at any point during execution, it'll call my_error_handler() task and will pass some context (error info... etc.) to allow it to handle the error. The expectation is that my_error_handler() would do things like clean up resources, log/send customized notifications... etc. The thing it will NOT let you do is recover from failure... The execution of this workflow will still fail, be marked as failure and upstream callers will still be notified of its failure.

An example of sub-workflows:

@workflow(on_error=my_error_handler)
def my_sub_wf():
    ...

@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def my_parent_wf() -> str:
   n1 = my_sub_wf()
   n2 = my_sub_wf()
   n3 = my_sub_wf()

   return n3.out1

In this case, my_parent_wf will continue running even if any of the nodes fails. The overall status of the execution will again be marked as failure but it'll let as many nodes as possible to execute... Whenever my_sub_wf fails, it'll invoke an instance of my_error_handler task to cleanup resources... etc.

Note that on_error is not implemented while failure_policy is...

@michaels-lyft
Copy link
Author

Hi @EngHabu, thanks for the proposal! I don't think it would work for our use case since downstream workflows rely on this workflow to be marked as successful in order for it to be executed. If this ended up being the implementation for try-catch, then I think we would need option 1 in the description in order for our use case to work.

I can think of a very hacky workaround using the proposed try-catch implementation though. It would to set the downstream workflow as the on_error task. But it would still be a bad user experience in the end since in our use case it should be considered as a successful run rather than a failure.

@EngHabu
Copy link
Contributor

EngHabu commented Aug 31, 2021

Hey @michaels-lyft, can you elaborate on how will you implement that function to make it handle the error and return a successful run? specially if you have a workflow that returns an output. Like this:

@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def my_parent_wf() -> str:
   n1 = my_sub_wf()
   n2 = my_sub_wf()
   n3 = my_sub_wf()

   return n3.out1

Let's say it fails in executing n2. How will you handle the error and populate the final output of the workflow?

@EngHabu EngHabu removed this from the 0.17.0 milestone Aug 31, 2021
@michaels-lyft
Copy link
Author

Our workflow looks more like:

@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def my_parent_wf() -> str:
   ...
   my_sub_task()         # my_sub_task does not return results, it performs a side-effect (eg. dump data into a datastore)
   n = my_sub_wf()       # my_sub_wf does not require all data from my_sub_task (it only needs ~80%)
   return n.out

@dynamic_task(allow_failure_ratio=0.8)
def my_sub_task():
  ...
  for i in inputs:
    yield my_sub_wf2(i)

@workflow
def my_sub_wf2():
  my_sub_task2()
  my_sub_task3()
  ...

Ideally, the allow_failure_ratio setting should work here. But it doesn't because it's yielding sub-workflows rather than sub-python-task. If it worked, then our use case would be addressed.

If the above approach doesn't work (ie. requires more work to fix), then I think the try-except scenario would work as long as there's a recover() functionality that we can specify for my_sub_task to not be marked as failure. For example, it could look like:

from flytekit import recover

@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def my_parent_wf() -> str:
   ...
   my_sub_task()         # my_sub_task does not return results, it performs a side-effect (eg. dump data into a datastore)
   n = my_sub_wf()       # my_sub_wf does not require all data from my_sub_task (it only needs ~80%)
   return n.out

@dynamic_task(on_error= my_error_handler)
def my_sub_task():
  ...
  for i in inputs:
    yield my_sub_wf2(i)

@workflow
def my_sub_wf2():
  my_sub_task2()
  my_sub_task3()
  ...

@task
def my_error_handler(...):
  if condition is okay:
    recover()       # mark task as not failed
  # proceed to mark task as failed

@kumare3
Copy link
Contributor

kumare3 commented Sep 20, 2021

converting the error handler to a separate task

@kumare3
Copy link
Contributor

kumare3 commented Oct 13, 2021

@michaels-lyft you can launch subworkflows and then decide to fail or succeed without needing any additional work from our team. It just needs to follow some rules

  1. your logic for launching the workflows should be deterministic, i.e. you should be able to generate the same set of workflows in the same order
  2. you should be able to name the workflows deterministically.

If above 2 conditions are met then, you should be able to launch the workflows using a python task and Flytekit.Remote in the new generally available flytekit. OR using the legacy flytekit SDKWorkflow() API, just ensuring that you name the executions correctly. You should also set a large number of retries for this task. One problem is that the launched workflows wont be easily tied to the parent workflow, so I suggest naming them accordingly, also within the bounds. Something like execid-{i} i=0,1,2....n

An example of doing so would be (in the new API)

@task 
def foo():
     remote = flytekit.remote()
     lp = remote.fetch_launch_plan(...)
     e1 = remote.execute(lp, inputs, name)
     ...
     e1.wait()
     e2.wait()
     remote.execute(lp, inputs=e1.outputs...)

This is completely ok, sadly the parallelizing and failure detection falls on you, but this is resilient.

In the future we will be supporting a new task-type that will allow one to run this and we will automatically provide futures to ensure that things run and we can wait correctly and data dependency is automatically propagated.

ping me if you have any questions.

cc @EngHabu

@github-actions
Copy link

Hello 👋, This issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will close the issue if we detect no activity in the next 7 days. Thank you for your contribution and understanding! 🙏

@github-actions github-actions bot added the stale label Aug 26, 2023
@github-actions
Copy link

github-actions bot commented Sep 2, 2023

Hello 👋, This issue has been inactive for over 9 months and hasn't received any updates since it was marked as stale. We'll be closing this issue for now, but if you believe this issue is still relevant, please feel free to reopen it. Thank you for your contribution and understanding! 🙏

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Sep 2, 2023
@eapolinario eapolinario reopened this Nov 2, 2023
Copy link

Hello 👋, this issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will engage on it to decide if it is still applicable.
Thank you for your contribution and understanding! 🙏

@github-actions github-actions bot added the stale label Jul 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request stale
Projects
None yet
Development

No branches or pull requests

6 participants