Skip to content

Commit

Permalink
Revert "Remove deprecated parameters from airflow (core) Operators (#…
Browse files Browse the repository at this point in the history
…41736)"

This reverts commit 27fe45b.
  • Loading branch information
eladkal authored Aug 27, 2024
1 parent 615cddf commit c1e54dd
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 9 deletions.
11 changes: 10 additions & 1 deletion airflow/operators/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
from __future__ import annotations

import datetime
import warnings
from typing import TYPE_CHECKING, Iterable

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.operators.branch import BaseBranchOperator
from airflow.utils import timezone

Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(
target_lower: datetime.datetime | datetime.time | None,
target_upper: datetime.datetime | datetime.time | None,
use_task_logical_date: bool = False,
use_task_execution_date: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -69,6 +71,13 @@ def __init__(
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.use_task_logical_date = use_task_logical_date
if use_task_execution_date:
self.use_task_logical_date = use_task_execution_date
warnings.warn(
"Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``.",
RemovedInAirflow3Warning,
stacklevel=2,
)

def choose_branch(self, context: Context) -> str | Iterable[str]:
if self.use_task_logical_date:
Expand Down
12 changes: 12 additions & 0 deletions airflow/operators/trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import datetime
import json
import time
import warnings
from typing import TYPE_CHECKING, Any, Sequence, cast

from sqlalchemy import select
Expand All @@ -33,6 +34,7 @@
AirflowSkipException,
DagNotFound,
DagRunAlreadyExists,
RemovedInAirflow3Warning,
)
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
Expand Down Expand Up @@ -108,6 +110,7 @@ class TriggerDagRunOperator(BaseOperator):
DAG for the same logical date already exists.
:param deferrable: If waiting for completion, whether or not to defer the task until done,
default is ``False``.
:param execution_date: Deprecated parameter; same as ``logical_date``.
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -136,6 +139,7 @@ def __init__(
failed_states: list[str | DagRunState] | None = None,
skip_when_already_exists: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
execution_date: str | datetime.datetime | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -156,6 +160,14 @@ def __init__(
self.skip_when_already_exists = skip_when_already_exists
self._defer = deferrable

if execution_date is not None:
warnings.warn(
"Parameter 'execution_date' is deprecated. Use 'logical_date' instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
logical_date = execution_date

if logical_date is not None and not isinstance(logical_date, (str, datetime.datetime)):
type_name = type(logical_date).__name__
raise TypeError(
Expand Down
11 changes: 11 additions & 0 deletions airflow/operators/weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
# under the License.
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Iterable

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.operators.branch import BaseBranchOperator
from airflow.utils import timezone
from airflow.utils.weekday import WeekDay
Expand Down Expand Up @@ -89,6 +91,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
:param use_task_logical_date: If ``True``, uses task's logical date to compare
with is_today. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week.
:param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date`
"""

def __init__(
Expand All @@ -98,13 +101,21 @@ def __init__(
follow_task_ids_if_false: str | Iterable[str],
week_day: str | Iterable[str] | WeekDay | Iterable[WeekDay],
use_task_logical_date: bool = False,
use_task_execution_day: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.week_day = week_day
self.use_task_logical_date = use_task_logical_date
if use_task_execution_day:
self.use_task_logical_date = use_task_execution_day
warnings.warn(
"Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
RemovedInAirflow3Warning,
stacklevel=2,
)
self._week_day_num = WeekDay.validate_week_day(week_day)

def choose_branch(self, context: Context) -> str | Iterable[str]:
Expand Down
7 changes: 0 additions & 7 deletions newsfragments/41736.significant.rst

This file was deleted.

16 changes: 16 additions & 0 deletions tests/operators/test_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,19 @@ def test_branch_datetime_operator_use_task_logical_date(self, dag_maker, target_
"branch_2": State.SKIPPED,
}
)

def test_deprecation_warning(self):
warning_message = (
"""Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
BranchDateTimeOperator(
task_id="warning",
follow_task_ids_if_true="branch_1",
follow_task_ids_if_false="branch_2",
target_upper=timezone.datetime(2020, 7, 7, 10, 30, 0),
target_lower=timezone.datetime(2020, 7, 7, 10, 30, 0),
use_task_execution_date=True,
dag=self.dag,
)
assert warning_message == str(warnings[0].message)
29 changes: 28 additions & 1 deletion tests/operators/test_trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import pendulum
import pytest

from airflow.exceptions import AirflowException, DagRunAlreadyExists, TaskDeferred
from airflow.exceptions import AirflowException, DagRunAlreadyExists, RemovedInAirflow3Warning, TaskDeferred
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -383,6 +383,7 @@ def test_trigger_dagrun_with_skip_when_already_exists(self, dag_maker):
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
trigger_run_id="dummy_run_id",
execution_date=None,
reset_dag_run=False,
skip_when_already_exists=True,
)
Expand Down Expand Up @@ -642,6 +643,32 @@ def test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self,
with pytest.raises(AirflowException, match="failed with failed state"):
task.execute_complete(context={}, event=trigger.serialize())

def test_trigger_dagrun_with_execution_date(self, dag_maker):
"""Test TriggerDagRunOperator with custom execution_date (deprecated parameter)"""
custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
with dag_maker(
TEST_DAG_ID, default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, serialized=True
) as dag:
with pytest.warns(
RemovedInAirflow3Warning,
match="Parameter 'execution_date' is deprecated. Use 'logical_date' instead.",
):
task = TriggerDagRunOperator(
task_id="test_trigger_dagrun_with_execution_date",
trigger_dag_id=TRIGGERED_DAG_ID,
execution_date=custom_execution_date,
)
self.re_sync_triggered_dag_to_db(dag, dag_maker)
dag_maker.create_dagrun()
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

with create_session() as session:
dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one()
assert dagrun.external_trigger
assert dagrun.logical_date == custom_execution_date
assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date)
self.assert_extra_link(dagrun, task, session)

@pytest.mark.skip_if_database_isolation_mode # Known to be broken in db isolation mode
@pytest.mark.parametrize(
argnames=["trigger_logical_date"],
Expand Down
20 changes: 20 additions & 0 deletions tests/operators/test_weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,23 @@ def test_branch_xcom_push_true_branch(self, dag_maker):
for ti in tis:
if ti.task_id == "make_choice":
assert ti.xcom_pull(task_ids="make_choice") == "branch_1"

def test_deprecation_warning(self, dag_maker):
warning_message = (
"""Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
with dag_maker(
"branch_day_of_week_operator_test",
start_date=DEFAULT_DATE,
schedule=INTERVAL,
serialized=True,
):
BranchDayOfWeekOperator(
task_id="week_day_warn",
follow_task_ids_if_true="branch_1",
follow_task_ids_if_false="branch_2",
week_day="Monday",
use_task_execution_day=True,
)
assert warning_message == str(warnings[0].message)

0 comments on commit c1e54dd

Please sign in to comment.