Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give on_task_instance_failed access to the error that caused the failure #38155

Merged
merged 10 commits into from
Apr 12, 2024

Conversation

vandonr
Copy link
Contributor

@vandonr vandonr commented Mar 14, 2024

I was going to ask this as a question, but decided to do it as a PR instead given how simple the change is.

I was writing a listener, and it seems extremely hard to get some information on the error in the on_task_instance_failed callback, because the error is not passed as a parameter to the callback itself 😞

It's written to the context a bit further down, but we don't have that yet when the callback is called.

We cannot add an extra parameter now because it'd be a breaking change, but what do you think about storing the error in the TaskInstance object before calling on_task_instance_failed ? It'd be a pretty cheap way to solve that issue (if it's one!). We don't need to persist that in DB or anything, it just needs to carry the value to the method call that just follows, seems simple enough ?

@vandonr vandonr requested review from kaxil, XD-DENG and ashb as code owners March 14, 2024 17:08
Copy link

boring-cyborg bot commented Mar 14, 2024

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@vandonr vandonr changed the title give on_task_instance_failed access to the error that caused the failure Give on_task_instance_failed access to the error that caused the failure Mar 14, 2024
@vandonr vandonr changed the title Give on_task_instance_failed access to the error that caused the failure Give on_task_instance_failed access to the error that caused the failure Mar 14, 2024
@potiuk
Copy link
Member

potiuk commented Mar 20, 2024

This would be rather hacky. Likely better to add a new method on_task_instanc_failed_with_error

@vandonr
Copy link
Contributor Author

vandonr commented Mar 20, 2024

This would be rather hacky. Likely better to add a new method on_task_instanc_failed_with_error

surely possible, but it doesn't look very clean... And even if we obsolete the other method, we'll be stuck with the weird name.

Would it make it less hacky if the field was added as a proper class field to TaskInstance ?

@potiuk
Copy link
Member

potiuk commented Mar 20, 2024

This would be rather hacky. Likely better to add a new method on_task_instanc_failed_with_error

surely possible, but it doesn't look very clean... And even if we obsolete the other method, we'll be stuck with the weird name.

Would it make it less hacky if the field was added as a proper class field to TaskInstance ?

TaskInstance is a Data Model. We should not add extra fields there which are not related to DataModel. We have done that in the past and it was not good.. That really is hacky.

@ephraimbuddy
Copy link
Contributor

This would be rather hacky. Likely better to add a new method on_task_instanc_failed_with_error

surely possible, but it doesn't look very clean... And even if we obsolete the other method, we'll be stuck with the weird name.
Would it make it less hacky if the field was added as a proper class field to TaskInstance ?

TaskInstance is a Data Model. We should not add extra fields there which are not related to DataModel. We have done that in the past and it was not good.. That really is hacky.

I had an idea for this type of stuff but it seemed disruptive - have a state data model that includes a message field in which we can store errors like this https://github.com/apache/airflow/pull/37896/files. I think, that's the best way to capture these kinds of errors seamlessly.

@vandonr
Copy link
Contributor Author

vandonr commented Mar 21, 2024

Even though this feature is experimental, the openlineage provider relies on it, so I don't think we can just go ahead and change it

def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, session):

@potiuk
Copy link
Member

potiuk commented Mar 22, 2024

Even though this feature is experimental, the openlineage provider relies on it, so I don't think we can just go ahead and change it

Agreed. That's the hard nut to crack. But maybe other idea. How about you put the error message in "thread local" variable and allow to retrieve it from there using a new public API call (get_last_task_failure- and add documentation about it). Maybe not a nicest API, but it could be backwards compatible and will allow us to extend the callbacks in the future.

@vandonr
Copy link
Contributor Author

vandonr commented Mar 28, 2024

using a new public API call (get_last_task_failure- and add documentation about it).

by "new API" you meant just a method on the class, right ?
Is there more doc to update besides the sample dag and the docstring for the method itself ?

@potiuk
Copy link
Member

potiuk commented Apr 2, 2024

using a new public API call (get_last_task_failure- and add documentation about it).

by "new API" you meant just a method on the class, right ? Is there more doc to update besides the sample dag and the docstring for the method itself ?

I guess the documentation about listeners https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/listeners.html - it should have a section on "how to retrieve error" - and yes - ideally it should include the relevant excerpt from an example dag (which will serve both as a way to test it end-to-end and serve as a documentation 'extract'.

Generally everey "feature" in airflow has some documentation that aims (and sometimes fail) to explain the feature for the user in a more "howto" fashion. That's the goal we strive for

@vandonr
Copy link
Contributor Author

vandonr commented Apr 3, 2024

I guess the documentation about listeners https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/listeners.html - it should have a section on "how to retrieve error" - and yes - ideally it should include the relevant excerpt from an example dag

Given the fact that this documentation is very minimalistic for now, what do you think about handling this PR as just adding the capability to retrieve the error (which would already be somewhat visible to users through auto-completion), and I can write an other PR to add the howto_listen_ti_XXX to the doc you mentioned, which would self-document how to retrieve the error.

@potiuk
Copy link
Member

potiuk commented Apr 3, 2024

I am not a big fan of splitting PRs like that. If we decide to add something, then big part of the change is to tell the users other than author how to use it. That's a big difference between implementing somethign for "me" vs. for "all users".

And there is very practical reason to have it in one PR - if we decide to cherry-pick that one to 2.9.0 or 2.9.1 (for example) , we will have to remember about cherry-picking yet another PR with documentation. This is not something that makes release manager's life easy and, really not a release manager's job to track and remember about those related PRs that should also be cherry-picked. If you imagine that release manager has 60 or 70 PRs to cherry-pick, choosing and cherry-picking them is enough of a task to not add a burden to go through all of them look through all the discussions in PR and see "hey there is also that 2nd PR with documentation that I have to cherry-pick as well". This is largely impossible task to do if you put yourself in the release manager shoes. Comparing to that - adding a paragraph or so documentation to the PR while it is being worked on is far more focused work that just "makes sense" to be done together.

@potiuk
Copy link
Member

potiuk commented Apr 3, 2024

This is also why this one should also have a unit test added before we merge - because then when we cherry-pick that one, we should cherry-pick it with the unit test, which is is the only way we can see if there is also a conflict or another reason why the cherry-picked change does not work in the other branch.

@potiuk
Copy link
Member

potiuk commented Apr 3, 2024

Also one other thing - the smaller and the more isolated the change is, the more important it is to have both unit tests and documentation. When you are working on a bigger change, which is split across multiple PRs - those changes are targetting future airflow versions and there tests and documentation are even supposed to be split after the bigger change takes shape. Simply - those changes will never be cherry-picked.

This one, on the other hand - has quite a big chance to be cherry-picked to 2.9* - possibly even to 2.9.0 if we merge it before rc2 (there will be an RC) or even to 2.9.1 or later - while technically it adds, a feature, we can classify it as a bugfix that fixes a missing feature in listener. So that's why it's more important to have this one complete with tests and docs, because it's very likely we will cherry-pick it.

@vandonr vandonr requested a review from potiuk as a code owner April 3, 2024 14:34
@mobuchowski
Copy link
Contributor

mobuchowski commented Apr 9, 2024

@vandonr @potiuk listeners (outside of dataset listeners) are non-experimental as of 2.9: #36376 - however pluggy allows to add arguments without breaking impls: https://pluggy.readthedocs.io/en/latest/#opt-in-arguments - does this fit into non-experimental label?

My understanding is that with removing experimental label, the only moment when we can remove the particular listener interface is Airflow 3. There's nothing that prevents us from adding arguments to the listener APIs if it does not break listeners conforming to previous spec - and it is not the case here.

@potiuk
Copy link
Member

potiuk commented Apr 9, 2024

@vandonr @potiuk listeners (outside of dataset listeners) are non-experimental as of 2.9: #36376 - however pluggy allows to add arguments without breaking impls: https://pluggy.readthedocs.io/en/latest/#opt-in-arguments - does this fit into non-experimental label?

I thin that's the best solution if it can be done. Do I understand, that when you change the spec, and add a new field, the message/event can be send with a new argument and if your implementation does not accept it, it will not receive it? If so then I think that solves the problem in the best way.

@mobuchowski
Copy link
Contributor

mobuchowski commented Apr 9, 2024

@potiuk to make sure we fully understand: it means that when the plugin hook gets called and your listener do not implement the argument, it does get called, but just does not receive the particular argument

import sys
from pluggy import PluginManager, HookspecMarker, HookimplMarker


hookspec = HookspecMarker("myproject")
hookimpl = HookimplMarker("myproject")


@hookspec
def myhook(config, arg):
    pass

class Plugin1:
    @hookimpl
    def myhook(self, arg):
        print("hook without config", arg)

class Plugin2:
    @hookimpl
    def myhook(self, config, arg):
        print("hook with config", config, arg)


pm = PluginManager("myproject")

# load from the local module's namespace
pm.add_hookspecs(sys.modules[__name__])

pm.register(Plugin1())
pm.register(Plugin2())

pm.hook.myhook(config="cfg", arg="qwer")

prints both

hook with config cfg qwer
hook without config qwer

@vandonr
Copy link
Contributor Author

vandonr commented Apr 9, 2024

ha, that's very nice and solves most issues 😎
I didn't know it was handled so well by pluggy !
I'll make that change tomorrow :)

@vandonr
Copy link
Contributor Author

vandonr commented Apr 10, 2024

I didn't change the signature of the existing openlinage listener

def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, session):

because pluggy supports having too many parameters, but it crashes when calling a method with too few params, so we have to keep the old signature as not to break compatibility of newer provider package versions with older airflow versions.

@vandonr
Copy link
Contributor Author

vandonr commented Apr 12, 2024

@mobuchowski do you want to review the change now ?
(also, I need someone to click the button to run the CI plz)

@mobuchowski
Copy link
Contributor

@raphaelauv sorry for leaving you like that. The solution is good, we can add some internal runtime checking of Airflow version to the OL provider later if we want to use that field.

@mobuchowski mobuchowski merged commit 53dcbce into apache:main Apr 12, 2024
41 checks passed
Copy link

boring-cyborg bot commented Apr 12, 2024

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:improvement Changelog: Improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants