diff --git a/airflow/sentry.py b/airflow/sentry.py index 340b660934b45..a6b2adf891fd5 100644 --- a/airflow/sentry.py +++ b/airflow/sentry.py @@ -144,11 +144,14 @@ def add_breadcrumbs(self, task_instance, session=None): sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info") def enrich_errors(self, func): - """Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs.""" + """ + Wrap TaskInstance._run_raw_task and LocalTaskJob._run_mini_scheduler_on_child_tasks + to support task specific tags and breadcrumbs. + """ session_args_idx = find_session_idx(func) @wraps(func) - def wrapper(task_instance, *args, **kwargs): + def wrapper(_self, *args, **kwargs): # Wrapping the _run_raw_task function with push_scope to contain # tags and breadcrumbs to a specific Task Instance @@ -159,8 +162,14 @@ def wrapper(task_instance, *args, **kwargs): with sentry_sdk.push_scope(): try: - return func(task_instance, *args, **kwargs) + return func(_self, *args, **kwargs) except Exception as e: + # Is a LocalTaskJob get the task instance + if hasattr(_self, 'task_instance'): + task_instance = _self.task_instance + else: + task_instance = _self + self.add_tagging(task_instance) self.add_breadcrumbs(task_instance, session=session) sentry_sdk.capture_exception(e)