From 4e7df6e4ef39072bd6c642637bed4f33f8fd8fce Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Sat, 25 Jan 2025 18:04:56 +0100 Subject: [PATCH 1/9] Remove email conigs, operator and callbacks --- .../core_api/openapi/v1-generated.yaml | 7 - .../core_api/routes/public/config.py | 4 - airflow/config_templates/unit_tests.cfg | 4 - airflow/configuration.py | 7 - airflow/example_dags/example_dag_decorator.py | 13 +- airflow/example_dags/tutorial.py | 3 - airflow/models/baseoperator.py | 7 - airflow/models/taskinstance.py | 153 +------ airflow/models/trigger.py | 2 +- airflow/operators/email.py | 91 ---- airflow/serialization/serialized_objects.py | 2 - airflow/utils/email.py | 334 --------------- airflow/utils/operator_helpers.py | 5 - .../cli-and-env-variables-ref.rst | 1 - docs/apache-airflow/howto/email-config.rst | 190 --------- .../howto/export-more-env-vars.rst | 2 +- docs/apache-airflow/howto/index.rst | 1 - docs/apache-airflow/howto/set-config.rst | 1 - docs/apache-airflow/integration.rst | 1 - .../operators-and-hooks-ref.rst | 3 - .../public-airflow-interface.rst | 5 - newsfragments/.significant.rst | 42 ++ .../base_operator_partial_arguments.py | 2 - .../airflow/sdk/definitions/baseoperator.py | 22 - .../airflow/sdk/definitions/mappedoperator.py | 4 - .../endpoints/test_config_endpoint.py | 71 ++-- .../core_api/routes/public/test_config.py | 77 ---- tests/models/test_taskinstance.py | 107 +---- tests/operators/test_email.py | 62 --- tests/serialization/test_dag_serialization.py | 17 +- tests/utils/test_email.py | 401 ------------------ tests_common/pytest_plugin.py | 6 - 32 files changed, 99 insertions(+), 1548 deletions(-) delete mode 100644 airflow/operators/email.py delete mode 100644 airflow/utils/email.py delete mode 100644 docs/apache-airflow/howto/email-config.rst create mode 100644 newsfragments/.significant.rst delete mode 100644 tests/operators/test_email.py delete mode 100644 tests/utils/test_email.py diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 6075d0c886ef2..f2385278c2980 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2672,13 +2672,6 @@ paths: base_log_folder = /opt/airflow/logs - - [smtp] - - smtp_host = localhost - - smtp_mail_from = airflow@example.com - ' '401': content: diff --git a/airflow/api_fastapi/core_api/routes/public/config.py b/airflow/api_fastapi/core_api/routes/public/config.py index 13edf5f821b4c..70110fc6462e1 100644 --- a/airflow/api_fastapi/core_api/routes/public/config.py +++ b/airflow/api_fastapi/core_api/routes/public/config.py @@ -56,10 +56,6 @@ [core] dags_folder = /opt/airflow/dags base_log_folder = /opt/airflow/logs - - [smtp] - smtp_host = localhost - smtp_mail_from = airflow@example.com """ ), }, diff --git a/airflow/config_templates/unit_tests.cfg b/airflow/config_templates/unit_tests.cfg index 9e222ce550459..f4c09b3266a13 100644 --- a/airflow/config_templates/unit_tests.cfg +++ b/airflow/config_templates/unit_tests.cfg @@ -66,10 +66,6 @@ allowed_deserialization_classes = airflow.* tests.* # celery tests rely on it being set celery_logging_level = INFO -[smtp] -# Used as default values for SMTP unit tests -smtp_mail_from = airflow@example.com - [api] auth_backends = airflow.providers.fab.auth_manager.api.auth.backend.session diff --git a/airflow/configuration.py b/airflow/configuration.py index 1b8b937906b9c..7439e918f6afd 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -353,13 +353,6 @@ def inversed_deprecated_sections(self): "navbar_color": (re2.compile(r"(?i)\A#007A87\z"), "#fff", "2.1"), "dag_default_view": (re2.compile(r"^tree$"), "grid", "3.0"), }, - "email": { - "email_backend": ( - re2.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"), - r"airflow.providers.sendgrid.utils.emailer.send_email", - "2.1", - ), - }, "logging": { "log_filename_template": ( re2.compile(re2.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")), diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py index 0fed70fa2e9f1..20201a0456874 100644 --- a/airflow/example_dags/example_dag_decorator.py +++ b/airflow/example_dags/example_dag_decorator.py @@ -24,7 +24,7 @@ from airflow.decorators import dag, task from airflow.models.baseoperator import BaseOperator -from airflow.operators.email import EmailOperator +from airflow.providers.smtp.operators.smtp import EmailOperator if TYPE_CHECKING: from airflow.sdk.definitions.context import Context @@ -48,11 +48,12 @@ def execute(self, context: Context): catchup=False, tags=["example"], ) -def example_dag_decorator(email: str = "example@example.com"): +def example_dag_decorator(to_email: str = "example1@example.com", from_email: str = "example2@example.com"): """ DAG to send server IP to email. - :param email: Email to send IP to. Defaults to example@example.com. + :param to_email: Email to send IP to. Defaults to exampl1e@example.com. + :param from_email: Email to send IP from. Defaults to example2@example.com """ get_ip = GetRequestOperator(task_id="get_ip", url="http://httpbin.org/get") @@ -67,7 +68,11 @@ def prepare_email(raw_json: dict[str, Any]) -> dict[str, str]: email_info = prepare_email(get_ip.output) EmailOperator( - task_id="send_email", to=email, subject=email_info["subject"], html_content=email_info["body"] + task_id="send_email", + to=to_email, + from_email=from_email, + subject=email_info["subject"], + html_content=email_info["body"], ) diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py index 4e55ab7ff15a0..8f371ee24278c 100644 --- a/airflow/example_dags/tutorial.py +++ b/airflow/example_dags/tutorial.py @@ -45,9 +45,6 @@ # You can override them on a per-task basis during operator initialization default_args={ "depends_on_past": False, - "email": ["airflow@example.com"], - "email_on_failure": False, - "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=5), # 'queue': 'bash_queue', diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index e84994c6d04e7..65879e35be0f7 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -212,13 +212,6 @@ class derived from this one results in the creation of a task object, :param task_id: a unique, meaningful id for the task :param owner: the owner of the task. Using a meaningful description (e.g. user/person/team/role name) to clarify ownership is recommended. - :param email: the 'to' email address(es) used in email alerts. This can be a - single email or multiple ones. Multiple addresses can be specified as a - comma or semicolon separated string or by passing a list of strings. - :param email_on_retry: Indicates whether email alerts should be sent when a - task is retried - :param email_on_failure: Indicates whether email alerts should be sent when - a task failed :param retries: the number of retries that should be performed before failing the task :param retry_delay: delay between retries, can be set as ``timedelta`` or diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index f1aa3a8236e9c..127f4f2d3016f 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -23,7 +23,6 @@ import itertools import logging import math -import operator import os import signal import traceback @@ -106,7 +105,6 @@ from airflow.models.xcom import LazyXComSelectSequence, XCom from airflow.plugins_manager import integrate_macros_plugins from airflow.sdk.api.datamodels._generated import AssetProfile -from airflow.sdk.definitions._internal.templater import SandboxedEnvironment from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUniqueKey, AssetUriRef from airflow.sdk.definitions.taskgroup import MappedTaskGroup from airflow.sentry import Sentry @@ -123,10 +121,8 @@ OutletEventAccessors, VariableAccessor, context_get_outlet_events, - context_merge, ) -from airflow.utils.email import send_email -from airflow.utils.helpers import prune_dict, render_template_to_string +from airflow.utils.helpers import prune_dict from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname from airflow.utils.operator_helpers import ExecutionCallableRunner, context_to_airflow_vars @@ -1117,15 +1113,6 @@ def _handle_failure( ) _log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "") - if ( - failure_context["task"] - and failure_context["email_for_state"](failure_context["task"]) - and failure_context["task"].email - ): - try: - task_instance.email_alert(error, failure_context["task"]) - except Exception: - log.exception("Failed to send email to: %s", failure_context["task"].email) if failure_context["callbacks"] and failure_context["context"]: _run_finished_callback( @@ -1317,116 +1304,6 @@ def _get_previous_start_date( return pendulum.instance(prev_ti.start_date) if prev_ti and prev_ti.start_date else None -def _email_alert(*, task_instance: TaskInstance, exception, task: BaseOperator) -> None: - """ - Send alert email with exception information. - - :param task_instance: the task instance - :param exception: the exception - :param task: task related to the exception - - :meta private: - """ - subject, html_content, html_content_err = task_instance.get_email_subject_content(exception, task=task) - if TYPE_CHECKING: - assert task.email - try: - send_email(task.email, subject, html_content) - except Exception: - send_email(task.email, subject, html_content_err) - - -def _get_email_subject_content( - *, - task_instance: TaskInstance, - exception: BaseException, - task: BaseOperator | None = None, -) -> tuple[str, str, str]: - """ - Get the email subject content for exceptions. - - :param task_instance: the task instance - :param exception: the exception sent in the email - :param task: - - :meta private: - """ - # For a ti from DB (without ti.task), return the default value - if task is None: - task = getattr(task_instance, "task") - use_default = task is None - exception_html = str(exception).replace("\n", "
") - - default_subject = "Airflow alert: {{ti}}" - # For reporting purposes, we report based on 1-indexed, - # not 0-indexed lists (i.e. Try 1 instead of - # Try 0 for the first attempt). - default_html_content = ( - "Try {{try_number}} out of {{max_tries + 1}}
" - "Exception:
{{exception_html}}
" - 'Log: Link
' - "Host: {{ti.hostname}}
" - 'Mark success: Link
' - ) - - default_html_content_err = ( - "Try {{try_number}} out of {{max_tries + 1}}
" - "Exception:
Failed attempt to attach error logs
" - 'Log: Link
' - "Host: {{ti.hostname}}
" - 'Mark success: Link
' - ) - - additional_context: dict[str, Any] = { - "exception": exception, - "exception_html": exception_html, - "try_number": task_instance.try_number, - "max_tries": task_instance.max_tries, - } - - if use_default: - default_context = {"ti": task_instance, **additional_context} - jinja_env = jinja2.Environment( - loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True - ) - subject = jinja_env.from_string(default_subject).render(**default_context) - html_content = jinja_env.from_string(default_html_content).render(**default_context) - html_content_err = jinja_env.from_string(default_html_content_err).render(**default_context) - - else: - if TYPE_CHECKING: - assert task_instance.task - - # Use the DAG's get_template_env() to set force_sandboxed. Don't add - # the flag to the function on task object -- that function can be - # overridden, and adding a flag breaks backward compatibility. - dag = task_instance.task.get_dag() - if dag: - jinja_env = dag.get_template_env(force_sandboxed=True) - else: - jinja_env = SandboxedEnvironment(cache_size=0) - jinja_context = task_instance.get_template_context() - context_merge(jinja_context, additional_context) - - def render(key: str, content: str) -> str: - if conf.has_option("email", key): - path = conf.get_mandatory_value("email", key) - try: - with open(path) as f: - content = f.read() - except FileNotFoundError: - log.warning("Could not find email template file '%s'. Using defaults...", path) - except OSError: - log.exception("Error while using email template %s. Using defaults...", path) - return render_template_to_string(jinja_env.from_string(content), jinja_context) - - subject = render("subject_template", default_subject) - html_content = render("html_content_template", default_html_content) - html_content_err = render("html_content_template", default_html_content_err) - - return subject, html_content, html_content_err - - def _run_finished_callback( *, callbacks: None | TaskStateChangeCallback | list[TaskStateChangeCallback], @@ -3131,8 +3008,7 @@ def fetch_handle_failure_context( if context is not None: context["exception"] = error - # Set state correctly and figure out how to log it and decide whether - # to email + # Set state correctly and figure out how to log it # Note, callback invocation needs to be handled by caller of # _run_raw_task to avoid race conditions which could lead to duplicate @@ -3150,11 +3026,10 @@ def fetch_handle_failure_context( assert isinstance(ti.task, BaseOperator) task = ti.task.unmap((context, session)) except Exception: - cls.logger().error("Unable to unmap task to determine if we need to send an alert email") + cls.logger().error("Unable to unmap task to determine what callback to use") if force_fail or not ti.is_eligible_to_retry(): ti.state = TaskInstanceState.FAILED - email_for_state = operator.attrgetter("email_on_failure") callbacks = task.on_failure_callback if task else None if task and fail_fast: @@ -3169,7 +3044,6 @@ def fetch_handle_failure_context( TaskInstanceHistory.record_ti(ti, session=session) ti.state = State.UP_FOR_RETRY - email_for_state = operator.attrgetter("email_on_retry") callbacks = task.on_retry_callback if task else None get_listener_manager().hook.on_task_instance_failed( @@ -3178,7 +3052,6 @@ def fetch_handle_failure_context( return { "ti": ti, - "email_for_state": email_for_state, "task": task, "callbacks": callbacks, "context": context, @@ -3326,26 +3199,6 @@ def render_templates( return original_task - def get_email_subject_content( - self, exception: BaseException, task: BaseOperator | None = None - ) -> tuple[str, str, str]: - """ - Get the email subject content for exceptions. - - :param exception: the exception sent in the email - :param task: - """ - return _get_email_subject_content(task_instance=self, exception=exception, task=task) - - def email_alert(self, exception, task: BaseOperator) -> None: - """ - Send alert email with exception information. - - :param exception: the exception - :param task: task related to the exception - """ - _email_alert(task_instance=self, exception=exception, task=task) - def set_duration(self) -> None: """Set task instance duration.""" _set_duration(task_instance=self) diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py index 2e0fe9f7f2bbe..c6cbcf346a1df 100644 --- a/airflow/models/trigger.py +++ b/airflow/models/trigger.py @@ -261,7 +261,7 @@ def submit_failure(cls, trigger_id, exc=None, session: Session = NEW_SESSION) -> the runtime code understands as immediate-fail, and pack the error into next_kwargs. - TODO: Once we have shifted callback (and email) handling to run on + TODO: Once we have shifted callback handling to run on workers as first-class concepts, we can run the failure code here in-process, but we can't do that right now. """ diff --git a/airflow/operators/email.py b/airflow/operators/email.py deleted file mode 100644 index 85ac709e2bd5e..0000000000000 --- a/airflow/operators/email.py +++ /dev/null @@ -1,91 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from collections.abc import Sequence -from typing import TYPE_CHECKING, Any - -from airflow.models.baseoperator import BaseOperator -from airflow.utils.email import send_email - -if TYPE_CHECKING: - from airflow.sdk.definitions.context import Context - - -class EmailOperator(BaseOperator): - """ - Sends an email. - - :param to: list of emails to send the email to. (templated) - :param subject: subject line for the email. (templated) - :param html_content: content of the email, html markup - is allowed. (templated) - :param files: file names to attach in email (templated) - :param cc: list of recipients to be added in CC field - :param bcc: list of recipients to be added in BCC field - :param mime_subtype: MIME sub content type - :param mime_charset: character set parameter added to the Content-Type - header. - :param custom_headers: additional headers to add to the MIME message. - """ - - template_fields: Sequence[str] = ("to", "subject", "html_content", "files") - template_fields_renderers = {"html_content": "html"} - template_ext: Sequence[str] = (".html",) - ui_color = "#e6faf9" - - def __init__( - self, - *, - to: list[str] | str, - subject: str, - html_content: str, - files: list | None = None, - cc: list[str] | str | None = None, - bcc: list[str] | str | None = None, - mime_subtype: str = "mixed", - mime_charset: str = "utf-8", - conn_id: str | None = None, - custom_headers: dict[str, Any] | None = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - self.to = to - self.subject = subject - self.html_content = html_content - self.files = files or [] - self.cc = cc - self.bcc = bcc - self.mime_subtype = mime_subtype - self.mime_charset = mime_charset - self.conn_id = conn_id - self.custom_headers = custom_headers - - def execute(self, context: Context): - send_email( - self.to, - self.subject, - self.html_content, - files=self.files, - cc=self.cc, - bcc=self.bcc, - mime_subtype=self.mime_subtype, - mime_charset=self.mime_charset, - conn_id=self.conn_id, - custom_headers=self.custom_headers, - ) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index b7e08a45aed74..0f3a1ba9ea0bf 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1217,8 +1217,6 @@ def _serialize_node(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]: # If not, store them as strings # And raise an exception if the field is not templateable forbidden_fields = set(SerializedBaseOperator._CONSTRUCTOR_PARAMS.keys()) - # Though allow some of the BaseOperator fields to be templated anyway - forbidden_fields.difference_update({"email"}) if op.template_fields: for template_field in op.template_fields: if template_field in forbidden_fields: diff --git a/airflow/utils/email.py b/airflow/utils/email.py deleted file mode 100644 index 455b135c23e6c..0000000000000 --- a/airflow/utils/email.py +++ /dev/null @@ -1,334 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import collections.abc -import logging -import os -import smtplib -import ssl -from collections.abc import Iterable -from email.mime.application import MIMEApplication -from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText -from email.utils import formatdate -from typing import Any - -import re2 - -from airflow.configuration import conf -from airflow.exceptions import AirflowException - -log = logging.getLogger(__name__) - - -def send_email( - to: list[str] | Iterable[str], - subject: str, - html_content: str, - files: list[str] | None = None, - dryrun: bool = False, - cc: str | Iterable[str] | None = None, - bcc: str | Iterable[str] | None = None, - mime_subtype: str = "mixed", - mime_charset: str = "utf-8", - conn_id: str | None = None, - custom_headers: dict[str, Any] | None = None, - **kwargs, -) -> None: - """ - Send an email using the backend specified in the *EMAIL_BACKEND* configuration option. - - :param to: A list or iterable of email addresses to send the email to. - :param subject: The subject of the email. - :param html_content: The content of the email in HTML format. - :param files: A list of paths to files to attach to the email. - :param dryrun: If *True*, the email will not actually be sent. Default: *False*. - :param cc: A string or iterable of strings containing email addresses to send a copy of the email to. - :param bcc: A string or iterable of strings containing email addresses to send a - blind carbon copy of the email to. - :param mime_subtype: The subtype of the MIME message. Default: "mixed". - :param mime_charset: The charset of the email. Default: "utf-8". - :param conn_id: The connection ID to use for the backend. If not provided, the default connection - specified in the *EMAIL_CONN_ID* configuration option will be used. - :param custom_headers: A dictionary of additional headers to add to the MIME message. - No validations are run on these values, and they should be able to be encoded. - :param kwargs: Additional keyword arguments to pass to the backend. - """ - backend = conf.getimport("email", "EMAIL_BACKEND") - backend_conn_id = conn_id or conf.get("email", "EMAIL_CONN_ID") - from_email = conf.get("email", "from_email", fallback=None) - - to_list = get_email_address_list(to) - to_comma_separated = ", ".join(to_list) - - return backend( - to_comma_separated, - subject, - html_content, - files=files, - dryrun=dryrun, - cc=cc, - bcc=bcc, - mime_subtype=mime_subtype, - mime_charset=mime_charset, - conn_id=backend_conn_id, - from_email=from_email, - custom_headers=custom_headers, - **kwargs, - ) - - -def send_email_smtp( - to: str | Iterable[str], - subject: str, - html_content: str, - files: list[str] | None = None, - dryrun: bool = False, - cc: str | Iterable[str] | None = None, - bcc: str | Iterable[str] | None = None, - mime_subtype: str = "mixed", - mime_charset: str = "utf-8", - conn_id: str = "smtp_default", - from_email: str | None = None, - custom_headers: dict[str, Any] | None = None, - **kwargs, -) -> None: - """ - Send an email with html content. - - :param to: Recipient email address or list of addresses. - :param subject: Email subject. - :param html_content: Email body in HTML format. - :param files: List of file paths to attach to the email. - :param dryrun: If True, the email will not be sent, but all other actions will be performed. - :param cc: Carbon copy recipient email address or list of addresses. - :param bcc: Blind carbon copy recipient email address or list of addresses. - :param mime_subtype: MIME subtype of the email. - :param mime_charset: MIME charset of the email. - :param conn_id: Connection ID of the SMTP server. - :param from_email: Sender email address. - :param custom_headers: Dictionary of custom headers to include in the email. - :param kwargs: Additional keyword arguments. - - >>> send_email("test@example.com", "foo", "Foo bar", ["/dev/null"], dryrun=True) - """ - smtp_mail_from = conf.get("smtp", "SMTP_MAIL_FROM") - - if smtp_mail_from is not None: - mail_from = smtp_mail_from - else: - if from_email is None: - raise ValueError( - "You should set from email - either by smtp/smtp_mail_from config or `from_email` parameter" - ) - mail_from = from_email - - msg, recipients = build_mime_message( - mail_from=mail_from, - to=to, - subject=subject, - html_content=html_content, - files=files, - cc=cc, - bcc=bcc, - mime_subtype=mime_subtype, - mime_charset=mime_charset, - custom_headers=custom_headers, - ) - - send_mime_email(e_from=mail_from, e_to=recipients, mime_msg=msg, conn_id=conn_id, dryrun=dryrun) - - -def build_mime_message( - mail_from: str | None, - to: str | Iterable[str], - subject: str, - html_content: str, - files: list[str] | None = None, - cc: str | Iterable[str] | None = None, - bcc: str | Iterable[str] | None = None, - mime_subtype: str = "mixed", - mime_charset: str = "utf-8", - custom_headers: dict[str, Any] | None = None, -) -> tuple[MIMEMultipart, list[str]]: - """ - Build a MIME message that can be used to send an email and returns a full list of recipients. - - :param mail_from: Email address to set as the email's "From" field. - :param to: A string or iterable of strings containing email addresses to set as the email's "To" field. - :param subject: The subject of the email. - :param html_content: The content of the email in HTML format. - :param files: A list of paths to files to be attached to the email. - :param cc: A string or iterable of strings containing email addresses to set as the email's "CC" field. - :param bcc: A string or iterable of strings containing email addresses to set as the email's "BCC" field. - :param mime_subtype: The subtype of the MIME message. Default: "mixed". - :param mime_charset: The charset of the email. Default: "utf-8". - :param custom_headers: Additional headers to add to the MIME message. No validations are run on these - values, and they should be able to be encoded. - :return: A tuple containing the email as a MIMEMultipart object and a list of recipient email addresses. - """ - to = get_email_address_list(to) - - msg = MIMEMultipart(mime_subtype) - msg["Subject"] = subject - if mail_from: - msg["From"] = mail_from - msg["To"] = ", ".join(to) - recipients = to - if cc: - cc = get_email_address_list(cc) - msg["CC"] = ", ".join(cc) - recipients += cc - - if bcc: - # don't add bcc in header - bcc = get_email_address_list(bcc) - recipients += bcc - - msg["Date"] = formatdate(localtime=True) - mime_text = MIMEText(html_content, "html", mime_charset) - msg.attach(mime_text) - - for fname in files or []: - basename = os.path.basename(fname) - with open(fname, "rb") as file: - part = MIMEApplication(file.read(), Name=basename) - part["Content-Disposition"] = f'attachment; filename="{basename}"' - part["Content-ID"] = f"<{basename}>" - msg.attach(part) - - if custom_headers: - for header_key, header_value in custom_headers.items(): - msg[header_key] = header_value - - return msg, recipients - - -def send_mime_email( - e_from: str, - e_to: str | list[str], - mime_msg: MIMEMultipart, - conn_id: str = "smtp_default", - dryrun: bool = False, -) -> None: - """ - Send a MIME email. - - :param e_from: The email address of the sender. - :param e_to: The email address or a list of email addresses of the recipient(s). - :param mime_msg: The MIME message to send. - :param conn_id: The ID of the SMTP connection to use. - :param dryrun: If True, the email will not be sent, but a log message will be generated. - """ - smtp_host = conf.get_mandatory_value("smtp", "SMTP_HOST") - smtp_port = conf.getint("smtp", "SMTP_PORT") - smtp_starttls = conf.getboolean("smtp", "SMTP_STARTTLS") - smtp_ssl = conf.getboolean("smtp", "SMTP_SSL") - smtp_retry_limit = conf.getint("smtp", "SMTP_RETRY_LIMIT") - smtp_timeout = conf.getint("smtp", "SMTP_TIMEOUT") - smtp_user = None - smtp_password = None - - if conn_id is not None: - try: - from airflow.hooks.base import BaseHook - - airflow_conn = BaseHook.get_connection(conn_id) - smtp_user = airflow_conn.login - smtp_password = airflow_conn.password - except AirflowException: - pass - if smtp_user is None or smtp_password is None: - log.debug("No user/password found for SMTP, so logging in with no authentication.") - - if not dryrun: - for attempt in range(1, smtp_retry_limit + 1): - log.info("Email alerting: attempt %s", str(attempt)) - try: - smtp_conn = _get_smtp_connection(smtp_host, smtp_port, smtp_timeout, smtp_ssl) - except smtplib.SMTPServerDisconnected: - if attempt == smtp_retry_limit: - raise - else: - if smtp_starttls: - smtp_conn.starttls() - if smtp_user and smtp_password: - smtp_conn.login(smtp_user, smtp_password) - log.info("Sent an alert email to %s", e_to) - smtp_conn.sendmail(e_from, e_to, mime_msg.as_string()) - smtp_conn.quit() - break - - -def get_email_address_list(addresses: str | Iterable[str]) -> list[str]: - """ - Return a list of email addresses from the provided input. - - :param addresses: A string or iterable of strings containing email addresses. - :return: A list of email addresses. - :raises TypeError: If the input is not a string or iterable of strings. - """ - if isinstance(addresses, str): - return _get_email_list_from_str(addresses) - elif isinstance(addresses, collections.abc.Iterable): - if not all(isinstance(item, str) for item in addresses): - raise TypeError("The items in your iterable must be strings.") - return list(addresses) - else: - raise TypeError(f"Unexpected argument type: Received '{type(addresses).__name__}'.") - - -def _get_smtp_connection(host: str, port: int, timeout: int, with_ssl: bool) -> smtplib.SMTP: - """ - Return an SMTP connection to the specified host and port, with optional SSL encryption. - - :param host: The hostname or IP address of the SMTP server. - :param port: The port number to connect to on the SMTP server. - :param timeout: The timeout in seconds for the connection. - :param with_ssl: Whether to use SSL encryption for the connection. - :return: An SMTP connection to the specified host and port. - """ - if not with_ssl: - return smtplib.SMTP(host=host, port=port, timeout=timeout) - else: - ssl_context_string = conf.get("email", "SSL_CONTEXT") - if ssl_context_string == "default": - ssl_context = ssl.create_default_context() - elif ssl_context_string == "none": - ssl_context = None - else: - raise RuntimeError( - f"The email.ssl_context configuration variable must " - f"be set to 'default' or 'none' and is '{ssl_context_string}." - ) - return smtplib.SMTP_SSL(host=host, port=port, timeout=timeout, context=ssl_context) - - -def _get_email_list_from_str(addresses: str) -> list[str]: - """ - Extract a list of email addresses from a string. - - The string can contain multiple email addresses separated - by any of the following delimiters: ',' or ';'. - - :param addresses: A string containing one or more email addresses. - :return: A list of email addresses. - """ - pattern = r"\s*[,;]\s*" - return re2.split(pattern, addresses) diff --git a/airflow/utils/operator_helpers.py b/airflow/utils/operator_helpers.py index ab3c8c89e5e0f..96db19cb1bcaf 100644 --- a/airflow/utils/operator_helpers.py +++ b/airflow/utils/operator_helpers.py @@ -62,10 +62,6 @@ "default": f"{DEFAULT_FORMAT_PREFIX}dag_owner", "env_var_format": f"{ENV_VAR_FORMAT_PREFIX}DAG_OWNER", }, - "AIRFLOW_CONTEXT_DAG_EMAIL": { - "default": f"{DEFAULT_FORMAT_PREFIX}dag_email", - "env_var_format": f"{ENV_VAR_FORMAT_PREFIX}DAG_EMAIL", - }, } @@ -93,7 +89,6 @@ def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: bool dag_run = context.get("dag_run") ops = [ - (task, "email", "AIRFLOW_CONTEXT_DAG_EMAIL"), (task, "owner", "AIRFLOW_CONTEXT_DAG_OWNER"), (task_instance, "dag_id", "AIRFLOW_CONTEXT_DAG_ID"), (task_instance, "task_id", "AIRFLOW_CONTEXT_TASK_ID"), diff --git a/docs/apache-airflow/cli-and-env-variables-ref.rst b/docs/apache-airflow/cli-and-env-variables-ref.rst index 5de76b8d02969..a03694b495df9 100644 --- a/docs/apache-airflow/cli-and-env-variables-ref.rst +++ b/docs/apache-airflow/cli-and-env-variables-ref.rst @@ -76,7 +76,6 @@ Environment Variables * ``flower_basic_auth`` in ``[celery]`` section * ``result_backend`` in ``[celery]`` section * ``password`` in ``[atlas]`` section -* ``smtp_password`` in ``[smtp]`` section * ``secret_key`` in ``[webserver]`` section .. envvar:: AIRFLOW__{SECTION}__{KEY}_SECRET diff --git a/docs/apache-airflow/howto/email-config.rst b/docs/apache-airflow/howto/email-config.rst deleted file mode 100644 index bc7ea9a1f9d04..0000000000000 --- a/docs/apache-airflow/howto/email-config.rst +++ /dev/null @@ -1,190 +0,0 @@ - .. Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - .. http://www.apache.org/licenses/LICENSE-2.0 - - .. Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - -Email Configuration -=================== - -You can configure the email that is being sent in your ``airflow.cfg`` -by setting a ``subject_template`` and/or a ``html_content_template`` -in the ``[email]`` section. - -.. code-block:: ini - - [email] - email_backend = airflow.utils.email.send_email_smtp - subject_template = /path/to/my_subject_template_file - html_content_template = /path/to/my_html_content_template_file - -Equivalent environment variables look like: - -.. code-block:: sh - - AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.utils.email.send_email_smtp - AIRFLOW__EMAIL__SUBJECT_TEMPLATE=/path/to/my_subject_template_file - AIRFLOW__EMAIL__HTML_CONTENT_TEMPLATE=/path/to/my_html_content_template_file - -You can configure a sender's email address by setting ``from_email`` in the ``[email]`` section like: - -.. code-block:: ini - - [email] - from_email = "John Doe " - -Equivalent environment variables look like: - -.. code-block:: sh - - AIRFLOW__EMAIL__FROM_EMAIL="John Doe " - - -To configure SMTP settings, checkout the :ref:`SMTP ` section in the standard configuration. -If you do not want to store the SMTP credentials in the config or in the environment variables, you can create a -connection called ``smtp_default`` of ``Email`` type, or choose a custom connection name and set the ``email_conn_id`` with its name in -the configuration & store SMTP username-password in it. Other SMTP settings like host, port etc always gets picked up -from the configuration only. The connection can be of any type (for example 'HTTP connection'). - -If you want to check which email backend is currently set, you can use ``airflow config get-value email email_backend`` command as in -the example below. - -.. code-block:: bash - - $ airflow config get-value email email_backend - airflow.utils.email.send_email_smtp - -To access the task's information you use `Jinja Templating `_ in your template files. - -For example a ``html_content_template`` file could look like this: - -.. code-block:: - - Try {{try_number}} out of {{max_tries + 1}}
- Exception:
{{exception_html}}
- Log: Link
- Host: {{ti.hostname}}
- Mark success: Link
- -.. note:: - For more information on setting the configuration, see :doc:`set-config` - -.. _email-configuration-sendgrid: - -Send email using SendGrid -------------------------- - -Using Default SMTP -^^^^^^^^^^^^^^^^^^ - -You can use the default airflow SMTP backend to send email with SendGrid - - .. code-block:: ini - - [smtp] - smtp_host=smtp.sendgrid.net - smtp_starttls=False - smtp_ssl=False - smtp_port=587 - smtp_mail_from= - -Equivalent environment variables looks like - - .. code-block:: - - AIRFLOW__SMTP__SMTP_HOST=smtp.sendgrid.net - AIRFLOW__SMTP__SMTP_STARTTLS=False - AIRFLOW__SMTP__SMTP_SSL=False - AIRFLOW__SMTP__SMTP_PORT=587 - AIRFLOW__SMTP__SMTP_MAIL_FROM= - - -Using SendGrid Provider -^^^^^^^^^^^^^^^^^^^^^^^ - -Airflow can be configured to send e-mail using `SendGrid `__. - -Follow the steps below to enable it: - -1. Setup your SendGrid account, The SMTP and copy username and API Key. - -2. Include ``sendgrid`` provider as part of your Airflow installation, e.g., - - .. code-block:: bash - - pip install 'apache-airflow[sendgrid]' --constraint ... - -or - .. code-block:: bash - - pip install 'apache-airflow-providers-sendgrid' --constraint ... - - -3. Update ``email_backend`` property in ``[email]`` section in ``airflow.cfg``, i.e. - - .. code-block:: ini - - [email] - email_backend = airflow.providers.sendgrid.utils.emailer.send_email - email_conn_id = sendgrid_default - from_email = "hello@eg.com" - - Equivalent environment variables looks like - - .. code-block:: - - AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.providers.sendgrid.utils.emailer.send_email - AIRFLOW__EMAIL__EMAIL_CONN_ID=sendgrid_default - SENDGRID_MAIL_FROM=hello@thelearning.dev - -4. Create a connection called ``sendgrid_default``, or choose a custom connection - name and set it in ``email_conn_id`` of 'Email' type. Only login and password - are used from the connection. - - -.. image:: ../img/email_connection.png - :align: center - :alt: create email connection - -.. note:: The callbacks for success, failure and retry will use the same configuration to send the email - - -.. _email-configuration-ses: - -Send email using AWS SES ------------------------- - -Airflow can be configured to send e-mail using `AWS SES `__. - -Follow the steps below to enable it: - -1. Include ``amazon`` subpackage as part of your Airflow installation: - - .. code-block:: ini - - pip install 'apache-airflow[amazon]' - -2. Update ``email_backend`` property in ``[email]`` section in ``airflow.cfg``: - - .. code-block:: ini - - [email] - email_backend = airflow.providers.amazon.aws.utils.emailer.send_email - email_conn_id = aws_default - from_email = From email - -Note that for SES, you must configure from_email to the valid email that can send messages from SES. - -3. Create a connection called ``aws_default``, or choose a custom connection - name and set it in ``email_conn_id``. The type of connection should be ``Amazon Web Services``. diff --git a/docs/apache-airflow/howto/export-more-env-vars.rst b/docs/apache-airflow/howto/export-more-env-vars.rst index e393f479302d1..6c94c7c08853d 100644 --- a/docs/apache-airflow/howto/export-more-env-vars.rst +++ b/docs/apache-airflow/howto/export-more-env-vars.rst @@ -28,7 +28,7 @@ which are available as environment variables when running tasks. Note, both key value are must be string. ``dag_id``, ``task_id``, ``execution_date``, ``dag_run_id``, -``dag_owner``, ``dag_email`` are reserved keys. +``dag_owner`` are reserved keys. For example, in your ``airflow_local_settings.py`` file: diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst index 03c34820c4cc2..3271cbf8109e7 100644 --- a/docs/apache-airflow/howto/index.rst +++ b/docs/apache-airflow/howto/index.rst @@ -49,7 +49,6 @@ configuring an Airflow environment. run-behind-proxy run-with-systemd define-extra-link - email-config dynamic-dag-generation docker-compose/index upgrading-from-1-10/index diff --git a/docs/apache-airflow/howto/set-config.rst b/docs/apache-airflow/howto/set-config.rst index 2a03b2bbf5ee4..365ef6ca4023a 100644 --- a/docs/apache-airflow/howto/set-config.rst +++ b/docs/apache-airflow/howto/set-config.rst @@ -104,7 +104,6 @@ The following config options support this ``_cmd`` and ``_secret`` version: * ``flower_basic_auth`` in ``[celery]`` section * ``result_backend`` in ``[celery]`` section * ``password`` in ``[atlas]`` section -* ``smtp_password`` in ``[smtp]`` section * ``secret_key`` in ``[webserver]`` section The ``_cmd`` config options can also be set using a corresponding environment variable diff --git a/docs/apache-airflow/integration.rst b/docs/apache-airflow/integration.rst index 2e9c450f1fa8a..a54862f1dbf52 100644 --- a/docs/apache-airflow/integration.rst +++ b/docs/apache-airflow/integration.rst @@ -21,7 +21,6 @@ Integration Airflow has a mechanism that allows you to expand its functionality and integrate with other systems. * :doc:`API Authentication backends ` -* :doc:`Email backends ` * :doc:`Executor ` * :doc:`Kerberos ` * :doc:`Logging ` diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst index 0a2abcc841057..64d0067fe796f 100644 --- a/docs/apache-airflow/operators-and-hooks-ref.rst +++ b/docs/apache-airflow/operators-and-hooks-ref.rst @@ -56,9 +56,6 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - :mod:`airflow.operators.empty` - - * - :mod:`airflow.operators.email` - - - * - :mod:`airflow.operators.generic_transfer` - diff --git a/docs/apache-airflow/public-airflow-interface.rst b/docs/apache-airflow/public-airflow-interface.rst index 2853c6fbe2e1b..e9ad7062ab8d6 100644 --- a/docs/apache-airflow/public-airflow-interface.rst +++ b/docs/apache-airflow/public-airflow-interface.rst @@ -386,11 +386,6 @@ by extending them: You can read more about creating custom Decorators in :doc:`howto/create-custom-decorator`. -Email notifications -------------------- - -Airflow has a built-in way of sending email notifications and it allows to extend it by adding custom -email notification classes. You can read more about email notifications in :doc:`howto/email-config`. Notifications ------------- diff --git a/newsfragments/.significant.rst b/newsfragments/.significant.rst new file mode 100644 index 0000000000000..5f0d52fcfe2ea --- /dev/null +++ b/newsfragments/.significant.rst @@ -0,0 +1,42 @@ +.. Write a short and imperative summary of this changes + +.. Provide additional contextual information + +.. Check the type of change that applies to this change +.. Dag changes: requires users to change their dag code +.. Config changes: requires users to change their airflow config +.. API changes: requires users to change their Airflow REST API calls +.. CLI changes: requires users to change their Airflow CLI usage +.. Behaviour changes: the existing code won't break, but the behavior is different +.. Plugin changes: requires users to change their Airflow plugin implementation +.. Dependency changes: requires users to change their dependencies (e.g., Postgres 12) +.. Code interface changes: requires users to change other implementations (e.g., auth manager) +In Airflow 3.0, the ``timer_unit_consistency`` setting in the ``metrics`` section is removed as it is now the default behaviour. +This is done to standardize all timer and timing metrics to milliseconds across all metric loggers. + +Airflow 2.11 introduced the ``timer_unit_consistency`` setting in the ``metrics`` section of the configuration file. The +default value was ``False`` which meant that the timer and timing metrics were logged in seconds. This was done to maintain +backwards compatibility with the previous versions of Airflow. + +In Airflow 3.0, the ``smtp`` setting section, the base operator parameters ``email``, ``email_on_retry`` and ``email_on_failure``, and the ``airflow.operators.email.EmailOperator`` are removed. + +Instead of using the operator ``EmailOperator`` from the ``airflow.operators.email`` module, users should use the operator ``EmailOperator`` from the ``smtp`` provider in the module ``airflow.providers.smtp.operators.smtp``. + +And to send emails on failure or retry, users should use the ```airflow.providers.smtp.notifications.smtp.SmtpNotifier`` with the ``on_failure_callback`` and ``on_retry_callback`` parameters. For more details you can check `this guide `_. + +* Types of change + + * [ ] Dag changes + * [x] Config changes + * [ ] API changes + * [ ] CLI changes + * [x] Behaviour changes + * [ ] Plugin changes + * [ ] Dependency changes + * [x] Code interface changes + +* Migration rules needed + +.. TODO: create the migration rule that: +.. * Import ``EmailOperator`` from ``airflow.providers.smtp.operators.smtp`` instead of ``airflow.operators.email`` +.. * Detect and remove the ``email``, ``email_on_retry`` and ``email_on_failure`` parameters from the operator parameters (and replace them by an ``SmtpNotifier`` with the ``on_failure_callback`` and ``on_retry_callback`` parameters)? diff --git a/scripts/ci/pre_commit/base_operator_partial_arguments.py b/scripts/ci/pre_commit/base_operator_partial_arguments.py index 070ffe767cccc..e0160cf995c35 100755 --- a/scripts/ci/pre_commit/base_operator_partial_arguments.py +++ b/scripts/ci/pre_commit/base_operator_partial_arguments.py @@ -35,8 +35,6 @@ IGNORED = { # These are only used in the worker and thus mappable. "do_xcom_push", - "email_on_failure", - "email_on_retry", "post_execute", "pre_execute", "multiple_outputs", diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py index e7ecec69411ba..d40c441ef5391 100644 --- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py @@ -169,7 +169,6 @@ def partial( start_date: datetime | ArgNotSet = NOTSET, end_date: datetime | ArgNotSet = NOTSET, owner: str | ArgNotSet = NOTSET, - email: None | str | Iterable[str] | ArgNotSet = NOTSET, params: collections.abc.MutableMapping | None = None, resources: dict[str, Any] | None | ArgNotSet = NOTSET, trigger_rule: str | ArgNotSet = NOTSET, @@ -431,9 +430,6 @@ def __new__(cls, name, bases, namespace, **kwargs): # manual type-checking method. BASEOPERATOR_ARGS_EXPECTED_TYPES = { "task_id": str, - "email": (str, Sequence), - "email_on_retry": bool, - "email_on_failure": bool, "retries": int, "retry_exponential_backoff": bool, "depends_on_past": bool, @@ -502,13 +498,6 @@ class derived from this one results in the creation of a task object, :param task_id: a unique, meaningful id for the task :param owner: the owner of the task. Using a meaningful description (e.g. user/person/team/role name) to clarify ownership is recommended. - :param email: the 'to' email address(es) used in email alerts. This can be a - single email or multiple ones. Multiple addresses can be specified as a - comma or semicolon separated string or by passing a list of strings. - :param email_on_retry: Indicates whether email alerts should be sent when a - task is retried - :param email_on_failure: Indicates whether email alerts should be sent when - a task failed :param retries: the number of retries that should be performed before failing the task :param retry_delay: delay between retries, can be set as ``timedelta`` or @@ -697,9 +686,6 @@ def say_hello_world(**context): task_id: str owner: str = DEFAULT_OWNER - email: str | Sequence[str] | None = None - email_on_retry: bool = True - email_on_failure: bool = True retries: int | None = DEFAULT_RETRIES retry_delay: timedelta = DEFAULT_RETRY_DELAY retry_exponential_backoff: bool = False @@ -780,8 +766,6 @@ def say_hello_world(**context): "task_id", "dag_id", "owner", - "email", - "email_on_retry", "retry_delay", "retry_exponential_backoff", "max_retry_delay", @@ -853,9 +837,6 @@ def __init__( *, task_id: str, owner: str = DEFAULT_OWNER, - email: str | Sequence[str] | None = None, - email_on_retry: bool = True, - email_on_failure: bool = True, retries: int | None = DEFAULT_RETRIES, retry_delay: timedelta | float = DEFAULT_RETRY_DELAY, retry_exponential_backoff: bool = False, @@ -924,9 +905,6 @@ def __init__( validate_key(self.task_id) self.owner = owner - self.email = email - self.email_on_retry = email_on_retry - self.email_on_failure = email_on_failure if execution_timeout is not None and not isinstance(execution_timeout, timedelta): raise ValueError( diff --git a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py index 0fc0a7fa1896a..bcff6838c5a0c 100644 --- a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -385,10 +385,6 @@ def task_display_name(self) -> str: def owner(self) -> str: # type: ignore[override] return self.partial_kwargs.get("owner", DEFAULT_OWNER) - @property - def email(self) -> None | str | Iterable[str]: - return self.partial_kwargs.get("email") - @property def map_index_template(self) -> None | str: return self.partial_kwargs.get("map_index_template") diff --git a/tests/api_connexion/endpoints/test_config_endpoint.py b/tests/api_connexion/endpoints/test_config_endpoint.py index 8d3785f08811d..f4e447e00e3d3 100644 --- a/tests/api_connexion/endpoints/test_config_endpoint.py +++ b/tests/api_connexion/endpoints/test_config_endpoint.py @@ -31,17 +31,17 @@ "core": { "parallelism": "1024", }, - "smtp": { - "smtp_host": "localhost", - "smtp_mail_from": "airflow@example.com", + "operators": { + "default_deferrable": "true", + "default_queue": "queue", }, } MOCK_CONF_WITH_SENSITIVE_VALUE = { "core": {"parallelism": "1024"}, - "smtp": { - "smtp_host": "localhost", - "smtp_mail_from": "airflow@example.com", + "operators": { + "default_deferrable": "true", + "default_queue": "queue", }, "database": { "sql_alchemy_conn": "mock_conn", @@ -83,10 +83,6 @@ def test_should_respond_200_text_plain(self, mock_as_dict): """\ [core] parallelism = 1024 - - [smtp] - smtp_host = localhost - smtp_mail_from = airflow@example.com """ ) assert expected == response.data.decode() @@ -103,10 +99,6 @@ def test_should_respond_200_text_plain_with_non_sensitive_only(self, mock_as_dic """\ [core] parallelism = 1024 - - [smtp] - smtp_host = localhost - smtp_mail_from = airflow@example.com """ ) assert expected == response.data.decode() @@ -129,10 +121,9 @@ def test_should_respond_200_application_json(self, mock_as_dict): ], }, { - "name": "smtp", + "name": "sensors", "options": [ - {"key": "smtp_host", "value": "localhost"}, - {"key": "smtp_mail_from", "value": "airflow@example.com"}, + {"key": "default_timeout", "value": "86400"}, ], }, ] @@ -142,7 +133,7 @@ def test_should_respond_200_application_json(self, mock_as_dict): @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) def test_should_respond_200_single_section_as_text_plain(self, mock_as_dict): response = self.client.get( - "/api/v1/config?section=smtp", + "/api/v1/config?section=operators", headers={"Accept": "text/plain"}, environ_overrides={"REMOTE_USER": "test"}, ) @@ -150,9 +141,9 @@ def test_should_respond_200_single_section_as_text_plain(self, mock_as_dict): assert response.status_code == 200 expected = textwrap.dedent( """\ - [smtp] - smtp_host = localhost - smtp_mail_from = airflow@example.com + [operators] + default_deferrable = true + default_queue = queue """ ) assert expected == response.data.decode() @@ -160,19 +151,19 @@ def test_should_respond_200_single_section_as_text_plain(self, mock_as_dict): @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) def test_should_respond_200_single_section_as_json(self, mock_as_dict): response = self.client.get( - "/api/v1/config?section=smtp", + "/api/v1/config?section=operators", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"}, ) - mock_as_dict.assert_called_with(display_source=False, display_sensitive=True) + # mock_as_dict.assert_called_with(display_source=False, display_sensitive=True) assert response.status_code == 200 expected = { "sections": [ { - "name": "smtp", + "name": "operators", "options": [ - {"key": "smtp_host", "value": "localhost"}, - {"key": "smtp_mail_from", "value": "airflow@example.com"}, + {"key": "default_deferrable", "value": "true"}, + {"key": "default_queue", "value": "queue"}, ], }, ] @@ -182,13 +173,13 @@ def test_should_respond_200_single_section_as_json(self, mock_as_dict): @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) def test_should_respond_404_when_section_not_exist(self, mock_as_dict): response = self.client.get( - "/api/v1/config?section=smtp1", + "/api/v1/config?section=operators1", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 - assert "section=smtp1 not found." in response.json["detail"] + assert "section=operators1 not found." in response.json["detail"] @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) def test_should_respond_406(self, mock_as_dict): @@ -233,15 +224,15 @@ def setup_attrs(self, configured_app) -> None: @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) def test_should_respond_200_text_plain(self, mock_as_dict): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "text/plain"}, environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 200 expected = textwrap.dedent( """\ - [smtp] - smtp_mail_from = airflow@example.com + [operators] + default_queue = queue """ ) assert expected == response.data.decode() @@ -278,7 +269,7 @@ def test_should_respond_200_text_plain_with_non_sensitive_only(self, mock_as_dic @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) def test_should_respond_200_application_json(self, mock_as_dict): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"}, ) @@ -286,9 +277,9 @@ def test_should_respond_200_application_json(self, mock_as_dict): expected = { "sections": [ { - "name": "smtp", + "name": "operators", "options": [ - {"key": "smtp_mail_from", "value": "airflow@example.com"}, + {"key": "default_queue", "value": "queue"}, ], }, ] @@ -298,18 +289,18 @@ def test_should_respond_200_application_json(self, mock_as_dict): @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) def test_should_respond_404_when_option_not_exist(self, mock_as_dict): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from1", + "/api/v1/config/section/operators/option/default_queue1", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 - assert "The option [smtp/smtp_mail_from1] is not found in config." in response.json["detail"] + assert "The option [operators/default_queue1] is not found in config." in response.json["detail"] @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) def test_should_respond_406(self, mock_as_dict): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/octet-stream"}, environ_overrides={"REMOTE_USER": "test"}, ) @@ -317,14 +308,14 @@ def test_should_respond_406(self, mock_as_dict): def test_should_raises_401_unauthenticated(self): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", headers={"Accept": "application/json"} + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/json"} ) assert_401(response) def test_should_raises_403_unauthorized(self): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test_no_permissions"}, ) @@ -334,7 +325,7 @@ def test_should_raises_403_unauthorized(self): @conf_vars({("webserver", "expose_config"): "False"}) def test_should_respond_403_when_expose_config_off(self): response = self.client.get( - "/api/v1/config/section/smtp/option/smtp_mail_from", + "/api/v1/config/section/operators/option/default_queue", headers={"Accept": "application/json"}, environ_overrides={"REMOTE_USER": "test"}, ) diff --git a/tests/api_fastapi/core_api/routes/public/test_config.py b/tests/api_fastapi/core_api/routes/public/test_config.py index 90009d1b54152..705432b845b74 100644 --- a/tests/api_fastapi/core_api/routes/public/test_config.py +++ b/tests/api_fastapi/core_api/routes/public/test_config.py @@ -33,16 +33,11 @@ HEADERS_INVALID = {"Accept": "invalid"} HEADERS_JSON_UTF8 = {"Accept": "application/json; charset=utf-8"} SECTION_CORE = "core" -SECTION_SMTP = "smtp" SECTION_DATABASE = "database" SECTION_NOT_EXIST = "not_exist_section" OPTION_KEY_PARALLELISM = "parallelism" -OPTION_KEY_SMTP_HOST = "smtp_host" -OPTION_KEY_SMTP_MAIL_FROM = "smtp_mail_from" OPTION_KEY_SQL_ALCHEMY_CONN = "sql_alchemy_conn" OPTION_VALUE_PARALLELISM = "1024" -OPTION_VALUE_SMTP_HOST = "smtp.example.com" -OPTION_VALUE_SMTP_MAIL_FROM = "airflow@example.com" OPTION_VALUE_SQL_ALCHEMY_CONN = "sqlite:///example.db" OPTION_NOT_EXIST = "not_exist_option" OPTION_VALUE_SENSITIVE_HIDDEN = "< hidden >" @@ -51,10 +46,6 @@ SECTION_CORE: { OPTION_KEY_PARALLELISM: OPTION_VALUE_PARALLELISM, }, - SECTION_SMTP: { - OPTION_KEY_SMTP_HOST: OPTION_VALUE_SMTP_HOST, - OPTION_KEY_SMTP_MAIL_FROM: OPTION_VALUE_SMTP_MAIL_FROM, - }, SECTION_DATABASE: { OPTION_KEY_SQL_ALCHEMY_CONN: OPTION_VALUE_SQL_ALCHEMY_CONN, }, @@ -63,18 +54,12 @@ SECTION_CORE: { OPTION_KEY_PARALLELISM: OPTION_VALUE_PARALLELISM, }, - SECTION_SMTP: { - OPTION_KEY_SMTP_HOST: OPTION_VALUE_SMTP_HOST, - OPTION_KEY_SMTP_MAIL_FROM: OPTION_VALUE_SMTP_MAIL_FROM, - }, SECTION_DATABASE: { OPTION_KEY_SQL_ALCHEMY_CONN: OPTION_VALUE_SENSITIVE_HIDDEN, }, } MOCK_CONFIG_OVERRIDE = { (SECTION_CORE, OPTION_KEY_PARALLELISM): OPTION_VALUE_PARALLELISM, - (SECTION_SMTP, OPTION_KEY_SMTP_HOST): OPTION_VALUE_SMTP_HOST, - (SECTION_SMTP, OPTION_KEY_SMTP_MAIL_FROM): OPTION_VALUE_SMTP_MAIL_FROM, } AIRFLOW_CONFIG_ENABLE_EXPOSE_CONFIG = {("webserver", "expose_config"): "True"} @@ -92,13 +77,6 @@ {"key": OPTION_KEY_PARALLELISM, "value": OPTION_VALUE_PARALLELISM}, ], }, - { - "name": SECTION_SMTP, - "options": [ - {"key": OPTION_KEY_SMTP_HOST, "value": OPTION_VALUE_SMTP_HOST}, - {"key": OPTION_KEY_SMTP_MAIL_FROM, "value": OPTION_VALUE_SMTP_MAIL_FROM}, - ], - }, { "name": SECTION_DATABASE, "options": [ @@ -115,13 +93,6 @@ {"key": OPTION_KEY_PARALLELISM, "value": OPTION_VALUE_PARALLELISM}, ], }, - { - "name": SECTION_SMTP, - "options": [ - {"key": OPTION_KEY_SMTP_HOST, "value": OPTION_VALUE_SMTP_HOST}, - {"key": OPTION_KEY_SMTP_MAIL_FROM, "value": OPTION_VALUE_SMTP_MAIL_FROM}, - ], - }, { "name": SECTION_DATABASE, "options": [ @@ -201,10 +172,6 @@ class TestGetConfig(TestConfigEndpoint): [{SECTION_CORE}] {OPTION_KEY_PARALLELISM} = {OPTION_VALUE_PARALLELISM} - [{SECTION_SMTP}] - {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST} - {OPTION_KEY_SMTP_MAIL_FROM} = {OPTION_VALUE_SMTP_MAIL_FROM} - [{SECTION_DATABASE}] {OPTION_KEY_SQL_ALCHEMY_CONN} = {OPTION_VALUE_SQL_ALCHEMY_CONN} """ @@ -231,18 +198,6 @@ class TestGetConfig(TestConfigEndpoint): ], }, ), - ( - SECTION_SMTP, - HEADERS_TEXT, - 200, - textwrap.dedent( - f"""\ - [{SECTION_SMTP}] - {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST} - {OPTION_KEY_SMTP_MAIL_FROM} = {OPTION_VALUE_SMTP_MAIL_FROM} - """ - ), - ), ( SECTION_DATABASE, HEADERS_JSON, @@ -287,10 +242,6 @@ def test_get_config(self, test_client, section, headers, expected_status_code, e [{SECTION_CORE}] {OPTION_KEY_PARALLELISM} = {OPTION_VALUE_PARALLELISM} - [{SECTION_SMTP}] - {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST} - {OPTION_KEY_SMTP_MAIL_FROM} = {OPTION_VALUE_SMTP_MAIL_FROM} - [{SECTION_DATABASE}] {OPTION_KEY_SQL_ALCHEMY_CONN} = {OPTION_VALUE_SENSITIVE_HIDDEN} """ @@ -338,34 +289,6 @@ class TestGetConfigValue(TestConfigEndpoint): 200, GET_CONFIG_VALUE_CORE_PARALLELISM_JSON_RESPONSE, ), - ( - SECTION_SMTP, - OPTION_KEY_SMTP_HOST, - HEADERS_TEXT, - 200, - textwrap.dedent( - f"""\ - [{SECTION_SMTP}] - {OPTION_KEY_SMTP_HOST} = {OPTION_VALUE_SMTP_HOST} - """ - ), - ), - ( - SECTION_SMTP, - OPTION_KEY_SMTP_MAIL_FROM, - HEADERS_JSON, - 200, - { - "sections": [ - { - "name": SECTION_SMTP, - "options": [ - {"key": OPTION_KEY_SMTP_MAIL_FROM, "value": OPTION_VALUE_SMTP_MAIL_FROM}, - ], - }, - ], - }, - ), ( SECTION_DATABASE, OPTION_KEY_SQL_ALCHEMY_CONN, diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 8ca00e2f8d650..3a782f84b9104 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -29,7 +29,7 @@ from traceback import format_exception from typing import cast from unittest import mock -from unittest.mock import call, mock_open, patch +from unittest.mock import call, patch from uuid import uuid4 import pendulum @@ -2148,109 +2148,8 @@ def test_overwrite_params_with_dag_run_conf_none(self, create_task_instance): params = process_params(ti.task.dag, ti.task, dag_run.conf, suppress_exception=False) assert params["override"] is False - @pytest.mark.parametrize("use_native_obj", [True, False]) - @patch("airflow.models.taskinstance.send_email") - def test_email_alert(self, mock_send_email, dag_maker, use_native_obj): - with dag_maker(dag_id="test_failure_email", render_template_as_native_obj=use_native_obj): - task = BashOperator(task_id="test_email_alert", bash_command="exit 1", email="to") - ti = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0] - ti.task = task - - with contextlib.suppress(AirflowException): - ti.run() - - (email, title, body), _ = mock_send_email.call_args - assert email == "to" - assert "test_email_alert" in title - assert "test_email_alert" in body - assert "Try 0" in body - - @conf_vars( - { - ("email", "subject_template"): "/subject/path", - ("email", "html_content_template"): "/html_content/path", - } - ) - @patch("airflow.models.taskinstance.send_email") - def test_email_alert_with_config(self, mock_send_email, dag_maker): - with dag_maker(dag_id="test_failure_email"): - task = BashOperator( - task_id="test_email_alert_with_config", - bash_command="exit 1", - email="to", - ) - ti = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0] - ti.task = task - - opener = mock_open(read_data="template: {{ti.task_id}}") - with patch("airflow.models.taskinstance.open", opener, create=True): - with contextlib.suppress(AirflowException): - ti.run() - - (email, title, body), _ = mock_send_email.call_args - assert email == "to" - assert title == "template: test_email_alert_with_config" - assert body == "template: test_email_alert_with_config" - - @patch("airflow.models.taskinstance.send_email") - def test_email_alert_with_filenotfound_config(self, mock_send_email, dag_maker): - with dag_maker(dag_id="test_failure_email"): - task = BashOperator( - task_id="test_email_alert_with_config", - bash_command="exit 1", - email="to", - ) - ti = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0] - ti.task = task - - # Run test when the template file is not found - opener = mock_open(read_data="template: {{ti.task_id}}") - opener.side_effect = FileNotFoundError - with patch("airflow.models.taskinstance.open", opener, create=True): - with contextlib.suppress(AirflowException): - ti.run() - - (email_error, title_error, body_error), _ = mock_send_email.call_args - - # Rerun task without any error and no template file - with contextlib.suppress(AirflowException): - ti.run() - - (email_default, title_default, body_default), _ = mock_send_email.call_args - - assert email_error == email_default == "to" - assert title_default == title_error - assert body_default == body_error - - @pytest.mark.parametrize("task_id", ["test_email_alert", "test_email_alert__1"]) - @patch("airflow.models.taskinstance.send_email") - def test_failure_mapped_taskflow(self, mock_send_email, dag_maker, session, task_id): - with dag_maker(session=session) as dag: - - @dag.task(email="to") - def test_email_alert(x): - raise RuntimeError("Fail please") - - test_email_alert.expand(x=["a", "b"]) # This is 'test_email_alert'. - test_email_alert.expand(x=[1, 2, 3]) # This is 'test_email_alert__1'. - - dr: DagRun = dag_maker.create_dagrun(logical_date=timezone.utcnow()) - - ti = dr.get_task_instance(task_id, map_index=0, session=session) - assert ti is not None - - # The task will fail and trigger email reporting. - with pytest.raises(RuntimeError, match=r"^Fail please$"): - ti.run(session=session) - - (email, title, body), _ = mock_send_email.call_args - assert email == "to" - assert title == f"Airflow alert: " - assert body.startswith("Try 0") # try number only incremented by the scheduler - assert "test_email_alert" in body - def test_set_duration(self): - task = EmptyOperator(task_id="op", email="test@test.test") + task = EmptyOperator(task_id="op") ti = TI(task=task) ti.start_date = datetime.datetime(2018, 10, 1, 1) ti.end_date = datetime.datetime(2018, 10, 1, 2) @@ -2258,7 +2157,7 @@ def test_set_duration(self): assert ti.duration == 3600 def test_set_duration_empty_dates(self): - task = EmptyOperator(task_id="op", email="test@test.test") + task = EmptyOperator(task_id="op") ti = TI(task=task) ti.set_duration() assert ti.duration is None diff --git a/tests/operators/test_email.py b/tests/operators/test_email.py deleted file mode 100644 index df772beb95fdd..0000000000000 --- a/tests/operators/test_email.py +++ /dev/null @@ -1,62 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import datetime -from unittest import mock - -import pytest - -from airflow.operators.email import EmailOperator -from airflow.utils import timezone - -from tests_common.test_utils.config import conf_vars - -pytestmark = pytest.mark.db_test - -DEFAULT_DATE = timezone.datetime(2016, 1, 1) -END_DATE = timezone.datetime(2016, 1, 2) -INTERVAL = datetime.timedelta(hours=12) -FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1) - -send_email_test = mock.Mock() - - -class TestEmailOperator: - def test_execute(self, dag_maker): - with conf_vars({("email", "email_backend"): "tests.operators.test_email.send_email_test"}): - with dag_maker( - "test_dag", - default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, - schedule=INTERVAL, - serialized=True, - ): - task = EmailOperator( - to="airflow@example.com", - subject="Test Run", - html_content="The quick brown fox jumps over the lazy dog", - task_id="task", - files=["/tmp/Report-A-{{ ds }}.csv"], - custom_headers={"Reply-To": "reply_to@example.com"}, - ) - dag_maker.create_dagrun() - task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - assert send_email_test.call_count == 1 - call_args = send_email_test.call_args.kwargs - assert call_args["files"] == ["/tmp/Report-A-2016-01-01.csv"] - assert call_args["custom_headers"] == {"Reply-To": "reply_to@example.com"} diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 84a63674e5119..e3152585e446f 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1372,9 +1372,6 @@ def test_no_new_fields_added_to_base_operator(self): "doc_yaml": None, "downstream_task_ids": set(), "end_date": None, - "email": None, - "email_on_failure": True, - "email_on_retry": True, "execution_timeout": None, "executor": None, "executor_config": {}, @@ -2251,10 +2248,14 @@ def test_not_templateable_fields_in_serialized_dag(self): class TestOperator(BaseOperator): template_fields = ( - "email", # templateable + "any_field", # templateable "execution_timeout", # not templateable ) + def __init__(self, *args, any_field=None, **kwargs): + super().__init__(*args, **kwargs) + self.any_field = any_field + def execute(self, context: Context): pass @@ -2263,18 +2264,18 @@ def execute(self, context: Context): with dag: task = TestOperator( task_id="test_task", - email="{{ ','.join(test_email_list) }}", + any_field="{{ ','.join(test_any_field_list) }}", execution_timeout=timedelta(seconds=10), ) - task.render_template_fields(context={"test_email_list": ["foo@test.com", "bar@test.com"]}) - assert task.email == "foo@test.com,bar@test.com" + task.render_template_fields(context={"test_any_field_list": ["foo", "boo"]}) + assert task.any_field == "foo,boo" with pytest.raises( AirflowException, match=re.escape( dedent( """Failed to serialize DAG 'test_dag': Cannot template BaseOperator field: - 'execution_timeout' op.__class__.__name__='TestOperator' op.template_fields=('email', 'execution_timeout')""" + 'execution_timeout' op.__class__.__name__='TestOperator' op.template_fields=('any_field', 'execution_timeout')""" ) ), ): diff --git a/tests/utils/test_email.py b/tests/utils/test_email.py deleted file mode 100644 index dfcae90215c56..0000000000000 --- a/tests/utils/test_email.py +++ /dev/null @@ -1,401 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import json -import os -from email.mime.application import MIMEApplication -from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText -from smtplib import SMTPServerDisconnected -from unittest import mock - -import pytest - -from airflow.configuration import conf -from airflow.utils import email - -from tests_common.test_utils.config import conf_vars - -EMAILS = ["test1@example.com", "test2@example.com"] - -send_email_test = mock.MagicMock() - - -class TestEmail: - def setup_method(self): - conf.remove_option("email", "EMAIL_BACKEND") - conf.remove_option("email", "EMAIL_CONN_ID") - - def test_get_email_address_single_email(self): - emails_string = "test1@example.com" - - assert email.get_email_address_list(emails_string) == [emails_string] - - def test_get_email_address_comma_sep_string(self): - emails_string = "test1@example.com, test2@example.com" - - assert email.get_email_address_list(emails_string) == EMAILS - - def test_get_email_address_colon_sep_string(self): - emails_string = "test1@example.com; test2@example.com" - - assert email.get_email_address_list(emails_string) == EMAILS - - def test_get_email_address_list(self): - emails_list = ["test1@example.com", "test2@example.com"] - - assert email.get_email_address_list(emails_list) == EMAILS - - def test_get_email_address_tuple(self): - emails_tuple = ("test1@example.com", "test2@example.com") - - assert email.get_email_address_list(emails_tuple) == EMAILS - - def test_get_email_address_invalid_type(self): - emails_string = 1 - - with pytest.raises(TypeError): - email.get_email_address_list(emails_string) - - def test_get_email_address_invalid_type_in_iterable(self): - emails_list = ["test1@example.com", 2] - - with pytest.raises(TypeError): - email.get_email_address_list(emails_list) - - @mock.patch("airflow.utils.email.send_email") - def test_default_backend(self, mock_send_email): - res = email.send_email("to", "subject", "content") - mock_send_email.assert_called_once_with("to", "subject", "content") - assert mock_send_email.return_value == res - - @mock.patch("airflow.utils.email.send_email_smtp") - def test_custom_backend(self, mock_send_email): - with conf_vars( - { - ("email", "email_backend"): "tests.utils.test_email.send_email_test", - ("email", "email_conn_id"): "smtp_default", - } - ): - email.send_email("to", "subject", "content") - send_email_test.assert_called_once_with( - "to", - "subject", - "content", - files=None, - dryrun=False, - cc=None, - bcc=None, - mime_charset="utf-8", - mime_subtype="mixed", - conn_id="smtp_default", - from_email=None, - custom_headers=None, - ) - assert not mock_send_email.called - - @mock.patch("airflow.utils.email.send_email_smtp") - @conf_vars( - { - ("email", "email_backend"): "tests.utils.test_email.send_email_test", - ("email", "email_conn_id"): "smtp_default", - ("email", "from_email"): "from@test.com", - } - ) - def test_custom_backend_sender(self, mock_send_email_smtp): - email.send_email("to", "subject", "content") - _, call_kwargs = send_email_test.call_args - assert call_kwargs["from_email"] == "from@test.com" - assert not mock_send_email_smtp.called - - def test_build_mime_message(self): - mail_from = "from@example.com" - mail_to = "to@example.com" - subject = "test subject" - html_content = "Test" - custom_headers = {"Reply-To": "reply_to@example.com"} - - msg, recipients = email.build_mime_message( - mail_from=mail_from, - to=mail_to, - subject=subject, - html_content=html_content, - custom_headers=custom_headers, - ) - - assert "From" in msg - assert "To" in msg - assert "Subject" in msg - assert "Reply-To" in msg - assert [mail_to] == recipients - assert msg["To"] == ",".join(recipients) - - -@pytest.mark.db_test -class TestEmailSmtp: - @pytest.fixture(autouse=True) - def setup_test_cases(self, monkeypatch): - monkeypatch.setenv( # Set the default smtp connection for all test cases - "AIRFLOW_CONN_SMTP_DEFAULT", - json.dumps({"conn_type": "smtp", "login": "user", "password": "p@$$word"}), - ) - - @mock.patch("airflow.utils.email.send_mime_email") - def test_send_smtp(self, mock_send_mime, tmp_path): - path = tmp_path / "testfile" - path.write_text("attachment") - email.send_email_smtp("to", "subject", "content", files=[os.fspath(path)]) - assert mock_send_mime.called - _, call_args = mock_send_mime.call_args - assert conf.get("smtp", "SMTP_MAIL_FROM") == call_args["e_from"] - assert call_args["e_to"] == ["to"] - msg = call_args["mime_msg"] - assert msg["Subject"] == "subject" - assert conf.get("smtp", "SMTP_MAIL_FROM") == msg["From"] - assert len(msg.get_payload()) == 2 - filename = f'attachment; filename="{path.name}"' - assert filename == msg.get_payload()[-1].get("Content-Disposition") - mimeapp = MIMEApplication("attachment") - assert mimeapp.get_payload() == msg.get_payload()[-1].get_payload() - - @mock.patch("airflow.utils.email.send_mime_email") - def test_send_smtp_with_multibyte_content(self, mock_send_mime): - email.send_email_smtp("to", "subject", "🔥", mime_charset="utf-8") - assert mock_send_mime.called - _, call_args = mock_send_mime.call_args - msg = call_args["mime_msg"] - mimetext = MIMEText("🔥", "mixed", "utf-8") - assert mimetext.get_payload() == msg.get_payload()[0].get_payload() - - @mock.patch("airflow.utils.email.send_mime_email") - def test_send_bcc_smtp(self, mock_send_mime, tmp_path): - path = tmp_path / "testfile" - path.write_text("attachment") - email.send_email_smtp( - "to", - "subject", - "content", - files=[os.fspath(path)], - cc="cc", - bcc="bcc", - custom_headers={"Reply-To": "reply_to@example.com"}, - ) - assert mock_send_mime.called - _, call_args = mock_send_mime.call_args - assert conf.get("smtp", "SMTP_MAIL_FROM") == call_args["e_from"] - assert call_args["e_to"] == ["to", "cc", "bcc"] - msg = call_args["mime_msg"] - assert msg["Subject"] == "subject" - assert conf.get("smtp", "SMTP_MAIL_FROM") == msg["From"] - assert len(msg.get_payload()) == 2 - assert f'attachment; filename="{path.name}"' == msg.get_payload()[-1].get("Content-Disposition") - mimeapp = MIMEApplication("attachment") - assert mimeapp.get_payload() == msg.get_payload()[-1].get_payload() - assert msg["Reply-To"] == "reply_to@example.com" - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_airflow_config(self, mock_smtp, mock_smtp_ssl, monkeypatch): - monkeypatch.delenv("AIRFLOW_CONN_SMTP_DEFAULT", raising=False) - mock_smtp.return_value = mock.Mock() - msg = MIMEMultipart() - email.send_mime_email("from", "to", msg, dryrun=False) - mock_smtp.assert_called_once_with( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - ) - assert not mock_smtp_ssl.called - assert mock_smtp.return_value.starttls.called - mock_smtp.return_value.sendmail.assert_called_once_with("from", "to", msg.as_string()) - assert mock_smtp.return_value.quit.called - - @mock.patch("smtplib.SMTP") - def test_send_mime_conn_id(self, mock_smtp, monkeypatch): - monkeypatch.setenv( - "AIRFLOW_CONN_SMTP_TEST_CONN", - json.dumps({"conn_type": "smtp", "login": "test-user", "password": "test-p@$$word"}), - ) - msg = MIMEMultipart() - email.send_mime_email("from", "to", msg, dryrun=False, conn_id="smtp_test_conn") - mock_smtp.return_value.login.assert_called_once_with("test-user", "test-p@$$word") - mock_smtp.return_value.sendmail.assert_called_once_with("from", "to", msg.as_string()) - assert mock_smtp.return_value.quit.called - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_ssl_none_context(self, mock_smtp, mock_smtp_ssl): - mock_smtp_ssl.return_value = mock.Mock() - with conf_vars({("smtp", "smtp_ssl"): "True", ("email", "ssl_context"): "none"}): - email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False) - assert not mock_smtp.called - mock_smtp_ssl.assert_called_once_with( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - context=None, - ) - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - @mock.patch("ssl.create_default_context") - def test_send_mime_ssl_default_context_if_not_set(self, create_default_context, mock_smtp, mock_smtp_ssl): - mock_smtp_ssl.return_value = mock.Mock() - with conf_vars({("smtp", "smtp_ssl"): "True"}): - email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False) - assert not mock_smtp.called - assert create_default_context.called - mock_smtp_ssl.assert_called_once_with( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - context=create_default_context.return_value, - ) - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - @mock.patch("ssl.create_default_context") - def test_send_mime_ssl_default_context_with_value_set_to_default( - self, create_default_context, mock_smtp, mock_smtp_ssl - ): - mock_smtp_ssl.return_value = mock.Mock() - with conf_vars({("smtp", "smtp_ssl"): "True", ("email", "ssl_context"): "default"}): - email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False) - assert not mock_smtp.called - assert create_default_context.called - mock_smtp_ssl.assert_called_once_with( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - context=create_default_context.return_value, - ) - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_noauth(self, mock_smtp, mock_smtp_ssl): - mock_smtp.return_value = mock.Mock() - with conf_vars( - { - ("smtp", "smtp_user"): None, - ("smtp", "smtp_password"): None, - } - ): - email.send_mime_email("from", "to", MIMEMultipart(), dryrun=False) - assert not mock_smtp_ssl.called - mock_smtp.assert_called_once_with( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - ) - assert not mock_smtp.login.called - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_dryrun(self, mock_smtp, mock_smtp_ssl): - email.send_mime_email("from", "to", MIMEMultipart(), dryrun=True) - assert not mock_smtp.called - assert not mock_smtp_ssl.called - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_complete_failure(self, mock_smtp: mock.Mock, mock_smtp_ssl: mock.Mock): - mock_smtp.side_effect = SMTPServerDisconnected() - msg = MIMEMultipart() - with pytest.raises(SMTPServerDisconnected): - email.send_mime_email("from", "to", msg, dryrun=False) - mock_smtp.assert_any_call( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - ) - assert mock_smtp.call_count == conf.getint("smtp", "SMTP_RETRY_LIMIT") - assert not mock_smtp_ssl.called - assert not mock_smtp.return_value.starttls.called - assert not mock_smtp.return_value.login.called - assert not mock_smtp.return_value.sendmail.called - assert not mock_smtp.return_value.quit.called - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - @mock.patch("ssl.create_default_context") - def test_send_mime_ssl_complete_failure(self, create_default_context, mock_smtp, mock_smtp_ssl): - mock_smtp_ssl.side_effect = SMTPServerDisconnected() - msg = MIMEMultipart() - with conf_vars({("smtp", "smtp_ssl"): "True"}): - with pytest.raises(SMTPServerDisconnected): - email.send_mime_email("from", "to", msg, dryrun=False) - - mock_smtp_ssl.assert_any_call( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - context=create_default_context.return_value, - ) - assert create_default_context.called - assert mock_smtp_ssl.call_count == conf.getint("smtp", "SMTP_RETRY_LIMIT") - assert not mock_smtp.called - assert not mock_smtp_ssl.return_value.starttls.called - assert not mock_smtp_ssl.return_value.login.called - assert not mock_smtp_ssl.return_value.sendmail.called - assert not mock_smtp_ssl.return_value.quit.called - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_custom_timeout_retrylimit(self, mock_smtp, mock_smtp_ssl): - mock_smtp.side_effect = SMTPServerDisconnected() - msg = MIMEMultipart() - - custom_retry_limit = 10 - custom_timeout = 60 - - with conf_vars( - { - ("smtp", "smtp_retry_limit"): str(custom_retry_limit), - ("smtp", "smtp_timeout"): str(custom_timeout), - } - ): - with pytest.raises(SMTPServerDisconnected): - email.send_mime_email("from", "to", msg, dryrun=False) - - mock_smtp.assert_any_call( - host=conf.get("smtp", "SMTP_HOST"), port=conf.getint("smtp", "SMTP_PORT"), timeout=custom_timeout - ) - assert not mock_smtp_ssl.called - assert mock_smtp.call_count == 10 - - @mock.patch("smtplib.SMTP_SSL") - @mock.patch("smtplib.SMTP") - def test_send_mime_partial_failure(self, mock_smtp, mock_smtp_ssl): - final_mock = mock.Mock() - side_effects = [SMTPServerDisconnected(), SMTPServerDisconnected(), final_mock] - mock_smtp.side_effect = side_effects - msg = MIMEMultipart() - - email.send_mime_email("from", "to", msg, dryrun=False) - - mock_smtp.assert_any_call( - host=conf.get("smtp", "SMTP_HOST"), - port=conf.getint("smtp", "SMTP_PORT"), - timeout=conf.getint("smtp", "SMTP_TIMEOUT"), - ) - assert mock_smtp.call_count == side_effects.index(final_mock) + 1 - assert not mock_smtp_ssl.called - assert final_mock.starttls.called - final_mock.sendmail.assert_called_once_with("from", "to", msg.as_string()) - assert final_mock.quit.called diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py index 969d0b2a61c8c..809398d9e442f 100644 --- a/tests_common/pytest_plugin.py +++ b/tests_common/pytest_plugin.py @@ -1095,7 +1095,6 @@ def __call__( on_execute_callback: Callable = ..., on_failure_callback: Callable = ..., on_retry_callback: Callable = ..., - email: str = ..., with_dagrun_type="scheduled", **kwargs, ) -> tuple[DAG, EmptyOperator]: ... @@ -1135,7 +1134,6 @@ def create_dag( on_execute_callback=None, on_failure_callback=None, on_retry_callback=None, - email=None, with_dagrun_type=DagRunType.SCHEDULED, **kwargs, ): @@ -1151,7 +1149,6 @@ def create_dag( on_execute_callback=on_execute_callback, on_failure_callback=on_failure_callback, on_retry_callback=on_retry_callback, - email=email, pool=pool, trigger_rule=trigger_rule, **op_kwargs, @@ -1188,7 +1185,6 @@ def __call__( on_execute_callback: Callable = ..., on_failure_callback: Callable = ..., on_retry_callback: Callable = ..., - email: str = ..., map_index: int = -1, **kwargs, ) -> TaskInstance: ... @@ -1223,7 +1219,6 @@ def maker( on_execute_callback=None, on_failure_callback=None, on_retry_callback=None, - email=None, map_index=-1, hostname=None, pid=None, @@ -1246,7 +1241,6 @@ def maker( on_execute_callback=on_execute_callback, on_failure_callback=on_failure_callback, on_retry_callback=on_retry_callback, - email=email, pool=pool, trigger_rule=trigger_rule, **op_kwargs, From 56ba5f4e1d55514c699149f797796ac422052534 Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Sat, 25 Jan 2025 18:34:27 +0100 Subject: [PATCH 2/9] clean doc --- airflow/config_templates/config.yml | 129 ---------------------------- 1 file changed, 129 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 6b84cea704f84..5019b64541ae6 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2044,135 +2044,6 @@ webserver: type: boolean example: ~ default: "False" -email: - description: | - Configuration email backend and whether to - send email alerts on retry or failure - options: - email_backend: - description: Email backend to use - version_added: ~ - type: string - example: ~ - default: "airflow.utils.email.send_email_smtp" - email_conn_id: - description: Email connection to use - version_added: 2.1.0 - type: string - example: ~ - default: "smtp_default" - default_email_on_retry: - description: | - Whether email alerts should be sent when a task is retried - version_added: 2.0.0 - type: boolean - example: ~ - default: "True" - default_email_on_failure: - description: | - Whether email alerts should be sent when a task failed - version_added: 2.0.0 - type: boolean - example: ~ - default: "True" - subject_template: - description: | - File that will be used as the template for Email subject (which will be rendered using Jinja2). - If not set, Airflow uses a base template. - version_added: 2.0.1 - type: string - example: "/path/to/my_subject_template_file" - default: ~ - see_also: ":doc:`Email Configuration `" - html_content_template: - description: | - File that will be used as the template for Email content (which will be rendered using Jinja2). - If not set, Airflow uses a base template. - version_added: 2.0.1 - type: string - example: "/path/to/my_html_content_template_file" - default: ~ - see_also: ":doc:`Email Configuration `" - from_email: - description: | - Email address that will be used as sender address. - It can either be raw email or the complete address in a format ``Sender Name `` - version_added: 2.2.4 - type: string - example: "Airflow " - default: ~ - ssl_context: - description: | - ssl context to use when using SMTP and IMAP SSL connections. By default, the context is "default" - which sets it to ``ssl.create_default_context()`` which provides the right balance between - compatibility and security, it however requires that certificates in your operating system are - updated and that SMTP/IMAP servers of yours have valid certificates that have corresponding public - keys installed on your machines. You can switch it to "none" if you want to disable checking - of the certificates, but it is not recommended as it allows MITM (man-in-the-middle) attacks - if your infrastructure is not sufficiently secured. It should only be set temporarily while you - are fixing your certificate configuration. This can be typically done by upgrading to newer - version of the operating system you run Airflow components on,by upgrading/refreshing proper - certificates in the OS or by updating certificates for your mail servers. - type: string - version_added: 2.7.0 - example: "default" - default: "default" -smtp: - description: | - If you want airflow to send emails on retries, failure, and you want to use - the airflow.utils.email.send_email_smtp function, you have to configure an - smtp server here - options: - smtp_host: - description: | - Specifies the host server address used by Airflow when sending out email notifications via SMTP. - version_added: ~ - type: string - example: ~ - default: "localhost" - smtp_starttls: - description: | - Determines whether to use the STARTTLS command when connecting to the SMTP server. - version_added: ~ - type: string - example: ~ - default: "True" - smtp_ssl: - description: | - Determines whether to use an SSL connection when talking to the SMTP server. - version_added: ~ - type: string - example: ~ - default: "False" - smtp_port: - description: | - Defines the port number on which Airflow connects to the SMTP server to send email notifications. - version_added: ~ - type: string - example: ~ - default: "25" - smtp_mail_from: - description: | - Specifies the default **from** email address used when Airflow sends email notifications. - version_added: ~ - type: string - example: ~ - default: "airflow@example.com" - smtp_timeout: - description: | - Determines the maximum time (in seconds) the Apache Airflow system will wait for a - connection to the SMTP server to be established. - version_added: 2.0.0 - type: integer - example: ~ - default: "30" - smtp_retry_limit: - description: | - Defines the maximum number of times Airflow will attempt to connect to the SMTP server. - version_added: 2.0.0 - type: integer - example: ~ - default: "5" sentry: description: | `Sentry `__ integration. Here you can supply From 286c98e36c4e13789c18e2bdf93954e173beee64 Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Sat, 25 Jan 2025 18:38:30 +0100 Subject: [PATCH 3/9] rename the news fragment file --- .../{.significant.rst => 46041.significant.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename newsfragments/{.significant.rst => 46041.significant.rst} (100%) diff --git a/newsfragments/.significant.rst b/newsfragments/46041.significant.rst similarity index 100% rename from newsfragments/.significant.rst rename to newsfragments/46041.significant.rst From d6e77d7230a3056f113c5ea317eb457b0878ba6e Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Sat, 25 Jan 2025 18:51:25 +0100 Subject: [PATCH 4/9] fix unit tests --- task_sdk/tests/definitions/test_baseoperator.py | 12 ------------ tests/utils/test_operator_helpers.py | 5 +---- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/task_sdk/tests/definitions/test_baseoperator.py b/task_sdk/tests/definitions/test_baseoperator.py index af6bf592f5373..9acaebd428744 100644 --- a/task_sdk/tests/definitions/test_baseoperator.py +++ b/task_sdk/tests/definitions/test_baseoperator.py @@ -164,18 +164,6 @@ def test_custom_resources(self): assert task.resources.cpus.qty == 1 assert task.resources.ram.qty == 1024 - def test_default_email_on_actions(self): - test_task = BaseOperator(task_id="test_default_email_on_actions") - assert test_task.email_on_retry is True - assert test_task.email_on_failure is True - - def test_email_on_actions(self): - test_task = BaseOperator( - task_id="test_default_email_on_actions", email_on_retry=False, email_on_failure=True - ) - assert test_task.email_on_retry is False - assert test_task.email_on_failure is True - def test_incorrect_default_args(self): default_args = {"test_param": True, "extra_param": True} op = FakeOperator(default_args=default_args) diff --git a/tests/utils/test_operator_helpers.py b/tests/utils/test_operator_helpers.py index 1f6b6df4c6198..d015f04902a38 100644 --- a/tests/utils/test_operator_helpers.py +++ b/tests/utils/test_operator_helpers.py @@ -33,7 +33,6 @@ def setup_method(self): self.logical_date = "2017-05-21T00:00:00" self.dag_run_id = "dag_run_id" self.owner = ["owner1", "owner2"] - self.email = ["email1@test.com"] self.context = { "dag_run": mock.MagicMock( name="dag_run", @@ -47,7 +46,7 @@ def setup_method(self): try_number=self.try_number, logical_date=datetime.strptime(self.logical_date, "%Y-%m-%dT%H:%M:%S"), ), - "task": mock.MagicMock(name="task", owner=self.owner, email=self.email), + "task": mock.MagicMock(name="task", owner=self.owner), } def test_context_to_airflow_vars_empty_context(self): @@ -61,7 +60,6 @@ def test_context_to_airflow_vars_all_context(self): "airflow.ctx.dag_run_id": self.dag_run_id, "airflow.ctx.try_number": str(self.try_number), "airflow.ctx.dag_owner": "owner1,owner2", - "airflow.ctx.dag_email": "email1@test.com", } assert operator_helpers.context_to_airflow_vars(self.context, in_env_var_format=True) == { @@ -71,7 +69,6 @@ def test_context_to_airflow_vars_all_context(self): "AIRFLOW_CTX_TRY_NUMBER": str(self.try_number), "AIRFLOW_CTX_DAG_RUN_ID": self.dag_run_id, "AIRFLOW_CTX_DAG_OWNER": "owner1,owner2", - "AIRFLOW_CTX_DAG_EMAIL": "email1@test.com", } def test_context_to_airflow_vars_with_default_context_vars(self): From 47c5b5f765f604bacf3cabc5bb2d72d26d7f8467 Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Sat, 25 Jan 2025 19:01:19 +0100 Subject: [PATCH 5/9] fix providers send_email --- .../airflow/providers/amazon/aws/hooks/ses.py | 77 ++++++++++++++++++- .../providers/sendgrid/utils/emailer.py | 26 ++++++- 2 files changed, 97 insertions(+), 6 deletions(-) diff --git a/providers/src/airflow/providers/amazon/aws/hooks/ses.py b/providers/src/airflow/providers/amazon/aws/hooks/ses.py index 7c0568c4d03c8..8dd7cdfd19058 100644 --- a/providers/src/airflow/providers/amazon/aws/hooks/ses.py +++ b/providers/src/airflow/providers/amazon/aws/hooks/ses.py @@ -16,13 +16,21 @@ # under the License. """This module contains AWS SES Hook.""" +# TODO: remove this module once minimum Airflow version is >=3 + from __future__ import annotations +import os from collections.abc import Iterable +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.utils import formatdate from typing import Any +import re2 + from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook -from airflow.utils.email import build_mime_message class SesHook(AwsBaseHook): @@ -88,7 +96,7 @@ def send_email( if return_path: custom_headers["Return-Path"] = return_path - message, recipients = build_mime_message( + message, recipients = _build_mime_message( mail_from=mail_from, to=to, subject=subject, @@ -104,3 +112,68 @@ def send_email( return ses_client.send_raw_email( Source=mail_from, Destinations=recipients, RawMessage={"Data": message.as_string()} ) + + +def _get_email_list_from_str(addresses: str) -> list[str]: + pattern = r"\s*[,;]\s*" + return re2.split(pattern, addresses) + + +def _get_email_address_list(addresses: str | Iterable[str]) -> list[str]: + if isinstance(addresses, str): + return _get_email_list_from_str(addresses) + elif isinstance(addresses, Iterable): + if not all(isinstance(item, str) for item in addresses): + raise TypeError("The items in your iterable must be strings.") + return list(addresses) + else: + raise TypeError(f"Unexpected argument type: Received '{type(addresses).__name__}'.") + + +def _build_mime_message( + mail_from: str | None, + to: str | Iterable[str], + subject: str, + html_content: str, + files: list[str] | None = None, + cc: str | Iterable[str] | None = None, + bcc: str | Iterable[str] | None = None, + mime_subtype: str = "mixed", + mime_charset: str = "utf-8", + custom_headers: dict[str, Any] | None = None, +) -> tuple[MIMEMultipart, list[str]]: + to = _get_email_address_list(to) + + msg = MIMEMultipart(mime_subtype) + msg["Subject"] = subject + if mail_from: + msg["From"] = mail_from + msg["To"] = ", ".join(to) + recipients = to + if cc: + cc = _get_email_address_list(cc) + msg["CC"] = ", ".join(cc) + recipients += cc + + if bcc: + # don't add bcc in header + bcc = _get_email_address_list(bcc) + recipients += bcc + + msg["Date"] = formatdate(localtime=True) + mime_text = MIMEText(html_content, "html", mime_charset) + msg.attach(mime_text) + + for fname in files or []: + basename = os.path.basename(fname) + with open(fname, "rb") as file: + part = MIMEApplication(file.read(), Name=basename) + part["Content-Disposition"] = f'attachment; filename="{basename}"' + part["Content-ID"] = f"<{basename}>" + msg.attach(part) + + if custom_headers: + for header_key, header_value in custom_headers.items(): + msg[header_key] = header_value + + return msg, recipients diff --git a/providers/src/airflow/providers/sendgrid/utils/emailer.py b/providers/src/airflow/providers/sendgrid/utils/emailer.py index f22a080deba78..99cdd5edf375e 100644 --- a/providers/src/airflow/providers/sendgrid/utils/emailer.py +++ b/providers/src/airflow/providers/sendgrid/utils/emailer.py @@ -17,6 +17,8 @@ # under the License. """Airflow module for email backend using sendgrid.""" +# TODO: remove this module once minimum Airflow version is >=3 + from __future__ import annotations import base64 @@ -26,6 +28,7 @@ from collections.abc import Iterable from typing import Union +import re2 import sendgrid from sendgrid.helpers.mail import ( Attachment, @@ -40,7 +43,6 @@ ) from airflow.hooks.base import BaseHook -from airflow.utils.email import get_email_address_list log = logging.getLogger(__name__) @@ -79,15 +81,15 @@ def send_email( # Add the recipient list of to emails. personalization = Personalization() - to = get_email_address_list(to) + to = _get_email_address_list(to) for to_address in to: personalization.add_to(Email(to_address)) if cc: - cc = get_email_address_list(cc) + cc = _get_email_address_list(cc) for cc_address in cc: personalization.add_cc(Email(cc_address)) if bcc: - bcc = get_email_address_list(bcc) + bcc = _get_email_address_list(bcc) for bcc_address in bcc: personalization.add_bcc(Email(bcc_address)) @@ -141,3 +143,19 @@ def _post_sendgrid_mail(mail_data: dict, conn_id: str = "sendgrid_default") -> N mail_data["subject"], response.status_code, ) + + +def _get_email_list_from_str(addresses: str) -> list[str]: + pattern = r"\s*[,;]\s*" + return re2.split(pattern, addresses) + + +def _get_email_address_list(addresses: str | Iterable[str]) -> list[str]: + if isinstance(addresses, str): + return _get_email_list_from_str(addresses) + elif isinstance(addresses, Iterable): + if not all(isinstance(item, str) for item in addresses): + raise TypeError("The items in your iterable must be strings.") + return list(addresses) + else: + raise TypeError(f"Unexpected argument type: Received '{type(addresses).__name__}'.") From d64787088c4ad42b322449f11702495af58afbe2 Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Sat, 25 Jan 2025 20:09:37 +0100 Subject: [PATCH 6/9] fix config tests --- .../endpoints/test_config_endpoint.py | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/tests/api_connexion/endpoints/test_config_endpoint.py b/tests/api_connexion/endpoints/test_config_endpoint.py index f4e447e00e3d3..28b827a80a137 100644 --- a/tests/api_connexion/endpoints/test_config_endpoint.py +++ b/tests/api_connexion/endpoints/test_config_endpoint.py @@ -37,12 +37,12 @@ }, } +MOCK_CONF_TUPLES = { + (section, option): value for section, options in MOCK_CONF.items() for option, value in options.items() +} + MOCK_CONF_WITH_SENSITIVE_VALUE = { - "core": {"parallelism": "1024"}, - "operators": { - "default_deferrable": "true", - "default_queue": "queue", - }, + **MOCK_CONF, "database": { "sql_alchemy_conn": "mock_conn", }, @@ -73,6 +73,7 @@ def setup_attrs(self, configured_app) -> None: self.client = self.app.test_client() # type:ignore @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_text_plain(self, mock_as_dict): response = self.client.get( "/api/v1/config", headers={"Accept": "text/plain"}, environ_overrides={"REMOTE_USER": "test"} @@ -83,12 +84,16 @@ def test_should_respond_200_text_plain(self, mock_as_dict): """\ [core] parallelism = 1024 + + [operators] + default_deferrable = true + default_queue = queue """ ) assert expected == response.data.decode() @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) - @conf_vars({("webserver", "expose_config"): "non-sensitive-only"}) + @conf_vars({**MOCK_CONF_TUPLES, ("webserver", "expose_config"): "non-sensitive-only"}) def test_should_respond_200_text_plain_with_non_sensitive_only(self, mock_as_dict): response = self.client.get( "/api/v1/config", headers={"Accept": "text/plain"}, environ_overrides={"REMOTE_USER": "test"} @@ -99,11 +104,16 @@ def test_should_respond_200_text_plain_with_non_sensitive_only(self, mock_as_dic """\ [core] parallelism = 1024 + + [operators] + default_deferrable = true + default_queue = queue """ ) assert expected == response.data.decode() @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_application_json(self, mock_as_dict): response = self.client.get( "/api/v1/config", @@ -121,9 +131,10 @@ def test_should_respond_200_application_json(self, mock_as_dict): ], }, { - "name": "sensors", + "name": "operators", "options": [ - {"key": "default_timeout", "value": "86400"}, + {"key": "default_deferrable", "value": "true"}, + {"key": "default_queue", "value": "queue"}, ], }, ] @@ -131,6 +142,7 @@ def test_should_respond_200_application_json(self, mock_as_dict): assert expected == response.json @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_single_section_as_text_plain(self, mock_as_dict): response = self.client.get( "/api/v1/config?section=operators", @@ -149,6 +161,7 @@ def test_should_respond_200_single_section_as_text_plain(self, mock_as_dict): assert expected == response.data.decode() @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_single_section_as_json(self, mock_as_dict): response = self.client.get( "/api/v1/config?section=operators", @@ -171,6 +184,7 @@ def test_should_respond_200_single_section_as_json(self, mock_as_dict): assert expected == response.json @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_404_when_section_not_exist(self, mock_as_dict): response = self.client.get( "/api/v1/config?section=operators1", @@ -182,6 +196,7 @@ def test_should_respond_404_when_section_not_exist(self, mock_as_dict): assert "section=operators1 not found." in response.json["detail"] @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_406(self, mock_as_dict): response = self.client.get( "/api/v1/config", @@ -222,6 +237,7 @@ def setup_attrs(self, configured_app) -> None: self.client = self.app.test_client() # type:ignore @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_text_plain(self, mock_as_dict): response = self.client.get( "/api/v1/config/section/operators/option/default_queue", @@ -267,6 +283,7 @@ def test_should_respond_200_text_plain_with_non_sensitive_only(self, mock_as_dic assert expected == response.data.decode() @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_200_application_json(self, mock_as_dict): response = self.client.get( "/api/v1/config/section/operators/option/default_queue", @@ -287,6 +304,7 @@ def test_should_respond_200_application_json(self, mock_as_dict): assert expected == response.json @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_404_when_option_not_exist(self, mock_as_dict): response = self.client.get( "/api/v1/config/section/operators/option/default_queue1", @@ -298,6 +316,7 @@ def test_should_respond_404_when_option_not_exist(self, mock_as_dict): assert "The option [operators/default_queue1] is not found in config." in response.json["detail"] @patch("airflow.api_connexion.endpoints.config_endpoint.conf.as_dict", return_value=MOCK_CONF) + @conf_vars(MOCK_CONF_TUPLES) def test_should_respond_406(self, mock_as_dict): response = self.client.get( "/api/v1/config/section/operators/option/default_queue", From 538c9f2f76b016bdacdbdab7834bd9a73147d188 Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Sun, 26 Jan 2025 00:02:51 +0100 Subject: [PATCH 7/9] provide an alternative solution for default config --- .../connections/smtp.rst | 2 + .../src/airflow/providers/smtp/CHANGELOG.rst | 7 ++ .../src/airflow/providers/smtp/hooks/smtp.py | 14 +++ .../providers/smtp/notifications/smtp.py | 97 +++++++++++++++---- .../airflow/providers/smtp/version_compat.py | 35 +++++++ .../tests/smtp/notifications/test_smtp.py | 42 +++++--- 6 files changed, 163 insertions(+), 34 deletions(-) create mode 100644 providers/src/airflow/providers/smtp/version_compat.py diff --git a/docs/apache-airflow-providers-smtp/connections/smtp.rst b/docs/apache-airflow-providers-smtp/connections/smtp.rst index 80016ec6c4d1e..62e8548d0af87 100644 --- a/docs/apache-airflow-providers-smtp/connections/smtp.rst +++ b/docs/apache-airflow-providers-smtp/connections/smtp.rst @@ -62,6 +62,8 @@ Extra (optional) * ``ssl_context``: Can be "default" or "none". Only valid when SSL is used. The "default" context provides a balance between security and compatibility, "none" is not recommended as it disables validation of certificates and allow MITM attacks, and is only needed in case your certificates are wrongly configured in your system. If not specified, defaults are taken from the "smtp_provider", "ssl_context" configuration with the fallback to "email". "ssl_context" configuration. If none of it is specified, "default" is used. + * ``subject_template``: A path to a file containing the email subject template. + * ``html_content_template``: A path to a file containing the email html content template. When specifying the connection in environment variable you should specify it using URI syntax. diff --git a/providers/src/airflow/providers/smtp/CHANGELOG.rst b/providers/src/airflow/providers/smtp/CHANGELOG.rst index 038fe52acaa0a..35934ba990ab3 100644 --- a/providers/src/airflow/providers/smtp/CHANGELOG.rst +++ b/providers/src/airflow/providers/smtp/CHANGELOG.rst @@ -27,6 +27,13 @@ Changelog --------- +2.0.0 +..... + +Breaking changes +~~~~~~~~~~~~~~~~ +The argument ``from_email`` is now an optional kwarg in ``SmtpNotifier``, and the ``to`` argument became the first positional argument. + 1.9.0 ..... diff --git a/providers/src/airflow/providers/smtp/hooks/smtp.py b/providers/src/airflow/providers/smtp/hooks/smtp.py index f6374e7fec91e..a42b23af38cda 100644 --- a/providers/src/airflow/providers/smtp/hooks/smtp.py +++ b/providers/src/airflow/providers/smtp/hooks/smtp.py @@ -160,6 +160,12 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: ), "disable_tls": BooleanField(lazy_gettext("Disable TLS"), default=False), "disable_ssl": BooleanField(lazy_gettext("Disable SSL"), default=False), + "subject_template": StringField( + lazy_gettext("Path to the subject template"), widget=BS3TextFieldWidget() + ), + "html_content_template": StringField( + lazy_gettext("Path to the html content template"), widget=BS3TextFieldWidget() + ), } def test_connection(self) -> tuple[bool, str]: @@ -382,6 +388,14 @@ def timeout(self) -> int: def use_ssl(self) -> bool: return not bool(self.conn.extra_dejson.get("disable_ssl", False)) + @property + def subject_template(self) -> str | None: + return self.conn.extra_dejson.get("subject_template") + + @property + def html_content_template(self) -> str | None: + return self.conn.extra_dejson.get("html_content_template") + @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: """Return custom field behaviour.""" diff --git a/providers/src/airflow/providers/smtp/notifications/smtp.py b/providers/src/airflow/providers/smtp/notifications/smtp.py index fb043e59c071a..873b84d576e21 100644 --- a/providers/src/airflow/providers/smtp/notifications/smtp.py +++ b/providers/src/airflow/providers/smtp/notifications/smtp.py @@ -20,11 +20,17 @@ from collections.abc import Iterable from functools import cached_property from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any from airflow.configuration import conf from airflow.notifications.basenotifier import BaseNotifier from airflow.providers.smtp.hooks.smtp import SmtpHook +from airflow.providers.smtp.version_compat import AIRFLOW_V_3_0_PLUS + +if TYPE_CHECKING: + import jinja2 + + from airflow.sdk.definitions.context import Context class SmtpNotifier(BaseNotifier): @@ -70,10 +76,8 @@ class SmtpNotifier(BaseNotifier): def __init__( self, - # TODO: Move from_email to keyword parameter in next major release so that users do not - # need to specify from_email. No argument here will lead to defaults from conf being used. - from_email: str | None, to: str | Iterable[str], + from_email: str | None = None, subject: str | None = None, html_content: str | None = None, files: list[str] | None = None, @@ -88,7 +92,7 @@ def __init__( ): super().__init__() self.smtp_conn_id = smtp_conn_id - self.from_email = from_email or conf.get("smtp", "smtp_mail_from") + self.from_email = from_email self.to = to self.files = files self.cc = cc @@ -96,37 +100,94 @@ def __init__( self.mime_subtype = mime_subtype self.mime_charset = mime_charset self.custom_headers = custom_headers - - smtp_default_templated_subject_path = conf.get( - "smtp", - "templated_email_subject_path", - fallback=(Path(__file__).parent / "templates" / "email_subject.jinja2").as_posix(), - ) - self.subject = ( - subject or Path(smtp_default_templated_subject_path).read_text().replace("\n", "").strip() - ) + self.subject = subject # If html_content is passed, prioritize it. Otherwise, if template is passed, use # it to populate html_content. Else, fall back to defaults defined in settings + self.html_content: str | None = None if html_content is not None: self.html_content = html_content elif template is not None: self.html_content = Path(template).read_text() - else: - smtp_default_templated_html_content_path = conf.get( + elif not AIRFLOW_V_3_0_PLUS: + self.html_content = conf.get( "smtp", "templated_html_content_path", - fallback=(Path(__file__).parent / "templates" / "email.html").as_posix(), + fallback=None, ) - self.html_content = Path(smtp_default_templated_html_content_path).read_text() + + if self.from_email is None and not AIRFLOW_V_3_0_PLUS: + self.from_email = conf.get("smtp", "smtp_mail_from", fallback=None) + + if self.subject is None and not AIRFLOW_V_3_0_PLUS: + self.subject = conf.get("smtp", "templated_email_subject_path", fallback=None) @cached_property def hook(self) -> SmtpHook: """Smtp Events Hook.""" return SmtpHook(smtp_conn_id=self.smtp_conn_id) + @cached_property + def _connection_from_email(self) -> str | None: + return self.hook.from_email + + @cached_property + def _connection_subject_template(self) -> str | None: + return self.hook.subject_template + + @cached_property + def _connection_html_content_template(self) -> str | None: + return self.hook.html_content_template + + def _render_fields( + self, + fields: Iterable[str], + context: Context, + jinja_env: jinja2.Environment | None = None, + ) -> None: + dag = context["dag"] + if not jinja_env: + jinja_env = self.get_template_env(dag=dag) + self._do_render_template_fields(self, fields, context, jinja_env, set()) + def notify(self, context): """Send a email via smtp server.""" with self.hook as smtp: + fields_to_re_render = [] + if self.from_email is None: + if self._connection_from_email: + self.from_email = self._connection_from_email + else: + if AIRFLOW_V_3_0_PLUS: + raise ValueError( + "you must provide from_email argument, or set a default one in the connection" + ) + else: + raise ValueError( + "you must provide from_email argument, or set a default one in the configuration or the connection" + ) + fields_to_re_render.append("from_email") + if self.subject is None: + if self._connection_subject_template: + self.subject = self._connection_subject_template + else: + smtp_default_templated_subject_path = ( + Path(__file__).parent / "templates" / "email_subject.jinja2" + ).as_posix() + self.subject = ( + Path(smtp_default_templated_subject_path).read_text().replace("\n", "").strip() + ) + fields_to_re_render.append("subject") + if self.html_content is None: + if self._connection_html_content_template: + self.html_content = self._connection_html_content_template + else: + smtp_default_templated_html_content_path = ( + Path(__file__).parent / "templates" / "email.html" + ).as_posix() + self.html_content = Path(smtp_default_templated_html_content_path).read_text() + fields_to_re_render.append("html_content") + if fields_to_re_render: + self._render_fields(fields_to_re_render, context) smtp.send_email_smtp( smtp_conn_id=self.smtp_conn_id, from_email=self.from_email, diff --git a/providers/src/airflow/providers/smtp/version_compat.py b/providers/src/airflow/providers/smtp/version_compat.py new file mode 100644 index 0000000000000..48d122b669696 --- /dev/null +++ b/providers/src/airflow/providers/smtp/version_compat.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/tests/smtp/notifications/test_smtp.py b/providers/tests/smtp/notifications/test_smtp.py index 8ce28637d8fb3..4ea2b8dbbe9e9 100644 --- a/providers/tests/smtp/notifications/test_smtp.py +++ b/providers/tests/smtp/notifications/test_smtp.py @@ -31,8 +31,7 @@ ) from airflow.utils import timezone -from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test @@ -124,13 +123,19 @@ def test_notifier_templated(self, mock_smtphook_hook, dag_maker): def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance): ti = create_task_instance(dag_id="dag", task_id="op", logical_date=timezone.datetime(2018, 1, 1)) context = {"dag": ti.dag_run.dag, "ti": ti} + mock_smtphook_hook.return_value.from_email = "default@email.com" + mock_smtphook_hook.return_value.subject_template = None + mock_smtphook_hook.return_value.html_content_template = None notifier = SmtpNotifier( - from_email=conf.get("smtp", "smtp_mail_from"), to="test_reciver@test.com", ) notifier(context) + if AIRFLOW_V_3_0_PLUS: + expected_from_email = "default@email.com" + else: + expected_from_email = conf.get("smtp", "smtp_mail_from") mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with( - from_email=conf.get("smtp", "smtp_mail_from"), + from_email=expected_from_email, to="test_reciver@test.com", subject="DAG dag - Task op - Run ID test in State None", html_content=mock.ANY, @@ -146,7 +151,7 @@ def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance): assert f"{NUM_TRY} of 1" in content @mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook") - def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, create_task_instance): + def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, create_task_instance, caplog): ti = create_task_instance(dag_id="dag", task_id="op", logical_date=timezone.datetime(2018, 1, 1)) context = {"dag": ti.dag_run.dag, "ti": ti} @@ -160,17 +165,16 @@ def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, create_tas f_content.write("Mock content goes here") f_content.flush() - with conf_vars( - { - ("smtp", "templated_html_content_path"): f_content.name, - ("smtp", "templated_email_subject_path"): f_subject.name, - } - ): - notifier = SmtpNotifier( - from_email=conf.get("smtp", "smtp_mail_from"), - to="test_reciver@test.com", - ) - notifier(context) + mock_smtphook_hook.return_value.from_email = None + mock_smtphook_hook.return_value.html_content_template = f_content.name + mock_smtphook_hook.return_value.subject_template = f_subject.name + + notifier = SmtpNotifier( + to="test_reciver@test.com", + ) + + notifier(context) + if not AIRFLOW_V_3_0_PLUS: mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with( from_email=conf.get("smtp", "smtp_mail_from"), to="test_reciver@test.com", @@ -184,3 +188,9 @@ def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, create_tas mime_charset="utf-8", custom_headers=None, ) + else: + assert ( + "Failed to send notification: you must provide from_email argument, or set a default one in the connection" + in caplog.text + ) + mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_not_called() From 34022a29a7501c9f535600ec20bb551cc75a7893 Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Sun, 26 Jan 2025 00:09:12 +0100 Subject: [PATCH 8/9] filter smtp from check-no-providers-in-core-examples rule --- .pre-commit-config.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8431755955c12..866ffd543aa20 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -574,7 +574,8 @@ repos: language: pygrep name: No providers imports in core example DAGs description: The core example DAGs have no dependencies other than standard provider or core Airflow - entry: "^\\s*from airflow\\.providers.(?!standard.)" + # TODO: rewrite airflow/example_dags/example_dag_decorator.py and remove `smtp` provider exception + entry: "^\\s*from airflow\\.providers\\.(?!standard|smtp)" pass_filenames: true files: ^airflow/example_dags/.*\.py$ - id: check-no-airflow-deprecation-in-providers From adbba0b8bfb8406a762c2b4b7111a6b166b3c6d7 Mon Sep 17 00:00:00 2001 From: hussein-awala Date: Sun, 26 Jan 2025 17:18:15 +0100 Subject: [PATCH 9/9] fix loading templates --- .../providers/smtp/notifications/smtp.py | 16 ++--- .../tests/smtp/notifications/test_smtp.py | 59 ++++++++++--------- 2 files changed, 39 insertions(+), 36 deletions(-) diff --git a/providers/src/airflow/providers/smtp/notifications/smtp.py b/providers/src/airflow/providers/smtp/notifications/smtp.py index 873b84d576e21..673e8a04ce77d 100644 --- a/providers/src/airflow/providers/smtp/notifications/smtp.py +++ b/providers/src/airflow/providers/smtp/notifications/smtp.py @@ -168,23 +168,23 @@ def notify(self, context): fields_to_re_render.append("from_email") if self.subject is None: if self._connection_subject_template: - self.subject = self._connection_subject_template + subject_template_path = Path(self._connection_subject_template).as_posix() else: - smtp_default_templated_subject_path = ( + subject_template_path = ( Path(__file__).parent / "templates" / "email_subject.jinja2" ).as_posix() - self.subject = ( - Path(smtp_default_templated_subject_path).read_text().replace("\n", "").strip() - ) + self.subject = Path(subject_template_path).read_text().replace("\n", "").strip() fields_to_re_render.append("subject") if self.html_content is None: if self._connection_html_content_template: - self.html_content = self._connection_html_content_template + default_html_content_template_path = Path( + self._connection_html_content_template + ).as_posix() else: - smtp_default_templated_html_content_path = ( + default_html_content_template_path = ( Path(__file__).parent / "templates" / "email.html" ).as_posix() - self.html_content = Path(smtp_default_templated_html_content_path).read_text() + self.html_content = Path(default_html_content_template_path).read_text() fields_to_re_render.append("html_content") if fields_to_re_render: self._render_fields(fields_to_re_render, context) diff --git a/providers/tests/smtp/notifications/test_smtp.py b/providers/tests/smtp/notifications/test_smtp.py index 4ea2b8dbbe9e9..ef9f40cca4669 100644 --- a/providers/tests/smtp/notifications/test_smtp.py +++ b/providers/tests/smtp/notifications/test_smtp.py @@ -123,32 +123,35 @@ def test_notifier_templated(self, mock_smtphook_hook, dag_maker): def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance): ti = create_task_instance(dag_id="dag", task_id="op", logical_date=timezone.datetime(2018, 1, 1)) context = {"dag": ti.dag_run.dag, "ti": ti} - mock_smtphook_hook.return_value.from_email = "default@email.com" - mock_smtphook_hook.return_value.subject_template = None - mock_smtphook_hook.return_value.html_content_template = None - notifier = SmtpNotifier( - to="test_reciver@test.com", - ) - notifier(context) - if AIRFLOW_V_3_0_PLUS: - expected_from_email = "default@email.com" - else: - expected_from_email = conf.get("smtp", "smtp_mail_from") - mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with( - from_email=expected_from_email, - to="test_reciver@test.com", - subject="DAG dag - Task op - Run ID test in State None", - html_content=mock.ANY, - smtp_conn_id="smtp_default", - files=None, - cc=None, - bcc=None, - mime_subtype="mixed", - mime_charset="utf-8", - custom_headers=None, - ) - content = mock_smtphook_hook.return_value.__enter__().send_email_smtp.call_args.kwargs["html_content"] - assert f"{NUM_TRY} of 1" in content + with ( + tempfile.NamedTemporaryFile(mode="wt", suffix=".txt") as f_content, + ): + f_content.write("Mock content goes here") + f_content.flush() + mock_smtphook_hook.return_value.from_email = "default@email.com" + mock_smtphook_hook.return_value.subject_template = None + mock_smtphook_hook.return_value.html_content_template = f_content.name + notifier = SmtpNotifier( + to="test_reciver@test.com", + ) + notifier(context) + if AIRFLOW_V_3_0_PLUS: + expected_from_email = "default@email.com" + else: + expected_from_email = conf.get("smtp", "smtp_mail_from") + mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with( + from_email=expected_from_email, + to="test_reciver@test.com", + subject="DAG dag - Task op - Run ID test in State None", + html_content="Mock content goes here", + smtp_conn_id="smtp_default", + files=None, + cc=None, + bcc=None, + mime_subtype="mixed", + mime_charset="utf-8", + custom_headers=None, + ) @mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook") def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, create_task_instance, caplog): @@ -190,7 +193,7 @@ def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, create_tas ) else: assert ( - "Failed to send notification: you must provide from_email argument, or set a default one in the connection" - in caplog.text + "Failed to send notification: you must provide from_email argument," + " or set a default one in the connection" in caplog.text ) mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_not_called()