Skip to content

Commit

Permalink
Give on_task_instance_failed access to the error that caused the fa…
Browse files Browse the repository at this point in the history
…ilure (#38155)

* give on_task_instance_failed access to the error that caused the failure

* store error in thread-local storage && update sample dag

* add sample code to the listeners doc

* test that the error is accessible on callback

* fix stuff detected by static checks

* add _thread_local_data to TaskInstancePydantic as well

it's a possible type in handle_failure

* mention error recovery mechanism in doc

* replace previous solution with a new parameter
  • Loading branch information
vandonr authored Apr 12, 2024
1 parent 4a3caa2 commit 53dcbce
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 11 deletions.
7 changes: 6 additions & 1 deletion airflow/example_dags/plugins/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ def on_task_instance_success(previous_state: TaskInstanceState, task_instance: T

# [START howto_listen_ti_failure_task]
@hookimpl
def on_task_instance_failed(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
def on_task_instance_failed(
previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session
):
"""
This method is called when task state changes to FAILED.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
Expand All @@ -113,6 +115,8 @@ def on_task_instance_failed(previous_state: TaskInstanceState, task_instance: Ta

print(f"Task start:{start_date} end:{end_date} duration:{duration}")
print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
if error:
print(f"Failure caused by {error}")


# [END howto_listen_ti_failure_task]
Expand Down Expand Up @@ -146,6 +150,7 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
external_trigger = dag_run.external_trigger

print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
print(f"Failed with message: {msg}")


# [END howto_listen_dagrun_failure_task]
Expand Down
5 changes: 4 additions & 1 deletion airflow/listeners/spec/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def on_task_instance_success(

@hookspec
def on_task_instance_failed(
previous_state: TaskInstanceState | None, task_instance: TaskInstance, session: Session | None
previous_state: TaskInstanceState | None,
task_instance: TaskInstance,
error: None | str | BaseException,
session: Session | None,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
3 changes: 2 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,7 @@ class TaskInstance(Base, LoggingMixin):
cascade="all, delete, delete-orphan",
)
note = association_proxy("task_instance_note", "content", creator=_creator_note)

task: Operator | None = None
test_mode: bool = False
is_trigger_log_context: bool = False
Expand Down Expand Up @@ -2934,7 +2935,7 @@ def fetch_handle_failure_context(
):
"""Handle Failure for the TaskInstance."""
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, session=session
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error, session=session
)

if error:
Expand Down
42 changes: 39 additions & 3 deletions docs/apache-airflow/administration-and-deployment/listeners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,57 @@ Lifecycle events allow you to react to start and stop events for an Airflow ``Jo
DagRun State Change Events
--------------------------

DagRun state change events occur when a :class:`~airflow.models.dagrun.DagRun` changes state.

- ``on_dag_run_running``

.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
:language: python
:start-after: [START howto_listen_dagrun_running_task]
:end-before: [END howto_listen_dagrun_running_task]

- ``on_dag_run_success``

.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
:language: python
:start-after: [START howto_listen_dagrun_success_task]
:end-before: [END howto_listen_dagrun_success_task]

- ``on_dag_run_failed``

DagRun state change events occur when a ``DagRun`` changes state.
.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
:language: python
:start-after: [START howto_listen_dagrun_failure_task]
:end-before: [END howto_listen_dagrun_failure_task]


TaskInstance State Change Events
--------------------------------

TaskInstance state change events occur when a :class:`~airflow.models.taskinstance.TaskInstance` changes state.
You can use these events to react to ``LocalTaskJob`` state changes.

- ``on_task_instance_running``

.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
:language: python
:start-after: [START howto_listen_ti_running_task]
:end-before: [END howto_listen_ti_running_task]

- ``on_task_instance_success``

.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
:language: python
:start-after: [START howto_listen_ti_success_task]
:end-before: [END howto_listen_ti_success_task]

- ``on_task_instance_failed``

TaskInstance state change events occur when a ``TaskInstance`` changes state.
You can use these events to react to ``LocalTaskJob`` state changes.
.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
:language: python
:start-after: [START howto_listen_ti_failure_task]
:end-before: [END howto_listen_ti_failure_task]


Dataset Events
--------------
Expand Down
4 changes: 3 additions & 1 deletion tests/listeners/class_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def on_task_instance_success(self, previous_state, task_instance, session):
self.state.append(TaskInstanceState.SUCCESS)

@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, session):
def on_task_instance_failed(
self, previous_state, task_instance, error: None | str | BaseException, session
):
self.state.append(TaskInstanceState.FAILED)


Expand Down
4 changes: 3 additions & 1 deletion tests/listeners/file_write_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def on_task_instance_success(self, previous_state, task_instance, session):
self.write("on_task_instance_success")

@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, session):
def on_task_instance_failed(
self, previous_state, task_instance, error: None | str | BaseException, session
):
self.write("on_task_instance_failed")

@hookimpl
Expand Down
2 changes: 1 addition & 1 deletion tests/listeners/full_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def on_task_instance_success(previous_state, task_instance, session):


@hookimpl
def on_task_instance_failed(previous_state, task_instance, session):
def on_task_instance_failed(previous_state, task_instance, error: None | str | BaseException, session):
state.append(TaskInstanceState.FAILED)


Expand Down
16 changes: 14 additions & 2 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2882,6 +2882,11 @@ def test_handle_failure(self, create_dummy_dag, session=None):
start_date = timezone.datetime(2016, 6, 1)
clear_db_runs()

from airflow.listeners.listener import get_listener_manager

listener_callback_on_error = mock.MagicMock()
get_listener_manager().pm.hook.on_task_instance_failed = listener_callback_on_error

mock_on_failure_1 = mock.MagicMock()
mock_on_retry_1 = mock.MagicMock()
dag, task1 = create_dummy_dag(
Expand All @@ -2901,11 +2906,18 @@ def test_handle_failure(self, create_dummy_dag, session=None):
state=None,
session=session,
)

ti1 = dr.get_task_instance(task1.task_id, session=session)
ti1.task = task1

ti1.state = State.FAILED
ti1.handle_failure("test failure handling")
error_message = "test failure handling"
ti1.handle_failure(error_message)

# check that the listener callback was called, and that it can access the error
listener_callback_on_error.assert_called_once()
callback_args = listener_callback_on_error.call_args.kwargs
assert "error" in callback_args
assert callback_args["error"] == error_message

context_arg_1 = mock_on_failure_1.call_args.args[0]
assert context_arg_1
Expand Down

0 comments on commit 53dcbce

Please sign in to comment.