diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 6075d0c886ef2..f2385278c2980 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -2672,13 +2672,6 @@ paths:
base_log_folder = /opt/airflow/logs
-
- [smtp]
-
- smtp_host = localhost
-
- smtp_mail_from = airflow@example.com
-
'
'401':
content:
diff --git a/airflow/api_fastapi/core_api/routes/public/config.py b/airflow/api_fastapi/core_api/routes/public/config.py
index 13edf5f821b4c..70110fc6462e1 100644
--- a/airflow/api_fastapi/core_api/routes/public/config.py
+++ b/airflow/api_fastapi/core_api/routes/public/config.py
@@ -56,10 +56,6 @@
[core]
dags_folder = /opt/airflow/dags
base_log_folder = /opt/airflow/logs
-
- [smtp]
- smtp_host = localhost
- smtp_mail_from = airflow@example.com
"""
),
},
diff --git a/airflow/config_templates/unit_tests.cfg b/airflow/config_templates/unit_tests.cfg
index 9e222ce550459..f4c09b3266a13 100644
--- a/airflow/config_templates/unit_tests.cfg
+++ b/airflow/config_templates/unit_tests.cfg
@@ -66,10 +66,6 @@ allowed_deserialization_classes = airflow.* tests.*
# celery tests rely on it being set
celery_logging_level = INFO
-[smtp]
-# Used as default values for SMTP unit tests
-smtp_mail_from = airflow@example.com
-
[api]
auth_backends = airflow.providers.fab.auth_manager.api.auth.backend.session
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 1b8b937906b9c..7439e918f6afd 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -353,13 +353,6 @@ def inversed_deprecated_sections(self):
"navbar_color": (re2.compile(r"(?i)\A#007A87\z"), "#fff", "2.1"),
"dag_default_view": (re2.compile(r"^tree$"), "grid", "3.0"),
},
- "email": {
- "email_backend": (
- re2.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"),
- r"airflow.providers.sendgrid.utils.emailer.send_email",
- "2.1",
- ),
- },
"logging": {
"log_filename_template": (
re2.compile(re2.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")),
diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py
index 0fed70fa2e9f1..20201a0456874 100644
--- a/airflow/example_dags/example_dag_decorator.py
+++ b/airflow/example_dags/example_dag_decorator.py
@@ -24,7 +24,7 @@
from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
-from airflow.operators.email import EmailOperator
+from airflow.providers.smtp.operators.smtp import EmailOperator
if TYPE_CHECKING:
from airflow.sdk.definitions.context import Context
@@ -48,11 +48,12 @@ def execute(self, context: Context):
catchup=False,
tags=["example"],
)
-def example_dag_decorator(email: str = "example@example.com"):
+def example_dag_decorator(to_email: str = "example1@example.com", from_email: str = "example2@example.com"):
"""
DAG to send server IP to email.
- :param email: Email to send IP to. Defaults to example@example.com.
+ :param to_email: Email to send IP to. Defaults to exampl1e@example.com.
+ :param from_email: Email to send IP from. Defaults to example2@example.com
"""
get_ip = GetRequestOperator(task_id="get_ip", url="http://httpbin.org/get")
@@ -67,7 +68,11 @@ def prepare_email(raw_json: dict[str, Any]) -> dict[str, str]:
email_info = prepare_email(get_ip.output)
EmailOperator(
- task_id="send_email", to=email, subject=email_info["subject"], html_content=email_info["body"]
+ task_id="send_email",
+ to=to_email,
+ from_email=from_email,
+ subject=email_info["subject"],
+ html_content=email_info["body"],
)
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index 4e55ab7ff15a0..8f371ee24278c 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -45,9 +45,6 @@
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
- "email": ["airflow@example.com"],
- "email_on_failure": False,
- "email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index e84994c6d04e7..65879e35be0f7 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -212,13 +212,6 @@ class derived from this one results in the creation of a task object,
:param task_id: a unique, meaningful id for the task
:param owner: the owner of the task. Using a meaningful description
(e.g. user/person/team/role name) to clarify ownership is recommended.
- :param email: the 'to' email address(es) used in email alerts. This can be a
- single email or multiple ones. Multiple addresses can be specified as a
- comma or semicolon separated string or by passing a list of strings.
- :param email_on_retry: Indicates whether email alerts should be sent when a
- task is retried
- :param email_on_failure: Indicates whether email alerts should be sent when
- a task failed
:param retries: the number of retries that should be performed before
failing the task
:param retry_delay: delay between retries, can be set as ``timedelta`` or
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index f1aa3a8236e9c..127f4f2d3016f 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -23,7 +23,6 @@
import itertools
import logging
import math
-import operator
import os
import signal
import traceback
@@ -106,7 +105,6 @@
from airflow.models.xcom import LazyXComSelectSequence, XCom
from airflow.plugins_manager import integrate_macros_plugins
from airflow.sdk.api.datamodels._generated import AssetProfile
-from airflow.sdk.definitions._internal.templater import SandboxedEnvironment
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUniqueKey, AssetUriRef
from airflow.sdk.definitions.taskgroup import MappedTaskGroup
from airflow.sentry import Sentry
@@ -123,10 +121,8 @@
OutletEventAccessors,
VariableAccessor,
context_get_outlet_events,
- context_merge,
)
-from airflow.utils.email import send_email
-from airflow.utils.helpers import prune_dict, render_template_to_string
+from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.operator_helpers import ExecutionCallableRunner, context_to_airflow_vars
@@ -1117,15 +1113,6 @@ def _handle_failure(
)
_log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "")
- if (
- failure_context["task"]
- and failure_context["email_for_state"](failure_context["task"])
- and failure_context["task"].email
- ):
- try:
- task_instance.email_alert(error, failure_context["task"])
- except Exception:
- log.exception("Failed to send email to: %s", failure_context["task"].email)
if failure_context["callbacks"] and failure_context["context"]:
_run_finished_callback(
@@ -1317,116 +1304,6 @@ def _get_previous_start_date(
return pendulum.instance(prev_ti.start_date) if prev_ti and prev_ti.start_date else None
-def _email_alert(*, task_instance: TaskInstance, exception, task: BaseOperator) -> None:
- """
- Send alert email with exception information.
-
- :param task_instance: the task instance
- :param exception: the exception
- :param task: task related to the exception
-
- :meta private:
- """
- subject, html_content, html_content_err = task_instance.get_email_subject_content(exception, task=task)
- if TYPE_CHECKING:
- assert task.email
- try:
- send_email(task.email, subject, html_content)
- except Exception:
- send_email(task.email, subject, html_content_err)
-
-
-def _get_email_subject_content(
- *,
- task_instance: TaskInstance,
- exception: BaseException,
- task: BaseOperator | None = None,
-) -> tuple[str, str, str]:
- """
- Get the email subject content for exceptions.
-
- :param task_instance: the task instance
- :param exception: the exception sent in the email
- :param task:
-
- :meta private:
- """
- # For a ti from DB (without ti.task), return the default value
- if task is None:
- task = getattr(task_instance, "task")
- use_default = task is None
- exception_html = str(exception).replace("\n", "
")
-
- default_subject = "Airflow alert: {{ti}}"
- # For reporting purposes, we report based on 1-indexed,
- # not 0-indexed lists (i.e. Try 1 instead of
- # Try 0 for the first attempt).
- default_html_content = (
- "Try {{try_number}} out of {{max_tries + 1}}
"
- "Exception:
{{exception_html}}
"
- 'Log: Link
'
- "Host: {{ti.hostname}}
"
- 'Mark success: Link
'
- )
-
- default_html_content_err = (
- "Try {{try_number}} out of {{max_tries + 1}}
"
- "Exception:
Failed attempt to attach error logs
"
- 'Log: Link
'
- "Host: {{ti.hostname}}
"
- 'Mark success: Link
'
- )
-
- additional_context: dict[str, Any] = {
- "exception": exception,
- "exception_html": exception_html,
- "try_number": task_instance.try_number,
- "max_tries": task_instance.max_tries,
- }
-
- if use_default:
- default_context = {"ti": task_instance, **additional_context}
- jinja_env = jinja2.Environment(
- loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True
- )
- subject = jinja_env.from_string(default_subject).render(**default_context)
- html_content = jinja_env.from_string(default_html_content).render(**default_context)
- html_content_err = jinja_env.from_string(default_html_content_err).render(**default_context)
-
- else:
- if TYPE_CHECKING:
- assert task_instance.task
-
- # Use the DAG's get_template_env() to set force_sandboxed. Don't add
- # the flag to the function on task object -- that function can be
- # overridden, and adding a flag breaks backward compatibility.
- dag = task_instance.task.get_dag()
- if dag:
- jinja_env = dag.get_template_env(force_sandboxed=True)
- else:
- jinja_env = SandboxedEnvironment(cache_size=0)
- jinja_context = task_instance.get_template_context()
- context_merge(jinja_context, additional_context)
-
- def render(key: str, content: str) -> str:
- if conf.has_option("email", key):
- path = conf.get_mandatory_value("email", key)
- try:
- with open(path) as f:
- content = f.read()
- except FileNotFoundError:
- log.warning("Could not find email template file '%s'. Using defaults...", path)
- except OSError:
- log.exception("Error while using email template %s. Using defaults...", path)
- return render_template_to_string(jinja_env.from_string(content), jinja_context)
-
- subject = render("subject_template", default_subject)
- html_content = render("html_content_template", default_html_content)
- html_content_err = render("html_content_template", default_html_content_err)
-
- return subject, html_content, html_content_err
-
-
def _run_finished_callback(
*,
callbacks: None | TaskStateChangeCallback | list[TaskStateChangeCallback],
@@ -3131,8 +3008,7 @@ def fetch_handle_failure_context(
if context is not None:
context["exception"] = error
- # Set state correctly and figure out how to log it and decide whether
- # to email
+ # Set state correctly and figure out how to log it
# Note, callback invocation needs to be handled by caller of
# _run_raw_task to avoid race conditions which could lead to duplicate
@@ -3150,11 +3026,10 @@ def fetch_handle_failure_context(
assert isinstance(ti.task, BaseOperator)
task = ti.task.unmap((context, session))
except Exception:
- cls.logger().error("Unable to unmap task to determine if we need to send an alert email")
+ cls.logger().error("Unable to unmap task to determine what callback to use")
if force_fail or not ti.is_eligible_to_retry():
ti.state = TaskInstanceState.FAILED
- email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None
if task and fail_fast:
@@ -3169,7 +3044,6 @@ def fetch_handle_failure_context(
TaskInstanceHistory.record_ti(ti, session=session)
ti.state = State.UP_FOR_RETRY
- email_for_state = operator.attrgetter("email_on_retry")
callbacks = task.on_retry_callback if task else None
get_listener_manager().hook.on_task_instance_failed(
@@ -3178,7 +3052,6 @@ def fetch_handle_failure_context(
return {
"ti": ti,
- "email_for_state": email_for_state,
"task": task,
"callbacks": callbacks,
"context": context,
@@ -3326,26 +3199,6 @@ def render_templates(
return original_task
- def get_email_subject_content(
- self, exception: BaseException, task: BaseOperator | None = None
- ) -> tuple[str, str, str]:
- """
- Get the email subject content for exceptions.
-
- :param exception: the exception sent in the email
- :param task:
- """
- return _get_email_subject_content(task_instance=self, exception=exception, task=task)
-
- def email_alert(self, exception, task: BaseOperator) -> None:
- """
- Send alert email with exception information.
-
- :param exception: the exception
- :param task: task related to the exception
- """
- _email_alert(task_instance=self, exception=exception, task=task)
-
def set_duration(self) -> None:
"""Set task instance duration."""
_set_duration(task_instance=self)
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 2e0fe9f7f2bbe..c6cbcf346a1df 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -261,7 +261,7 @@ def submit_failure(cls, trigger_id, exc=None, session: Session = NEW_SESSION) ->
the runtime code understands as immediate-fail, and pack the error into
next_kwargs.
- TODO: Once we have shifted callback (and email) handling to run on
+ TODO: Once we have shifted callback handling to run on
workers as first-class concepts, we can run the failure code here
in-process, but we can't do that right now.
"""
diff --git a/airflow/operators/email.py b/airflow/operators/email.py
deleted file mode 100644
index 85ac709e2bd5e..0000000000000
--- a/airflow/operators/email.py
+++ /dev/null
@@ -1,91 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from collections.abc import Sequence
-from typing import TYPE_CHECKING, Any
-
-from airflow.models.baseoperator import BaseOperator
-from airflow.utils.email import send_email
-
-if TYPE_CHECKING:
- from airflow.sdk.definitions.context import Context
-
-
-class EmailOperator(BaseOperator):
- """
- Sends an email.
-
- :param to: list of emails to send the email to. (templated)
- :param subject: subject line for the email. (templated)
- :param html_content: content of the email, html markup
- is allowed. (templated)
- :param files: file names to attach in email (templated)
- :param cc: list of recipients to be added in CC field
- :param bcc: list of recipients to be added in BCC field
- :param mime_subtype: MIME sub content type
- :param mime_charset: character set parameter added to the Content-Type
- header.
- :param custom_headers: additional headers to add to the MIME message.
- """
-
- template_fields: Sequence[str] = ("to", "subject", "html_content", "files")
- template_fields_renderers = {"html_content": "html"}
- template_ext: Sequence[str] = (".html",)
- ui_color = "#e6faf9"
-
- def __init__(
- self,
- *,
- to: list[str] | str,
- subject: str,
- html_content: str,
- files: list | None = None,
- cc: list[str] | str | None = None,
- bcc: list[str] | str | None = None,
- mime_subtype: str = "mixed",
- mime_charset: str = "utf-8",
- conn_id: str | None = None,
- custom_headers: dict[str, Any] | None = None,
- **kwargs,
- ) -> None:
- super().__init__(**kwargs)
- self.to = to
- self.subject = subject
- self.html_content = html_content
- self.files = files or []
- self.cc = cc
- self.bcc = bcc
- self.mime_subtype = mime_subtype
- self.mime_charset = mime_charset
- self.conn_id = conn_id
- self.custom_headers = custom_headers
-
- def execute(self, context: Context):
- send_email(
- self.to,
- self.subject,
- self.html_content,
- files=self.files,
- cc=self.cc,
- bcc=self.bcc,
- mime_subtype=self.mime_subtype,
- mime_charset=self.mime_charset,
- conn_id=self.conn_id,
- custom_headers=self.custom_headers,
- )
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index b7e08a45aed74..0f3a1ba9ea0bf 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -1217,8 +1217,6 @@ def _serialize_node(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]:
# If not, store them as strings
# And raise an exception if the field is not templateable
forbidden_fields = set(SerializedBaseOperator._CONSTRUCTOR_PARAMS.keys())
- # Though allow some of the BaseOperator fields to be templated anyway
- forbidden_fields.difference_update({"email"})
if op.template_fields:
for template_field in op.template_fields:
if template_field in forbidden_fields:
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
deleted file mode 100644
index 455b135c23e6c..0000000000000
--- a/airflow/utils/email.py
+++ /dev/null
@@ -1,334 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import collections.abc
-import logging
-import os
-import smtplib
-import ssl
-from collections.abc import Iterable
-from email.mime.application import MIMEApplication
-from email.mime.multipart import MIMEMultipart
-from email.mime.text import MIMEText
-from email.utils import formatdate
-from typing import Any
-
-import re2
-
-from airflow.configuration import conf
-from airflow.exceptions import AirflowException
-
-log = logging.getLogger(__name__)
-
-
-def send_email(
- to: list[str] | Iterable[str],
- subject: str,
- html_content: str,
- files: list[str] | None = None,
- dryrun: bool = False,
- cc: str | Iterable[str] | None = None,
- bcc: str | Iterable[str] | None = None,
- mime_subtype: str = "mixed",
- mime_charset: str = "utf-8",
- conn_id: str | None = None,
- custom_headers: dict[str, Any] | None = None,
- **kwargs,
-) -> None:
- """
- Send an email using the backend specified in the *EMAIL_BACKEND* configuration option.
-
- :param to: A list or iterable of email addresses to send the email to.
- :param subject: The subject of the email.
- :param html_content: The content of the email in HTML format.
- :param files: A list of paths to files to attach to the email.
- :param dryrun: If *True*, the email will not actually be sent. Default: *False*.
- :param cc: A string or iterable of strings containing email addresses to send a copy of the email to.
- :param bcc: A string or iterable of strings containing email addresses to send a
- blind carbon copy of the email to.
- :param mime_subtype: The subtype of the MIME message. Default: "mixed".
- :param mime_charset: The charset of the email. Default: "utf-8".
- :param conn_id: The connection ID to use for the backend. If not provided, the default connection
- specified in the *EMAIL_CONN_ID* configuration option will be used.
- :param custom_headers: A dictionary of additional headers to add to the MIME message.
- No validations are run on these values, and they should be able to be encoded.
- :param kwargs: Additional keyword arguments to pass to the backend.
- """
- backend = conf.getimport("email", "EMAIL_BACKEND")
- backend_conn_id = conn_id or conf.get("email", "EMAIL_CONN_ID")
- from_email = conf.get("email", "from_email", fallback=None)
-
- to_list = get_email_address_list(to)
- to_comma_separated = ", ".join(to_list)
-
- return backend(
- to_comma_separated,
- subject,
- html_content,
- files=files,
- dryrun=dryrun,
- cc=cc,
- bcc=bcc,
- mime_subtype=mime_subtype,
- mime_charset=mime_charset,
- conn_id=backend_conn_id,
- from_email=from_email,
- custom_headers=custom_headers,
- **kwargs,
- )
-
-
-def send_email_smtp(
- to: str | Iterable[str],
- subject: str,
- html_content: str,
- files: list[str] | None = None,
- dryrun: bool = False,
- cc: str | Iterable[str] | None = None,
- bcc: str | Iterable[str] | None = None,
- mime_subtype: str = "mixed",
- mime_charset: str = "utf-8",
- conn_id: str = "smtp_default",
- from_email: str | None = None,
- custom_headers: dict[str, Any] | None = None,
- **kwargs,
-) -> None:
- """
- Send an email with html content.
-
- :param to: Recipient email address or list of addresses.
- :param subject: Email subject.
- :param html_content: Email body in HTML format.
- :param files: List of file paths to attach to the email.
- :param dryrun: If True, the email will not be sent, but all other actions will be performed.
- :param cc: Carbon copy recipient email address or list of addresses.
- :param bcc: Blind carbon copy recipient email address or list of addresses.
- :param mime_subtype: MIME subtype of the email.
- :param mime_charset: MIME charset of the email.
- :param conn_id: Connection ID of the SMTP server.
- :param from_email: Sender email address.
- :param custom_headers: Dictionary of custom headers to include in the email.
- :param kwargs: Additional keyword arguments.
-
- >>> send_email("test@example.com", "foo", "Foo bar", ["/dev/null"], dryrun=True)
- """
- smtp_mail_from = conf.get("smtp", "SMTP_MAIL_FROM")
-
- if smtp_mail_from is not None:
- mail_from = smtp_mail_from
- else:
- if from_email is None:
- raise ValueError(
- "You should set from email - either by smtp/smtp_mail_from config or `from_email` parameter"
- )
- mail_from = from_email
-
- msg, recipients = build_mime_message(
- mail_from=mail_from,
- to=to,
- subject=subject,
- html_content=html_content,
- files=files,
- cc=cc,
- bcc=bcc,
- mime_subtype=mime_subtype,
- mime_charset=mime_charset,
- custom_headers=custom_headers,
- )
-
- send_mime_email(e_from=mail_from, e_to=recipients, mime_msg=msg, conn_id=conn_id, dryrun=dryrun)
-
-
-def build_mime_message(
- mail_from: str | None,
- to: str | Iterable[str],
- subject: str,
- html_content: str,
- files: list[str] | None = None,
- cc: str | Iterable[str] | None = None,
- bcc: str | Iterable[str] | None = None,
- mime_subtype: str = "mixed",
- mime_charset: str = "utf-8",
- custom_headers: dict[str, Any] | None = None,
-) -> tuple[MIMEMultipart, list[str]]:
- """
- Build a MIME message that can be used to send an email and returns a full list of recipients.
-
- :param mail_from: Email address to set as the email's "From" field.
- :param to: A string or iterable of strings containing email addresses to set as the email's "To" field.
- :param subject: The subject of the email.
- :param html_content: The content of the email in HTML format.
- :param files: A list of paths to files to be attached to the email.
- :param cc: A string or iterable of strings containing email addresses to set as the email's "CC" field.
- :param bcc: A string or iterable of strings containing email addresses to set as the email's "BCC" field.
- :param mime_subtype: The subtype of the MIME message. Default: "mixed".
- :param mime_charset: The charset of the email. Default: "utf-8".
- :param custom_headers: Additional headers to add to the MIME message. No validations are run on these
- values, and they should be able to be encoded.
- :return: A tuple containing the email as a MIMEMultipart object and a list of recipient email addresses.
- """
- to = get_email_address_list(to)
-
- msg = MIMEMultipart(mime_subtype)
- msg["Subject"] = subject
- if mail_from:
- msg["From"] = mail_from
- msg["To"] = ", ".join(to)
- recipients = to
- if cc:
- cc = get_email_address_list(cc)
- msg["CC"] = ", ".join(cc)
- recipients += cc
-
- if bcc:
- # don't add bcc in header
- bcc = get_email_address_list(bcc)
- recipients += bcc
-
- msg["Date"] = formatdate(localtime=True)
- mime_text = MIMEText(html_content, "html", mime_charset)
- msg.attach(mime_text)
-
- for fname in files or []:
- basename = os.path.basename(fname)
- with open(fname, "rb") as file:
- part = MIMEApplication(file.read(), Name=basename)
- part["Content-Disposition"] = f'attachment; filename="{basename}"'
- part["Content-ID"] = f"<{basename}>"
- msg.attach(part)
-
- if custom_headers:
- for header_key, header_value in custom_headers.items():
- msg[header_key] = header_value
-
- return msg, recipients
-
-
-def send_mime_email(
- e_from: str,
- e_to: str | list[str],
- mime_msg: MIMEMultipart,
- conn_id: str = "smtp_default",
- dryrun: bool = False,
-) -> None:
- """
- Send a MIME email.
-
- :param e_from: The email address of the sender.
- :param e_to: The email address or a list of email addresses of the recipient(s).
- :param mime_msg: The MIME message to send.
- :param conn_id: The ID of the SMTP connection to use.
- :param dryrun: If True, the email will not be sent, but a log message will be generated.
- """
- smtp_host = conf.get_mandatory_value("smtp", "SMTP_HOST")
- smtp_port = conf.getint("smtp", "SMTP_PORT")
- smtp_starttls = conf.getboolean("smtp", "SMTP_STARTTLS")
- smtp_ssl = conf.getboolean("smtp", "SMTP_SSL")
- smtp_retry_limit = conf.getint("smtp", "SMTP_RETRY_LIMIT")
- smtp_timeout = conf.getint("smtp", "SMTP_TIMEOUT")
- smtp_user = None
- smtp_password = None
-
- if conn_id is not None:
- try:
- from airflow.hooks.base import BaseHook
-
- airflow_conn = BaseHook.get_connection(conn_id)
- smtp_user = airflow_conn.login
- smtp_password = airflow_conn.password
- except AirflowException:
- pass
- if smtp_user is None or smtp_password is None:
- log.debug("No user/password found for SMTP, so logging in with no authentication.")
-
- if not dryrun:
- for attempt in range(1, smtp_retry_limit + 1):
- log.info("Email alerting: attempt %s", str(attempt))
- try:
- smtp_conn = _get_smtp_connection(smtp_host, smtp_port, smtp_timeout, smtp_ssl)
- except smtplib.SMTPServerDisconnected:
- if attempt == smtp_retry_limit:
- raise
- else:
- if smtp_starttls:
- smtp_conn.starttls()
- if smtp_user and smtp_password:
- smtp_conn.login(smtp_user, smtp_password)
- log.info("Sent an alert email to %s", e_to)
- smtp_conn.sendmail(e_from, e_to, mime_msg.as_string())
- smtp_conn.quit()
- break
-
-
-def get_email_address_list(addresses: str | Iterable[str]) -> list[str]:
- """
- Return a list of email addresses from the provided input.
-
- :param addresses: A string or iterable of strings containing email addresses.
- :return: A list of email addresses.
- :raises TypeError: If the input is not a string or iterable of strings.
- """
- if isinstance(addresses, str):
- return _get_email_list_from_str(addresses)
- elif isinstance(addresses, collections.abc.Iterable):
- if not all(isinstance(item, str) for item in addresses):
- raise TypeError("The items in your iterable must be strings.")
- return list(addresses)
- else:
- raise TypeError(f"Unexpected argument type: Received '{type(addresses).__name__}'.")
-
-
-def _get_smtp_connection(host: str, port: int, timeout: int, with_ssl: bool) -> smtplib.SMTP:
- """
- Return an SMTP connection to the specified host and port, with optional SSL encryption.
-
- :param host: The hostname or IP address of the SMTP server.
- :param port: The port number to connect to on the SMTP server.
- :param timeout: The timeout in seconds for the connection.
- :param with_ssl: Whether to use SSL encryption for the connection.
- :return: An SMTP connection to the specified host and port.
- """
- if not with_ssl:
- return smtplib.SMTP(host=host, port=port, timeout=timeout)
- else:
- ssl_context_string = conf.get("email", "SSL_CONTEXT")
- if ssl_context_string == "default":
- ssl_context = ssl.create_default_context()
- elif ssl_context_string == "none":
- ssl_context = None
- else:
- raise RuntimeError(
- f"The email.ssl_context configuration variable must "
- f"be set to 'default' or 'none' and is '{ssl_context_string}."
- )
- return smtplib.SMTP_SSL(host=host, port=port, timeout=timeout, context=ssl_context)
-
-
-def _get_email_list_from_str(addresses: str) -> list[str]:
- """
- Extract a list of email addresses from a string.
-
- The string can contain multiple email addresses separated
- by any of the following delimiters: ',' or ';'.
-
- :param addresses: A string containing one or more email addresses.
- :return: A list of email addresses.
- """
- pattern = r"\s*[,;]\s*"
- return re2.split(pattern, addresses)
diff --git a/airflow/utils/operator_helpers.py b/airflow/utils/operator_helpers.py
index ab3c8c89e5e0f..96db19cb1bcaf 100644
--- a/airflow/utils/operator_helpers.py
+++ b/airflow/utils/operator_helpers.py
@@ -62,10 +62,6 @@
"default": f"{DEFAULT_FORMAT_PREFIX}dag_owner",
"env_var_format": f"{ENV_VAR_FORMAT_PREFIX}DAG_OWNER",
},
- "AIRFLOW_CONTEXT_DAG_EMAIL": {
- "default": f"{DEFAULT_FORMAT_PREFIX}dag_email",
- "env_var_format": f"{ENV_VAR_FORMAT_PREFIX}DAG_EMAIL",
- },
}
@@ -93,7 +89,6 @@ def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: bool
dag_run = context.get("dag_run")
ops = [
- (task, "email", "AIRFLOW_CONTEXT_DAG_EMAIL"),
(task, "owner", "AIRFLOW_CONTEXT_DAG_OWNER"),
(task_instance, "dag_id", "AIRFLOW_CONTEXT_DAG_ID"),
(task_instance, "task_id", "AIRFLOW_CONTEXT_TASK_ID"),
diff --git a/docs/apache-airflow/cli-and-env-variables-ref.rst b/docs/apache-airflow/cli-and-env-variables-ref.rst
index 5de76b8d02969..a03694b495df9 100644
--- a/docs/apache-airflow/cli-and-env-variables-ref.rst
+++ b/docs/apache-airflow/cli-and-env-variables-ref.rst
@@ -76,7 +76,6 @@ Environment Variables
* ``flower_basic_auth`` in ``[celery]`` section
* ``result_backend`` in ``[celery]`` section
* ``password`` in ``[atlas]`` section
-* ``smtp_password`` in ``[smtp]`` section
* ``secret_key`` in ``[webserver]`` section
.. envvar:: AIRFLOW__{SECTION}__{KEY}_SECRET
diff --git a/docs/apache-airflow/howto/email-config.rst b/docs/apache-airflow/howto/email-config.rst
deleted file mode 100644
index bc7ea9a1f9d04..0000000000000
--- a/docs/apache-airflow/howto/email-config.rst
+++ /dev/null
@@ -1,190 +0,0 @@
- .. Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- .. http://www.apache.org/licenses/LICENSE-2.0
-
- .. Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
-Email Configuration
-===================
-
-You can configure the email that is being sent in your ``airflow.cfg``
-by setting a ``subject_template`` and/or a ``html_content_template``
-in the ``[email]`` section.
-
-.. code-block:: ini
-
- [email]
- email_backend = airflow.utils.email.send_email_smtp
- subject_template = /path/to/my_subject_template_file
- html_content_template = /path/to/my_html_content_template_file
-
-Equivalent environment variables look like:
-
-.. code-block:: sh
-
- AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.utils.email.send_email_smtp
- AIRFLOW__EMAIL__SUBJECT_TEMPLATE=/path/to/my_subject_template_file
- AIRFLOW__EMAIL__HTML_CONTENT_TEMPLATE=/path/to/my_html_content_template_file
-
-You can configure a sender's email address by setting ``from_email`` in the ``[email]`` section like:
-
-.. code-block:: ini
-
- [email]
- from_email = "John Doe "
-
-Equivalent environment variables look like:
-
-.. code-block:: sh
-
- AIRFLOW__EMAIL__FROM_EMAIL="John Doe "
-
-
-To configure SMTP settings, checkout the :ref:`SMTP ` section in the standard configuration.
-If you do not want to store the SMTP credentials in the config or in the environment variables, you can create a
-connection called ``smtp_default`` of ``Email`` type, or choose a custom connection name and set the ``email_conn_id`` with its name in
-the configuration & store SMTP username-password in it. Other SMTP settings like host, port etc always gets picked up
-from the configuration only. The connection can be of any type (for example 'HTTP connection').
-
-If you want to check which email backend is currently set, you can use ``airflow config get-value email email_backend`` command as in
-the example below.
-
-.. code-block:: bash
-
- $ airflow config get-value email email_backend
- airflow.utils.email.send_email_smtp
-
-To access the task's information you use `Jinja Templating `_ in your template files.
-
-For example a ``html_content_template`` file could look like this:
-
-.. code-block::
-
- Try {{try_number}} out of {{max_tries + 1}}
- Exception:
{{exception_html}}
- Log: Link
- Host: {{ti.hostname}}
- Mark success: Link
-
-.. note::
- For more information on setting the configuration, see :doc:`set-config`
-
-.. _email-configuration-sendgrid:
-
-Send email using SendGrid
--------------------------
-
-Using Default SMTP
-^^^^^^^^^^^^^^^^^^
-
-You can use the default airflow SMTP backend to send email with SendGrid
-
- .. code-block:: ini
-
- [smtp]
- smtp_host=smtp.sendgrid.net
- smtp_starttls=False
- smtp_ssl=False
- smtp_port=587
- smtp_mail_from=
-
-Equivalent environment variables looks like
-
- .. code-block::
-
- AIRFLOW__SMTP__SMTP_HOST=smtp.sendgrid.net
- AIRFLOW__SMTP__SMTP_STARTTLS=False
- AIRFLOW__SMTP__SMTP_SSL=False
- AIRFLOW__SMTP__SMTP_PORT=587
- AIRFLOW__SMTP__SMTP_MAIL_FROM=
-
-
-Using SendGrid Provider
-^^^^^^^^^^^^^^^^^^^^^^^
-
-Airflow can be configured to send e-mail using `SendGrid `__.
-
-Follow the steps below to enable it:
-
-1. Setup your SendGrid account, The SMTP and copy username and API Key.
-
-2. Include ``sendgrid`` provider as part of your Airflow installation, e.g.,
-
- .. code-block:: bash
-
- pip install 'apache-airflow[sendgrid]' --constraint ...
-
-or
- .. code-block:: bash
-
- pip install 'apache-airflow-providers-sendgrid' --constraint ...
-
-
-3. Update ``email_backend`` property in ``[email]`` section in ``airflow.cfg``, i.e.
-
- .. code-block:: ini
-
- [email]
- email_backend = airflow.providers.sendgrid.utils.emailer.send_email
- email_conn_id = sendgrid_default
- from_email = "hello@eg.com"
-
- Equivalent environment variables looks like
-
- .. code-block::
-
- AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.providers.sendgrid.utils.emailer.send_email
- AIRFLOW__EMAIL__EMAIL_CONN_ID=sendgrid_default
- SENDGRID_MAIL_FROM=hello@thelearning.dev
-
-4. Create a connection called ``sendgrid_default``, or choose a custom connection
- name and set it in ``email_conn_id`` of 'Email' type. Only login and password
- are used from the connection.
-
-
-.. image:: ../img/email_connection.png
- :align: center
- :alt: create email connection
-
-.. note:: The callbacks for success, failure and retry will use the same configuration to send the email
-
-
-.. _email-configuration-ses:
-
-Send email using AWS SES
-------------------------
-
-Airflow can be configured to send e-mail using `AWS SES `__.
-
-Follow the steps below to enable it:
-
-1. Include ``amazon`` subpackage as part of your Airflow installation:
-
- .. code-block:: ini
-
- pip install 'apache-airflow[amazon]'
-
-2. Update ``email_backend`` property in ``[email]`` section in ``airflow.cfg``:
-
- .. code-block:: ini
-
- [email]
- email_backend = airflow.providers.amazon.aws.utils.emailer.send_email
- email_conn_id = aws_default
- from_email = From email
-
-Note that for SES, you must configure from_email to the valid email that can send messages from SES.
-
-3. Create a connection called ``aws_default``, or choose a custom connection
- name and set it in ``email_conn_id``. The type of connection should be ``Amazon Web Services``.
diff --git a/docs/apache-airflow/howto/export-more-env-vars.rst b/docs/apache-airflow/howto/export-more-env-vars.rst
index e393f479302d1..6c94c7c08853d 100644
--- a/docs/apache-airflow/howto/export-more-env-vars.rst
+++ b/docs/apache-airflow/howto/export-more-env-vars.rst
@@ -28,7 +28,7 @@ which are available as environment variables when running tasks. Note, both key
value are must be string.
``dag_id``, ``task_id``, ``execution_date``, ``dag_run_id``,
-``dag_owner``, ``dag_email`` are reserved keys.
+``dag_owner`` are reserved keys.
For example, in your ``airflow_local_settings.py`` file:
diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst
index 03c34820c4cc2..3271cbf8109e7 100644
--- a/docs/apache-airflow/howto/index.rst
+++ b/docs/apache-airflow/howto/index.rst
@@ -49,7 +49,6 @@ configuring an Airflow environment.
run-behind-proxy
run-with-systemd
define-extra-link
- email-config
dynamic-dag-generation
docker-compose/index
upgrading-from-1-10/index
diff --git a/docs/apache-airflow/howto/set-config.rst b/docs/apache-airflow/howto/set-config.rst
index 2a03b2bbf5ee4..365ef6ca4023a 100644
--- a/docs/apache-airflow/howto/set-config.rst
+++ b/docs/apache-airflow/howto/set-config.rst
@@ -104,7 +104,6 @@ The following config options support this ``_cmd`` and ``_secret`` version:
* ``flower_basic_auth`` in ``[celery]`` section
* ``result_backend`` in ``[celery]`` section
* ``password`` in ``[atlas]`` section
-* ``smtp_password`` in ``[smtp]`` section
* ``secret_key`` in ``[webserver]`` section
The ``_cmd`` config options can also be set using a corresponding environment variable
diff --git a/docs/apache-airflow/integration.rst b/docs/apache-airflow/integration.rst
index 2e9c450f1fa8a..a54862f1dbf52 100644
--- a/docs/apache-airflow/integration.rst
+++ b/docs/apache-airflow/integration.rst
@@ -21,7 +21,6 @@ Integration
Airflow has a mechanism that allows you to expand its functionality and integrate with other systems.
* :doc:`API Authentication backends `
-* :doc:`Email backends `
* :doc:`Executor `
* :doc:`Kerberos `
* :doc:`Logging `
diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst
index 0a2abcc841057..64d0067fe796f 100644
--- a/docs/apache-airflow/operators-and-hooks-ref.rst
+++ b/docs/apache-airflow/operators-and-hooks-ref.rst
@@ -56,9 +56,6 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`.
* - :mod:`airflow.operators.empty`
-
- * - :mod:`airflow.operators.email`
- -
-
* - :mod:`airflow.operators.generic_transfer`
-
diff --git a/docs/apache-airflow/public-airflow-interface.rst b/docs/apache-airflow/public-airflow-interface.rst
index 2853c6fbe2e1b..e9ad7062ab8d6 100644
--- a/docs/apache-airflow/public-airflow-interface.rst
+++ b/docs/apache-airflow/public-airflow-interface.rst
@@ -386,11 +386,6 @@ by extending them:
You can read more about creating custom Decorators in :doc:`howto/create-custom-decorator`.
-Email notifications
--------------------
-
-Airflow has a built-in way of sending email notifications and it allows to extend it by adding custom
-email notification classes. You can read more about email notifications in :doc:`howto/email-config`.
Notifications
-------------
diff --git a/newsfragments/.significant.rst b/newsfragments/.significant.rst
new file mode 100644
index 0000000000000..5f0d52fcfe2ea
--- /dev/null
+++ b/newsfragments/.significant.rst
@@ -0,0 +1,42 @@
+.. Write a short and imperative summary of this changes
+
+.. Provide additional contextual information
+
+.. Check the type of change that applies to this change
+.. Dag changes: requires users to change their dag code
+.. Config changes: requires users to change their airflow config
+.. API changes: requires users to change their Airflow REST API calls
+.. CLI changes: requires users to change their Airflow CLI usage
+.. Behaviour changes: the existing code won't break, but the behavior is different
+.. Plugin changes: requires users to change their Airflow plugin implementation
+.. Dependency changes: requires users to change their dependencies (e.g., Postgres 12)
+.. Code interface changes: requires users to change other implementations (e.g., auth manager)
+In Airflow 3.0, the ``timer_unit_consistency`` setting in the ``metrics`` section is removed as it is now the default behaviour.
+This is done to standardize all timer and timing metrics to milliseconds across all metric loggers.
+
+Airflow 2.11 introduced the ``timer_unit_consistency`` setting in the ``metrics`` section of the configuration file. The
+default value was ``False`` which meant that the timer and timing metrics were logged in seconds. This was done to maintain
+backwards compatibility with the previous versions of Airflow.
+
+In Airflow 3.0, the ``smtp`` setting section, the base operator parameters ``email``, ``email_on_retry`` and ``email_on_failure``, and the ``airflow.operators.email.EmailOperator`` are removed.
+
+Instead of using the operator ``EmailOperator`` from the ``airflow.operators.email`` module, users should use the operator ``EmailOperator`` from the ``smtp`` provider in the module ``airflow.providers.smtp.operators.smtp``.
+
+And to send emails on failure or retry, users should use the ```airflow.providers.smtp.notifications.smtp.SmtpNotifier`` with the ``on_failure_callback`` and ``on_retry_callback`` parameters. For more details you can check `this guide `_.
+
+* Types of change
+
+ * [ ] Dag changes
+ * [x] Config changes
+ * [ ] API changes
+ * [ ] CLI changes
+ * [x] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [x] Code interface changes
+
+* Migration rules needed
+
+.. TODO: create the migration rule that:
+.. * Import ``EmailOperator`` from ``airflow.providers.smtp.operators.smtp`` instead of ``airflow.operators.email``
+.. * Detect and remove the ``email``, ``email_on_retry`` and ``email_on_failure`` parameters from the operator parameters (and replace them by an ``SmtpNotifier`` with the ``on_failure_callback`` and ``on_retry_callback`` parameters)?
diff --git a/scripts/ci/pre_commit/base_operator_partial_arguments.py b/scripts/ci/pre_commit/base_operator_partial_arguments.py
index 070ffe767cccc..e0160cf995c35 100755
--- a/scripts/ci/pre_commit/base_operator_partial_arguments.py
+++ b/scripts/ci/pre_commit/base_operator_partial_arguments.py
@@ -35,8 +35,6 @@
IGNORED = {
# These are only used in the worker and thus mappable.
"do_xcom_push",
- "email_on_failure",
- "email_on_retry",
"post_execute",
"pre_execute",
"multiple_outputs",
diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py
index e7ecec69411ba..d40c441ef5391 100644
--- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py
+++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py
@@ -169,7 +169,6 @@ def partial(
start_date: datetime | ArgNotSet = NOTSET,
end_date: datetime | ArgNotSet = NOTSET,
owner: str | ArgNotSet = NOTSET,
- email: None | str | Iterable[str] | ArgNotSet = NOTSET,
params: collections.abc.MutableMapping | None = None,
resources: dict[str, Any] | None | ArgNotSet = NOTSET,
trigger_rule: str | ArgNotSet = NOTSET,
@@ -431,9 +430,6 @@ def __new__(cls, name, bases, namespace, **kwargs):
# manual type-checking method.
BASEOPERATOR_ARGS_EXPECTED_TYPES = {
"task_id": str,
- "email": (str, Sequence),
- "email_on_retry": bool,
- "email_on_failure": bool,
"retries": int,
"retry_exponential_backoff": bool,
"depends_on_past": bool,
@@ -502,13 +498,6 @@ class derived from this one results in the creation of a task object,
:param task_id: a unique, meaningful id for the task
:param owner: the owner of the task. Using a meaningful description
(e.g. user/person/team/role name) to clarify ownership is recommended.
- :param email: the 'to' email address(es) used in email alerts. This can be a
- single email or multiple ones. Multiple addresses can be specified as a
- comma or semicolon separated string or by passing a list of strings.
- :param email_on_retry: Indicates whether email alerts should be sent when a
- task is retried
- :param email_on_failure: Indicates whether email alerts should be sent when
- a task failed
:param retries: the number of retries that should be performed before
failing the task
:param retry_delay: delay between retries, can be set as ``timedelta`` or
@@ -697,9 +686,6 @@ def say_hello_world(**context):
task_id: str
owner: str = DEFAULT_OWNER
- email: str | Sequence[str] | None = None
- email_on_retry: bool = True
- email_on_failure: bool = True
retries: int | None = DEFAULT_RETRIES
retry_delay: timedelta = DEFAULT_RETRY_DELAY
retry_exponential_backoff: bool = False
@@ -780,8 +766,6 @@ def say_hello_world(**context):
"task_id",
"dag_id",
"owner",
- "email",
- "email_on_retry",
"retry_delay",
"retry_exponential_backoff",
"max_retry_delay",
@@ -853,9 +837,6 @@ def __init__(
*,
task_id: str,
owner: str = DEFAULT_OWNER,
- email: str | Sequence[str] | None = None,
- email_on_retry: bool = True,
- email_on_failure: bool = True,
retries: int | None = DEFAULT_RETRIES,
retry_delay: timedelta | float = DEFAULT_RETRY_DELAY,
retry_exponential_backoff: bool = False,
@@ -924,9 +905,6 @@ def __init__(
validate_key(self.task_id)
self.owner = owner
- self.email = email
- self.email_on_retry = email_on_retry
- self.email_on_failure = email_on_failure
if execution_timeout is not None and not isinstance(execution_timeout, timedelta):
raise ValueError(
diff --git a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py
index 0fc0a7fa1896a..bcff6838c5a0c 100644
--- a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -385,10 +385,6 @@ def task_display_name(self) -> str:
def owner(self) -> str: # type: ignore[override]
return self.partial_kwargs.get("owner", DEFAULT_OWNER)
- @property
- def email(self) -> None | str | Iterable[str]:
- return self.partial_kwargs.get("email")
-
@property
def map_index_template(self) -> None | str:
return self.partial_kwargs.get("map_index_template")
diff --git a/tests/api_connexion/endpoints/test_config_endpoint.py b/tests/api_connexion/endpoints/test_config_endpoint.py
index 8d3785f08811d..f4e447e00e3d3 100644
--- a/tests/api_connexion/endpoints/test_config_endpoint.py
+++ b/tests/api_connexion/endpoints/test_config_endpoint.py
@@ -31,17 +31,17 @@
"core": {
"parallelism": "1024",
},
- "smtp": {
- "smtp_host": "localhost",
- "smtp_mail_from": "airflow@example.com",
+ "operators": {
+ "default_deferrable": "true",
+ "default_queue": "queue",
},
}
MOCK_CONF_WITH_SENSITIVE_VALUE = {
"core": {"parallelism": "1024"},
- "smtp": {
- "smtp_host": "localhost",
- "smtp_mail_from": "airflow@example.com",
+ "operators": {
+ "default_deferrable": "true",
+ "default_queue": "queue",
},
"database": {
"sql_alchemy_conn": "mock_conn",
@@ -83,10 +83,6 @@ def test_should_respond_200_text_plain(self, mock_as_dict):
"""\
[core]
parallelism = 1024
-
- [smtp]
- smtp_host = localhost
- smtp_mail_from = airflow@example.com
"""
)
assert expected == response.data.decode()
@@ -103,10 +99,6 @@ def test_should_respond_200_text_plain_with_non_sensitive_only(self, mock_as_dic
"""\
[core]
parallelism = 1024
-
- [smtp]
- smtp_host = localhost
- smtp_mail_from = airflow@example.com
"""
)
assert expected == response.data.decode()
@@ -129,10 +121,9 @@ def test_should_respond_200_application_json(self, mock_as_dict):
],
},
{
- "name": "smtp",
+ "name": "sensors",
"options": [
- {"key": "smtp_host", "value": "localhost"},
- {"key": "smtp_mail_from", "value": "airflow@example.com"},
+ {"key": "default_timeout", "value": "86400"},
],
},
]
@@ -142,7 +133,7 @@ def test_should_respond_200_application_json(self, mock_as_dict):
@patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF)
def test_should_respond_200_single_section_as_text_plain(self, mock_as_dict):
response = self.client.get(
- "/api/v1/config?section=smtp",
+ "/api/v1/config?section=operators",
headers={"Accept": "text/plain"},
environ_overrides={"REMOTE_USER": "test"},
)
@@ -150,9 +141,9 @@ def test_should_respond_200_single_section_as_text_plain(self, mock_as_dict):
assert response.status_code == 200
expected = textwrap.dedent(
"""\
- [smtp]
- smtp_host = localhost
- smtp_mail_from = airflow@example.com
+ [operators]
+ default_deferrable = true
+ default_queue = queue
"""
)
assert expected == response.data.decode()
@@ -160,19 +151,19 @@ def test_should_respond_200_single_section_as_text_plain(self, mock_as_dict):
@patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF)
def test_should_respond_200_single_section_as_json(self, mock_as_dict):
response = self.client.get(
- "/api/v1/config?section=smtp",
+ "/api/v1/config?section=operators",
headers={"Accept": "application/json"},
environ_overrides={"REMOTE_USER": "test"},
)
- mock_as_dict.assert_called_with(display_source=False, display_sensitive=True)
+ # mock_as_dict.assert_called_with(display_source=False, display_sensitive=True)
assert response.status_code == 200
expected = {
"sections": [
{
- "name": "smtp",
+ "name": "operators",
"options": [
- {"key": "smtp_host", "value": "localhost"},
- {"key": "smtp_mail_from", "value": "airflow@example.com"},
+ {"key": "default_deferrable", "value": "true"},
+ {"key": "default_queue", "value": "queue"},
],
},
]
@@ -182,13 +173,13 @@ def test_should_respond_200_single_section_as_json(self, mock_as_dict):
@patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF)
def test_should_respond_404_when_section_not_exist(self, mock_as_dict):
response = self.client.get(
- "/api/v1/config?section=smtp1",
+ "/api/v1/config?section=operators1",
headers={"Accept": "application/json"},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 404
- assert "section=smtp1 not found." in response.json["detail"]
+ assert "section=operators1 not found." in response.json["detail"]
@patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF)
def test_should_respond_406(self, mock_as_dict):
@@ -233,15 +224,15 @@ def setup_attrs(self, configured_app) -> None:
@patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF)
def test_should_respond_200_text_plain(self, mock_as_dict):
response = self.client.get(
- "/api/v1/config/section/smtp/option/smtp_mail_from",
+ "/api/v1/config/section/operators/option/default_queue",
headers={"Accept": "text/plain"},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 200
expected = textwrap.dedent(
"""\
- [smtp]
- smtp_mail_from = airflow@example.com
+ [operators]
+ default_queue = queue
"""
)
assert expected == response.data.decode()
@@ -278,7 +269,7 @@ def test_should_respond_200_text_plain_with_non_sensitive_only(self, mock_as_dic
@patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF)
def test_should_respond_200_application_json(self, mock_as_dict):
response = self.client.get(
- "/api/v1/config/section/smtp/option/smtp_mail_from",
+ "/api/v1/config/section/operators/option/default_queue",
headers={"Accept": "application/json"},
environ_overrides={"REMOTE_USER": "test"},
)
@@ -286,9 +277,9 @@ def test_should_respond_200_application_json(self, mock_as_dict):
expected = {
"sections": [
{
- "name": "smtp",
+ "name": "operators",
"options": [
- {"key": "smtp_mail_from", "value": "airflow@example.com"},
+ {"key": "default_queue", "value": "queue"},
],
},
]
@@ -298,18 +289,18 @@ def test_should_respond_200_application_json(self, mock_as_dict):
@patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF)
def test_should_respond_404_when_option_not_exist(self, mock_as_dict):
response = self.client.get(
- "/api/v1/config/section/smtp/option/smtp_mail_from1",
+ "/api/v1/config/section/operators/option/default_queue1",
headers={"Accept": "application/json"},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 404
- assert "The option [smtp/smtp_mail_from1] is not found in config." in response.json["detail"]
+ assert "The option [operators/default_queue1] is not found in config." in response.json["detail"]
@patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF)
def test_should_respond_406(self, mock_as_dict):
response = self.client.get(
- "/api/v1/config/section/smtp/option/smtp_mail_from",
+ "/api/v1/config/section/operators/option/default_queue",
headers={"Accept": "application/octet-stream"},
environ_overrides={"REMOTE_USER": "test"},
)
@@ -317,14 +308,14 @@ def test_should_respond_406(self, mock_as_dict):
def test_should_raises_401_unauthenticated(self):
response = self.client.get(
- "/api/v1/config/section/smtp/option/smtp_mail_from", headers={"Accept": "application/json"}
+ "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/json"}
)
assert_401(response)
def test_should_raises_403_unauthorized(self):
response = self.client.get(
- "/api/v1/config/section/smtp/option/smtp_mail_from",
+ "/api/v1/config/section/operators/option/default_queue",
headers={"Accept": "application/json"},
environ_overrides={"REMOTE_USER": "test_no_permissions"},
)
@@ -334,7 +325,7 @@ def test_should_raises_403_unauthorized(self):
@conf_vars({("webserver", "expose_config"): "False"})
def test_should_respond_403_when_expose_config_off(self):
response = self.client.get(
- "/api/v1/config/section/smtp/option/smtp_mail_from",
+ "/api/v1/config/section/operators/option/default_queue",
headers={"Accept": "application/json"},
environ_overrides={"REMOTE_USER": "test"},
)
diff --git a/tests/api_fastapi/core_api/routes/public/test_config.py b/tests/api_fastapi/core_api/routes/public/test_config.py
index 90009d1b54152..705432b845b74 100644
--- a/tests/api_fastapi/core_api/routes/public/test_config.py
+++ b/tests/api_fastapi/core_api/routes/public/test_config.py
@@ -33,16 +33,11 @@
HEADERS_INVALID = {"Accept": "invalid"}
HEADERS_JSON_UTF8 = {"Accept": "application/json; charset=utf-8"}
SECTION_CORE = "core"
-SECTION_SMTP = "smtp"
SECTION_DATABASE = "database"
SECTION_NOT_EXIST = "not_exist_section"
OPTION_KEY_PARALLELISM = "parallelism"
-OPTION_KEY_SMTP_HOST = "smtp_host"
-OPTION_KEY_SMTP_MAIL_FROM = "smtp_mail_from"
OPTION_KEY_SQL_ALCHEMY_CONN = "sql_alchemy_conn"
OPTION_VALUE_PARALLELISM = "1024"
-OPTION_VALUE_SMTP_HOST = "smtp.example.com"
-OPTION_VALUE_SMTP_MAIL_FROM = "airflow@example.com"
OPTION_VALUE_SQL_ALCHEMY_CONN = "sqlite:///example.db"
OPTION_NOT_EXIST = "not_exist_option"
OPTION_VALUE_SENSITIVE_HIDDEN = "< hidden >"
@@ -51,10 +46,6 @@
SECTION_CORE: {
OPTION_KEY_PARALLELISM: OPTION_VALUE_PARALLELISM,
},
- SECTION_SMTP: {
- OPTION_KEY_SMTP_HOST: OPTION_VALUE_SMTP_HOST,
- OPTION_KEY_SMTP_MAIL_FROM: OPTION_VALUE_SMTP_MAIL_FROM,
- },
SECTION_DATABASE: {
OPTION_KEY_SQL_ALCHEMY_CONN: OPTION_VALUE_SQL_ALCHEMY_CONN,
},
@@ -63,18 +54,12 @@
SECTION_CORE: {
OPTION_KEY_PARALLELISM: OPTION_VALUE_PARALLELISM,
},
- SECTION_SMTP: {
- OPTION_KEY_SMTP_HOST: OPTION_VALUE_SMTP_HOST,
- OPTION_KEY_SMTP_MAIL_FROM: OPTION_VALUE_SMTP_MAIL_FROM,
- },
SECTION_DATABASE: {
OPTION_KEY_SQL_ALCHEMY_CONN: OPTION_VALUE_SENSITIVE_HIDDEN,
},
}
MOCK_CONFIG_OVERRIDE = {
(SECTION_CORE, OPTION_KEY_PARALLELISM): OPTION_VALUE_PARALLELISM,
- (SECTION_SMTP, OPTION_KEY_SMTP_HOST): OPTION_VALUE_SMTP_HOST,
- (SECTION_SMTP, OPTION_KEY_SMTP_MAIL_FROM): OPTION_VALUE_SMTP_MAIL_FROM,
}
AIRFLOW_CONFIG_ENABLE_EXPOSE_CONFIG = {("webserver", "expose_config"): "True"}
@@ -92,13 +77,6 @@
{"key": OPTION_KEY_PARALLELISM, "value": OPTION_VALUE_PARALLELISM},
],
},
- {
- "name": SECTION_SMTP,
- "options": [
- {"key": OPTION_KEY_SMTP_HOST, "value": OPTION_VALUE_SMTP_HOST},
- {"key": OPTION_KEY_SMTP_MAIL_FROM, "value": OPTION_VALUE_SMTP_MAIL_FROM},
- ],
- },
{
"name": SECTION_DATABASE,
"options": [
@@ -115,13 +93,6 @@
{"key": OPTION_KEY_PARALLELISM, "value": OPTION_VALUE_PARALLELISM},
],
},
- {
- "name": SECTION_SMTP,
- "options": [
- {"key": OPTION_KEY_SMTP_HOST, "value": OPTION_VALUE_SMTP_HOST},
- {"key": OPTION_KEY_SMTP_MAIL_FROM, "value": OPTION_VALUE_SMTP_MAIL_FROM},
- ],
- },
{
"name": SECTION_DATABASE,
"options": [
@@ -201,10 +172,6 @@ class TestGetConfig(TestConfigEndpoint):
[{SECTION_CORE}]
{OPTION_KEY_PARALLELISM} = {OPTION_VALUE_PARALLELISM}
- [{SECTION_SMTP}]
- {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST}
- {OPTION_KEY_SMTP_MAIL_FROM} = {OPTION_VALUE_SMTP_MAIL_FROM}
-
[{SECTION_DATABASE}]
{OPTION_KEY_SQL_ALCHEMY_CONN} = {OPTION_VALUE_SQL_ALCHEMY_CONN}
"""
@@ -231,18 +198,6 @@ class TestGetConfig(TestConfigEndpoint):
],
},
),
- (
- SECTION_SMTP,
- HEADERS_TEXT,
- 200,
- textwrap.dedent(
- f"""\
- [{SECTION_SMTP}]
- {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST}
- {OPTION_KEY_SMTP_MAIL_FROM} = {OPTION_VALUE_SMTP_MAIL_FROM}
- """
- ),
- ),
(
SECTION_DATABASE,
HEADERS_JSON,
@@ -287,10 +242,6 @@ def test_get_config(self, test_client, section, headers, expected_status_code, e
[{SECTION_CORE}]
{OPTION_KEY_PARALLELISM} = {OPTION_VALUE_PARALLELISM}
- [{SECTION_SMTP}]
- {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST}
- {OPTION_KEY_SMTP_MAIL_FROM} = {OPTION_VALUE_SMTP_MAIL_FROM}
-
[{SECTION_DATABASE}]
{OPTION_KEY_SQL_ALCHEMY_CONN} = {OPTION_VALUE_SENSITIVE_HIDDEN}
"""
@@ -338,34 +289,6 @@ class TestGetConfigValue(TestConfigEndpoint):
200,
GET_CONFIG_VALUE_CORE_PARALLELISM_JSON_RESPONSE,
),
- (
- SECTION_SMTP,
- OPTION_KEY_SMTP_HOST,
- HEADERS_TEXT,
- 200,
- textwrap.dedent(
- f"""\
- [{SECTION_SMTP}]
- {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST}
- """
- ),
- ),
- (
- SECTION_SMTP,
- OPTION_KEY_SMTP_MAIL_FROM,
- HEADERS_JSON,
- 200,
- {
- "sections": [
- {
- "name": SECTION_SMTP,
- "options": [
- {"key": OPTION_KEY_SMTP_MAIL_FROM, "value": OPTION_VALUE_SMTP_MAIL_FROM},
- ],
- },
- ],
- },
- ),
(
SECTION_DATABASE,
OPTION_KEY_SQL_ALCHEMY_CONN,
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 8ca00e2f8d650..3a782f84b9104 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -29,7 +29,7 @@
from traceback import format_exception
from typing import cast
from unittest import mock
-from unittest.mock import call, mock_open, patch
+from unittest.mock import call, patch
from uuid import uuid4
import pendulum
@@ -2148,109 +2148,8 @@ def test_overwrite_params_with_dag_run_conf_none(self, create_task_instance):
params = process_params(ti.task.dag, ti.task, dag_run.conf, suppress_exception=False)
assert params["override"] is False
- @pytest.mark.parametrize("use_native_obj", [True, False])
- @patch("airflow.models.taskinstance.send_email")
- def test_email_alert(self, mock_send_email, dag_maker, use_native_obj):
- with dag_maker(dag_id="test_failure_email", render_template_as_native_obj=use_native_obj):
- task = BashOperator(task_id="test_email_alert", bash_command="exit 1", email="to")
- ti = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0]
- ti.task = task
-
- with contextlib.suppress(AirflowException):
- ti.run()
-
- (email, title, body), _ = mock_send_email.call_args
- assert email == "to"
- assert "test_email_alert" in title
- assert "test_email_alert" in body
- assert "Try 0" in body
-
- @conf_vars(
- {
- ("email", "subject_template"): "/subject/path",
- ("email", "html_content_template"): "/html_content/path",
- }
- )
- @patch("airflow.models.taskinstance.send_email")
- def test_email_alert_with_config(self, mock_send_email, dag_maker):
- with dag_maker(dag_id="test_failure_email"):
- task = BashOperator(
- task_id="test_email_alert_with_config",
- bash_command="exit 1",
- email="to",
- )
- ti = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0]
- ti.task = task
-
- opener = mock_open(read_data="template: {{ti.task_id}}")
- with patch("airflow.models.taskinstance.open", opener, create=True):
- with contextlib.suppress(AirflowException):
- ti.run()
-
- (email, title, body), _ = mock_send_email.call_args
- assert email == "to"
- assert title == "template: test_email_alert_with_config"
- assert body == "template: test_email_alert_with_config"
-
- @patch("airflow.models.taskinstance.send_email")
- def test_email_alert_with_filenotfound_config(self, mock_send_email, dag_maker):
- with dag_maker(dag_id="test_failure_email"):
- task = BashOperator(
- task_id="test_email_alert_with_config",
- bash_command="exit 1",
- email="to",
- )
- ti = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0]
- ti.task = task
-
- # Run test when the template file is not found
- opener = mock_open(read_data="template: {{ti.task_id}}")
- opener.side_effect = FileNotFoundError
- with patch("airflow.models.taskinstance.open", opener, create=True):
- with contextlib.suppress(AirflowException):
- ti.run()
-
- (email_error, title_error, body_error), _ = mock_send_email.call_args
-
- # Rerun task without any error and no template file
- with contextlib.suppress(AirflowException):
- ti.run()
-
- (email_default, title_default, body_default), _ = mock_send_email.call_args
-
- assert email_error == email_default == "to"
- assert title_default == title_error
- assert body_default == body_error
-
- @pytest.mark.parametrize("task_id", ["test_email_alert", "test_email_alert__1"])
- @patch("airflow.models.taskinstance.send_email")
- def test_failure_mapped_taskflow(self, mock_send_email, dag_maker, session, task_id):
- with dag_maker(session=session) as dag:
-
- @dag.task(email="to")
- def test_email_alert(x):
- raise RuntimeError("Fail please")
-
- test_email_alert.expand(x=["a", "b"]) # This is 'test_email_alert'.
- test_email_alert.expand(x=[1, 2, 3]) # This is 'test_email_alert__1'.
-
- dr: DagRun = dag_maker.create_dagrun(logical_date=timezone.utcnow())
-
- ti = dr.get_task_instance(task_id, map_index=0, session=session)
- assert ti is not None
-
- # The task will fail and trigger email reporting.
- with pytest.raises(RuntimeError, match=r"^Fail please$"):
- ti.run(session=session)
-
- (email, title, body), _ = mock_send_email.call_args
- assert email == "to"
- assert title == f"Airflow alert: "
- assert body.startswith("Try 0") # try number only incremented by the scheduler
- assert "test_email_alert" in body
-
def test_set_duration(self):
- task = EmptyOperator(task_id="op", email="test@test.test")
+ task = EmptyOperator(task_id="op")
ti = TI(task=task)
ti.start_date = datetime.datetime(2018, 10, 1, 1)
ti.end_date = datetime.datetime(2018, 10, 1, 2)
@@ -2258,7 +2157,7 @@ def test_set_duration(self):
assert ti.duration == 3600
def test_set_duration_empty_dates(self):
- task = EmptyOperator(task_id="op", email="test@test.test")
+ task = EmptyOperator(task_id="op")
ti = TI(task=task)
ti.set_duration()
assert ti.duration is None
diff --git a/tests/operators/test_email.py b/tests/operators/test_email.py
deleted file mode 100644
index df772beb95fdd..0000000000000
--- a/tests/operators/test_email.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import datetime
-from unittest import mock
-
-import pytest
-
-from airflow.operators.email import EmailOperator
-from airflow.utils import timezone
-
-from tests_common.test_utils.config import conf_vars
-
-pytestmark = pytest.mark.db_test
-
-DEFAULT_DATE = timezone.datetime(2016, 1, 1)
-END_DATE = timezone.datetime(2016, 1, 2)
-INTERVAL = datetime.timedelta(hours=12)
-FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
-
-send_email_test = mock.Mock()
-
-
-class TestEmailOperator:
- def test_execute(self, dag_maker):
- with conf_vars({("email", "email_backend"): "tests.operators.test_email.send_email_test"}):
- with dag_maker(
- "test_dag",
- default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
- schedule=INTERVAL,
- serialized=True,
- ):
- task = EmailOperator(
- to="airflow@example.com",
- subject="Test Run",
- html_content="The quick brown fox jumps over the lazy dog",
- task_id="task",
- files=["/tmp/Report-A-{{ ds }}.csv"],
- custom_headers={"Reply-To": "reply_to@example.com"},
- )
- dag_maker.create_dagrun()
- task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- assert send_email_test.call_count == 1
- call_args = send_email_test.call_args.kwargs
- assert call_args["files"] == ["/tmp/Report-A-2016-01-01.csv"]
- assert call_args["custom_headers"] == {"Reply-To": "reply_to@example.com"}
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 84a63674e5119..e3152585e446f 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -1372,9 +1372,6 @@ def test_no_new_fields_added_to_base_operator(self):
"doc_yaml": None,
"downstream_task_ids": set(),
"end_date": None,
- "email": None,
- "email_on_failure": True,
- "email_on_retry": True,
"execution_timeout": None,
"executor": None,
"executor_config": {},
@@ -2251,10 +2248,14 @@ def test_not_templateable_fields_in_serialized_dag(self):
class TestOperator(BaseOperator):
template_fields = (
- "email", # templateable
+ "any_field", # templateable
"execution_timeout", # not templateable
)
+ def __init__(self, *args, any_field=None, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.any_field = any_field
+
def execute(self, context: Context):
pass
@@ -2263,18 +2264,18 @@ def execute(self, context: Context):
with dag:
task = TestOperator(
task_id="test_task",
- email="{{ ','.join(test_email_list) }}",
+ any_field="{{ ','.join(test_any_field_list) }}",
execution_timeout=timedelta(seconds=10),
)
- task.render_template_fields(context={"test_email_list": ["foo@test.com", "bar@test.com"]})
- assert task.email == "foo@test.com,bar@test.com"
+ task.render_template_fields(context={"test_any_field_list": ["foo", "boo"]})
+ assert task.any_field == "foo,boo"
with pytest.raises(
AirflowException,
match=re.escape(
dedent(
"""Failed to serialize DAG 'test_dag': Cannot template BaseOperator field:
- 'execution_timeout' op.__class__.__name__='TestOperator' op.template_fields=('email', 'execution_timeout')"""
+ 'execution_timeout' op.__class__.__name__='TestOperator' op.template_fields=('any_field', 'execution_timeout')"""
)
),
):
diff --git a/tests/utils/test_email.py b/tests/utils/test_email.py
deleted file mode 100644
index dfcae90215c56..0000000000000
--- a/tests/utils/test_email.py
+++ /dev/null
@@ -1,401 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import json
-import os
-from email.mime.application import MIMEApplication
-from email.mime.multipart import MIMEMultipart
-from email.mime.text import MIMEText
-from smtplib import SMTPServerDisconnected
-from unittest import mock
-
-import pytest
-
-from airflow.configuration import conf
-from airflow.utils import email
-
-from tests_common.test_utils.config import conf_vars
-
-EMAILS = ["test1@example.com", "test2@example.com"]
-
-send_email_test = mock.MagicMock()
-
-
-class TestEmail:
- def setup_method(self):
- conf.remove_option("email", "EMAIL_BACKEND")
- conf.remove_option("email", "EMAIL_CONN_ID")
-
- def test_get_email_address_single_email(self):
- emails_string = "test1@example.com"
-
- assert email.get_email_address_list(emails_string) == [emails_string]
-
- def test_get_email_address_comma_sep_string(self):
- emails_string = "test1@example.com, test2@example.com"
-
- assert email.get_email_address_list(emails_string) == EMAILS
-
- def test_get_email_address_colon_sep_string(self):
- emails_string = "test1@example.com; test2@example.com"
-
- assert email.get_email_address_list(emails_string) == EMAILS
-
- def test_get_email_address_list(self):
- emails_list = ["test1@example.com", "test2@example.com"]
-
- assert email.get_email_address_list(emails_list) == EMAILS
-
- def test_get_email_address_tuple(self):
- emails_tuple = ("test1@example.com", "test2@example.com")
-
- assert email.get_email_address_list(emails_tuple) == EMAILS
-
- def test_get_email_address_invalid_type(self):
- emails_string = 1
-
- with pytest.raises(TypeError):
- email.get_email_address_list(emails_string)
-
- def test_get_email_address_invalid_type_in_iterable(self):
- emails_list = ["test1@example.com", 2]
-
- with pytest.raises(TypeError):
- email.get_email_address_list(emails_list)
-
- @mock.patch("airflow.utils.email.send_email")
- def test_default_backend(self, mock_send_email):
- res = email.send_email("to", "subject", "content")
- mock_send_email.assert_called_once_with("to", "subject", "content")
- assert mock_send_email.return_value == res
-
- @mock.patch("airflow.utils.email.send_email_smtp")
- def test_custom_backend(self, mock_send_email):
- with conf_vars(
- {
- ("email", "email_backend"): "tests.utils.test_email.send_email_test",
- ("email", "email_conn_id"): "smtp_default",
- }
- ):
- email.send_email("to", "subject", "content")
- send_email_test.assert_called_once_with(
- "to",
- "subject",
- "content",
- files=None,
- dryrun=False,
- cc=None,
- bcc=None,
- mime_charset="utf-8",
- mime_subtype="mixed",
- conn_id="smtp_default",
- from_email=None,
- custom_headers=None,
- )
- assert not mock_send_email.called
-
- @mock.patch("airflow.utils.email.send_email_smtp")
- @conf_vars(
- {
- ("email", "email_backend"): "tests.utils.test_email.send_email_test",
- ("email", "email_conn_id"): "smtp_default",
- ("email", "from_email"): "from@test.com",
- }
- )
- def test_custom_backend_sender(self, mock_send_email_smtp):
- email.send_email("to", "subject", "content")
- _, call_kwargs = send_email_test.call_args
- assert call_kwargs["from_email"] == "from@test.com"
- assert not mock_send_email_smtp.called
-
- def test_build_mime_message(self):
- mail_from = "from@example.com"
- mail_to = "to@example.com"
- subject = "test subject"
- html_content = "Test"
- custom_headers = {"Reply-To": "reply_to@example.com"}
-
- msg, recipients = email.build_mime_message(
- mail_from=mail_from,
- to=mail_to,
- subject=subject,
- html_content=html_content,
- custom_headers=custom_headers,
- )
-
- assert "From" in msg
- assert "To" in msg
- assert "Subject" in msg
- assert "Reply-To" in msg
- assert [mail_to] == recipients
- assert msg["To"] == ",".join(recipients)
-
-
-@pytest.mark.db_test
-class TestEmailSmtp:
- @pytest.fixture(autouse=True)
- def setup_test_cases(self, monkeypatch):
- monkeypatch.setenv( # Set the default smtp connection for all test cases
- "AIRFLOW_CONN_SMTP_DEFAULT",
- json.dumps({"conn_type": "smtp", "login": "user", "password": "p@$$word"}),
- )
-
- @mock.patch("airflow.utils.email.send_mime_email")
- def test_send_smtp(self, mock_send_mime, tmp_path):
- path = tmp_path / "testfile"
- path.write_text("attachment")
- email.send_email_smtp("to", "subject", "content", files=[os.fspath(path)])
- assert mock_send_mime.called
- _, call_args = mock_send_mime.call_args
- assert conf.get("smtp", "SMTP_MAIL_FROM") == call_args["e_from"]
- assert call_args["e_to"] == ["to"]
- msg = call_args["mime_msg"]
- assert msg["Subject"] == "subject"
- assert conf.get("smtp", "SMTP_MAIL_FROM") == msg["From"]
- assert len(msg.get_payload()) == 2
- filename = f'attachment; filename="{path.name}"'
- assert filename == msg.get_payload()[-1].get("Content-Disposition")
- mimeapp = MIMEApplication("attachment")
- assert mimeapp.get_payload() == msg.get_payload()[-1].get_payload()
-
- @mock.patch("airflow.utils.email.send_mime_email")
- def test_send_smtp_with_multibyte_content(self, mock_send_mime):
- email.send_email_smtp("to", "subject", "🔥", mime_charset="utf-8")
- assert mock_send_mime.called
- _, call_args = mock_send_mime.call_args
- msg = call_args["mime_msg"]
- mimetext = MIMEText("🔥", "mixed", "utf-8")
- assert mimetext.get_payload() == msg.get_payload()[0].get_payload()
-
- @mock.patch("airflow.utils.email.send_mime_email")
- def test_send_bcc_smtp(self, mock_send_mime, tmp_path):
- path = tmp_path / "testfile"
- path.write_text("attachment")
- email.send_email_smtp(
- "to",
- "subject",
- "content",
- files=[os.fspath(path)],
- cc="cc",
- bcc="bcc",
- custom_headers={"Reply-To": "reply_to@example.com"},
- )
- assert mock_send_mime.called
- _, call_args = mock_send_mime.call_args
- assert conf.get("smtp", "SMTP_MAIL_FROM") == call_args["e_from"]
- assert call_args["e_to"] == ["to", "cc", "bcc"]
- msg = call_args["mime_msg"]
- assert msg["Subject"] == "subject"
- assert conf.get("smtp", "SMTP_MAIL_FROM") == msg["From"]
- assert len(msg.get_payload()) == 2
- assert f'attachment; filename="{path.name}"' == msg.get_payload()[-1].get("Content-Disposition")
- mimeapp = MIMEApplication("attachment")
- assert mimeapp.get_payload() == msg.get_payload()[-1].get_payload()
- assert msg["Reply-To"] == "reply_to@example.com"
-
- @mock.patch("smtplib.SMTP_SSL")
- @mock.patch("smtplib.SMTP")
- def test_send_mime_airflow_config(self, mock_smtp, mock_smtp_ssl, monkeypatch):
- monkeypatch.delenv("AIRFLOW_CONN_SMTP_DEFAULT", raising=False)
- mock_smtp.return_value = mock.Mock()
- msg = MIMEMultipart()
- email.send_mime_email("from", "to", msg, dryrun=False)
- mock_smtp.assert_called_once_with(
- host=conf.get("smtp", "SMTP_HOST"),
- port=conf.getint("smtp", "SMTP_PORT"),
- timeout=conf.getint("smtp", "SMTP_TIMEOUT"),
- )
- assert not mock_smtp_ssl.called
- assert mock_smtp.return_value.starttls.called
- mock_smtp.return_value.sendmail.assert_called_once_with("from", "to", msg.as_string())
- assert mock_smtp.return_value.quit.called
-
- @mock.patch("smtplib.SMTP")
- def test_send_mime_conn_id(self, mock_smtp, monkeypatch):
- monkeypatch.setenv(
- "AIRFLOW_CONN_SMTP_TEST_CONN",
- json.dumps({"conn_type": "smtp", "login": "test-user", "password": "test-p@$$word"}),
- )
- msg = MIMEMultipart()
- email.send_mime_email("from", "to", msg, dryrun=False, conn_id="smtp_test_conn")
- mock_smtp.return_value.login.assert_called_once_with("test-user", "test-p@$$word")
- mock_smtp.return_value.sendmail.assert_called_once_with("from", "to", msg.as_string())
- assert mock_smtp.return_value.quit.called
-
- @mock.patch("smtplib.SMTP_SSL")
- @mock.patch("smtplib.SMTP")
- def test_send_mime_ssl_none_context(self, mock_smtp, mock_smtp_ssl):
- mock_smtp_ssl.return_value = mock.Mock()
- with conf_vars({("smtp", "smtp_ssl"): "True", ("email", "ssl_context"): "none"}):
- email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False)
- assert not mock_smtp.called
- mock_smtp_ssl.assert_called_once_with(
- host=conf.get("smtp", "SMTP_HOST"),
- port=conf.getint("smtp", "SMTP_PORT"),
- timeout=conf.getint("smtp", "SMTP_TIMEOUT"),
- context=None,
- )
-
- @mock.patch("smtplib.SMTP_SSL")
- @mock.patch("smtplib.SMTP")
- @mock.patch("ssl.create_default_context")
- def test_send_mime_ssl_default_context_if_not_set(self, create_default_context, mock_smtp, mock_smtp_ssl):
- mock_smtp_ssl.return_value = mock.Mock()
- with conf_vars({("smtp", "smtp_ssl"): "True"}):
- email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False)
- assert not mock_smtp.called
- assert create_default_context.called
- mock_smtp_ssl.assert_called_once_with(
- host=conf.get("smtp", "SMTP_HOST"),
- port=conf.getint("smtp", "SMTP_PORT"),
- timeout=conf.getint("smtp", "SMTP_TIMEOUT"),
- context=create_default_context.return_value,
- )
-
- @mock.patch("smtplib.SMTP_SSL")
- @mock.patch("smtplib.SMTP")
- @mock.patch("ssl.create_default_context")
- def test_send_mime_ssl_default_context_with_value_set_to_default(
- self, create_default_context, mock_smtp, mock_smtp_ssl
- ):
- mock_smtp_ssl.return_value = mock.Mock()
- with conf_vars({("smtp", "smtp_ssl"): "True", ("email", "ssl_context"): "default"}):
- email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False)
- assert not mock_smtp.called
- assert create_default_context.called
- mock_smtp_ssl.assert_called_once_with(
- host=conf.get("smtp", "SMTP_HOST"),
- port=conf.getint("smtp", "SMTP_PORT"),
- timeout=conf.getint("smtp", "SMTP_TIMEOUT"),
- context=create_default_context.return_value,
- )
-
- @mock.patch("smtplib.SMTP_SSL")
- @mock.patch("smtplib.SMTP")
- def test_send_mime_noauth(self, mock_smtp, mock_smtp_ssl):
- mock_smtp.return_value = mock.Mock()
- with conf_vars(
- {
- ("smtp", "smtp_user"): None,
- ("smtp", "smtp_password"): None,
- }
- ):
- email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False)
- assert not mock_smtp_ssl.called
- mock_smtp.assert_called_once_with(
- host=conf.get("smtp", "SMTP_HOST"),
- port=conf.getint("smtp", "SMTP_PORT"),
- timeout=conf.getint("smtp", "SMTP_TIMEOUT"),
- )
- assert not mock_smtp.login.called
-
- @mock.patch("smtplib.SMTP_SSL")
- @mock.patch("smtplib.SMTP")
- def test_send_mime_dryrun(self, mock_smtp, mock_smtp_ssl):
- email.send_mime_email("from", "to", MIMEMultipart(), dryrun=True)
- assert not mock_smtp.called
- assert not mock_smtp_ssl.called
-
- @mock.patch("smtplib.SMTP_SSL")
- @mock.patch("smtplib.SMTP")
- def test_send_mime_complete_failure(self, mock_smtp: mock.Mock, mock_smtp_ssl: mock.Mock):
- mock_smtp.side_effect = SMTPServerDisconnected()
- msg = MIMEMultipart()
- with pytest.raises(SMTPServerDisconnected):
- email.send_mime_email("from", "to", msg, dryrun=False)
- mock_smtp.assert_any_call(
- host=conf.get("smtp", "SMTP_HOST"),
- port=conf.getint("smtp", "SMTP_PORT"),
- timeout=conf.getint("smtp", "SMTP_TIMEOUT"),
- )
- assert mock_smtp.call_count == conf.getint("smtp", "SMTP_RETRY_LIMIT")
- assert not mock_smtp_ssl.called
- assert not mock_smtp.return_value.starttls.called
- assert not mock_smtp.return_value.login.called
- assert not mock_smtp.return_value.sendmail.called
- assert not mock_smtp.return_value.quit.called
-
- @mock.patch("smtplib.SMTP_SSL")
- @mock.patch("smtplib.SMTP")
- @mock.patch("ssl.create_default_context")
- def test_send_mime_ssl_complete_failure(self, create_default_context, mock_smtp, mock_smtp_ssl):
- mock_smtp_ssl.side_effect = SMTPServerDisconnected()
- msg = MIMEMultipart()
- with conf_vars({("smtp", "smtp_ssl"): "True"}):
- with pytest.raises(SMTPServerDisconnected):
- email.send_mime_email("from", "to", msg, dryrun=False)
-
- mock_smtp_ssl.assert_any_call(
- host=conf.get("smtp", "SMTP_HOST"),
- port=conf.getint("smtp", "SMTP_PORT"),
- timeout=conf.getint("smtp", "SMTP_TIMEOUT"),
- context=create_default_context.return_value,
- )
- assert create_default_context.called
- assert mock_smtp_ssl.call_count == conf.getint("smtp", "SMTP_RETRY_LIMIT")
- assert not mock_smtp.called
- assert not mock_smtp_ssl.return_value.starttls.called
- assert not mock_smtp_ssl.return_value.login.called
- assert not mock_smtp_ssl.return_value.sendmail.called
- assert not mock_smtp_ssl.return_value.quit.called
-
- @mock.patch("smtplib.SMTP_SSL")
- @mock.patch("smtplib.SMTP")
- def test_send_mime_custom_timeout_retrylimit(self, mock_smtp, mock_smtp_ssl):
- mock_smtp.side_effect = SMTPServerDisconnected()
- msg = MIMEMultipart()
-
- custom_retry_limit = 10
- custom_timeout = 60
-
- with conf_vars(
- {
- ("smtp", "smtp_retry_limit"): str(custom_retry_limit),
- ("smtp", "smtp_timeout"): str(custom_timeout),
- }
- ):
- with pytest.raises(SMTPServerDisconnected):
- email.send_mime_email("from", "to", msg, dryrun=False)
-
- mock_smtp.assert_any_call(
- host=conf.get("smtp", "SMTP_HOST"), port=conf.getint("smtp", "SMTP_PORT"), timeout=custom_timeout
- )
- assert not mock_smtp_ssl.called
- assert mock_smtp.call_count == 10
-
- @mock.patch("smtplib.SMTP_SSL")
- @mock.patch("smtplib.SMTP")
- def test_send_mime_partial_failure(self, mock_smtp, mock_smtp_ssl):
- final_mock = mock.Mock()
- side_effects = [SMTPServerDisconnected(), SMTPServerDisconnected(), final_mock]
- mock_smtp.side_effect = side_effects
- msg = MIMEMultipart()
-
- email.send_mime_email("from", "to", msg, dryrun=False)
-
- mock_smtp.assert_any_call(
- host=conf.get("smtp", "SMTP_HOST"),
- port=conf.getint("smtp", "SMTP_PORT"),
- timeout=conf.getint("smtp", "SMTP_TIMEOUT"),
- )
- assert mock_smtp.call_count == side_effects.index(final_mock) + 1
- assert not mock_smtp_ssl.called
- assert final_mock.starttls.called
- final_mock.sendmail.assert_called_once_with("from", "to", msg.as_string())
- assert final_mock.quit.called
diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py
index 969d0b2a61c8c..809398d9e442f 100644
--- a/tests_common/pytest_plugin.py
+++ b/tests_common/pytest_plugin.py
@@ -1095,7 +1095,6 @@ def __call__(
on_execute_callback: Callable = ...,
on_failure_callback: Callable = ...,
on_retry_callback: Callable = ...,
- email: str = ...,
with_dagrun_type="scheduled",
**kwargs,
) -> tuple[DAG, EmptyOperator]: ...
@@ -1135,7 +1134,6 @@ def create_dag(
on_execute_callback=None,
on_failure_callback=None,
on_retry_callback=None,
- email=None,
with_dagrun_type=DagRunType.SCHEDULED,
**kwargs,
):
@@ -1151,7 +1149,6 @@ def create_dag(
on_execute_callback=on_execute_callback,
on_failure_callback=on_failure_callback,
on_retry_callback=on_retry_callback,
- email=email,
pool=pool,
trigger_rule=trigger_rule,
**op_kwargs,
@@ -1188,7 +1185,6 @@ def __call__(
on_execute_callback: Callable = ...,
on_failure_callback: Callable = ...,
on_retry_callback: Callable = ...,
- email: str = ...,
map_index: int = -1,
**kwargs,
) -> TaskInstance: ...
@@ -1223,7 +1219,6 @@ def maker(
on_execute_callback=None,
on_failure_callback=None,
on_retry_callback=None,
- email=None,
map_index=-1,
hostname=None,
pid=None,
@@ -1246,7 +1241,6 @@ def maker(
on_execute_callback=on_execute_callback,
on_failure_callback=on_failure_callback,
on_retry_callback=on_retry_callback,
- email=email,
pool=pool,
trigger_rule=trigger_rule,
**op_kwargs,