diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f567c13d06e7f..858fa90665a60 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -574,7 +574,8 @@ repos: language: pygrep name: No providers imports in core example DAGs description: The core example DAGs have no dependencies other than standard provider or core Airflow - entry: "^\\s*from airflow\\.providers.(?!standard.)" + # TODO: rewrite airflow/example_dags/example_dag_decorator.py and remove `smtp` provider exception + entry: "^\\s*from airflow\\.providers\\.(?!standard|smtp)" pass_filenames: true files: ^airflow/example_dags/.*\.py$ - id: check-no-airflow-deprecation-in-providers diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 486c371682fdb..4fff7f906afe6 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2680,13 +2680,6 @@ paths: base_log_folder = /opt/airflow/logs - - [smtp] - - smtp_host = localhost - - smtp_mail_from = airflow@example.com - ' '401': content: diff --git a/airflow/api_fastapi/core_api/routes/public/config.py b/airflow/api_fastapi/core_api/routes/public/config.py index 13edf5f821b4c..70110fc6462e1 100644 --- a/airflow/api_fastapi/core_api/routes/public/config.py +++ b/airflow/api_fastapi/core_api/routes/public/config.py @@ -56,10 +56,6 @@ [core] dags_folder = /opt/airflow/dags base_log_folder = /opt/airflow/logs - - [smtp] - smtp_host = localhost - smtp_mail_from = airflow@example.com """ ), }, diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 201d4e3e6bc83..542f76ea58167 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2031,135 +2031,6 @@ webserver: type: integer example: ~ default: "1000" -email: - description: | - Configuration email backend and whether to - send email alerts on retry or failure - options: - email_backend: - description: Email backend to use - version_added: ~ - type: string - example: ~ - default: "airflow.utils.email.send_email_smtp" - email_conn_id: - description: Email connection to use - version_added: 2.1.0 - type: string - example: ~ - default: "smtp_default" - default_email_on_retry: - description: | - Whether email alerts should be sent when a task is retried - version_added: 2.0.0 - type: boolean - example: ~ - default: "True" - default_email_on_failure: - description: | - Whether email alerts should be sent when a task failed - version_added: 2.0.0 - type: boolean - example: ~ - default: "True" - subject_template: - description: | - File that will be used as the template for Email subject (which will be rendered using Jinja2). - If not set, Airflow uses a base template. - version_added: 2.0.1 - type: string - example: "/path/to/my_subject_template_file" - default: ~ - see_also: ":doc:`Email Configuration `" - html_content_template: - description: | - File that will be used as the template for Email content (which will be rendered using Jinja2). - If not set, Airflow uses a base template. - version_added: 2.0.1 - type: string - example: "/path/to/my_html_content_template_file" - default: ~ - see_also: ":doc:`Email Configuration `" - from_email: - description: | - Email address that will be used as sender address. - It can either be raw email or the complete address in a format ``Sender Name `` - version_added: 2.2.4 - type: string - example: "Airflow " - default: ~ - ssl_context: - description: | - ssl context to use when using SMTP and IMAP SSL connections. By default, the context is "default" - which sets it to ``ssl.create_default_context()`` which provides the right balance between - compatibility and security, it however requires that certificates in your operating system are - updated and that SMTP/IMAP servers of yours have valid certificates that have corresponding public - keys installed on your machines. You can switch it to "none" if you want to disable checking - of the certificates, but it is not recommended as it allows MITM (man-in-the-middle) attacks - if your infrastructure is not sufficiently secured. It should only be set temporarily while you - are fixing your certificate configuration. This can be typically done by upgrading to newer - version of the operating system you run Airflow components on,by upgrading/refreshing proper - certificates in the OS or by updating certificates for your mail servers. - type: string - version_added: 2.7.0 - example: "default" - default: "default" -smtp: - description: | - If you want airflow to send emails on retries, failure, and you want to use - the airflow.utils.email.send_email_smtp function, you have to configure an - smtp server here - options: - smtp_host: - description: | - Specifies the host server address used by Airflow when sending out email notifications via SMTP. - version_added: ~ - type: string - example: ~ - default: "localhost" - smtp_starttls: - description: | - Determines whether to use the STARTTLS command when connecting to the SMTP server. - version_added: ~ - type: string - example: ~ - default: "True" - smtp_ssl: - description: | - Determines whether to use an SSL connection when talking to the SMTP server. - version_added: ~ - type: string - example: ~ - default: "False" - smtp_port: - description: | - Defines the port number on which Airflow connects to the SMTP server to send email notifications. - version_added: ~ - type: string - example: ~ - default: "25" - smtp_mail_from: - description: | - Specifies the default **from** email address used when Airflow sends email notifications. - version_added: ~ - type: string - example: ~ - default: "airflow@example.com" - smtp_timeout: - description: | - Determines the maximum time (in seconds) the Apache Airflow system will wait for a - connection to the SMTP server to be established. - version_added: 2.0.0 - type: integer - example: ~ - default: "30" - smtp_retry_limit: - description: | - Defines the maximum number of times Airflow will attempt to connect to the SMTP server. - version_added: 2.0.0 - type: integer - example: ~ - default: "5" sentry: description: | `Sentry `__ integration. Here you can supply diff --git a/airflow/config_templates/unit_tests.cfg b/airflow/config_templates/unit_tests.cfg index 8f1a5ed676a16..a04cc2078ab47 100644 --- a/airflow/config_templates/unit_tests.cfg +++ b/airflow/config_templates/unit_tests.cfg @@ -66,10 +66,6 @@ allowed_deserialization_classes = airflow.* tests.* # celery tests rely on it being set celery_logging_level = INFO -[smtp] -# Used as default values for SMTP unit tests -smtp_mail_from = airflow@example.com - [api] auth_backends = airflow.providers.fab.auth_manager.api.auth.backend.session diff --git a/airflow/configuration.py b/airflow/configuration.py index 6ba3fe6006c66..8b7357455457f 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -353,13 +353,6 @@ def inversed_deprecated_sections(self): "navbar_color": (re2.compile(r"(?i)\A#007A87\z"), "#fff", "2.1"), "dag_default_view": (re2.compile(r"^tree$"), "grid", "3.0"), }, - "email": { - "email_backend": ( - re2.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"), - r"airflow.providers.sendgrid.utils.emailer.send_email", - "2.1", - ), - }, "logging": { "log_filename_template": ( re2.compile(re2.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")), diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py index 4e55ab7ff15a0..8f371ee24278c 100644 --- a/airflow/example_dags/tutorial.py +++ b/airflow/example_dags/tutorial.py @@ -45,9 +45,6 @@ # You can override them on a per-task basis during operator initialization default_args={ "depends_on_past": False, - "email": ["airflow@example.com"], - "email_on_failure": False, - "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=5), # 'queue': 'bash_queue', diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 3361fe33df6f7..1a1467f7f01b5 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -212,13 +212,6 @@ class derived from this one results in the creation of a task object, :param task_id: a unique, meaningful id for the task :param owner: the owner of the task. Using a meaningful description (e.g. user/person/team/role name) to clarify ownership is recommended. - :param email: the 'to' email address(es) used in email alerts. This can be a - single email or multiple ones. Multiple addresses can be specified as a - comma or semicolon separated string or by passing a list of strings. - :param email_on_retry: Indicates whether email alerts should be sent when a - task is retried - :param email_on_failure: Indicates whether email alerts should be sent when - a task failed :param retries: the number of retries that should be performed before failing the task :param retry_delay: delay between retries, can be set as ``timedelta`` or diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index ceb0acfcf9946..8c6f16a761d5a 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -22,7 +22,6 @@ import itertools import logging import math -import operator import os import signal import traceback @@ -105,7 +104,6 @@ from airflow.models.xcom import LazyXComSelectSequence, XCom from airflow.plugins_manager import integrate_macros_plugins from airflow.sdk.api.datamodels._generated import AssetProfile -from airflow.sdk.definitions._internal.templater import SandboxedEnvironment from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUniqueKey, AssetUriRef from airflow.sdk.definitions.param import process_params from airflow.sdk.definitions.taskgroup import MappedTaskGroup @@ -123,10 +121,8 @@ OutletEventAccessors, VariableAccessor, context_get_outlet_events, - context_merge, ) -from airflow.utils.email import send_email -from airflow.utils.helpers import prune_dict, render_template_to_string +from airflow.utils.helpers import prune_dict from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname from airflow.utils.operator_helpers import ExecutionCallableRunner, context_to_airflow_vars @@ -1077,15 +1073,6 @@ def _handle_failure( ) _log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "") - if ( - failure_context["task"] - and failure_context["email_for_state"](failure_context["task"]) - and failure_context["task"].email - ): - try: - task_instance.email_alert(error, failure_context["task"]) - except Exception: - log.exception("Failed to send email to: %s", failure_context["task"].email) if failure_context["callbacks"] and failure_context["context"]: _run_finished_callback( @@ -1277,116 +1264,6 @@ def _get_previous_start_date( return pendulum.instance(prev_ti.start_date) if prev_ti and prev_ti.start_date else None -def _email_alert(*, task_instance: TaskInstance, exception, task: BaseOperator) -> None: - """ - Send alert email with exception information. - - :param task_instance: the task instance - :param exception: the exception - :param task: task related to the exception - - :meta private: - """ - subject, html_content, html_content_err = task_instance.get_email_subject_content(exception, task=task) - if TYPE_CHECKING: - assert task.email - try: - send_email(task.email, subject, html_content) - except Exception: - send_email(task.email, subject, html_content_err) - - -def _get_email_subject_content( - *, - task_instance: TaskInstance, - exception: BaseException, - task: BaseOperator | None = None, -) -> tuple[str, str, str]: - """ - Get the email subject content for exceptions. - - :param task_instance: the task instance - :param exception: the exception sent in the email - :param task: - - :meta private: - """ - # For a ti from DB (without ti.task), return the default value - if task is None: - task = getattr(task_instance, "task") - use_default = task is None - exception_html = str(exception).replace("\n", "
") - - default_subject = "Airflow alert: {{ti}}" - # For reporting purposes, we report based on 1-indexed, - # not 0-indexed lists (i.e. Try 1 instead of - # Try 0 for the first attempt). - default_html_content = ( - "Try {{try_number}} out of {{max_tries + 1}}
" - "Exception:
{{exception_html}}
" - 'Log: Link
' - "Host: {{ti.hostname}}
" - 'Mark success: Link
' - ) - - default_html_content_err = ( - "Try {{try_number}} out of {{max_tries + 1}}
" - "Exception:
Failed attempt to attach error logs
" - 'Log: Link
' - "Host: {{ti.hostname}}
" - 'Mark success: Link
' - ) - - additional_context: dict[str, Any] = { - "exception": exception, - "exception_html": exception_html, - "try_number": task_instance.try_number, - "max_tries": task_instance.max_tries, - } - - if use_default: - default_context = {"ti": task_instance, **additional_context} - jinja_env = jinja2.Environment( - loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True - ) - subject = jinja_env.from_string(default_subject).render(**default_context) - html_content = jinja_env.from_string(default_html_content).render(**default_context) - html_content_err = jinja_env.from_string(default_html_content_err).render(**default_context) - - else: - if TYPE_CHECKING: - assert task_instance.task - - # Use the DAG's get_template_env() to set force_sandboxed. Don't add - # the flag to the function on task object -- that function can be - # overridden, and adding a flag breaks backward compatibility. - dag = task_instance.task.get_dag() - if dag: - jinja_env = dag.get_template_env(force_sandboxed=True) - else: - jinja_env = SandboxedEnvironment(cache_size=0) - jinja_context = task_instance.get_template_context() - context_merge(jinja_context, additional_context) - - def render(key: str, content: str) -> str: - if conf.has_option("email", key): - path = conf.get_mandatory_value("email", key) - try: - with open(path) as f: - content = f.read() - except FileNotFoundError: - log.warning("Could not find email template file '%s'. Using defaults...", path) - except OSError: - log.exception("Error while using email template %s. Using defaults...", path) - return render_template_to_string(jinja_env.from_string(content), jinja_context) - - subject = render("subject_template", default_subject) - html_content = render("html_content_template", default_html_content) - html_content_err = render("html_content_template", default_html_content_err) - - return subject, html_content, html_content_err - - def _run_finished_callback( *, callbacks: None | TaskStateChangeCallback | list[TaskStateChangeCallback], @@ -3095,8 +2972,7 @@ def fetch_handle_failure_context( if context is not None: context["exception"] = error - # Set state correctly and figure out how to log it and decide whether - # to email + # Set state correctly and figure out how to log it # Note, callback invocation needs to be handled by caller of # _run_raw_task to avoid race conditions which could lead to duplicate @@ -3114,11 +2990,10 @@ def fetch_handle_failure_context( assert isinstance(ti.task, BaseOperator) task = ti.task.unmap((context, session)) except Exception: - cls.logger().error("Unable to unmap task to determine if we need to send an alert email") + cls.logger().error("Unable to unmap task to determine what callback to use") if force_fail or not ti.is_eligible_to_retry(): ti.state = TaskInstanceState.FAILED - email_for_state = operator.attrgetter("email_on_failure") callbacks = task.on_failure_callback if task else None if task and fail_fast: @@ -3133,7 +3008,6 @@ def fetch_handle_failure_context( TaskInstanceHistory.record_ti(ti, session=session) ti.state = State.UP_FOR_RETRY - email_for_state = operator.attrgetter("email_on_retry") callbacks = task.on_retry_callback if task else None get_listener_manager().hook.on_task_instance_failed( @@ -3142,7 +3016,6 @@ def fetch_handle_failure_context( return { "ti": ti, - "email_for_state": email_for_state, "task": task, "callbacks": callbacks, "context": context, @@ -3290,26 +3163,6 @@ def render_templates( return original_task - def get_email_subject_content( - self, exception: BaseException, task: BaseOperator | None = None - ) -> tuple[str, str, str]: - """ - Get the email subject content for exceptions. - - :param exception: the exception sent in the email - :param task: - """ - return _get_email_subject_content(task_instance=self, exception=exception, task=task) - - def email_alert(self, exception, task: BaseOperator) -> None: - """ - Send alert email with exception information. - - :param exception: the exception - :param task: task related to the exception - """ - _email_alert(task_instance=self, exception=exception, task=task) - def set_duration(self) -> None: """Set task instance duration.""" _set_duration(task_instance=self) diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py index ce139c3134135..cc582992c4bc6 100644 --- a/airflow/models/trigger.py +++ b/airflow/models/trigger.py @@ -261,7 +261,7 @@ def submit_failure(cls, trigger_id, exc=None, session: Session = NEW_SESSION) -> the runtime code understands as immediate-fail, and pack the error into next_kwargs. - TODO: Once we have shifted callback (and email) handling to run on + TODO: Once we have shifted callback handling to run on workers as first-class concepts, we can run the failure code here in-process, but we can't do that right now. """ diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 08d032e873c6a..aecba0fc9882b 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1252,8 +1252,6 @@ def _serialize_node(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]: # If not, store them as strings # And raise an exception if the field is not templateable forbidden_fields = set(SerializedBaseOperator._CONSTRUCTOR_PARAMS.keys()) - # Though allow some of the BaseOperator fields to be templated anyway - forbidden_fields.difference_update({"email"}) if op.template_fields: for template_field in op.template_fields: if template_field in forbidden_fields: diff --git a/airflow/utils/email.py b/airflow/utils/email.py deleted file mode 100644 index 455b135c23e6c..0000000000000 --- a/airflow/utils/email.py +++ /dev/null @@ -1,334 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import collections.abc -import logging -import os -import smtplib -import ssl -from collections.abc import Iterable -from email.mime.application import MIMEApplication -from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText -from email.utils import formatdate -from typing import Any - -import re2 - -from airflow.configuration import conf -from airflow.exceptions import AirflowException - -log = logging.getLogger(__name__) - - -def send_email( - to: list[str] | Iterable[str], - subject: str, - html_content: str, - files: list[str] | None = None, - dryrun: bool = False, - cc: str | Iterable[str] | None = None, - bcc: str | Iterable[str] | None = None, - mime_subtype: str = "mixed", - mime_charset: str = "utf-8", - conn_id: str | None = None, - custom_headers: dict[str, Any] | None = None, - **kwargs, -) -> None: - """ - Send an email using the backend specified in the *EMAIL_BACKEND* configuration option. - - :param to: A list or iterable of email addresses to send the email to. - :param subject: The subject of the email. - :param html_content: The content of the email in HTML format. - :param files: A list of paths to files to attach to the email. - :param dryrun: If *True*, the email will not actually be sent. Default: *False*. - :param cc: A string or iterable of strings containing email addresses to send a copy of the email to. - :param bcc: A string or iterable of strings containing email addresses to send a - blind carbon copy of the email to. - :param mime_subtype: The subtype of the MIME message. Default: "mixed". - :param mime_charset: The charset of the email. Default: "utf-8". - :param conn_id: The connection ID to use for the backend. If not provided, the default connection - specified in the *EMAIL_CONN_ID* configuration option will be used. - :param custom_headers: A dictionary of additional headers to add to the MIME message. - No validations are run on these values, and they should be able to be encoded. - :param kwargs: Additional keyword arguments to pass to the backend. - """ - backend = conf.getimport("email", "EMAIL_BACKEND") - backend_conn_id = conn_id or conf.get("email", "EMAIL_CONN_ID") - from_email = conf.get("email", "from_email", fallback=None) - - to_list = get_email_address_list(to) - to_comma_separated = ", ".join(to_list) - - return backend( - to_comma_separated, - subject, - html_content, - files=files, - dryrun=dryrun, - cc=cc, - bcc=bcc, - mime_subtype=mime_subtype, - mime_charset=mime_charset, - conn_id=backend_conn_id, - from_email=from_email, - custom_headers=custom_headers, - **kwargs, - ) - - -def send_email_smtp( - to: str | Iterable[str], - subject: str, - html_content: str, - files: list[str] | None = None, - dryrun: bool = False, - cc: str | Iterable[str] | None = None, - bcc: str | Iterable[str] | None = None, - mime_subtype: str = "mixed", - mime_charset: str = "utf-8", - conn_id: str = "smtp_default", - from_email: str | None = None, - custom_headers: dict[str, Any] | None = None, - **kwargs, -) -> None: - """ - Send an email with html content. - - :param to: Recipient email address or list of addresses. - :param subject: Email subject. - :param html_content: Email body in HTML format. - :param files: List of file paths to attach to the email. - :param dryrun: If True, the email will not be sent, but all other actions will be performed. - :param cc: Carbon copy recipient email address or list of addresses. - :param bcc: Blind carbon copy recipient email address or list of addresses. - :param mime_subtype: MIME subtype of the email. - :param mime_charset: MIME charset of the email. - :param conn_id: Connection ID of the SMTP server. - :param from_email: Sender email address. - :param custom_headers: Dictionary of custom headers to include in the email. - :param kwargs: Additional keyword arguments. - - >>> send_email("test@example.com", "foo", "Foo bar", ["/dev/null"], dryrun=True) - """ - smtp_mail_from = conf.get("smtp", "SMTP_MAIL_FROM") - - if smtp_mail_from is not None: - mail_from = smtp_mail_from - else: - if from_email is None: - raise ValueError( - "You should set from email - either by smtp/smtp_mail_from config or `from_email` parameter" - ) - mail_from = from_email - - msg, recipients = build_mime_message( - mail_from=mail_from, - to=to, - subject=subject, - html_content=html_content, - files=files, - cc=cc, - bcc=bcc, - mime_subtype=mime_subtype, - mime_charset=mime_charset, - custom_headers=custom_headers, - ) - - send_mime_email(e_from=mail_from, e_to=recipients, mime_msg=msg, conn_id=conn_id, dryrun=dryrun) - - -def build_mime_message( - mail_from: str | None, - to: str | Iterable[str], - subject: str, - html_content: str, - files: list[str] | None = None, - cc: str | Iterable[str] | None = None, - bcc: str | Iterable[str] | None = None, - mime_subtype: str = "mixed", - mime_charset: str = "utf-8", - custom_headers: dict[str, Any] | None = None, -) -> tuple[MIMEMultipart, list[str]]: - """ - Build a MIME message that can be used to send an email and returns a full list of recipients. - - :param mail_from: Email address to set as the email's "From" field. - :param to: A string or iterable of strings containing email addresses to set as the email's "To" field. - :param subject: The subject of the email. - :param html_content: The content of the email in HTML format. - :param files: A list of paths to files to be attached to the email. - :param cc: A string or iterable of strings containing email addresses to set as the email's "CC" field. - :param bcc: A string or iterable of strings containing email addresses to set as the email's "BCC" field. - :param mime_subtype: The subtype of the MIME message. Default: "mixed". - :param mime_charset: The charset of the email. Default: "utf-8". - :param custom_headers: Additional headers to add to the MIME message. No validations are run on these - values, and they should be able to be encoded. - :return: A tuple containing the email as a MIMEMultipart object and a list of recipient email addresses. - """ - to = get_email_address_list(to) - - msg = MIMEMultipart(mime_subtype) - msg["Subject"] = subject - if mail_from: - msg["From"] = mail_from - msg["To"] = ", ".join(to) - recipients = to - if cc: - cc = get_email_address_list(cc) - msg["CC"] = ", ".join(cc) - recipients += cc - - if bcc: - # don't add bcc in header - bcc = get_email_address_list(bcc) - recipients += bcc - - msg["Date"] = formatdate(localtime=True) - mime_text = MIMEText(html_content, "html", mime_charset) - msg.attach(mime_text) - - for fname in files or []: - basename = os.path.basename(fname) - with open(fname, "rb") as file: - part = MIMEApplication(file.read(), Name=basename) - part["Content-Disposition"] = f'attachment; filename="{basename}"' - part["Content-ID"] = f"<{basename}>" - msg.attach(part) - - if custom_headers: - for header_key, header_value in custom_headers.items(): - msg[header_key] = header_value - - return msg, recipients - - -def send_mime_email( - e_from: str, - e_to: str | list[str], - mime_msg: MIMEMultipart, - conn_id: str = "smtp_default", - dryrun: bool = False, -) -> None: - """ - Send a MIME email. - - :param e_from: The email address of the sender. - :param e_to: The email address or a list of email addresses of the recipient(s). - :param mime_msg: The MIME message to send. - :param conn_id: The ID of the SMTP connection to use. - :param dryrun: If True, the email will not be sent, but a log message will be generated. - """ - smtp_host = conf.get_mandatory_value("smtp", "SMTP_HOST") - smtp_port = conf.getint("smtp", "SMTP_PORT") - smtp_starttls = conf.getboolean("smtp", "SMTP_STARTTLS") - smtp_ssl = conf.getboolean("smtp", "SMTP_SSL") - smtp_retry_limit = conf.getint("smtp", "SMTP_RETRY_LIMIT") - smtp_timeout = conf.getint("smtp", "SMTP_TIMEOUT") - smtp_user = None - smtp_password = None - - if conn_id is not None: - try: - from airflow.hooks.base import BaseHook - - airflow_conn = BaseHook.get_connection(conn_id) - smtp_user = airflow_conn.login - smtp_password = airflow_conn.password - except AirflowException: - pass - if smtp_user is None or smtp_password is None: - log.debug("No user/password found for SMTP, so logging in with no authentication.") - - if not dryrun: - for attempt in range(1, smtp_retry_limit + 1): - log.info("Email alerting: attempt %s", str(attempt)) - try: - smtp_conn = _get_smtp_connection(smtp_host, smtp_port, smtp_timeout, smtp_ssl) - except smtplib.SMTPServerDisconnected: - if attempt == smtp_retry_limit: - raise - else: - if smtp_starttls: - smtp_conn.starttls() - if smtp_user and smtp_password: - smtp_conn.login(smtp_user, smtp_password) - log.info("Sent an alert email to %s", e_to) - smtp_conn.sendmail(e_from, e_to, mime_msg.as_string()) - smtp_conn.quit() - break - - -def get_email_address_list(addresses: str | Iterable[str]) -> list[str]: - """ - Return a list of email addresses from the provided input. - - :param addresses: A string or iterable of strings containing email addresses. - :return: A list of email addresses. - :raises TypeError: If the input is not a string or iterable of strings. - """ - if isinstance(addresses, str): - return _get_email_list_from_str(addresses) - elif isinstance(addresses, collections.abc.Iterable): - if not all(isinstance(item, str) for item in addresses): - raise TypeError("The items in your iterable must be strings.") - return list(addresses) - else: - raise TypeError(f"Unexpected argument type: Received '{type(addresses).__name__}'.") - - -def _get_smtp_connection(host: str, port: int, timeout: int, with_ssl: bool) -> smtplib.SMTP: - """ - Return an SMTP connection to the specified host and port, with optional SSL encryption. - - :param host: The hostname or IP address of the SMTP server. - :param port: The port number to connect to on the SMTP server. - :param timeout: The timeout in seconds for the connection. - :param with_ssl: Whether to use SSL encryption for the connection. - :return: An SMTP connection to the specified host and port. - """ - if not with_ssl: - return smtplib.SMTP(host=host, port=port, timeout=timeout) - else: - ssl_context_string = conf.get("email", "SSL_CONTEXT") - if ssl_context_string == "default": - ssl_context = ssl.create_default_context() - elif ssl_context_string == "none": - ssl_context = None - else: - raise RuntimeError( - f"The email.ssl_context configuration variable must " - f"be set to 'default' or 'none' and is '{ssl_context_string}." - ) - return smtplib.SMTP_SSL(host=host, port=port, timeout=timeout, context=ssl_context) - - -def _get_email_list_from_str(addresses: str) -> list[str]: - """ - Extract a list of email addresses from a string. - - The string can contain multiple email addresses separated - by any of the following delimiters: ',' or ';'. - - :param addresses: A string containing one or more email addresses. - :return: A list of email addresses. - """ - pattern = r"\s*[,;]\s*" - return re2.split(pattern, addresses) diff --git a/airflow/utils/operator_helpers.py b/airflow/utils/operator_helpers.py index ab3c8c89e5e0f..96db19cb1bcaf 100644 --- a/airflow/utils/operator_helpers.py +++ b/airflow/utils/operator_helpers.py @@ -62,10 +62,6 @@ "default": f"{DEFAULT_FORMAT_PREFIX}dag_owner", "env_var_format": f"{ENV_VAR_FORMAT_PREFIX}DAG_OWNER", }, - "AIRFLOW_CONTEXT_DAG_EMAIL": { - "default": f"{DEFAULT_FORMAT_PREFIX}dag_email", - "env_var_format": f"{ENV_VAR_FORMAT_PREFIX}DAG_EMAIL", - }, } @@ -93,7 +89,6 @@ def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: bool dag_run = context.get("dag_run") ops = [ - (task, "email", "AIRFLOW_CONTEXT_DAG_EMAIL"), (task, "owner", "AIRFLOW_CONTEXT_DAG_OWNER"), (task_instance, "dag_id", "AIRFLOW_CONTEXT_DAG_ID"), (task_instance, "task_id", "AIRFLOW_CONTEXT_TASK_ID"), diff --git a/docs/apache-airflow/cli-and-env-variables-ref.rst b/docs/apache-airflow/cli-and-env-variables-ref.rst index 5de76b8d02969..a03694b495df9 100644 --- a/docs/apache-airflow/cli-and-env-variables-ref.rst +++ b/docs/apache-airflow/cli-and-env-variables-ref.rst @@ -76,7 +76,6 @@ Environment Variables * ``flower_basic_auth`` in ``[celery]`` section * ``result_backend`` in ``[celery]`` section * ``password`` in ``[atlas]`` section -* ``smtp_password`` in ``[smtp]`` section * ``secret_key`` in ``[webserver]`` section .. envvar:: AIRFLOW__{SECTION}__{KEY}_SECRET diff --git a/docs/apache-airflow/howto/email-config.rst b/docs/apache-airflow/howto/email-config.rst deleted file mode 100644 index bc7ea9a1f9d04..0000000000000 --- a/docs/apache-airflow/howto/email-config.rst +++ /dev/null @@ -1,190 +0,0 @@ - .. Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - .. http://www.apache.org/licenses/LICENSE-2.0 - - .. Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - -Email Configuration -=================== - -You can configure the email that is being sent in your ``airflow.cfg`` -by setting a ``subject_template`` and/or a ``html_content_template`` -in the ``[email]`` section. - -.. code-block:: ini - - [email] - email_backend = airflow.utils.email.send_email_smtp - subject_template = /path/to/my_subject_template_file - html_content_template = /path/to/my_html_content_template_file - -Equivalent environment variables look like: - -.. code-block:: sh - - AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.utils.email.send_email_smtp - AIRFLOW__EMAIL__SUBJECT_TEMPLATE=/path/to/my_subject_template_file - AIRFLOW__EMAIL__HTML_CONTENT_TEMPLATE=/path/to/my_html_content_template_file - -You can configure a sender's email address by setting ``from_email`` in the ``[email]`` section like: - -.. code-block:: ini - - [email] - from_email = "John Doe " - -Equivalent environment variables look like: - -.. code-block:: sh - - AIRFLOW__EMAIL__FROM_EMAIL="John Doe " - - -To configure SMTP settings, checkout the :ref:`SMTP ` section in the standard configuration. -If you do not want to store the SMTP credentials in the config or in the environment variables, you can create a -connection called ``smtp_default`` of ``Email`` type, or choose a custom connection name and set the ``email_conn_id`` with its name in -the configuration & store SMTP username-password in it. Other SMTP settings like host, port etc always gets picked up -from the configuration only. The connection can be of any type (for example 'HTTP connection'). - -If you want to check which email backend is currently set, you can use ``airflow config get-value email email_backend`` command as in -the example below. - -.. code-block:: bash - - $ airflow config get-value email email_backend - airflow.utils.email.send_email_smtp - -To access the task's information you use `Jinja Templating `_ in your template files. - -For example a ``html_content_template`` file could look like this: - -.. code-block:: - - Try {{try_number}} out of {{max_tries + 1}}
- Exception:
{{exception_html}}
- Log: Link
- Host: {{ti.hostname}}
- Mark success: Link
- -.. note:: - For more information on setting the configuration, see :doc:`set-config` - -.. _email-configuration-sendgrid: - -Send email using SendGrid -------------------------- - -Using Default SMTP -^^^^^^^^^^^^^^^^^^ - -You can use the default airflow SMTP backend to send email with SendGrid - - .. code-block:: ini - - [smtp] - smtp_host=smtp.sendgrid.net - smtp_starttls=False - smtp_ssl=False - smtp_port=587 - smtp_mail_from= - -Equivalent environment variables looks like - - .. code-block:: - - AIRFLOW__SMTP__SMTP_HOST=smtp.sendgrid.net - AIRFLOW__SMTP__SMTP_STARTTLS=False - AIRFLOW__SMTP__SMTP_SSL=False - AIRFLOW__SMTP__SMTP_PORT=587 - AIRFLOW__SMTP__SMTP_MAIL_FROM= - - -Using SendGrid Provider -^^^^^^^^^^^^^^^^^^^^^^^ - -Airflow can be configured to send e-mail using `SendGrid `__. - -Follow the steps below to enable it: - -1. Setup your SendGrid account, The SMTP and copy username and API Key. - -2. Include ``sendgrid`` provider as part of your Airflow installation, e.g., - - .. code-block:: bash - - pip install 'apache-airflow[sendgrid]' --constraint ... - -or - .. code-block:: bash - - pip install 'apache-airflow-providers-sendgrid' --constraint ... - - -3. Update ``email_backend`` property in ``[email]`` section in ``airflow.cfg``, i.e. - - .. code-block:: ini - - [email] - email_backend = airflow.providers.sendgrid.utils.emailer.send_email - email_conn_id = sendgrid_default - from_email = "hello@eg.com" - - Equivalent environment variables looks like - - .. code-block:: - - AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.providers.sendgrid.utils.emailer.send_email - AIRFLOW__EMAIL__EMAIL_CONN_ID=sendgrid_default - SENDGRID_MAIL_FROM=hello@thelearning.dev - -4. Create a connection called ``sendgrid_default``, or choose a custom connection - name and set it in ``email_conn_id`` of 'Email' type. Only login and password - are used from the connection. - - -.. image:: ../img/email_connection.png - :align: center - :alt: create email connection - -.. note:: The callbacks for success, failure and retry will use the same configuration to send the email - - -.. _email-configuration-ses: - -Send email using AWS SES ------------------------- - -Airflow can be configured to send e-mail using `AWS SES `__. - -Follow the steps below to enable it: - -1. Include ``amazon`` subpackage as part of your Airflow installation: - - .. code-block:: ini - - pip install 'apache-airflow[amazon]' - -2. Update ``email_backend`` property in ``[email]`` section in ``airflow.cfg``: - - .. code-block:: ini - - [email] - email_backend = airflow.providers.amazon.aws.utils.emailer.send_email - email_conn_id = aws_default - from_email = From email - -Note that for SES, you must configure from_email to the valid email that can send messages from SES. - -3. Create a connection called ``aws_default``, or choose a custom connection - name and set it in ``email_conn_id``. The type of connection should be ``Amazon Web Services``. diff --git a/docs/apache-airflow/howto/export-more-env-vars.rst b/docs/apache-airflow/howto/export-more-env-vars.rst index e393f479302d1..6c94c7c08853d 100644 --- a/docs/apache-airflow/howto/export-more-env-vars.rst +++ b/docs/apache-airflow/howto/export-more-env-vars.rst @@ -28,7 +28,7 @@ which are available as environment variables when running tasks. Note, both key value are must be string. ``dag_id``, ``task_id``, ``execution_date``, ``dag_run_id``, -``dag_owner``, ``dag_email`` are reserved keys. +``dag_owner`` are reserved keys. For example, in your ``airflow_local_settings.py`` file: diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst index 03c34820c4cc2..3271cbf8109e7 100644 --- a/docs/apache-airflow/howto/index.rst +++ b/docs/apache-airflow/howto/index.rst @@ -49,7 +49,6 @@ configuring an Airflow environment. run-behind-proxy run-with-systemd define-extra-link - email-config dynamic-dag-generation docker-compose/index upgrading-from-1-10/index diff --git a/docs/apache-airflow/howto/set-config.rst b/docs/apache-airflow/howto/set-config.rst index 2a03b2bbf5ee4..365ef6ca4023a 100644 --- a/docs/apache-airflow/howto/set-config.rst +++ b/docs/apache-airflow/howto/set-config.rst @@ -104,7 +104,6 @@ The following config options support this ``_cmd`` and ``_secret`` version: * ``flower_basic_auth`` in ``[celery]`` section * ``result_backend`` in ``[celery]`` section * ``password`` in ``[atlas]`` section -* ``smtp_password`` in ``[smtp]`` section * ``secret_key`` in ``[webserver]`` section The ``_cmd`` config options can also be set using a corresponding environment variable diff --git a/docs/apache-airflow/integration.rst b/docs/apache-airflow/integration.rst index 2e9c450f1fa8a..a54862f1dbf52 100644 --- a/docs/apache-airflow/integration.rst +++ b/docs/apache-airflow/integration.rst @@ -21,7 +21,6 @@ Integration Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. * :doc:`API Authentication backends ` -* :doc:`Email backends ` * :doc:`Executor ` * :doc:`Kerberos ` * :doc:`Logging ` diff --git a/docs/apache-airflow/public-airflow-interface.rst b/docs/apache-airflow/public-airflow-interface.rst index d1ac63eb5b3a9..24d3dafe3b970 100644 --- a/docs/apache-airflow/public-airflow-interface.rst +++ b/docs/apache-airflow/public-airflow-interface.rst @@ -386,11 +386,6 @@ by extending them: You can read more about creating custom Decorators in :doc:`howto/create-custom-decorator`. -Email notifications -------------------- - -Airflow has a built-in way of sending email notifications and it allows to extend it by adding custom -email notification classes. You can read more about email notifications in :doc:`howto/email-config`. Notifications ------------- diff --git a/newsfragments/46041.significant.rst b/newsfragments/46041.significant.rst new file mode 100644 index 0000000000000..5f0d52fcfe2ea --- /dev/null +++ b/newsfragments/46041.significant.rst @@ -0,0 +1,42 @@ +.. Write a short and imperative summary of this changes + +.. Provide additional contextual information + +.. Check the type of change that applies to this change +.. Dag changes: requires users to change their dag code +.. Config changes: requires users to change their airflow config +.. API changes: requires users to change their Airflow REST API calls +.. CLI changes: requires users to change their Airflow CLI usage +.. Behaviour changes: the existing code won't break, but the behavior is different +.. Plugin changes: requires users to change their Airflow plugin implementation +.. Dependency changes: requires users to change their dependencies (e.g., Postgres 12) +.. Code interface changes: requires users to change other implementations (e.g., auth manager) +In Airflow 3.0, the ``timer_unit_consistency`` setting in the ``metrics`` section is removed as it is now the default behaviour. +This is done to standardize all timer and timing metrics to milliseconds across all metric loggers. + +Airflow 2.11 introduced the ``timer_unit_consistency`` setting in the ``metrics`` section of the configuration file. The +default value was ``False`` which meant that the timer and timing metrics were logged in seconds. This was done to maintain +backwards compatibility with the previous versions of Airflow. + +In Airflow 3.0, the ``smtp`` setting section, the base operator parameters ``email``, ``email_on_retry`` and ``email_on_failure``, and the ``airflow.operators.email.EmailOperator`` are removed. + +Instead of using the operator ``EmailOperator`` from the ``airflow.operators.email`` module, users should use the operator ``EmailOperator`` from the ``smtp`` provider in the module ``airflow.providers.smtp.operators.smtp``. + +And to send emails on failure or retry, users should use the ```airflow.providers.smtp.notifications.smtp.SmtpNotifier`` with the ``on_failure_callback`` and ``on_retry_callback`` parameters. For more details you can check `this guide `_. + +* Types of change + + * [ ] Dag changes + * [x] Config changes + * [ ] API changes + * [ ] CLI changes + * [x] Behaviour changes + * [ ] Plugin changes + * [ ] Dependency changes + * [x] Code interface changes + +* Migration rules needed + +.. TODO: create the migration rule that: +.. * Import ``EmailOperator`` from ``airflow.providers.smtp.operators.smtp`` instead of ``airflow.operators.email`` +.. * Detect and remove the ``email``, ``email_on_retry`` and ``email_on_failure`` parameters from the operator parameters (and replace them by an ``SmtpNotifier`` with the ``on_failure_callback`` and ``on_retry_callback`` parameters)? diff --git a/providers/sendgrid/src/airflow/providers/sendgrid/utils/emailer.py b/providers/sendgrid/src/airflow/providers/sendgrid/utils/emailer.py index 7a637e868ed51..4d7ad89266497 100644 --- a/providers/sendgrid/src/airflow/providers/sendgrid/utils/emailer.py +++ b/providers/sendgrid/src/airflow/providers/sendgrid/utils/emailer.py @@ -17,6 +17,8 @@ # under the License. """Airflow module for email backend using sendgrid.""" +# TODO: remove this module once minimum Airflow version is >=3 + from __future__ import annotations import base64 @@ -26,9 +28,10 @@ from collections.abc import Iterable from typing import Union +import re2 + import sendgrid from airflow.hooks.base import BaseHook -from airflow.utils.email import get_email_address_list from sendgrid.helpers.mail import ( Attachment, Category, @@ -78,15 +81,15 @@ def send_email( # Add the recipient list of to emails. personalization = Personalization() - to = get_email_address_list(to) + to = _get_email_address_list(to) for to_address in to: personalization.add_to(Email(to_address)) if cc: - cc = get_email_address_list(cc) + cc = _get_email_address_list(cc) for cc_address in cc: personalization.add_cc(Email(cc_address)) if bcc: - bcc = get_email_address_list(bcc) + bcc = _get_email_address_list(bcc) for bcc_address in bcc: personalization.add_bcc(Email(bcc_address)) @@ -140,3 +143,19 @@ def _post_sendgrid_mail(mail_data: dict, conn_id: str = "sendgrid_default") -> N mail_data["subject"], response.status_code, ) + + +def _get_email_list_from_str(addresses: str) -> list[str]: + pattern = r"\s*[,;]\s*" + return re2.split(pattern, addresses) + + +def _get_email_address_list(addresses: str | Iterable[str]) -> list[str]: + if isinstance(addresses, str): + return _get_email_list_from_str(addresses) + elif isinstance(addresses, Iterable): + if not all(isinstance(item, str) for item in addresses): + raise TypeError("The items in your iterable must be strings.") + return list(addresses) + else: + raise TypeError(f"Unexpected argument type: Received '{type(addresses).__name__}'.") diff --git a/providers/src/airflow/providers/amazon/aws/hooks/ses.py b/providers/src/airflow/providers/amazon/aws/hooks/ses.py index 7c0568c4d03c8..8dd7cdfd19058 100644 --- a/providers/src/airflow/providers/amazon/aws/hooks/ses.py +++ b/providers/src/airflow/providers/amazon/aws/hooks/ses.py @@ -16,13 +16,21 @@ # under the License. """This module contains AWS SES Hook.""" +# TODO: remove this module once minimum Airflow version is >=3 + from __future__ import annotations +import os from collections.abc import Iterable +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.utils import formatdate from typing import Any +import re2 + from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -from airflow.utils.email import build_mime_message class SesHook(AwsBaseHook): @@ -88,7 +96,7 @@ def send_email( if return_path: custom_headers["Return-Path"] = return_path - message, recipients = build_mime_message( + message, recipients = _build_mime_message( mail_from=mail_from, to=to, subject=subject, @@ -104,3 +112,68 @@ def send_email( return ses_client.send_raw_email( Source=mail_from, Destinations=recipients, RawMessage={"Data": message.as_string()} ) + + +def _get_email_list_from_str(addresses: str) -> list[str]: + pattern = r"\s*[,;]\s*" + return re2.split(pattern, addresses) + + +def _get_email_address_list(addresses: str | Iterable[str]) -> list[str]: + if isinstance(addresses, str): + return _get_email_list_from_str(addresses) + elif isinstance(addresses, Iterable): + if not all(isinstance(item, str) for item in addresses): + raise TypeError("The items in your iterable must be strings.") + return list(addresses) + else: + raise TypeError(f"Unexpected argument type: Received '{type(addresses).__name__}'.") + + +def _build_mime_message( + mail_from: str | None, + to: str | Iterable[str], + subject: str, + html_content: str, + files: list[str] | None = None, + cc: str | Iterable[str] | None = None, + bcc: str | Iterable[str] | None = None, + mime_subtype: str = "mixed", + mime_charset: str = "utf-8", + custom_headers: dict[str, Any] | None = None, +) -> tuple[MIMEMultipart, list[str]]: + to = _get_email_address_list(to) + + msg = MIMEMultipart(mime_subtype) + msg["Subject"] = subject + if mail_from: + msg["From"] = mail_from + msg["To"] = ", ".join(to) + recipients = to + if cc: + cc = _get_email_address_list(cc) + msg["CC"] = ", ".join(cc) + recipients += cc + + if bcc: + # don't add bcc in header + bcc = _get_email_address_list(bcc) + recipients += bcc + + msg["Date"] = formatdate(localtime=True) + mime_text = MIMEText(html_content, "html", mime_charset) + msg.attach(mime_text) + + for fname in files or []: + basename = os.path.basename(fname) + with open(fname, "rb") as file: + part = MIMEApplication(file.read(), Name=basename) + part["Content-Disposition"] = f'attachment; filename="{basename}"' + part["Content-ID"] = f"<{basename}>" + msg.attach(part) + + if custom_headers: + for header_key, header_value in custom_headers.items(): + msg[header_key] = header_value + + return msg, recipients diff --git a/scripts/ci/pre_commit/base_operator_partial_arguments.py b/scripts/ci/pre_commit/base_operator_partial_arguments.py index 070ffe767cccc..e0160cf995c35 100755 --- a/scripts/ci/pre_commit/base_operator_partial_arguments.py +++ b/scripts/ci/pre_commit/base_operator_partial_arguments.py @@ -35,8 +35,6 @@ IGNORED = { # These are only used in the worker and thus mappable. "do_xcom_push", - "email_on_failure", - "email_on_retry", "post_execute", "pre_execute", "multiple_outputs", diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py index 14d67656008e5..421fd52a92717 100644 --- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py @@ -169,7 +169,6 @@ def partial( start_date: datetime | ArgNotSet = NOTSET, end_date: datetime | ArgNotSet = NOTSET, owner: str | ArgNotSet = NOTSET, - email: None | str | Iterable[str] | ArgNotSet = NOTSET, params: collections.abc.MutableMapping | None = None, resources: dict[str, Any] | None | ArgNotSet = NOTSET, trigger_rule: str | ArgNotSet = NOTSET, @@ -431,9 +430,6 @@ def __new__(cls, name, bases, namespace, **kwargs): # manual type-checking method. BASEOPERATOR_ARGS_EXPECTED_TYPES = { "task_id": str, - "email": (str, Sequence), - "email_on_retry": bool, - "email_on_failure": bool, "retries": int, "retry_exponential_backoff": bool, "depends_on_past": bool, @@ -502,13 +498,6 @@ class derived from this one results in the creation of a task object, :param task_id: a unique, meaningful id for the task :param owner: the owner of the task. Using a meaningful description (e.g. user/person/team/role name) to clarify ownership is recommended. - :param email: the 'to' email address(es) used in email alerts. This can be a - single email or multiple ones. Multiple addresses can be specified as a - comma or semicolon separated string or by passing a list of strings. - :param email_on_retry: Indicates whether email alerts should be sent when a - task is retried - :param email_on_failure: Indicates whether email alerts should be sent when - a task failed :param retries: the number of retries that should be performed before failing the task :param retry_delay: delay between retries, can be set as ``timedelta`` or @@ -697,9 +686,6 @@ def say_hello_world(**context): task_id: str owner: str = DEFAULT_OWNER - email: str | Sequence[str] | None = None - email_on_retry: bool = True - email_on_failure: bool = True retries: int | None = DEFAULT_RETRIES retry_delay: timedelta = DEFAULT_RETRY_DELAY retry_exponential_backoff: bool = False @@ -780,8 +766,6 @@ def say_hello_world(**context): "task_id", "dag_id", "owner", - "email", - "email_on_retry", "retry_delay", "retry_exponential_backoff", "max_retry_delay", @@ -853,9 +837,6 @@ def __init__( *, task_id: str, owner: str = DEFAULT_OWNER, - email: str | Sequence[str] | None = None, - email_on_retry: bool = True, - email_on_failure: bool = True, retries: int | None = DEFAULT_RETRIES, retry_delay: timedelta | float = DEFAULT_RETRY_DELAY, retry_exponential_backoff: bool = False, @@ -924,9 +905,6 @@ def __init__( validate_key(self.task_id) self.owner = owner - self.email = email - self.email_on_retry = email_on_retry - self.email_on_failure = email_on_failure if execution_timeout is not None and not isinstance(execution_timeout, timedelta): raise ValueError( diff --git a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py index 92f83792f3c79..9235f0d92254f 100644 --- a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -400,10 +400,6 @@ def task_display_name(self) -> str: def owner(self) -> str: # type: ignore[override] return self.partial_kwargs.get("owner", DEFAULT_OWNER) - @property - def email(self) -> None | str | Iterable[str]: - return self.partial_kwargs.get("email") - @property def map_index_template(self) -> None | str: return self.partial_kwargs.get("map_index_template") diff --git a/task_sdk/tests/definitions/test_baseoperator.py b/task_sdk/tests/definitions/test_baseoperator.py index 35f33818dc198..16233626fc00f 100644 --- a/task_sdk/tests/definitions/test_baseoperator.py +++ b/task_sdk/tests/definitions/test_baseoperator.py @@ -164,18 +164,6 @@ def test_custom_resources(self): assert task.resources.cpus.qty == 1 assert task.resources.ram.qty == 1024 - def test_default_email_on_actions(self): - test_task = BaseOperator(task_id="test_default_email_on_actions") - assert test_task.email_on_retry is True - assert test_task.email_on_failure is True - - def test_email_on_actions(self): - test_task = BaseOperator( - task_id="test_default_email_on_actions", email_on_retry=False, email_on_failure=True - ) - assert test_task.email_on_retry is False - assert test_task.email_on_failure is True - def test_incorrect_default_args(self): default_args = {"test_param": True, "extra_param": True} op = FakeOperator(default_args=default_args) diff --git a/tests/api_connexion/endpoints/test_config_endpoint.py b/tests/api_connexion/endpoints/test_config_endpoint.py index 8d3785f08811d..28b827a80a137 100644 --- a/tests/api_connexion/endpoints/test_config_endpoint.py +++ b/tests/api_connexion/endpoints/test_config_endpoint.py @@ -31,18 +31,18 @@ "core": { "parallelism": "1024", }, - "smtp": { - "smtp_host": "localhost", - "smtp_mail_from": "airflow@example.com", + "operators": { + "default_deferrable": "true", + "default_queue": "queue", }, } +MOCK_CONF_TUPLES = { + (section, option): value for section, options in MOCK_CONF.items() for option, value in options.items() +} + MOCK_CONF_WITH_SENSITIVE_VALUE = { - "core": {"parallelism": "1024"}, - "smtp": { - "smtp_host": "localhost", - "smtp_mail_from": "airflow@example.com", - }, + **MOCK_CONF, "database": { "sql_alchemy_conn": "mock_conn", }, @@ -73,6 +73,7 @@ def setup_attrs(self, configured_app) -> None: self.client = self.app.test_client() # type:ignore @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_text_plain(self, mock_as_dict): response = self.client.get( "/api/v1/config", headers={"Accept": "text/plain"}, environ_overrides={"REMOTE_USER": "test"} @@ -84,15 +85,15 @@ def test_should_respond_200_text_plain(self, mock_as_dict): [core] parallelism = 1024 - [smtp] - smtp_host = localhost - smtp_mail_from = airflow@example.com + [operators] + default_deferrable = true + default_queue = queue """ ) assert expected == response.data.decode() @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) - @conf_vars({("webserver", "expose_config"): "non-sensitive-only"}) + @conf_vars({**MOCK_CONF_TUPLES, ("webserver", "expose_config"): "non-sensitive-only"}) def test_should_respond_200_text_plain_with_non_sensitive_only(self, mock_as_dict): response = self.client.get( "/api/v1/config", headers={"Accept": "text/plain"}, environ_overrides={"REMOTE_USER": "test"} @@ -104,14 +105,15 @@ def test_should_respond_200_text_plain_with_non_sensitive_only(self, mock_as_dic [core] parallelism = 1024 - [smtp] - smtp_host = localhost - smtp_mail_from = airflow@example.com + [operators] + default_deferrable = true + default_queue = queue """ ) assert expected == response.data.decode() @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_application_json(self, mock_as_dict): response = self.client.get( "/api/v1/config", @@ -129,10 +131,10 @@ def test_should_respond_200_application_json(self, mock_as_dict): ], }, { - "name": "smtp", + "name": "operators", "options": [ - {"key": "smtp_host", "value": "localhost"}, - {"key": "smtp_mail_from", "value": "airflow@example.com"}, + {"key": "default_deferrable", "value": "true"}, + {"key": "default_queue", "value": "queue"}, ], }, ] @@ -140,9 +142,10 @@ def test_should_respond_200_application_json(self, mock_as_dict): assert expected == response.json @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_single_section_as_text_plain(self, mock_as_dict): response = self.client.get( - "/api/v1/config?section=smtp", + "/api/v1/config?section=operators", headers={"Accept": "text/plain"}, environ_overrides={"REMOTE_USER": "test"}, ) @@ -150,29 +153,30 @@ def test_should_respond_200_single_section_as_text_plain(self, mock_as_dict): assert response.status_code == 200 expected = textwrap.dedent( """\ - [smtp] - smtp_host = localhost - smtp_mail_from = airflow@example.com + [operators] + default_deferrable = true + default_queue = queue """ ) assert expected == response.data.decode() @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_single_section_as_json(self, mock_as_dict): response = self.client.get( - "/api/v1/config?section=smtp", + "/api/v1/config?section=operators", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"}, ) - mock_as_dict.assert_called_with(display_source=False, display_sensitive=True) + # mock_as_dict.assert_called_with(display_source=False, display_sensitive=True) assert response.status_code == 200 expected = { "sections": [ { - "name": "smtp", + "name": "operators", "options": [ - {"key": "smtp_host", "value": "localhost"}, - {"key": "smtp_mail_from", "value": "airflow@example.com"}, + {"key": "default_deferrable", "value": "true"}, + {"key": "default_queue", "value": "queue"}, ], }, ] @@ -180,17 +184,19 @@ def test_should_respond_200_single_section_as_json(self, mock_as_dict): assert expected == response.json @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_404_when_section_not_exist(self, mock_as_dict): response = self.client.get( - "/api/v1/config?section=smtp1", + "/api/v1/config?section=operators1", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 - assert "section=smtp1 not found." in response.json["detail"] + assert "section=operators1 not found." in response.json["detail"] @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_406(self, mock_as_dict): response = self.client.get( "/api/v1/config", @@ -231,17 +237,18 @@ def setup_attrs(self, configured_app) -> None: self.client = self.app.test_client() # type:ignore @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_text_plain(self, mock_as_dict): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "text/plain"}, environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 200 expected = textwrap.dedent( """\ - [smtp] - smtp_mail_from = airflow@example.com + [operators] + default_queue = queue """ ) assert expected == response.data.decode() @@ -276,9 +283,10 @@ def test_should_respond_200_text_plain_with_non_sensitive_only(self, mock_as_dic assert expected == response.data.decode() @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_application_json(self, mock_as_dict): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"}, ) @@ -286,9 +294,9 @@ def test_should_respond_200_application_json(self, mock_as_dict): expected = { "sections": [ { - "name": "smtp", + "name": "operators", "options": [ - {"key": "smtp_mail_from", "value": "airflow@example.com"}, + {"key": "default_queue", "value": "queue"}, ], }, ] @@ -296,20 +304,22 @@ def test_should_respond_200_application_json(self, mock_as_dict): assert expected == response.json @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_404_when_option_not_exist(self, mock_as_dict): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from1", + "/api/v1/config/section/operators/option/default_queue1", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 - assert "The option [smtp/smtp_mail_from1] is not found in config." in response.json["detail"] + assert "The option [operators/default_queue1] is not found in config." in response.json["detail"] @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_406(self, mock_as_dict): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/octet-stream"}, environ_overrides={"REMOTE_USER": "test"}, ) @@ -317,14 +327,14 @@ def test_should_respond_406(self, mock_as_dict): def test_should_raises_401_unauthenticated(self): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", headers={"Accept": "application/json"} + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/json"} ) assert_401(response) def test_should_raises_403_unauthorized(self): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test_no_permissions"}, ) @@ -334,7 +344,7 @@ def test_should_raises_403_unauthorized(self): @conf_vars({("webserver", "expose_config"): "False"}) def test_should_respond_403_when_expose_config_off(self): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"}, ) diff --git a/tests/api_fastapi/core_api/routes/public/test_config.py b/tests/api_fastapi/core_api/routes/public/test_config.py index 90009d1b54152..705432b845b74 100644 --- a/tests/api_fastapi/core_api/routes/public/test_config.py +++ b/tests/api_fastapi/core_api/routes/public/test_config.py @@ -33,16 +33,11 @@ HEADERS_INVALID = {"Accept": "invalid"} HEADERS_JSON_UTF8 = {"Accept": "application/json; charset=utf-8"} SECTION_CORE = "core" -SECTION_SMTP = "smtp" SECTION_DATABASE = "database" SECTION_NOT_EXIST = "not_exist_section" OPTION_KEY_PARALLELISM = "parallelism" -OPTION_KEY_SMTP_HOST = "smtp_host" -OPTION_KEY_SMTP_MAIL_FROM = "smtp_mail_from" OPTION_KEY_SQL_ALCHEMY_CONN = "sql_alchemy_conn" OPTION_VALUE_PARALLELISM = "1024" -OPTION_VALUE_SMTP_HOST = "smtp.example.com" -OPTION_VALUE_SMTP_MAIL_FROM = "airflow@example.com" OPTION_VALUE_SQL_ALCHEMY_CONN = "sqlite:///example.db" OPTION_NOT_EXIST = "not_exist_option" OPTION_VALUE_SENSITIVE_HIDDEN = "< hidden >" @@ -51,10 +46,6 @@ SECTION_CORE: { OPTION_KEY_PARALLELISM: OPTION_VALUE_PARALLELISM, }, - SECTION_SMTP: { - OPTION_KEY_SMTP_HOST: OPTION_VALUE_SMTP_HOST, - OPTION_KEY_SMTP_MAIL_FROM: OPTION_VALUE_SMTP_MAIL_FROM, - }, SECTION_DATABASE: { OPTION_KEY_SQL_ALCHEMY_CONN: OPTION_VALUE_SQL_ALCHEMY_CONN, }, @@ -63,18 +54,12 @@ SECTION_CORE: { OPTION_KEY_PARALLELISM: OPTION_VALUE_PARALLELISM, }, - SECTION_SMTP: { - OPTION_KEY_SMTP_HOST: OPTION_VALUE_SMTP_HOST, - OPTION_KEY_SMTP_MAIL_FROM: OPTION_VALUE_SMTP_MAIL_FROM, - }, SECTION_DATABASE: { OPTION_KEY_SQL_ALCHEMY_CONN: OPTION_VALUE_SENSITIVE_HIDDEN, }, } MOCK_CONFIG_OVERRIDE = { (SECTION_CORE, OPTION_KEY_PARALLELISM): OPTION_VALUE_PARALLELISM, - (SECTION_SMTP, OPTION_KEY_SMTP_HOST): OPTION_VALUE_SMTP_HOST, - (SECTION_SMTP, OPTION_KEY_SMTP_MAIL_FROM): OPTION_VALUE_SMTP_MAIL_FROM, } AIRFLOW_CONFIG_ENABLE_EXPOSE_CONFIG = {("webserver", "expose_config"): "True"} @@ -92,13 +77,6 @@ {"key": OPTION_KEY_PARALLELISM, "value": OPTION_VALUE_PARALLELISM}, ], }, - { - "name": SECTION_SMTP, - "options": [ - {"key": OPTION_KEY_SMTP_HOST, "value": OPTION_VALUE_SMTP_HOST}, - {"key": OPTION_KEY_SMTP_MAIL_FROM, "value": OPTION_VALUE_SMTP_MAIL_FROM}, - ], - }, { "name": SECTION_DATABASE, "options": [ @@ -115,13 +93,6 @@ {"key": OPTION_KEY_PARALLELISM, "value": OPTION_VALUE_PARALLELISM}, ], }, - { - "name": SECTION_SMTP, - "options": [ - {"key": OPTION_KEY_SMTP_HOST, "value": OPTION_VALUE_SMTP_HOST}, - {"key": OPTION_KEY_SMTP_MAIL_FROM, "value": OPTION_VALUE_SMTP_MAIL_FROM}, - ], - }, { "name": SECTION_DATABASE, "options": [ @@ -201,10 +172,6 @@ class TestGetConfig(TestConfigEndpoint): [{SECTION_CORE}] {OPTION_KEY_PARALLELISM} = {OPTION_VALUE_PARALLELISM} - [{SECTION_SMTP}] - {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST} - {OPTION_KEY_SMTP_MAIL_FROM} = {OPTION_VALUE_SMTP_MAIL_FROM} - [{SECTION_DATABASE}] {OPTION_KEY_SQL_ALCHEMY_CONN} = {OPTION_VALUE_SQL_ALCHEMY_CONN} """ @@ -231,18 +198,6 @@ class TestGetConfig(TestConfigEndpoint): ], }, ), - ( - SECTION_SMTP, - HEADERS_TEXT, - 200, - textwrap.dedent( - f"""\ - [{SECTION_SMTP}] - {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST} - {OPTION_KEY_SMTP_MAIL_FROM} = {OPTION_VALUE_SMTP_MAIL_FROM} - """ - ), - ), ( SECTION_DATABASE, HEADERS_JSON, @@ -287,10 +242,6 @@ def test_get_config(self, test_client, section, headers, expected_status_code, e [{SECTION_CORE}] {OPTION_KEY_PARALLELISM} = {OPTION_VALUE_PARALLELISM} - [{SECTION_SMTP}] - {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST} - {OPTION_KEY_SMTP_MAIL_FROM} = {OPTION_VALUE_SMTP_MAIL_FROM} - [{SECTION_DATABASE}] {OPTION_KEY_SQL_ALCHEMY_CONN} = {OPTION_VALUE_SENSITIVE_HIDDEN} """ @@ -338,34 +289,6 @@ class TestGetConfigValue(TestConfigEndpoint): 200, GET_CONFIG_VALUE_CORE_PARALLELISM_JSON_RESPONSE, ), - ( - SECTION_SMTP, - OPTION_KEY_SMTP_HOST, - HEADERS_TEXT, - 200, - textwrap.dedent( - f"""\ - [{SECTION_SMTP}] - {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST} - """ - ), - ), - ( - SECTION_SMTP, - OPTION_KEY_SMTP_MAIL_FROM, - HEADERS_JSON, - 200, - { - "sections": [ - { - "name": SECTION_SMTP, - "options": [ - {"key": OPTION_KEY_SMTP_MAIL_FROM, "value": OPTION_VALUE_SMTP_MAIL_FROM}, - ], - }, - ], - }, - ), ( SECTION_DATABASE, OPTION_KEY_SQL_ALCHEMY_CONN, diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 3d985394646d0..af6f24596721d 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -28,7 +28,7 @@ from traceback import format_exception from typing import cast from unittest import mock -from unittest.mock import call, mock_open, patch +from unittest.mock import call, patch from uuid import uuid4 import pendulum @@ -1961,109 +1961,8 @@ def test_overwrite_params_with_dag_run_conf_none(self, create_task_instance): params = process_params(ti.task.dag, ti.task, dag_run.conf, suppress_exception=False) assert params["override"] is False - @pytest.mark.parametrize("use_native_obj", [True, False]) - @patch("airflow.models.taskinstance.send_email") - def test_email_alert(self, mock_send_email, dag_maker, use_native_obj): - with dag_maker(dag_id="test_failure_email", render_template_as_native_obj=use_native_obj): - task = BashOperator(task_id="test_email_alert", bash_command="exit 1", email="to") - ti = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0] - ti.task = task - - with contextlib.suppress(AirflowException): - ti.run() - - (email, title, body), _ = mock_send_email.call_args - assert email == "to" - assert "test_email_alert" in title - assert "test_email_alert" in body - assert "Try 0" in body - - @conf_vars( - { - ("email", "subject_template"): "/subject/path", - ("email", "html_content_template"): "/html_content/path", - } - ) - @patch("airflow.models.taskinstance.send_email") - def test_email_alert_with_config(self, mock_send_email, dag_maker): - with dag_maker(dag_id="test_failure_email"): - task = BashOperator( - task_id="test_email_alert_with_config", - bash_command="exit 1", - email="to", - ) - ti = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0] - ti.task = task - - opener = mock_open(read_data="template: {{ti.task_id}}") - with patch("airflow.models.taskinstance.open", opener, create=True): - with contextlib.suppress(AirflowException): - ti.run() - - (email, title, body), _ = mock_send_email.call_args - assert email == "to" - assert title == "template: test_email_alert_with_config" - assert body == "template: test_email_alert_with_config" - - @patch("airflow.models.taskinstance.send_email") - def test_email_alert_with_filenotfound_config(self, mock_send_email, dag_maker): - with dag_maker(dag_id="test_failure_email"): - task = BashOperator( - task_id="test_email_alert_with_config", - bash_command="exit 1", - email="to", - ) - ti = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0] - ti.task = task - - # Run test when the template file is not found - opener = mock_open(read_data="template: {{ti.task_id}}") - opener.side_effect = FileNotFoundError - with patch("airflow.models.taskinstance.open", opener, create=True): - with contextlib.suppress(AirflowException): - ti.run() - - (email_error, title_error, body_error), _ = mock_send_email.call_args - - # Rerun task without any error and no template file - with contextlib.suppress(AirflowException): - ti.run() - - (email_default, title_default, body_default), _ = mock_send_email.call_args - - assert email_error == email_default == "to" - assert title_default == title_error - assert body_default == body_error - - @pytest.mark.parametrize("task_id", ["test_email_alert", "test_email_alert__1"]) - @patch("airflow.models.taskinstance.send_email") - def test_failure_mapped_taskflow(self, mock_send_email, dag_maker, session, task_id): - with dag_maker(session=session) as dag: - - @dag.task(email="to") - def test_email_alert(x): - raise RuntimeError("Fail please") - - test_email_alert.expand(x=["a", "b"]) # This is 'test_email_alert'. - test_email_alert.expand(x=[1, 2, 3]) # This is 'test_email_alert__1'. - - dr: DagRun = dag_maker.create_dagrun(logical_date=timezone.utcnow()) - - ti = dr.get_task_instance(task_id, map_index=0, session=session) - assert ti is not None - - # The task will fail and trigger email reporting. - with pytest.raises(RuntimeError, match=r"^Fail please$"): - ti.run(session=session) - - (email, title, body), _ = mock_send_email.call_args - assert email == "to" - assert title == f"Airflow alert: " - assert body.startswith("Try 0") # try number only incremented by the scheduler - assert "test_email_alert" in body - def test_set_duration(self): - task = EmptyOperator(task_id="op", email="test@test.test") + task = EmptyOperator(task_id="op") ti = TI(task=task) ti.start_date = datetime.datetime(2018, 10, 1, 1) ti.end_date = datetime.datetime(2018, 10, 1, 2) @@ -2071,7 +1970,7 @@ def test_set_duration(self): assert ti.duration == 3600 def test_set_duration_empty_dates(self): - task = EmptyOperator(task_id="op", email="test@test.test") + task = EmptyOperator(task_id="op") ti = TI(task=task) ti.set_duration() assert ti.duration is None diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 281bcc73c346b..f839546734a83 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1377,9 +1377,6 @@ def test_no_new_fields_added_to_base_operator(self): "doc_yaml": None, "downstream_task_ids": set(), "end_date": None, - "email": None, - "email_on_failure": True, - "email_on_retry": True, "execution_timeout": None, "executor": None, "executor_config": {}, @@ -2258,10 +2255,14 @@ def test_not_templateable_fields_in_serialized_dag(self): class TestOperator(BaseOperator): template_fields = ( - "email", # templateable + "any_field", # templateable "execution_timeout", # not templateable ) + def __init__(self, *args, any_field=None, **kwargs): + super().__init__(*args, **kwargs) + self.any_field = any_field + def execute(self, context: Context): pass @@ -2270,18 +2271,18 @@ def execute(self, context: Context): with dag: task = TestOperator( task_id="test_task", - email="{{ ','.join(test_email_list) }}", + any_field="{{ ','.join(test_any_field_list) }}", execution_timeout=timedelta(seconds=10), ) - task.render_template_fields(context={"test_email_list": ["foo@test.com", "bar@test.com"]}) - assert task.email == "foo@test.com,bar@test.com" + task.render_template_fields(context={"test_any_field_list": ["foo", "boo"]}) + assert task.any_field == "foo,boo" with pytest.raises( AirflowException, match=re.escape( dedent( """Failed to serialize DAG 'test_dag': Cannot template BaseOperator field: - 'execution_timeout' op.__class__.__name__='TestOperator' op.template_fields=('email', 'execution_timeout')""" + 'execution_timeout' op.__class__.__name__='TestOperator' op.template_fields=('any_field', 'execution_timeout')""" ) ), ): diff --git a/tests/utils/test_email.py b/tests/utils/test_email.py deleted file mode 100644 index dfcae90215c56..0000000000000 --- a/tests/utils/test_email.py +++ /dev/null @@ -1,401 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import json -import os -from email.mime.application import MIMEApplication -from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText -from smtplib import SMTPServerDisconnected -from unittest import mock - -import pytest - -from airflow.configuration import conf -from airflow.utils import email - -from tests_common.test_utils.config import conf_vars - -EMAILS = ["test1@example.com", "test2@example.com"] - -send_email_test = mock.MagicMock() - - -class TestEmail: - def setup_method(self): - conf.remove_option("email", "EMAIL_BACKEND") - conf.remove_option("email", "EMAIL_CONN_ID") - - def test_get_email_address_single_email(self): - emails_string = "test1@example.com" - - assert email.get_email_address_list(emails_string) == [emails_string] - - def test_get_email_address_comma_sep_string(self): - emails_string = "test1@example.com, test2@example.com" - - assert email.get_email_address_list(emails_string) == EMAILS - - def test_get_email_address_colon_sep_string(self): - emails_string = "test1@example.com; test2@example.com" - - assert email.get_email_address_list(emails_string) == EMAILS - - def test_get_email_address_list(self): - emails_list = ["test1@example.com", "test2@example.com"] - - assert email.get_email_address_list(emails_list) == EMAILS - - def test_get_email_address_tuple(self): - emails_tuple = ("test1@example.com", "test2@example.com") - - assert email.get_email_address_list(emails_tuple) == EMAILS - - def test_get_email_address_invalid_type(self): - emails_string = 1 - - with pytest.raises(TypeError): - email.get_email_address_list(emails_string) - - def test_get_email_address_invalid_type_in_iterable(self): - emails_list = ["test1@example.com", 2] - - with pytest.raises(TypeError): - email.get_email_address_list(emails_list) - - @mock.patch("airflow.utils.email.send_email") - def test_default_backend(self, mock_send_email): - res = email.send_email("to", "subject", "content") - mock_send_email.assert_called_once_with("to", "subject", "content") - assert mock_send_email.return_value == res - - @mock.patch("airflow.utils.email.send_email_smtp") - def test_custom_backend(self, mock_send_email): - with conf_vars( - { - ("email", "email_backend"): "tests.utils.test_email.send_email_test", - ("email", "email_conn_id"): "smtp_default", - } - ): - email.send_email("to", "subject", "content") - send_email_test.assert_called_once_with( - "to", - "subject", - "content", - files=None, - dryrun=False, - cc=None, - bcc=None, - mime_charset="utf-8", - mime_subtype="mixed", - conn_id="smtp_default", - from_email=None, - custom_headers=None, - ) - assert not mock_send_email.called - - @mock.patch("airflow.utils.email.send_email_smtp") - @conf_vars( - { - ("email", "email_backend"): "tests.utils.test_email.send_email_test", - ("email", "email_conn_id"): "smtp_default", - ("email", "from_email"): "from@test.com", - } - ) - def test_custom_backend_sender(self, mock_send_email_smtp): - email.send_email("to", "subject", "content") - _, call_kwargs = send_email_test.call_args - assert call_kwargs["from_email"] == "from@test.com" - assert not mock_send_email_smtp.called - - def test_build_mime_message(self): - mail_from = "from@example.com" - mail_to = "to@example.com" - subject = "test subject" - html_content = "Test" - custom_headers = {"Reply-To": "reply_to@example.com"} - - msg, recipients = email.build_mime_message( - mail_from=mail_from, - to=mail_to, - subject=subject, - html_content=html_content, - custom_headers=custom_headers, - ) - - assert "From" in msg - assert "To" in msg - assert "Subject" in msg - assert "Reply-To" in msg - assert [mail_to] == recipients - assert msg["To"] == ",".join(recipients) - - -@pytest.mark.db_test -class TestEmailSmtp: - @pytest.fixture(autouse=True) - def setup_test_cases(self, monkeypatch): - monkeypatch.setenv( # Set the default smtp connection for all test cases - "AIRFLOW_CONN_SMTP_DEFAULT", - json.dumps({"conn_type": "smtp", "login": "user", "password": "p@$$word"}), - ) - - @mock.patch("airflow.utils.email.send_mime_email") - def test_send_smtp(self, mock_send_mime, tmp_path): - path = tmp_path / "testfile" - path.write_text("attachment") - email.send_email_smtp("to", "subject", "content", files=[os.fspath(path)]) - assert mock_send_mime.called - _, call_args = mock_send_mime.call_args - assert conf.get("smtp", "SMTP_MAIL_FROM") == call_args["e_from"] - assert call_args["e_to"] == ["to"] - msg = call_args["mime_msg"] - assert msg["Subject"] == "subject" - assert conf.get("smtp", "SMTP_MAIL_FROM") == msg["From"] - assert len(msg.get_payload()) == 2 - filename = f'attachment; filename="{path.name}"' - assert filename == msg.get_payload()[-1].get("Content-Disposition") - mimeapp = MIMEApplication("attachment") - assert mimeapp.get_payload() == msg.get_payload()[-1].get_payload() - - @mock.patch("airflow.utils.email.send_mime_email") - def test_send_smtp_with_multibyte_content(self, mock_send_mime): - email.send_email_smtp("to", "subject", "🔥", mime_charset="utf-8") - assert mock_send_mime.called - _, call_args = mock_send_mime.call_args - msg = call_args["mime_msg"] - mimetext = MIMEText("🔥", "mixed", "utf-8") - assert mimetext.get_payload() == msg.get_payload()[0].get_payload() - - @mock.patch("airflow.utils.email.send_mime_email") - def test_send_bcc_smtp(self, mock_send_mime, tmp_path): - path = tmp_path / "testfile" - path.write_text("attachment") - email.send_email_smtp( - "to", - "subject", - "content", - files=[os.fspath(path)], - cc="cc", - bcc="bcc", - custom_headers={"Reply-To": "reply_to@example.com"}, - ) - assert mock_send_mime.called - _, call_args = mock_send_mime.call_args - assert conf.get("smtp", "SMTP_MAIL_FROM") == call_args["e_from"] - assert call_args["e_to"] == ["to", "cc", "bcc"] - msg = call_args["mime_msg"] - assert msg["Subject"] == "subject" - assert conf.get("smtp", "SMTP_MAIL_FROM") == msg["From"] - assert len(msg.get_payload()) == 2 - assert f'attachment; filename="{path.name}"' == msg.get_payload()[-1].get("Content-Disposition") - mimeapp = MIMEApplication("attachment") - assert mimeapp.get_payload() == msg.get_payload()[-1].get_payload() - assert msg["Reply-To"] == "reply_to@example.com" - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_airflow_config(self, mock_smtp, mock_smtp_ssl, monkeypatch): - monkeypatch.delenv("AIRFLOW_CONN_SMTP_DEFAULT", raising=False) - mock_smtp.return_value = mock.Mock() - msg = MIMEMultipart() - email.send_mime_email("from", "to", msg, dryrun=False) - mock_smtp.assert_called_once_with( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - ) - assert not mock_smtp_ssl.called - assert mock_smtp.return_value.starttls.called - mock_smtp.return_value.sendmail.assert_called_once_with("from", "to", msg.as_string()) - assert mock_smtp.return_value.quit.called - - @mock.patch("smtplib.SMTP") - def test_send_mime_conn_id(self, mock_smtp, monkeypatch): - monkeypatch.setenv( - "AIRFLOW_CONN_SMTP_TEST_CONN", - json.dumps({"conn_type": "smtp", "login": "test-user", "password": "test-p@$$word"}), - ) - msg = MIMEMultipart() - email.send_mime_email("from", "to", msg, dryrun=False, conn_id="smtp_test_conn") - mock_smtp.return_value.login.assert_called_once_with("test-user", "test-p@$$word") - mock_smtp.return_value.sendmail.assert_called_once_with("from", "to", msg.as_string()) - assert mock_smtp.return_value.quit.called - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_ssl_none_context(self, mock_smtp, mock_smtp_ssl): - mock_smtp_ssl.return_value = mock.Mock() - with conf_vars({("smtp", "smtp_ssl"): "True", ("email", "ssl_context"): "none"}): - email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False) - assert not mock_smtp.called - mock_smtp_ssl.assert_called_once_with( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - context=None, - ) - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - @mock.patch("ssl.create_default_context") - def test_send_mime_ssl_default_context_if_not_set(self, create_default_context, mock_smtp, mock_smtp_ssl): - mock_smtp_ssl.return_value = mock.Mock() - with conf_vars({("smtp", "smtp_ssl"): "True"}): - email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False) - assert not mock_smtp.called - assert create_default_context.called - mock_smtp_ssl.assert_called_once_with( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - context=create_default_context.return_value, - ) - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - @mock.patch("ssl.create_default_context") - def test_send_mime_ssl_default_context_with_value_set_to_default( - self, create_default_context, mock_smtp, mock_smtp_ssl - ): - mock_smtp_ssl.return_value = mock.Mock() - with conf_vars({("smtp", "smtp_ssl"): "True", ("email", "ssl_context"): "default"}): - email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False) - assert not mock_smtp.called - assert create_default_context.called - mock_smtp_ssl.assert_called_once_with( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - context=create_default_context.return_value, - ) - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_noauth(self, mock_smtp, mock_smtp_ssl): - mock_smtp.return_value = mock.Mock() - with conf_vars( - { - ("smtp", "smtp_user"): None, - ("smtp", "smtp_password"): None, - } - ): - email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False) - assert not mock_smtp_ssl.called - mock_smtp.assert_called_once_with( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - ) - assert not mock_smtp.login.called - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_dryrun(self, mock_smtp, mock_smtp_ssl): - email.send_mime_email("from", "to", MIMEMultipart(), dryrun=True) - assert not mock_smtp.called - assert not mock_smtp_ssl.called - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_complete_failure(self, mock_smtp: mock.Mock, mock_smtp_ssl: mock.Mock): - mock_smtp.side_effect = SMTPServerDisconnected() - msg = MIMEMultipart() - with pytest.raises(SMTPServerDisconnected): - email.send_mime_email("from", "to", msg, dryrun=False) - mock_smtp.assert_any_call( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - ) - assert mock_smtp.call_count == conf.getint("smtp", "SMTP_RETRY_LIMIT") - assert not mock_smtp_ssl.called - assert not mock_smtp.return_value.starttls.called - assert not mock_smtp.return_value.login.called - assert not mock_smtp.return_value.sendmail.called - assert not mock_smtp.return_value.quit.called - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - @mock.patch("ssl.create_default_context") - def test_send_mime_ssl_complete_failure(self, create_default_context, mock_smtp, mock_smtp_ssl): - mock_smtp_ssl.side_effect = SMTPServerDisconnected() - msg = MIMEMultipart() - with conf_vars({("smtp", "smtp_ssl"): "True"}): - with pytest.raises(SMTPServerDisconnected): - email.send_mime_email("from", "to", msg, dryrun=False) - - mock_smtp_ssl.assert_any_call( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - context=create_default_context.return_value, - ) - assert create_default_context.called - assert mock_smtp_ssl.call_count == conf.getint("smtp", "SMTP_RETRY_LIMIT") - assert not mock_smtp.called - assert not mock_smtp_ssl.return_value.starttls.called - assert not mock_smtp_ssl.return_value.login.called - assert not mock_smtp_ssl.return_value.sendmail.called - assert not mock_smtp_ssl.return_value.quit.called - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_custom_timeout_retrylimit(self, mock_smtp, mock_smtp_ssl): - mock_smtp.side_effect = SMTPServerDisconnected() - msg = MIMEMultipart() - - custom_retry_limit = 10 - custom_timeout = 60 - - with conf_vars( - { - ("smtp", "smtp_retry_limit"): str(custom_retry_limit), - ("smtp", "smtp_timeout"): str(custom_timeout), - } - ): - with pytest.raises(SMTPServerDisconnected): - email.send_mime_email("from", "to", msg, dryrun=False) - - mock_smtp.assert_any_call( - host=conf.get("smtp", "SMTP_HOST"), port=conf.getint("smtp", "SMTP_PORT"), timeout=custom_timeout - ) - assert not mock_smtp_ssl.called - assert mock_smtp.call_count == 10 - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_partial_failure(self, mock_smtp, mock_smtp_ssl): - final_mock = mock.Mock() - side_effects = [SMTPServerDisconnected(), SMTPServerDisconnected(), final_mock] - mock_smtp.side_effect = side_effects - msg = MIMEMultipart() - - email.send_mime_email("from", "to", msg, dryrun=False) - - mock_smtp.assert_any_call( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - ) - assert mock_smtp.call_count == side_effects.index(final_mock) + 1 - assert not mock_smtp_ssl.called - assert final_mock.starttls.called - final_mock.sendmail.assert_called_once_with("from", "to", msg.as_string()) - assert final_mock.quit.called diff --git a/tests/utils/test_operator_helpers.py b/tests/utils/test_operator_helpers.py index 1f6b6df4c6198..d015f04902a38 100644 --- a/tests/utils/test_operator_helpers.py +++ b/tests/utils/test_operator_helpers.py @@ -33,7 +33,6 @@ def setup_method(self): self.logical_date = "2017-05-21T00:00:00" self.dag_run_id = "dag_run_id" self.owner = ["owner1", "owner2"] - self.email = ["email1@test.com"] self.context = { "dag_run": mock.MagicMock( name="dag_run", @@ -47,7 +46,7 @@ def setup_method(self): try_number=self.try_number, logical_date=datetime.strptime(self.logical_date, "%Y-%m-%dT%H:%M:%S"), ), - "task": mock.MagicMock(name="task", owner=self.owner, email=self.email), + "task": mock.MagicMock(name="task", owner=self.owner), } def test_context_to_airflow_vars_empty_context(self): @@ -61,7 +60,6 @@ def test_context_to_airflow_vars_all_context(self): "airflow.ctx.dag_run_id": self.dag_run_id, "airflow.ctx.try_number": str(self.try_number), "airflow.ctx.dag_owner": "owner1,owner2", - "airflow.ctx.dag_email": "email1@test.com", } assert operator_helpers.context_to_airflow_vars(self.context, in_env_var_format=True) == { @@ -71,7 +69,6 @@ def test_context_to_airflow_vars_all_context(self): "AIRFLOW_CTX_TRY_NUMBER": str(self.try_number), "AIRFLOW_CTX_DAG_RUN_ID": self.dag_run_id, "AIRFLOW_CTX_DAG_OWNER": "owner1,owner2", - "AIRFLOW_CTX_DAG_EMAIL": "email1@test.com", } def test_context_to_airflow_vars_with_default_context_vars(self): diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py index 8b2ee9e6646a3..e9544b0484065 100644 --- a/tests_common/pytest_plugin.py +++ b/tests_common/pytest_plugin.py @@ -1113,7 +1113,6 @@ def __call__( on_execute_callback: Callable = ..., on_failure_callback: Callable = ..., on_retry_callback: Callable = ..., - email: str = ..., with_dagrun_type="scheduled", **kwargs, ) -> tuple[DAG, EmptyOperator]: ... @@ -1153,7 +1152,6 @@ def create_dag( on_execute_callback=None, on_failure_callback=None, on_retry_callback=None, - email=None, with_dagrun_type=DagRunType.SCHEDULED, **kwargs, ): @@ -1169,7 +1167,6 @@ def create_dag( on_execute_callback=on_execute_callback, on_failure_callback=on_failure_callback, on_retry_callback=on_retry_callback, - email=email, pool=pool, trigger_rule=trigger_rule, **op_kwargs, @@ -1206,7 +1203,6 @@ def __call__( on_execute_callback: Callable = ..., on_failure_callback: Callable = ..., on_retry_callback: Callable = ..., - email: str = ..., map_index: int = -1, **kwargs, ) -> TaskInstance: ... @@ -1241,7 +1237,6 @@ def maker( on_execute_callback=None, on_failure_callback=None, on_retry_callback=None, - email=None, map_index=-1, hostname=None, pid=None, @@ -1264,7 +1259,6 @@ def maker( on_execute_callback=on_execute_callback, on_failure_callback=on_failure_callback, on_retry_callback=on_retry_callback, - email=email, pool=pool, trigger_rule=trigger_rule, **op_kwargs,