Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow 3: python operators deprecations removal #41493

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 2 additions & 44 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,38 +77,6 @@ def is_venv_installed() -> bool:
return False


def task(python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs):
"""
Use :func:`airflow.decorators.task` instead, this is deprecated.

Calls ``@task.python`` and allows users to turn a Python function into
an Airflow task.

:param python_callable: A reference to an object that is callable
:param op_kwargs: a dictionary of keyword arguments that will get unpacked
in your function (templated)
:param op_args: a list of positional arguments that will get unpacked when
calling your callable (templated)
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
Defaults to False.
"""
# To maintain backwards compatibility, we import the task object into this file
# This prevents breakages in dags that use `from airflow.operators.python import task`
from airflow.decorators.python import python_task

warnings.warn(
"""airflow.operators.python.task is deprecated. Please use the following instead

from airflow.decorators import task
@task
def my_task()""",
RemovedInAirflow3Warning,
stacklevel=2,
)
return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)


@cache
def _parse_version_info(text: str) -> tuple[int, int, int, str, int]:
"""Parse python version info from a text."""
Expand Down Expand Up @@ -213,13 +181,6 @@ def __init__(
show_return_value_in_logs: bool = True,
**kwargs,
) -> None:
if kwargs.get("provide_context"):
warnings.warn(
"provide_context is deprecated as of 2.0 and is no longer required",
RemovedInAirflow3Warning,
stacklevel=2,
)
kwargs.pop("provide_context", None)
super().__init__(**kwargs)
if not callable(python_callable):
raise AirflowException("`python_callable` param must be callable")
Expand Down Expand Up @@ -732,11 +693,8 @@ def __init__(
f"Sys version: {sys.version_info}. Virtual environment version: {python_version}"
)
if python_version is not None and not isinstance(python_version, str):
warnings.warn(
"Passing non-string types (e.g. int or float) as python_version "
"is deprecated. Please use string value instead.",
RemovedInAirflow3Warning,
stacklevel=2,
raise AirflowException(
"Passing non-string types (e.g. int or float) as python_version not supported"
)
if not is_venv_installed():
raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.")
Expand Down
1 change: 1 addition & 0 deletions newsfragments/41493.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Removed deprecated method ``task`` from ``airflow.operators.python`` operator. Please use ``python_task`` from ``airflow.decorators.python`` instead.
Copy link
Contributor

@eladkal eladkal Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newsfragment will be about removing all the classses in the file.
This specific removal you are referring to will be in provider breaking change

Copy link
Member

@potiuk potiuk Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will have to keep it in newsfragments, so far those were "core" operators - and we have not moved them to "provider.standard" yet - and as I understand that, the standard provider will be part of the Airflow 3 migration (we will not remove old Python operators from Airflow 2 - we will add deprecation there) so the newsfragment should be still in Airflow - I think.

8 changes: 3 additions & 5 deletions tests/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ def func(custom, dag):
assert 1 == custom, "custom should be 1"
assert dag is not None, "dag should be set"

with pytest.warns(RemovedInAirflow3Warning):
error_message = "Invalid arguments were passed to PythonOperator \\(task_id: task_test-provide-context-does-not-fail\\). Invalid arguments were:\n\\*\\*kwargs: {'provide_context': True}"
with pytest.raises(AirflowException, match=error_message):
self.run_as_task(func, op_kwargs={"custom": 1}, provide_context=True)

def test_context_with_conflicting_op_args(self):
Expand Down Expand Up @@ -1334,10 +1335,7 @@ def f():
return
raise RuntimeError

with pytest.warns(
RemovedInAirflow3Warning, match="Passing non-string types.*python_version is deprecated"
):
self.run_as_task(f, python_version=3, serializer=serializer, requirements=extra_requirements)
self.run_as_task(f, python_version="3", serializer=serializer, requirements=extra_requirements)

def test_with_default(self):
def f(a):
Expand Down