Skip to content

Commit

Permalink
Remove email conigs, operator and callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala committed Jan 25, 2025
1 parent dd2252d commit 4e7df6e
Show file tree
Hide file tree
Showing 32 changed files with 99 additions and 1,548 deletions.
7 changes: 0 additions & 7 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2672,13 +2672,6 @@ paths:
base_log_folder = /opt/airflow/logs
[smtp]
smtp_host = localhost
smtp_mail_from = [email protected]
'
'401':
content:
Expand Down
4 changes: 0 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@
[core]
dags_folder = /opt/airflow/dags
base_log_folder = /opt/airflow/logs
[smtp]
smtp_host = localhost
smtp_mail_from = [email protected]
"""
),
},
Expand Down
4 changes: 0 additions & 4 deletions airflow/config_templates/unit_tests.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ allowed_deserialization_classes = airflow.* tests.*
# celery tests rely on it being set
celery_logging_level = INFO

[smtp]
# Used as default values for SMTP unit tests
smtp_mail_from = [email protected]

[api]
auth_backends = airflow.providers.fab.auth_manager.api.auth.backend.session

Expand Down
7 changes: 0 additions & 7 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,6 @@ def inversed_deprecated_sections(self):
"navbar_color": (re2.compile(r"(?i)\A#007A87\z"), "#fff", "2.1"),
"dag_default_view": (re2.compile(r"^tree$"), "grid", "3.0"),
},
"email": {
"email_backend": (
re2.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"),
r"airflow.providers.sendgrid.utils.emailer.send_email",
"2.1",
),
},
"logging": {
"log_filename_template": (
re2.compile(re2.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")),
Expand Down
13 changes: 9 additions & 4 deletions airflow/example_dags/example_dag_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
from airflow.operators.email import EmailOperator
from airflow.providers.smtp.operators.smtp import EmailOperator

if TYPE_CHECKING:
from airflow.sdk.definitions.context import Context
Expand All @@ -48,11 +48,12 @@ def execute(self, context: Context):
catchup=False,
tags=["example"],
)
def example_dag_decorator(email: str = "[email protected]"):
def example_dag_decorator(to_email: str = "example1@example.com", from_email: str = "example2@example.com"):
"""
DAG to send server IP to email.
:param email: Email to send IP to. Defaults to [email protected].
:param to_email: Email to send IP to. Defaults to [email protected].
:param from_email: Email to send IP from. Defaults to [email protected]
"""
get_ip = GetRequestOperator(task_id="get_ip", url="http://httpbin.org/get")

Expand All @@ -67,7 +68,11 @@ def prepare_email(raw_json: dict[str, Any]) -> dict[str, str]:
email_info = prepare_email(get_ip.output)

EmailOperator(
task_id="send_email", to=email, subject=email_info["subject"], html_content=email_info["body"]
task_id="send_email",
to=to_email,
from_email=from_email,
subject=email_info["subject"],
html_content=email_info["body"],
)


Expand Down
3 changes: 0 additions & 3 deletions airflow/example_dags/tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
Expand Down
7 changes: 0 additions & 7 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,6 @@ class derived from this one results in the creation of a task object,
:param task_id: a unique, meaningful id for the task
:param owner: the owner of the task. Using a meaningful description
(e.g. user/person/team/role name) to clarify ownership is recommended.
:param email: the 'to' email address(es) used in email alerts. This can be a
single email or multiple ones. Multiple addresses can be specified as a
comma or semicolon separated string or by passing a list of strings.
:param email_on_retry: Indicates whether email alerts should be sent when a
task is retried
:param email_on_failure: Indicates whether email alerts should be sent when
a task failed
:param retries: the number of retries that should be performed before
failing the task
:param retry_delay: delay between retries, can be set as ``timedelta`` or
Expand Down
153 changes: 3 additions & 150 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import itertools
import logging
import math
import operator
import os
import signal
import traceback
Expand Down Expand Up @@ -106,7 +105,6 @@
from airflow.models.xcom import LazyXComSelectSequence, XCom
from airflow.plugins_manager import integrate_macros_plugins
from airflow.sdk.api.datamodels._generated import AssetProfile
from airflow.sdk.definitions._internal.templater import SandboxedEnvironment
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUniqueKey, AssetUriRef
from airflow.sdk.definitions.taskgroup import MappedTaskGroup
from airflow.sentry import Sentry
Expand All @@ -123,10 +121,8 @@
OutletEventAccessors,
VariableAccessor,
context_get_outlet_events,
context_merge,
)
from airflow.utils.email import send_email
from airflow.utils.helpers import prune_dict, render_template_to_string
from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.operator_helpers import ExecutionCallableRunner, context_to_airflow_vars
Expand Down Expand Up @@ -1117,15 +1113,6 @@ def _handle_failure(
)

_log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "")
if (
failure_context["task"]
and failure_context["email_for_state"](failure_context["task"])
and failure_context["task"].email
):
try:
task_instance.email_alert(error, failure_context["task"])
except Exception:
log.exception("Failed to send email to: %s", failure_context["task"].email)

if failure_context["callbacks"] and failure_context["context"]:
_run_finished_callback(
Expand Down Expand Up @@ -1317,116 +1304,6 @@ def _get_previous_start_date(
return pendulum.instance(prev_ti.start_date) if prev_ti and prev_ti.start_date else None


def _email_alert(*, task_instance: TaskInstance, exception, task: BaseOperator) -> None:
"""
Send alert email with exception information.
:param task_instance: the task instance
:param exception: the exception
:param task: task related to the exception
:meta private:
"""
subject, html_content, html_content_err = task_instance.get_email_subject_content(exception, task=task)
if TYPE_CHECKING:
assert task.email
try:
send_email(task.email, subject, html_content)
except Exception:
send_email(task.email, subject, html_content_err)


def _get_email_subject_content(
*,
task_instance: TaskInstance,
exception: BaseException,
task: BaseOperator | None = None,
) -> tuple[str, str, str]:
"""
Get the email subject content for exceptions.
:param task_instance: the task instance
:param exception: the exception sent in the email
:param task:
:meta private:
"""
# For a ti from DB (without ti.task), return the default value
if task is None:
task = getattr(task_instance, "task")
use_default = task is None
exception_html = str(exception).replace("\n", "<br>")

default_subject = "Airflow alert: {{ti}}"
# For reporting purposes, we report based on 1-indexed,
# not 0-indexed lists (i.e. Try 1 instead of
# Try 0 for the first attempt).
default_html_content = (
"Try {{try_number}} out of {{max_tries + 1}}<br>"
"Exception:<br>{{exception_html}}<br>"
'Log: <a href="{{ti.log_url}}">Link</a><br>'
"Host: {{ti.hostname}}<br>"
'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
)

default_html_content_err = (
"Try {{try_number}} out of {{max_tries + 1}}<br>"
"Exception:<br>Failed attempt to attach error logs<br>"
'Log: <a href="{{ti.log_url}}">Link</a><br>'
"Host: {{ti.hostname}}<br>"
'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
)

additional_context: dict[str, Any] = {
"exception": exception,
"exception_html": exception_html,
"try_number": task_instance.try_number,
"max_tries": task_instance.max_tries,
}

if use_default:
default_context = {"ti": task_instance, **additional_context}
jinja_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True
)
subject = jinja_env.from_string(default_subject).render(**default_context)
html_content = jinja_env.from_string(default_html_content).render(**default_context)
html_content_err = jinja_env.from_string(default_html_content_err).render(**default_context)

else:
if TYPE_CHECKING:
assert task_instance.task

# Use the DAG's get_template_env() to set force_sandboxed. Don't add
# the flag to the function on task object -- that function can be
# overridden, and adding a flag breaks backward compatibility.
dag = task_instance.task.get_dag()
if dag:
jinja_env = dag.get_template_env(force_sandboxed=True)
else:
jinja_env = SandboxedEnvironment(cache_size=0)
jinja_context = task_instance.get_template_context()
context_merge(jinja_context, additional_context)

def render(key: str, content: str) -> str:
if conf.has_option("email", key):
path = conf.get_mandatory_value("email", key)
try:
with open(path) as f:
content = f.read()
except FileNotFoundError:
log.warning("Could not find email template file '%s'. Using defaults...", path)
except OSError:
log.exception("Error while using email template %s. Using defaults...", path)
return render_template_to_string(jinja_env.from_string(content), jinja_context)

subject = render("subject_template", default_subject)
html_content = render("html_content_template", default_html_content)
html_content_err = render("html_content_template", default_html_content_err)

return subject, html_content, html_content_err


def _run_finished_callback(
*,
callbacks: None | TaskStateChangeCallback | list[TaskStateChangeCallback],
Expand Down Expand Up @@ -3131,8 +3008,7 @@ def fetch_handle_failure_context(
if context is not None:
context["exception"] = error

# Set state correctly and figure out how to log it and decide whether
# to email
# Set state correctly and figure out how to log it

# Note, callback invocation needs to be handled by caller of
# _run_raw_task to avoid race conditions which could lead to duplicate
Expand All @@ -3150,11 +3026,10 @@ def fetch_handle_failure_context(
assert isinstance(ti.task, BaseOperator)
task = ti.task.unmap((context, session))
except Exception:
cls.logger().error("Unable to unmap task to determine if we need to send an alert email")
cls.logger().error("Unable to unmap task to determine what callback to use")

if force_fail or not ti.is_eligible_to_retry():
ti.state = TaskInstanceState.FAILED
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None

if task and fail_fast:
Expand All @@ -3169,7 +3044,6 @@ def fetch_handle_failure_context(
TaskInstanceHistory.record_ti(ti, session=session)

ti.state = State.UP_FOR_RETRY
email_for_state = operator.attrgetter("email_on_retry")
callbacks = task.on_retry_callback if task else None

get_listener_manager().hook.on_task_instance_failed(
Expand All @@ -3178,7 +3052,6 @@ def fetch_handle_failure_context(

return {
"ti": ti,
"email_for_state": email_for_state,
"task": task,
"callbacks": callbacks,
"context": context,
Expand Down Expand Up @@ -3326,26 +3199,6 @@ def render_templates(

return original_task

def get_email_subject_content(
self, exception: BaseException, task: BaseOperator | None = None
) -> tuple[str, str, str]:
"""
Get the email subject content for exceptions.
:param exception: the exception sent in the email
:param task:
"""
return _get_email_subject_content(task_instance=self, exception=exception, task=task)

def email_alert(self, exception, task: BaseOperator) -> None:
"""
Send alert email with exception information.
:param exception: the exception
:param task: task related to the exception
"""
_email_alert(task_instance=self, exception=exception, task=task)

def set_duration(self) -> None:
"""Set task instance duration."""
_set_duration(task_instance=self)
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def submit_failure(cls, trigger_id, exc=None, session: Session = NEW_SESSION) ->
the runtime code understands as immediate-fail, and pack the error into
next_kwargs.
TODO: Once we have shifted callback (and email) handling to run on
TODO: Once we have shifted callback handling to run on
workers as first-class concepts, we can run the failure code here
in-process, but we can't do that right now.
"""
Expand Down
Loading

0 comments on commit 4e7df6e

Please sign in to comment.