From c1e54dd3b1d60797cc6ddefc8992a19ff0398f7e Mon Sep 17 00:00:00 2001 From: Elad Kalif <45845474+eladkal@users.noreply.github.com> Date: Tue, 27 Aug 2024 19:28:44 +0300 Subject: [PATCH] Revert "Remove deprecated parameters from airflow (core) Operators (#41736)" This reverts commit 27fe45bdfe25eabba24c9d4de0b2e1807ea36840. --- airflow/operators/datetime.py | 11 +++++++++- airflow/operators/trigger_dagrun.py | 12 +++++++++++ airflow/operators/weekday.py | 11 ++++++++++ newsfragments/41736.significant.rst | 7 ------- tests/operators/test_datetime.py | 16 ++++++++++++++ tests/operators/test_trigger_dagrun.py | 29 +++++++++++++++++++++++++- tests/operators/test_weekday.py | 20 ++++++++++++++++++ 7 files changed, 97 insertions(+), 9 deletions(-) delete mode 100644 newsfragments/41736.significant.rst diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py index 4455b84dd3bb9..732b380077907 100644 --- a/airflow/operators/datetime.py +++ b/airflow/operators/datetime.py @@ -17,9 +17,10 @@ from __future__ import annotations import datetime +import warnings from typing import TYPE_CHECKING, Iterable -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, RemovedInAirflow3Warning from airflow.operators.branch import BaseBranchOperator from airflow.utils import timezone @@ -55,6 +56,7 @@ def __init__( target_lower: datetime.datetime | datetime.time | None, target_upper: datetime.datetime | datetime.time | None, use_task_logical_date: bool = False, + use_task_execution_date: bool = False, **kwargs, ) -> None: super().__init__(**kwargs) @@ -69,6 +71,13 @@ def __init__( self.follow_task_ids_if_true = follow_task_ids_if_true self.follow_task_ids_if_false = follow_task_ids_if_false self.use_task_logical_date = use_task_logical_date + if use_task_execution_date: + self.use_task_logical_date = use_task_execution_date + warnings.warn( + "Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``.", + RemovedInAirflow3Warning, + stacklevel=2, + ) def choose_branch(self, context: Context) -> str | Iterable[str]: if self.use_task_logical_date: diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index 539506ff9b0b1..2521297dcf936 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -20,6 +20,7 @@ import datetime import json import time +import warnings from typing import TYPE_CHECKING, Any, Sequence, cast from sqlalchemy import select @@ -33,6 +34,7 @@ AirflowSkipException, DagNotFound, DagRunAlreadyExists, + RemovedInAirflow3Warning, ) from airflow.models.baseoperator import BaseOperator from airflow.models.baseoperatorlink import BaseOperatorLink @@ -108,6 +110,7 @@ class TriggerDagRunOperator(BaseOperator): DAG for the same logical date already exists. :param deferrable: If waiting for completion, whether or not to defer the task until done, default is ``False``. + :param execution_date: Deprecated parameter; same as ``logical_date``. """ template_fields: Sequence[str] = ( @@ -136,6 +139,7 @@ def __init__( failed_states: list[str | DagRunState] | None = None, skip_when_already_exists: bool = False, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + execution_date: str | datetime.datetime | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -156,6 +160,14 @@ def __init__( self.skip_when_already_exists = skip_when_already_exists self._defer = deferrable + if execution_date is not None: + warnings.warn( + "Parameter 'execution_date' is deprecated. Use 'logical_date' instead.", + RemovedInAirflow3Warning, + stacklevel=2, + ) + logical_date = execution_date + if logical_date is not None and not isinstance(logical_date, (str, datetime.datetime)): type_name = type(logical_date).__name__ raise TypeError( diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py index f59a1da134888..af3e332899bf9 100644 --- a/airflow/operators/weekday.py +++ b/airflow/operators/weekday.py @@ -17,8 +17,10 @@ # under the License. from __future__ import annotations +import warnings from typing import TYPE_CHECKING, Iterable +from airflow.exceptions import RemovedInAirflow3Warning from airflow.operators.branch import BaseBranchOperator from airflow.utils import timezone from airflow.utils.weekday import WeekDay @@ -89,6 +91,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator): :param use_task_logical_date: If ``True``, uses task's logical date to compare with is_today. Execution Date is Useful for backfilling. If ``False``, uses system's day of the week. + :param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date` """ def __init__( @@ -98,6 +101,7 @@ def __init__( follow_task_ids_if_false: str | Iterable[str], week_day: str | Iterable[str] | WeekDay | Iterable[WeekDay], use_task_logical_date: bool = False, + use_task_execution_day: bool = False, **kwargs, ) -> None: super().__init__(**kwargs) @@ -105,6 +109,13 @@ def __init__( self.follow_task_ids_if_false = follow_task_ids_if_false self.week_day = week_day self.use_task_logical_date = use_task_logical_date + if use_task_execution_day: + self.use_task_logical_date = use_task_execution_day + warnings.warn( + "Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.", + RemovedInAirflow3Warning, + stacklevel=2, + ) self._week_day_num = WeekDay.validate_week_day(week_day) def choose_branch(self, context: Context) -> str | Iterable[str]: diff --git a/newsfragments/41736.significant.rst b/newsfragments/41736.significant.rst deleted file mode 100644 index 2c90979d80f06..0000000000000 --- a/newsfragments/41736.significant.rst +++ /dev/null @@ -1,7 +0,0 @@ -Removed deprecated parameters from core-operators. - -Parameters removed: - -- airflow.operators.datetime.BranchDateTimeOperator: use_task_execution_date -- airflow.operators.trigger_dagrun.TriggerDagRunOperator: execution_date -- airflow.operators.weekday.BranchDayOfWeekOperator: use_task_execution_day diff --git a/tests/operators/test_datetime.py b/tests/operators/test_datetime.py index e090d11ebeda1..e4294a885679e 100644 --- a/tests/operators/test_datetime.py +++ b/tests/operators/test_datetime.py @@ -253,3 +253,19 @@ def test_branch_datetime_operator_use_task_logical_date(self, dag_maker, target_ "branch_2": State.SKIPPED, } ) + + def test_deprecation_warning(self): + warning_message = ( + """Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``.""" + ) + with pytest.warns(DeprecationWarning) as warnings: + BranchDateTimeOperator( + task_id="warning", + follow_task_ids_if_true="branch_1", + follow_task_ids_if_false="branch_2", + target_upper=timezone.datetime(2020, 7, 7, 10, 30, 0), + target_lower=timezone.datetime(2020, 7, 7, 10, 30, 0), + use_task_execution_date=True, + dag=self.dag, + ) + assert warning_message == str(warnings[0].message) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 52a11d10e5e33..9ec22e7e7a3de 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -24,7 +24,7 @@ import pendulum import pytest -from airflow.exceptions import AirflowException, DagRunAlreadyExists, TaskDeferred +from airflow.exceptions import AirflowException, DagRunAlreadyExists, RemovedInAirflow3Warning, TaskDeferred from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun @@ -383,6 +383,7 @@ def test_trigger_dagrun_with_skip_when_already_exists(self, dag_maker): task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, trigger_run_id="dummy_run_id", + execution_date=None, reset_dag_run=False, skip_when_already_exists=True, ) @@ -642,6 +643,32 @@ def test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self, with pytest.raises(AirflowException, match="failed with failed state"): task.execute_complete(context={}, event=trigger.serialize()) + def test_trigger_dagrun_with_execution_date(self, dag_maker): + """Test TriggerDagRunOperator with custom execution_date (deprecated parameter)""" + custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5) + with dag_maker( + TEST_DAG_ID, default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, serialized=True + ) as dag: + with pytest.warns( + RemovedInAirflow3Warning, + match="Parameter 'execution_date' is deprecated. Use 'logical_date' instead.", + ): + task = TriggerDagRunOperator( + task_id="test_trigger_dagrun_with_execution_date", + trigger_dag_id=TRIGGERED_DAG_ID, + execution_date=custom_execution_date, + ) + self.re_sync_triggered_dag_to_db(dag, dag_maker) + dag_maker.create_dagrun() + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + with create_session() as session: + dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one() + assert dagrun.external_trigger + assert dagrun.logical_date == custom_execution_date + assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date) + self.assert_extra_link(dagrun, task, session) + @pytest.mark.skip_if_database_isolation_mode # Known to be broken in db isolation mode @pytest.mark.parametrize( argnames=["trigger_logical_date"], diff --git a/tests/operators/test_weekday.py b/tests/operators/test_weekday.py index 176246cf7bf10..9030942c6576e 100644 --- a/tests/operators/test_weekday.py +++ b/tests/operators/test_weekday.py @@ -285,3 +285,23 @@ def test_branch_xcom_push_true_branch(self, dag_maker): for ti in tis: if ti.task_id == "make_choice": assert ti.xcom_pull(task_ids="make_choice") == "branch_1" + + def test_deprecation_warning(self, dag_maker): + warning_message = ( + """Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.""" + ) + with pytest.warns(DeprecationWarning) as warnings: + with dag_maker( + "branch_day_of_week_operator_test", + start_date=DEFAULT_DATE, + schedule=INTERVAL, + serialized=True, + ): + BranchDayOfWeekOperator( + task_id="week_day_warn", + follow_task_ids_if_true="branch_1", + follow_task_ids_if_false="branch_2", + week_day="Monday", + use_task_execution_day=True, + ) + assert warning_message == str(warnings[0].message)