Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated parameters from airflow (core) Operators #41736

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions airflow/operators/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
from __future__ import annotations

import datetime
import warnings
from typing import TYPE_CHECKING, Iterable

from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.exceptions import AirflowException
from airflow.operators.branch import BaseBranchOperator
from airflow.utils import timezone

Expand Down Expand Up @@ -56,7 +55,6 @@ def __init__(
target_lower: datetime.datetime | datetime.time | None,
target_upper: datetime.datetime | datetime.time | None,
use_task_logical_date: bool = False,
use_task_execution_date: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -71,13 +69,6 @@ def __init__(
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.use_task_logical_date = use_task_logical_date
if use_task_execution_date:
self.use_task_logical_date = use_task_execution_date
warnings.warn(
"Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``.",
RemovedInAirflow3Warning,
stacklevel=2,
)

def choose_branch(self, context: Context) -> str | Iterable[str]:
if self.use_task_logical_date:
Expand Down
12 changes: 0 additions & 12 deletions airflow/operators/trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import datetime
import json
import time
import warnings
from typing import TYPE_CHECKING, Any, Sequence, cast

from sqlalchemy import select
Expand All @@ -34,7 +33,6 @@
AirflowSkipException,
DagNotFound,
DagRunAlreadyExists,
RemovedInAirflow3Warning,
)
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
Expand Down Expand Up @@ -110,7 +108,6 @@ class TriggerDagRunOperator(BaseOperator):
DAG for the same logical date already exists.
:param deferrable: If waiting for completion, whether or not to defer the task until done,
default is ``False``.
:param execution_date: Deprecated parameter; same as ``logical_date``.
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -139,7 +136,6 @@ def __init__(
failed_states: list[str | DagRunState] | None = None,
skip_when_already_exists: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
execution_date: str | datetime.datetime | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -160,14 +156,6 @@ def __init__(
self.skip_when_already_exists = skip_when_already_exists
self._defer = deferrable

if execution_date is not None:
warnings.warn(
"Parameter 'execution_date' is deprecated. Use 'logical_date' instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
logical_date = execution_date

if logical_date is not None and not isinstance(logical_date, (str, datetime.datetime)):
type_name = type(logical_date).__name__
raise TypeError(
Expand Down
11 changes: 0 additions & 11 deletions airflow/operators/weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
# under the License.
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Iterable

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.operators.branch import BaseBranchOperator
from airflow.utils import timezone
from airflow.utils.weekday import WeekDay
Expand Down Expand Up @@ -91,7 +89,6 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
:param use_task_logical_date: If ``True``, uses task's logical date to compare
with is_today. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week.
:param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date`
"""

def __init__(
Expand All @@ -101,21 +98,13 @@ def __init__(
follow_task_ids_if_false: str | Iterable[str],
week_day: str | Iterable[str] | WeekDay | Iterable[WeekDay],
use_task_logical_date: bool = False,
use_task_execution_day: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.week_day = week_day
self.use_task_logical_date = use_task_logical_date
if use_task_execution_day:
self.use_task_logical_date = use_task_execution_day
warnings.warn(
"Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
RemovedInAirflow3Warning,
stacklevel=2,
)
self._week_day_num = WeekDay.validate_week_day(week_day)

def choose_branch(self, context: Context) -> str | Iterable[str]:
Expand Down
7 changes: 7 additions & 0 deletions newsfragments/41736.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Removed deprecated parameters from core-operators.

Parameters removed:

- airflow.operators.datetime.BranchDateTimeOperator: use_task_execution_date
- airflow.operators.trigger_dagrun.TriggerDagRunOperator: execution_date
- airflow.operators.weekday.BranchDayOfWeekOperator: use_task_execution_day
16 changes: 0 additions & 16 deletions tests/operators/test_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,19 +253,3 @@ def test_branch_datetime_operator_use_task_logical_date(self, dag_maker, target_
"branch_2": State.SKIPPED,
}
)

def test_deprecation_warning(self):
warning_message = (
"""Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
BranchDateTimeOperator(
task_id="warning",
follow_task_ids_if_true="branch_1",
follow_task_ids_if_false="branch_2",
target_upper=timezone.datetime(2020, 7, 7, 10, 30, 0),
target_lower=timezone.datetime(2020, 7, 7, 10, 30, 0),
use_task_execution_date=True,
dag=self.dag,
)
assert warning_message == str(warnings[0].message)
29 changes: 1 addition & 28 deletions tests/operators/test_trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import pendulum
import pytest

from airflow.exceptions import AirflowException, DagRunAlreadyExists, RemovedInAirflow3Warning, TaskDeferred
from airflow.exceptions import AirflowException, DagRunAlreadyExists, TaskDeferred
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -383,7 +383,6 @@ def test_trigger_dagrun_with_skip_when_already_exists(self, dag_maker):
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
trigger_run_id="dummy_run_id",
execution_date=None,
reset_dag_run=False,
skip_when_already_exists=True,
)
Expand Down Expand Up @@ -643,32 +642,6 @@ def test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self,
with pytest.raises(AirflowException, match="failed with failed state"):
task.execute_complete(context={}, event=trigger.serialize())

def test_trigger_dagrun_with_execution_date(self, dag_maker):
"""Test TriggerDagRunOperator with custom execution_date (deprecated parameter)"""
custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
with dag_maker(
TEST_DAG_ID, default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, serialized=True
) as dag:
with pytest.warns(
RemovedInAirflow3Warning,
match="Parameter 'execution_date' is deprecated. Use 'logical_date' instead.",
):
task = TriggerDagRunOperator(
task_id="test_trigger_dagrun_with_execution_date",
trigger_dag_id=TRIGGERED_DAG_ID,
execution_date=custom_execution_date,
)
self.re_sync_triggered_dag_to_db(dag, dag_maker)
dag_maker.create_dagrun()
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

with create_session() as session:
dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one()
assert dagrun.external_trigger
assert dagrun.logical_date == custom_execution_date
assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date)
self.assert_extra_link(dagrun, task, session)

@pytest.mark.skip_if_database_isolation_mode # Known to be broken in db isolation mode
@pytest.mark.parametrize(
argnames=["trigger_logical_date"],
Expand Down
20 changes: 0 additions & 20 deletions tests/operators/test_weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,23 +285,3 @@ def test_branch_xcom_push_true_branch(self, dag_maker):
for ti in tis:
if ti.task_id == "make_choice":
assert ti.xcom_pull(task_ids="make_choice") == "branch_1"

def test_deprecation_warning(self, dag_maker):
warning_message = (
"""Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
with dag_maker(
"branch_day_of_week_operator_test",
start_date=DEFAULT_DATE,
schedule=INTERVAL,
serialized=True,
):
BranchDayOfWeekOperator(
task_id="week_day_warn",
follow_task_ids_if_true="branch_1",
follow_task_ids_if_false="branch_2",
week_day="Monday",
use_task_execution_day=True,
)
assert warning_message == str(warnings[0].message)