Skip to content

Commit

Permalink
AIP-72: Move DAG Params to Task SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
amoghrajesh committed Jan 27, 2025
1 parent 15b51fe commit 0adf9f5
Show file tree
Hide file tree
Showing 40 changed files with 60 additions and 57 deletions.
2 changes: 1 addition & 1 deletion airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
from airflow.models import TaskInstance
from airflow.models.dag import DAG, _run_inline_trigger
from airflow.models.dagrun import DagRun
from airflow.models.param import ParamsDict
from airflow.models.taskinstance import TaskReturnCode
from airflow.sdk.definitions.param import ParamsDict
from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_params_trigger_ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.models.param import Param, ParamsDict
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.utils.trigger_rule import TriggerRule

# [START params_trigger]
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_params_ui_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.models.param import Param, ParamsDict
from airflow.sdk.definitions.param import Param, ParamsDict

with (
DAG(
Expand Down
5 changes: 3 additions & 2 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def __getattr__(name):
"Log": "airflow.models.log",
"MappedOperator": "airflow.models.mappedoperator",
"Operator": "airflow.models.operator",
"Param": "airflow.models.param",
"Param": "airflow.sdk.definitions.param",
"ParamsDict": "airflow.sdk.definitions.param",
"Pool": "airflow.models.pool",
"RenderedTaskInstanceFields": "airflow.models.renderedtifields",
"SkipMixin": "airflow.models.skipmixin",
Expand Down Expand Up @@ -128,7 +129,6 @@ def __getattr__(name):
from airflow.models.log import Log
from airflow.models.mappedoperator import MappedOperator
from airflow.models.operator import Operator
from airflow.models.param import Param
from airflow.models.pool import Pool
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.skipmixin import SkipMixin
Expand All @@ -138,3 +138,4 @@ def __getattr__(name):
from airflow.models.trigger import Trigger
from airflow.models.variable import Variable
from airflow.models.xcom import XCom
from airflow.sdk.definitions.param import Param
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
from airflow.models.base import Base, StringID, TaskInstanceDependencies, _sentinel
from airflow.models.dagbag import DagBag
from airflow.models.log import Log
from airflow.models.param import process_params
from airflow.models.renderedtifields import get_serialized_template_fields
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.taskmap import TaskMap
Expand All @@ -108,6 +107,7 @@
from airflow.sdk.api.datamodels._generated import AssetProfile
from airflow.sdk.definitions._internal.templater import SandboxedEnvironment
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUniqueKey, AssetUriRef
from airflow.sdk.definitions.param import process_params
from airflow.sdk.definitions.taskgroup import MappedTaskGroup
from airflow.sentry import Sentry
from airflow.settings import task_instance_mutation_hook
Expand Down
4 changes: 2 additions & 2 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
EXPAND_INPUT_EMPTY,
create_expand_input,
)
from airflow.models.param import Param, ParamsDict
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg
Expand All @@ -64,6 +63,7 @@
)
from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
from airflow.sdk.definitions.xcom_arg import XComArg, serialize_xcom_arg
from airflow.sdk.execution_time.context import OutletEventAccessor, OutletEventAccessors
Expand Down Expand Up @@ -985,7 +985,7 @@ def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, di
class_identity = f"{v.__module__}.{v.__class__.__name__}"
except AttributeError:
class_identity = ""
if class_identity == "airflow.models.param.Param":
if class_identity == "airflow.sdk.definitions.param.Param":
serialized_params.append((k, cls._serialize_param(v)))
else:
# Auto-box other values into Params object like it is done by DAG parsing as well
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/core-concepts/params.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Use a dictionary that maps Param names to either a :class:`~airflow.models.param
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.sdk.definitions.param import Param
with DAG(
"the_dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
from airflow.exceptions import AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.models.variable import Variable
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.compat.standard.operators import PythonOperator
from airflow.sdk.definitions.param import Param
from airflow.utils.trigger_rule import TriggerRule

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.sdk.definitions.param import Param

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.models.variable import Variable
from airflow.operators.empty import EmptyOperator
from airflow.sdk.definitions.param import Param
from airflow.utils.operator_helpers import context_to_airflow_vars
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import ArgNotSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
from airflow.models.param import Param
from airflow.providers.fab.www.security import permissions
from airflow.sdk.definitions.param import Param
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState
Expand Down
2 changes: 2 additions & 0 deletions task_sdk/src/airflow/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
__lazy_imports: dict[str, str] = {
"BaseOperator": ".definitions.baseoperator",
"Connection": ".definitions.connection",
"Param": ".definitions.param",
"ParamD": ".definitions.param",
"DAG": ".definitions.dag",
"EdgeModifier": ".definitions.edges",
"Label": ".definitions.edges",
Expand Down
2 changes: 1 addition & 1 deletion task_sdk/src/airflow/sdk/definitions/asset/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
from sqlalchemy.orm import Session

from airflow.io.path import ObjectStoragePath
from airflow.models.param import ParamsDict
from airflow.sdk.definitions.asset import AssetAlias, AssetUniqueKey
from airflow.sdk.definitions.dag import DAG, DagStateChangeCallback, ScheduleArg
from airflow.sdk.definitions.param import ParamsDict
from airflow.serialization.dag_dependency import DagDependency
from airflow.triggers.base import BaseTrigger
from airflow.typing_compat import Self
Expand Down
2 changes: 1 addition & 1 deletion task_sdk/src/airflow/sdk/definitions/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import attrs

from airflow.models.param import ParamsDict
from airflow.sdk.definitions._internal.abstractoperator import (
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
DEFAULT_OWNER,
Expand All @@ -54,6 +53,7 @@
from airflow.sdk.definitions._internal.node import validate_key
from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet, validate_instance_args
from airflow.sdk.definitions.mappedoperator import OperatorPartial, validate_mapping_kwargs
from airflow.sdk.definitions.param import ParamsDict
from airflow.task.priority_strategy import (
PriorityWeightStrategy,
airflow_priority_weight_strategies,
Expand Down
2 changes: 1 addition & 1 deletion task_sdk/src/airflow/sdk/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@
ParamValidationError,
TaskNotFound,
)
from airflow.models.param import DagParam, ParamsDict
from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator
from airflow.sdk.definitions._internal.types import NOTSET
from airflow.sdk.definitions.asset import AssetAll, BaseAsset
from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.definitions.context import Context
from airflow.sdk.definitions.param import DagParam, ParamsDict
from airflow.timetables.base import Timetable
from airflow.timetables.simple import (
AssetTriggeredTimetable,
Expand Down
2 changes: 1 addition & 1 deletion task_sdk/src/airflow/sdk/definitions/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@
OperatorExpandArgument,
OperatorExpandKwargsArgument,
)
from airflow.models.param import ParamsDict
from airflow.models.xcom_arg import XComArg
from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.param import ParamsDict
from airflow.sdk.types import Operator
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.triggers.base import StartTriggerArgs
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion task_sdk/src/airflow/sdk/execution_time/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def context_update_for_unmapped(context: Context, task: BaseOperator) -> None:
:meta private:
"""
# TODO: Task-SDK this need to live in sdk too
from airflow.models.param import process_params
from airflow.sdk.definitions.param import process_params

context["task"] = context["ti"].task = task
context["params"] = process_params(
Expand Down
2 changes: 1 addition & 1 deletion task_sdk/tests/definitions/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import pytest

from airflow.exceptions import DuplicateTaskIdFound
from airflow.models.param import Param, ParamsDict
from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.definitions.dag import DAG, dag as dag_decorator
from airflow.sdk.definitions.param import Param, ParamsDict

DEFAULT_DATE = datetime(2016, 1, 1, tzinfo=timezone.utc)

Expand Down
8 changes: 4 additions & 4 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def test_should_respond_200(self, url_safe_serializer):
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"description": None,
"schema": {},
"value": 1,
Expand Down Expand Up @@ -380,7 +380,7 @@ def test_should_respond_200_with_asset_expression(self, url_safe_serializer):
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"description": None,
"schema": {},
"value": 1,
Expand Down Expand Up @@ -533,7 +533,7 @@ def test_should_respond_200_serialized(self, url_safe_serializer):
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"description": None,
"schema": {},
"value": 1,
Expand Down Expand Up @@ -591,7 +591,7 @@ def test_should_respond_200_serialized(self, url_safe_serializer):
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"description": None,
"schema": {},
"value": 1,
Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
from airflow.models.asset import AssetEvent, AssetModel
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.param import Param
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import DagRunState, State
Expand Down
10 changes: 5 additions & 5 deletions tests/api_connexion/endpoints/test_task_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_should_respond_200(self):
"owner": "airflow",
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"value": "bar",
"description": None,
"schema": {},
Expand Down Expand Up @@ -207,7 +207,7 @@ def test_unscheduled_task(self):
"owner": "airflow",
"params": {
"is_unscheduled": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"value": True,
"description": None,
"schema": {},
Expand Down Expand Up @@ -271,7 +271,7 @@ def test_should_respond_200_serialized(self):
"owner": "airflow",
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"value": "bar",
"description": None,
"schema": {},
Expand Down Expand Up @@ -348,7 +348,7 @@ def test_should_respond_200(self):
"owner": "airflow",
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"value": "bar",
"description": None,
"schema": {},
Expand Down Expand Up @@ -508,7 +508,7 @@ def test_get_unscheduled_tasks(self):
"owner": "airflow",
"params": {
"is_unscheduled": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"value": True,
"description": None,
"schema": {},
Expand Down
4 changes: 2 additions & 2 deletions tests/api_connexion/schemas/test_dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def test_serialize_test_dag_detail_schema(url_safe_serializer):
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"value": 1,
"description": None,
"schema": {},
Expand Down Expand Up @@ -229,7 +229,7 @@ def test_serialize_test_dag_with_asset_schedule_detail_schema(url_safe_serialize
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"value": 1,
"description": None,
"schema": {},
Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/schemas/test_task_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def test_serialize(self):
"owner": "airflow",
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"value": "bar",
"description": None,
"schema": {},
Expand Down
2 changes: 1 addition & 1 deletion tests/api_fastapi/core_api/routes/public/test_dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.asset import AssetEvent, AssetModel
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.param import Param
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState, State
Expand Down
2 changes: 1 addition & 1 deletion tests/api_fastapi/core_api/routes/public/test_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def test_dag_details(
"owners": ["airflow"],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"__class": "airflow.sdk.definitions.param.Param",
"description": None,
"schema": {},
"value": 1,
Expand Down
Loading

0 comments on commit 0adf9f5

Please sign in to comment.