From 0adf9f599110874e7f0e142b49d4def6d5c818af Mon Sep 17 00:00:00 2001 From: Amogh Date: Mon, 27 Jan 2025 18:36:17 +0530 Subject: [PATCH] AIP-72: Move DAG Params to Task SDK --- airflow/cli/commands/remote_commands/task_command.py | 2 +- airflow/example_dags/example_params_trigger_ui.py | 2 +- airflow/example_dags/example_params_ui_tutorial.py | 2 +- airflow/models/__init__.py | 5 +++-- airflow/models/taskinstance.py | 2 +- airflow/serialization/serialized_objects.py | 4 ++-- docs/apache-airflow/core-concepts/params.rst | 2 +- .../providers/edge/example_dags/integration_test.py | 2 +- .../airflow/providers/edge/example_dags/win_notepad.py | 2 +- .../airflow/providers/edge/example_dags/win_test.py | 2 +- .../api_endpoints/test_dag_run_endpoint.py | 2 +- task_sdk/src/airflow/sdk/__init__.py | 2 ++ .../src/airflow/sdk/definitions/asset/decorators.py | 2 +- task_sdk/src/airflow/sdk/definitions/baseoperator.py | 2 +- task_sdk/src/airflow/sdk/definitions/dag.py | 2 +- task_sdk/src/airflow/sdk/definitions/mappedoperator.py | 2 +- .../src/airflow/sdk/definitions}/param.py | 0 task_sdk/src/airflow/sdk/execution_time/context.py | 2 +- task_sdk/tests/definitions/test_dag.py | 2 +- tests/api_connexion/endpoints/test_dag_endpoint.py | 8 ++++---- tests/api_connexion/endpoints/test_dag_run_endpoint.py | 2 +- tests/api_connexion/endpoints/test_task_endpoint.py | 10 +++++----- tests/api_connexion/schemas/test_dag_schema.py | 4 ++-- tests/api_connexion/schemas/test_task_schema.py | 2 +- .../api_fastapi/core_api/routes/public/test_dag_run.py | 2 +- tests/api_fastapi/core_api/routes/public/test_dags.py | 2 +- tests/api_fastapi/core_api/routes/public/test_tasks.py | 10 +++++----- tests/dags/test_invalid_param.py | 2 +- tests/dags/test_invalid_param2.py | 2 +- tests/dags/test_invalid_param3.py | 2 +- tests/dags/test_invalid_param4.py | 2 +- tests/dags/test_valid_param.py | 2 +- tests/dags/test_valid_param2.py | 2 +- tests/models/test_dag.py | 4 ++-- tests/models/test_param.py | 4 ++-- tests/models/test_taskinstance.py | 2 +- tests/serialization/serializers/test_serializers.py | 2 +- tests/serialization/test_dag_serialization.py | 8 ++++---- tests/serialization/test_serialized_objects.py | 2 +- tests/www/views/test_views_trigger_dag.py | 2 +- 40 files changed, 60 insertions(+), 57 deletions(-) rename {airflow/models => task_sdk/src/airflow/sdk/definitions}/param.py (100%) diff --git a/airflow/cli/commands/remote_commands/task_command.py b/airflow/cli/commands/remote_commands/task_command.py index b1f9182e4c9c36..8bba6ebd891738 100644 --- a/airflow/cli/commands/remote_commands/task_command.py +++ b/airflow/cli/commands/remote_commands/task_command.py @@ -45,8 +45,8 @@ from airflow.models import TaskInstance from airflow.models.dag import DAG, _run_inline_trigger from airflow.models.dagrun import DagRun -from airflow.models.param import ParamsDict from airflow.models.taskinstance import TaskReturnCode +from airflow.sdk.definitions.param import ParamsDict from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS diff --git a/airflow/example_dags/example_params_trigger_ui.py b/airflow/example_dags/example_params_trigger_ui.py index e47ceae556501e..58c3ce25eea2f9 100644 --- a/airflow/example_dags/example_params_trigger_ui.py +++ b/airflow/example_dags/example_params_trigger_ui.py @@ -27,7 +27,7 @@ from airflow.decorators import task from airflow.models.dag import DAG -from airflow.models.param import Param, ParamsDict +from airflow.sdk.definitions.param import Param, ParamsDict from airflow.utils.trigger_rule import TriggerRule # [START params_trigger] diff --git a/airflow/example_dags/example_params_ui_tutorial.py b/airflow/example_dags/example_params_ui_tutorial.py index b64e777bed1449..59de869b6d34be 100644 --- a/airflow/example_dags/example_params_ui_tutorial.py +++ b/airflow/example_dags/example_params_ui_tutorial.py @@ -29,7 +29,7 @@ from airflow.decorators import task from airflow.models.dag import DAG -from airflow.models.param import Param, ParamsDict +from airflow.sdk.definitions.param import Param, ParamsDict with ( DAG( diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index ae0fa3040e181c..cce5e548ccbc63 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -99,7 +99,8 @@ def __getattr__(name): "Log": "airflow.models.log", "MappedOperator": "airflow.models.mappedoperator", "Operator": "airflow.models.operator", - "Param": "airflow.models.param", + "Param": "airflow.sdk.definitions.param", + "ParamsDict": "airflow.sdk.definitions.param", "Pool": "airflow.models.pool", "RenderedTaskInstanceFields": "airflow.models.renderedtifields", "SkipMixin": "airflow.models.skipmixin", @@ -128,7 +129,6 @@ def __getattr__(name): from airflow.models.log import Log from airflow.models.mappedoperator import MappedOperator from airflow.models.operator import Operator - from airflow.models.param import Param from airflow.models.pool import Pool from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.skipmixin import SkipMixin @@ -138,3 +138,4 @@ def __getattr__(name): from airflow.models.trigger import Trigger from airflow.models.variable import Variable from airflow.models.xcom import XCom + from airflow.sdk.definitions.param import Param diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 2a8d81d9d2e95f..da40b1ae8ab27d 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -98,7 +98,6 @@ from airflow.models.base import Base, StringID, TaskInstanceDependencies, _sentinel from airflow.models.dagbag import DagBag from airflow.models.log import Log -from airflow.models.param import process_params from airflow.models.renderedtifields import get_serialized_template_fields from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.taskmap import TaskMap @@ -108,6 +107,7 @@ from airflow.sdk.api.datamodels._generated import AssetProfile from airflow.sdk.definitions._internal.templater import SandboxedEnvironment from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUniqueKey, AssetUriRef +from airflow.sdk.definitions.param import process_params from airflow.sdk.definitions.taskgroup import MappedTaskGroup from airflow.sentry import Sentry from airflow.settings import task_instance_mutation_hook diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 875a68c615924e..9ae20aa6c4d3a9 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -46,7 +46,6 @@ EXPAND_INPUT_EMPTY, create_expand_input, ) -from airflow.models.param import Param, ParamsDict from airflow.models.taskinstance import SimpleTaskInstance from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg @@ -64,6 +63,7 @@ ) from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator from airflow.sdk.definitions.mappedoperator import MappedOperator +from airflow.sdk.definitions.param import Param, ParamsDict from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup from airflow.sdk.definitions.xcom_arg import XComArg, serialize_xcom_arg from airflow.sdk.execution_time.context import OutletEventAccessor, OutletEventAccessors @@ -985,7 +985,7 @@ def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, di class_identity = f"{v.__module__}.{v.__class__.__name__}" except AttributeError: class_identity = "" - if class_identity == "airflow.models.param.Param": + if class_identity == "airflow.sdk.definitions.param.Param": serialized_params.append((k, cls._serialize_param(v))) else: # Auto-box other values into Params object like it is done by DAG parsing as well diff --git a/docs/apache-airflow/core-concepts/params.rst b/docs/apache-airflow/core-concepts/params.rst index b54026ccb22fc1..bdd8c4df5a83b8 100644 --- a/docs/apache-airflow/core-concepts/params.rst +++ b/docs/apache-airflow/core-concepts/params.rst @@ -40,7 +40,7 @@ Use a dictionary that maps Param names to either a :class:`~airflow.models.param from airflow import DAG from airflow.decorators import task - from airflow.models.param import Param + from airflow.sdk.definitions.param import Param with DAG( "the_dag", diff --git a/providers/edge/src/airflow/providers/edge/example_dags/integration_test.py b/providers/edge/src/airflow/providers/edge/example_dags/integration_test.py index 418164832576db..41f53085a1c181 100644 --- a/providers/edge/src/airflow/providers/edge/example_dags/integration_test.py +++ b/providers/edge/src/airflow/providers/edge/example_dags/integration_test.py @@ -30,10 +30,10 @@ from airflow.exceptions import AirflowNotFoundException from airflow.hooks.base import BaseHook from airflow.models.dag import DAG -from airflow.models.param import Param from airflow.models.variable import Variable from airflow.operators.empty import EmptyOperator from airflow.providers.common.compat.standard.operators import PythonOperator +from airflow.sdk.definitions.param import Param from airflow.utils.trigger_rule import TriggerRule try: diff --git a/providers/edge/src/airflow/providers/edge/example_dags/win_notepad.py b/providers/edge/src/airflow/providers/edge/example_dags/win_notepad.py index da50fff8b96ee4..9e28e0ea6fb3dc 100644 --- a/providers/edge/src/airflow/providers/edge/example_dags/win_notepad.py +++ b/providers/edge/src/airflow/providers/edge/example_dags/win_notepad.py @@ -34,7 +34,7 @@ from airflow.models import BaseOperator from airflow.models.dag import DAG -from airflow.models.param import Param +from airflow.sdk.definitions.param import Param if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/edge/src/airflow/providers/edge/example_dags/win_test.py b/providers/edge/src/airflow/providers/edge/example_dags/win_test.py index 3a730009d50c35..1446de0b23e28d 100644 --- a/providers/edge/src/airflow/providers/edge/example_dags/win_test.py +++ b/providers/edge/src/airflow/providers/edge/example_dags/win_test.py @@ -37,9 +37,9 @@ from airflow.hooks.base import BaseHook from airflow.models import BaseOperator from airflow.models.dag import DAG -from airflow.models.param import Param from airflow.models.variable import Variable from airflow.operators.empty import EmptyOperator +from airflow.sdk.definitions.param import Param from airflow.utils.operator_helpers import context_to_airflow_vars from airflow.utils.trigger_rule import TriggerRule from airflow.utils.types import ArgNotSet diff --git a/providers/tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py b/providers/tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py index e745d3d655bdce..d0aecf20f92a6e 100644 --- a/providers/tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py +++ b/providers/tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py @@ -22,8 +22,8 @@ from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun -from airflow.models.param import Param from airflow.providers.fab.www.security import permissions +from airflow.sdk.definitions.param import Param from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import DagRunState diff --git a/task_sdk/src/airflow/sdk/__init__.py b/task_sdk/src/airflow/sdk/__init__.py index b8d6b6609dba7d..55c43782f7d903 100644 --- a/task_sdk/src/airflow/sdk/__init__.py +++ b/task_sdk/src/airflow/sdk/__init__.py @@ -48,6 +48,8 @@ __lazy_imports: dict[str, str] = { "BaseOperator": ".definitions.baseoperator", "Connection": ".definitions.connection", + "Param": ".definitions.param", + "ParamD": ".definitions.param", "DAG": ".definitions.dag", "EdgeModifier": ".definitions.edges", "Label": ".definitions.edges", diff --git a/task_sdk/src/airflow/sdk/definitions/asset/decorators.py b/task_sdk/src/airflow/sdk/definitions/asset/decorators.py index 1f1d90883240bb..579cc94b3ce341 100644 --- a/task_sdk/src/airflow/sdk/definitions/asset/decorators.py +++ b/task_sdk/src/airflow/sdk/definitions/asset/decorators.py @@ -31,9 +31,9 @@ from sqlalchemy.orm import Session from airflow.io.path import ObjectStoragePath - from airflow.models.param import ParamsDict from airflow.sdk.definitions.asset import AssetAlias, AssetUniqueKey from airflow.sdk.definitions.dag import DAG, DagStateChangeCallback, ScheduleArg + from airflow.sdk.definitions.param import ParamsDict from airflow.serialization.dag_dependency import DagDependency from airflow.triggers.base import BaseTrigger from airflow.typing_compat import Self diff --git a/task_sdk/src/airflow/sdk/definitions/baseoperator.py b/task_sdk/src/airflow/sdk/definitions/baseoperator.py index e7ecec69411ba1..14d67656008e57 100644 --- a/task_sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/baseoperator.py @@ -33,7 +33,6 @@ import attrs -from airflow.models.param import ParamsDict from airflow.sdk.definitions._internal.abstractoperator import ( DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, DEFAULT_OWNER, @@ -54,6 +53,7 @@ from airflow.sdk.definitions._internal.node import validate_key from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet, validate_instance_args from airflow.sdk.definitions.mappedoperator import OperatorPartial, validate_mapping_kwargs +from airflow.sdk.definitions.param import ParamsDict from airflow.task.priority_strategy import ( PriorityWeightStrategy, airflow_priority_weight_strategies, diff --git a/task_sdk/src/airflow/sdk/definitions/dag.py b/task_sdk/src/airflow/sdk/definitions/dag.py index cd5217c8111d06..7882222d5d72df 100644 --- a/task_sdk/src/airflow/sdk/definitions/dag.py +++ b/task_sdk/src/airflow/sdk/definitions/dag.py @@ -51,12 +51,12 @@ ParamValidationError, TaskNotFound, ) -from airflow.models.param import DagParam, ParamsDict from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator from airflow.sdk.definitions._internal.types import NOTSET from airflow.sdk.definitions.asset import AssetAll, BaseAsset from airflow.sdk.definitions.baseoperator import BaseOperator from airflow.sdk.definitions.context import Context +from airflow.sdk.definitions.param import DagParam, ParamsDict from airflow.timetables.base import Timetable from airflow.timetables.simple import ( AssetTriggeredTimetable, diff --git a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py index 0202cb2d4bdf37..b0357d00bb1911 100644 --- a/task_sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task_sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -69,10 +69,10 @@ OperatorExpandArgument, OperatorExpandKwargsArgument, ) - from airflow.models.param import ParamsDict from airflow.models.xcom_arg import XComArg from airflow.sdk.definitions.baseoperator import BaseOperator from airflow.sdk.definitions.dag import DAG + from airflow.sdk.definitions.param import ParamsDict from airflow.sdk.types import Operator from airflow.ti_deps.deps.base_ti_dep import BaseTIDep from airflow.triggers.base import StartTriggerArgs diff --git a/airflow/models/param.py b/task_sdk/src/airflow/sdk/definitions/param.py similarity index 100% rename from airflow/models/param.py rename to task_sdk/src/airflow/sdk/definitions/param.py diff --git a/task_sdk/src/airflow/sdk/execution_time/context.py b/task_sdk/src/airflow/sdk/execution_time/context.py index 1a4f902b3a69df..f1ca3a40dccd83 100644 --- a/task_sdk/src/airflow/sdk/execution_time/context.py +++ b/task_sdk/src/airflow/sdk/execution_time/context.py @@ -329,7 +329,7 @@ def context_update_for_unmapped(context: Context, task: BaseOperator) -> None: :meta private: """ # TODO: Task-SDK this need to live in sdk too - from airflow.models.param import process_params + from airflow.sdk.definitions.param import process_params context["task"] = context["ti"].task = task context["params"] = process_params( diff --git a/task_sdk/tests/definitions/test_dag.py b/task_sdk/tests/definitions/test_dag.py index f0e634f19b667a..e6baeabe98dee2 100644 --- a/task_sdk/tests/definitions/test_dag.py +++ b/task_sdk/tests/definitions/test_dag.py @@ -23,9 +23,9 @@ import pytest from airflow.exceptions import DuplicateTaskIdFound -from airflow.models.param import Param, ParamsDict from airflow.sdk.definitions.baseoperator import BaseOperator from airflow.sdk.definitions.dag import DAG, dag as dag_decorator +from airflow.sdk.definitions.param import Param, ParamsDict DEFAULT_DATE = datetime(2016, 1, 1, tzinfo=timezone.utc) diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index 1c35f467542992..a3f8672b97d29e 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -320,7 +320,7 @@ def test_should_respond_200(self, url_safe_serializer): "owners": [], "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "description": None, "schema": {}, "value": 1, @@ -380,7 +380,7 @@ def test_should_respond_200_with_asset_expression(self, url_safe_serializer): "owners": [], "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "description": None, "schema": {}, "value": 1, @@ -533,7 +533,7 @@ def test_should_respond_200_serialized(self, url_safe_serializer): "owners": [], "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "description": None, "schema": {}, "value": 1, @@ -591,7 +591,7 @@ def test_should_respond_200_serialized(self, url_safe_serializer): "owners": [], "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "description": None, "schema": {}, "value": 1, diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 9558dd4fd256a6..dc3073a475c3d4 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -30,9 +30,9 @@ from airflow.models.asset import AssetEvent, AssetModel from airflow.models.dag import DAG, DagModel from airflow.models.dagrun import DagRun -from airflow.models.param import Param from airflow.operators.empty import EmptyOperator from airflow.sdk.definitions.asset import Asset +from airflow.sdk.definitions.param import Param from airflow.utils import timezone from airflow.utils.session import create_session, provide_session from airflow.utils.state import DagRunState, State diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py b/tests/api_connexion/endpoints/test_task_endpoint.py index 7e593731d539d4..ac073fd7c60cc5 100644 --- a/tests/api_connexion/endpoints/test_task_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_endpoint.py @@ -124,7 +124,7 @@ def test_should_respond_200(self): "owner": "airflow", "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": "bar", "description": None, "schema": {}, @@ -207,7 +207,7 @@ def test_unscheduled_task(self): "owner": "airflow", "params": { "is_unscheduled": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": True, "description": None, "schema": {}, @@ -271,7 +271,7 @@ def test_should_respond_200_serialized(self): "owner": "airflow", "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": "bar", "description": None, "schema": {}, @@ -348,7 +348,7 @@ def test_should_respond_200(self): "owner": "airflow", "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": "bar", "description": None, "schema": {}, @@ -508,7 +508,7 @@ def test_get_unscheduled_tasks(self): "owner": "airflow", "params": { "is_unscheduled": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": True, "description": None, "schema": {}, diff --git a/tests/api_connexion/schemas/test_dag_schema.py b/tests/api_connexion/schemas/test_dag_schema.py index d6438045249aa1..800b512f993bc2 100644 --- a/tests/api_connexion/schemas/test_dag_schema.py +++ b/tests/api_connexion/schemas/test_dag_schema.py @@ -167,7 +167,7 @@ def test_serialize_test_dag_detail_schema(url_safe_serializer): "owners": [], "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": 1, "description": None, "schema": {}, @@ -229,7 +229,7 @@ def test_serialize_test_dag_with_asset_schedule_detail_schema(url_safe_serialize "owners": [], "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": 1, "description": None, "schema": {}, diff --git a/tests/api_connexion/schemas/test_task_schema.py b/tests/api_connexion/schemas/test_task_schema.py index 5748529b864af6..eee51c3aac73ac 100644 --- a/tests/api_connexion/schemas/test_task_schema.py +++ b/tests/api_connexion/schemas/test_task_schema.py @@ -86,7 +86,7 @@ def test_serialize(self): "owner": "airflow", "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": "bar", "description": None, "schema": {}, diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index dd32873b084c9a..a642e4df8bce73 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -27,9 +27,9 @@ from airflow.listeners.listener import get_listener_manager from airflow.models import DagModel, DagRun from airflow.models.asset import AssetEvent, AssetModel -from airflow.models.param import Param from airflow.operators.empty import EmptyOperator from airflow.sdk.definitions.asset import Asset +from airflow.sdk.definitions.param import Param from airflow.utils import timezone from airflow.utils.session import provide_session from airflow.utils.state import DagRunState, State diff --git a/tests/api_fastapi/core_api/routes/public/test_dags.py b/tests/api_fastapi/core_api/routes/public/test_dags.py index 748baae71a413a..8ef4a82613775d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dags.py +++ b/tests/api_fastapi/core_api/routes/public/test_dags.py @@ -377,7 +377,7 @@ def test_dag_details( "owners": ["airflow"], "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "description": None, "schema": {}, "value": 1, diff --git a/tests/api_fastapi/core_api/routes/public/test_tasks.py b/tests/api_fastapi/core_api/routes/public/test_tasks.py index a6bff8c9f356b5..0b03db83775ee4 100644 --- a/tests/api_fastapi/core_api/routes/public/test_tasks.py +++ b/tests/api_fastapi/core_api/routes/public/test_tasks.py @@ -103,7 +103,7 @@ def test_should_respond_200(self, test_client): "owner": "airflow", "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": "bar", "description": None, "schema": {}, @@ -185,7 +185,7 @@ def test_unscheduled_task(self, test_client): "owner": "airflow", "params": { "is_unscheduled": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": True, "description": None, "schema": {}, @@ -248,7 +248,7 @@ def test_should_respond_200_serialized(self, test_client): "owner": "airflow", "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": "bar", "description": None, "schema": {}, @@ -313,7 +313,7 @@ def test_should_respond_200(self, test_client): "owner": "airflow", "params": { "foo": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": "bar", "description": None, "schema": {}, @@ -469,7 +469,7 @@ def test_get_unscheduled_tasks(self, test_client): "owner": "airflow", "params": { "is_unscheduled": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "value": True, "description": None, "schema": {}, diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param.py index fb0d3c854d12dc..547fc7c11253de 100644 --- a/tests/dags/test_invalid_param.py +++ b/tests/dags/test_invalid_param.py @@ -19,8 +19,8 @@ from datetime import datetime from airflow.models.dag import DAG -from airflow.models.param import Param from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk.definitions.param import Param with DAG( "test_invalid_param", diff --git a/tests/dags/test_invalid_param2.py b/tests/dags/test_invalid_param2.py index 69ffda442301d5..5678f46090c898 100644 --- a/tests/dags/test_invalid_param2.py +++ b/tests/dags/test_invalid_param2.py @@ -19,8 +19,8 @@ from datetime import datetime from airflow import DAG -from airflow.models.param import Param from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk.definitions.param import Param with DAG( "test_invalid_param2", diff --git a/tests/dags/test_invalid_param3.py b/tests/dags/test_invalid_param3.py index a8017a3402b664..ea3bfa202a3195 100644 --- a/tests/dags/test_invalid_param3.py +++ b/tests/dags/test_invalid_param3.py @@ -19,8 +19,8 @@ from datetime import datetime from airflow import DAG -from airflow.models.param import Param from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk.definitions.param import Param with DAG( "test_invalid_param3", diff --git a/tests/dags/test_invalid_param4.py b/tests/dags/test_invalid_param4.py index bbfc7e970c51ce..0156072ba11cfe 100644 --- a/tests/dags/test_invalid_param4.py +++ b/tests/dags/test_invalid_param4.py @@ -19,8 +19,8 @@ from datetime import datetime from airflow import DAG -from airflow.models.param import Param from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk.definitions.param import Param with DAG( "test_invalid_param4", diff --git a/tests/dags/test_valid_param.py b/tests/dags/test_valid_param.py index afa0f98ce21d58..ddb858a9acc1ee 100644 --- a/tests/dags/test_valid_param.py +++ b/tests/dags/test_valid_param.py @@ -19,8 +19,8 @@ from datetime import datetime from airflow import DAG -from airflow.models.param import Param from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk.definitions.param import Param with DAG( "test_valid_param", diff --git a/tests/dags/test_valid_param2.py b/tests/dags/test_valid_param2.py index d59d6278c3a71b..ee6920bd92ee73 100644 --- a/tests/dags/test_valid_param2.py +++ b/tests/dags/test_valid_param2.py @@ -19,8 +19,8 @@ from datetime import datetime from airflow import DAG -from airflow.models.param import Param from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk.definitions.param import Param with DAG( "test_valid_param2", diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 32f6aec54e2dfc..6c6f71bbbec7ac 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -63,7 +63,6 @@ ) from airflow.models.dag_version import DagVersion from airflow.models.dagrun import DagRun -from airflow.models.param import DagParam, Param from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import TaskInstance as TI from airflow.operators.empty import EmptyOperator @@ -73,6 +72,7 @@ from airflow.sdk.definitions._internal.contextmanager import TaskGroupContext from airflow.sdk.definitions._internal.templater import NativeEnvironment, SandboxedEnvironment from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny +from airflow.sdk.definitions.param import DagParam, Param from airflow.security import permissions from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.simple import ( @@ -150,7 +150,7 @@ def _create_dagrun( ) -> DagRun: run_id = dag.timetable.generate_run_id( run_type=run_type, - logical_date=logical_date, + logical_date=logical_date, # type: ignore data_interval=data_interval, ) return dag.create_dagrun( diff --git a/tests/models/test_param.py b/tests/models/test_param.py index 152419db2fa4dd..fc594ec0c8fdaf 100644 --- a/tests/models/test_param.py +++ b/tests/models/test_param.py @@ -22,7 +22,7 @@ from airflow.decorators import task from airflow.exceptions import ParamValidationError -from airflow.models.param import Param, ParamsDict +from airflow.sdk.definitions.param import Param, ParamsDict from airflow.serialization.serialized_objects import BaseSerialization from airflow.utils import timezone from airflow.utils.types import DagRunType @@ -211,7 +211,7 @@ def test_value_saved(self): def test_dump(self): p = Param("hello", description="world", type="string", minLength=2) dump = p.dump() - assert dump["__class"] == "airflow.models.param.Param" + assert dump["__class"] == "airflow.sdk.definitions.param.Param" assert dump["value"] == "hello" assert dump["description"] == "world" assert dump["schema"] == {"type": "string", "minLength": 2} diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 8ca00e2f8d650a..89f01bb9a58cab 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -60,7 +60,6 @@ from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun from airflow.models.expandinput import EXPAND_INPUT_EMPTY, NotFullyPopulated -from airflow.models.param import process_params from airflow.models.pool import Pool from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.serialized_dag import SerializedDagModel @@ -81,6 +80,7 @@ from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.sensors.python import PythonSensor from airflow.sdk.definitions.asset import Asset, AssetAlias +from airflow.sdk.definitions.param import process_params from airflow.sensors.base import BaseSensorOperator from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG from airflow.stats import Stats diff --git a/tests/serialization/serializers/test_serializers.py b/tests/serialization/serializers/test_serializers.py index 5936a95b23d6dd..f3afdbbf769cc0 100644 --- a/tests/serialization/serializers/test_serializers.py +++ b/tests/serialization/serializers/test_serializers.py @@ -31,7 +31,7 @@ from pendulum import DateTime from pendulum.tz.timezone import FixedTimezone, Timezone -from airflow.models.param import Param, ParamsDict +from airflow.sdk.definitions.param import Param, ParamsDict from airflow.serialization.serde import DATA, deserialize, serialize PENDULUM3 = version.parse(metadata.version("pendulum")).major == 3 diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 84a63674e5119e..ea7b33eaaaaa2e 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -63,13 +63,13 @@ from airflow.models.dagbag import DagBag from airflow.models.expandinput import EXPAND_INPUT_EMPTY from airflow.models.mappedoperator import MappedOperator -from airflow.models.param import Param, ParamsDict from airflow.models.xcom import XCom from airflow.operators.empty import EmptyOperator from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.sensors.bash import BashSensor from airflow.sdk.definitions.asset import Asset +from airflow.sdk.definitions.param import Param, ParamsDict from airflow.security import permissions from airflow.serialization.enums import Encoding from airflow.serialization.json_schema import load_dag_schema_dict @@ -2181,7 +2181,7 @@ def test_params_serialization_from_dict_upgrade(self): "timezone": "UTC", "params": { "my_param": { - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", "default": "str", } }, @@ -2204,7 +2204,7 @@ def test_params_serialize_default_2_2_0(self): "fileloc": "/path/to/file.py", "tasks": [], "timezone": "UTC", - "params": [["str", {"__class": "airflow.models.param.Param", "default": "str"}]], + "params": [["str", {"__class": "airflow.sdk.definitions.param.Param", "default": "str"}]], }, } SerializedDAG.validate_schema(serialized) @@ -2228,7 +2228,7 @@ def test_params_serialize_default(self): "default": "a string value", "description": "hello", "schema": {"__var": {"type": "string"}, "__type": "dict"}, - "__class": "airflow.models.param.Param", + "__class": "airflow.sdk.definitions.param.Param", }, ] ], diff --git a/tests/serialization/test_serialized_objects.py b/tests/serialization/test_serialized_objects.py index 06bb477becdf4e..ca6cb78a62794e 100644 --- a/tests/serialization/test_serialized_objects.py +++ b/tests/serialization/test_serialized_objects.py @@ -37,12 +37,12 @@ from airflow.models.connection import Connection from airflow.models.dag import DAG from airflow.models.dagrun import DagRun -from airflow.models.param import Param from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance from airflow.models.xcom_arg import XComArg from airflow.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import PythonOperator from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAliasEvent, AssetUniqueKey +from airflow.sdk.definitions.param import Param from airflow.sdk.execution_time.context import OutletEventAccessor, OutletEventAccessors from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding from airflow.serialization.serialized_objects import BaseSerialization diff --git a/tests/www/views/test_views_trigger_dag.py b/tests/www/views/test_views_trigger_dag.py index 17d0b687b85720..c4136520d7f9bb 100644 --- a/tests/www/views/test_views_trigger_dag.py +++ b/tests/www/views/test_views_trigger_dag.py @@ -25,8 +25,8 @@ import pytest from airflow.models import DagBag, DagRun -from airflow.models.param import Param from airflow.operators.empty import EmptyOperator +from airflow.sdk.definitions.param import Param from airflow.security import permissions from airflow.utils import timezone from airflow.utils.json import WebEncoder