Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Move DAG Params to Task SDK #46127

Closed
wants to merge 2 commits into from

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jan 27, 2025

Dependent on https://github.com/apache/airflow/pull/46032/files

closes: #44361

Testing

Basic Testing

DAG used for testing:

from airflow import DAG
from airflow.decorators import task
from airflow.sdk import Param

with DAG(
    "param_dag",
    params={
        "x": Param(5, type="integer", minimum=3),
        "my_int_param": 6
    },
) as dag:

    @task
    def example_task():
        # This will print the default value, 6:
        print(dag.params['my_int_param'])


    example_task()

image

Params Trigger UI from example dags

from __future__ import annotations

import datetime
from pathlib import Path

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.utils.trigger_rule import TriggerRule

# [START params_trigger]
with DAG(
    dag_id=Path(__file__).stem,
    dag_display_name="Params Trigger UI",
    description=__doc__.partition(".")[0],
    doc_md=__doc__,
    schedule=None,
    start_date=datetime.datetime(2022, 3, 4),
    catchup=False,
    tags=["example", "params"],
    params={
        "names": Param(
            ["Linda", "Martha", "Thomas"],
            type="array",
            description="Define the list of names for which greetings should be generated in the logs."
            " Please have one name per line.",
            title="Names to greet",
        ),
        "english": Param(True, type="boolean", title="English"),
        "german": Param(True, type="boolean", title="German (Formal)"),
        "french": Param(True, type="boolean", title="French"),
    },
) as dag:

    @task(task_id="get_names", task_display_name="Get names")
    def get_names(**kwargs) -> list[str]:
        params: ParamsDict = kwargs["params"]
        if "names" not in params:
            print("Uuups, no names given, was no UI used to trigger?")
            return []
        return params["names"]

    @task.branch(task_id="select_languages", task_display_name="Select languages")
    def select_languages(**kwargs) -> list[str]:
        params: ParamsDict = kwargs["params"]
        selected_languages = []
        for lang in ["english", "german", "french"]:
            if params[lang]:
                selected_languages.append(f"generate_{lang}_greeting")
        return selected_languages

    @task(task_id="generate_english_greeting", task_display_name="Generate English greeting")
    def generate_english_greeting(name: str) -> str:
        return f"Hello {name}!"

    @task(task_id="generate_german_greeting", task_display_name="Erzeuge Deutsche Begrüßung")
    def generate_german_greeting(name: str) -> str:
        return f"Sehr geehrter Herr/Frau {name}."

    @task(task_id="generate_french_greeting", task_display_name="Produire un message d'accueil en français")
    def generate_french_greeting(name: str) -> str:
        return f"Bonjour {name}!"

    @task(task_id="print_greetings", task_display_name="Print greetings", trigger_rule=TriggerRule.ALL_DONE)
    def print_greetings(greetings1, greetings2, greetings3) -> None:
        for g in greetings1 or []:
            print(g)
        for g in greetings2 or []:
            print(g)
        for g in greetings3 or []:
            print(g)
        if not (greetings1 or greetings2 or greetings3):
            print("sad, nobody to greet :-(")

    lang_select = select_languages()
    names = get_names()
    english_greetings = generate_english_greeting.expand(name=names)
    german_greetings = generate_german_greeting.expand(name=names)
    french_greetings = generate_french_greeting.expand(name=names)
    lang_select >> [english_greetings, german_greetings, french_greetings]
    results_print = print_greetings(english_greetings, german_greetings, french_greetings)
# [END params_trigger]
image image image

The failures above are because mappedtasks dont work yet

Params UI tutorial DAG demonstrating various options for a trigger form generated by DAG params

Run config:

{
    "a_simple_list": [
        "one",
        "two",
        "three",
        "actually one value is made per line"
    ],
    "array_of_numbers": [
        1,
        2,
        3
    ],
    "array_of_objects": [
        {
            "country": "country_name",
            "name": "account_name"
        }
    ],
    "bool": true,
    "checked_number": 100,
    "checked_text": "length-checked-field",
    "date": "2025-01-27",
    "date_time": "2025-01-27T12:17:00+00:00",
    "flag": false,
    "hidden_secret_field": "constant value",
    "most_loved_number": 42,
    "multi_select": [
        "two",
        "three"
    ],
    "multi_select_with_label": [
        "2",
        "3"
    ],
    "multiline_text": "A multiline text Param\nthat will keep the newline\ncharacters in its value.",
    "object": {
        "key": "value"
    },
    "optional_field": "optional text, you can trigger also w/o text",
    "pick_one": "value 42",
    "pick_with_label": 3,
    "proposals": "Alpha",
    "required_field": "dummy dummy",
    "text": "Hello World!",
    "time": "12:13:14",
    "x": 3
}
image

Logs:

beef66d4770b
 ▶ Log message source details
{"logger":"airflow.dag_processing.bundles.manager.DagBundlesManager","timestamp":"2025-01-27T15:26:19.910440","event":"DAG bundles loaded: dags-folder, example_dags","level":"info"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-27T15:26:19.910732","event":"Filling up the DagBag from /opt/airflow/airflow/example_dags/example_params_ui_tutorial.py","level":"info"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-27T15:26:19.911277","event":"Importing /opt/airflow/airflow/example_dags/example_params_ui_tutorial.py","level":"debug"}
{"logger":"airflow.models.dagbag.DagBag","timestamp":"2025-01-27T15:26:19.946419","event":"Loaded DAG <DAG: example_params_ui_tutorial>","level":"debug"}
{"file":"/opt/airflow/airflow/example_dags/example_params_ui_tutorial.py","timestamp":"2025-01-27T15:26:19.946663","logger":"task","event":"DAG file parsed","level":"debug"}
{"json":"{\"rendered_fields\":{\"templates_dict\":null,\"op_args\":\"()\",\"op_kwargs\":{}},\"type\":\"SetRenderedFields\"}\n","timestamp":"2025-01-27T15:26:19.946922","logger":"task","event":"Sending request","level":"debug"}
{"logger":"airflow.sdk.definitions.param","timestamp":"2025-01-27T15:26:19.956861","event":"Updating task params ({'x': 3, 'text': 'Hello World!', 'flag': False, 'a_simple_list': ['one', 'two', 'three', 'actually one value is made per line'], 'most_loved_number': 42, 'pick_one': 'value 42', 'pick_with_label': 3, 'proposals': 'Alpha', 'multi_select': ['two', 'three'], 'multi_select_with_label': ['2', '3'], 'array_of_numbers': [1, 2, 3], 'bool': True, 'date_time': '2025-01-27T12:17:00+00:00', 'date': '2025-01-27', 'time': '12:13:14', 'multiline_text': 'A multiline text Param\\nthat will keep the newline\\ncharacters in its value.', 'required_field': None, 'optional_field': 'optional text, you can trigger also w/o text', 'checked_text': 'length-checked-field', 'checked_number': 100, 'object': {'key': 'value'}, 'array_of_objects': [{'name': 'account_name', 'country': 'country_name'}], 'hidden_secret_field': 'constant value'}) with DagRun.conf ({'x': 3, 'bool': True, 'date': '2025-01-27', 'flag': False, 'text': 'Hello World!', 'time': '12:13:14', 'object': {'key': 'value'}, 'pick_one': 'value 42', 'date_time': '2025-01-27T12:17:00+00:00', 'proposals': 'Alpha', 'checked_text': 'length-checked-field', 'multi_select': ['two', 'three'], 'a_simple_list': ['one', 'two', 'three', 'actually one value is made per line'], 'checked_number': 100, 'multiline_text': 'A multiline text Param\\nthat will keep the newline\\ncharacters in its value.', 'optional_field': 'optional text, you can trigger also w/o text', 'required_field': 'dummy dummy', 'pick_with_label': 3, 'array_of_numbers': [1, 2, 3], 'array_of_objects': [{'name': 'account_name', 'country': 'country_name'}], 'most_loved_number': 42, 'hidden_secret_field': 'constant value', 'multi_select_with_label': ['2', '3']})","level":"debug"}
{"chan":"stdout","event":"Validates params aere {'x': 3, 'text': 'Hello World!', 'flag': False, 'a_simple_list': ['one', 'two', 'three', 'actually one value is made per line'], 'most_loved_number': 42, 'pick_one': 'value 42', 'pick_with_label': 3, 'proposals': 'Alpha', 'multi_select': ['two', 'three'], 'multi_select_with_label': ['2', '3'], 'array_of_numbers': [1, 2, 3], 'bool': True, 'date_time': '2025-01-27T12:17:00+00:00', 'date': '2025-01-27', 'time': '12:13:14', 'multiline_text': 'A multiline text Param\\nthat will keep the newline\\ncharacters in its value.', 'required_field': 'dummy dummy', 'optional_field': 'optional text, you can trigger also w/o text', 'checked_text': 'length-checked-field', 'checked_number': 100, 'object': {'key': 'value'}, 'array_of_objects': [{'name': 'account_name', 'country': 'country_name'}], 'hidden_secret_field': 'constant value'}","timestamp":"2025-01-27T15:26:19.967251Z","level":"info","logger":"task"}
{"chan":"stdout","event":"This DAG was triggered with the following parameters:","timestamp":"2025-01-27T15:26:19.971408Z","level":"info","logger":"task"}
{"chan":"stdout","event":"","timestamp":"2025-01-27T15:26:19.971505Z","level":"info","logger":"task"}
{"chan":"stdout","event":"{","timestamp":"2025-01-27T15:26:19.971657Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"x\": 3,","timestamp":"2025-01-27T15:26:19.971733Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"text\": \"Hello World!\",","timestamp":"2025-01-27T15:26:19.971813Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"flag\": false,","timestamp":"2025-01-27T15:26:19.971885Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"a_simple_list\": [","timestamp":"2025-01-27T15:26:19.971956Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"one\",","timestamp":"2025-01-27T15:26:19.972028Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"two\",","timestamp":"2025-01-27T15:26:19.972097Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"three\",","timestamp":"2025-01-27T15:26:19.972166Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"actually one value is made per line\"","timestamp":"2025-01-27T15:26:19.972238Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    ],","timestamp":"2025-01-27T15:26:19.972313Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"most_loved_number\": 42,","timestamp":"2025-01-27T15:26:19.972383Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"pick_one\": \"value 42\",","timestamp":"2025-01-27T15:26:19.972453Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"pick_with_label\": 3,","timestamp":"2025-01-27T15:26:19.972524Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"proposals\": \"Alpha\",","timestamp":"2025-01-27T15:26:19.972593Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"multi_select\": [","timestamp":"2025-01-27T15:26:19.972663Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"two\",","timestamp":"2025-01-27T15:26:19.972734Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"three\"","timestamp":"2025-01-27T15:26:19.972806Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    ],","timestamp":"2025-01-27T15:26:19.972875Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"multi_select_with_label\": [","timestamp":"2025-01-27T15:26:19.972948Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"2\",","timestamp":"2025-01-27T15:26:19.973017Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"3\"","timestamp":"2025-01-27T15:26:19.973086Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    ],","timestamp":"2025-01-27T15:26:19.973165Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"array_of_numbers\": [","timestamp":"2025-01-27T15:26:19.973251Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        1,","timestamp":"2025-01-27T15:26:19.973324Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        2,","timestamp":"2025-01-27T15:26:19.973396Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        3","timestamp":"2025-01-27T15:26:19.973464Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    ],","timestamp":"2025-01-27T15:26:19.973534Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"bool\": true,","timestamp":"2025-01-27T15:26:19.973603Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"date_time\": \"2025-01-27T12:17:00+00:00\",","timestamp":"2025-01-27T15:26:19.973671Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"date\": \"2025-01-27\",","timestamp":"2025-01-27T15:26:19.973740Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"time\": \"12:13:14\",","timestamp":"2025-01-27T15:26:19.973807Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"multiline_text\": \"A multiline text Param\\nthat will keep the newline\\ncharacters in its value.\",","timestamp":"2025-01-27T15:26:19.973876Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"required_field\": \"dummy dummy\",","timestamp":"2025-01-27T15:26:19.973946Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"optional_field\": \"optional text, you can trigger also w/o text\",","timestamp":"2025-01-27T15:26:19.974077Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"checked_text\": \"length-checked-field\",","timestamp":"2025-01-27T15:26:19.974145Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"checked_number\": 100,","timestamp":"2025-01-27T15:26:19.974212Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"object\": {","timestamp":"2025-01-27T15:26:19.974278Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        \"key\": \"value\"","timestamp":"2025-01-27T15:26:19.974350Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    },","timestamp":"2025-01-27T15:26:19.974411Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"array_of_objects\": [","timestamp":"2025-01-27T15:26:19.974478Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        {","timestamp":"2025-01-27T15:26:19.974559Z","level":"info","logger":"task"}
{"chan":"stdout","event":"            \"name\": \"account_name\",","timestamp":"2025-01-27T15:26:19.974625Z","level":"info","logger":"task"}
{"chan":"stdout","event":"            \"country\": \"country_name\"","timestamp":"2025-01-27T15:26:19.974692Z","level":"info","logger":"task"}
{"chan":"stdout","event":"        }","timestamp":"2025-01-27T15:26:19.974777Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    ],","timestamp":"2025-01-27T15:26:19.974844Z","level":"info","logger":"task"}
{"chan":"stdout","event":"    \"hidden_secret_field\": \"constant value\"","timestamp":"2025-01-27T15:26:19.974911Z","level":"info","logger":"task"}
{"chan":"stdout","event":"}","timestamp":"2025-01-27T15:26:19.974980Z","level":"info","logger":"task"}
{"chan":"stdout","event":"","timestamp":"2025-01-27T15:26:19.975050Z","level":"info","logger":"task"}
{"logger":"airflow.task.operators.airflow.decorators.python._PythonDecoratedOperator","timestamp":"2025-01-27T15:26:19.971434","event":"Done. Returned value was: None","level":"info"}
{"json":"{\"state\":\"success\",\"end_date\":\"2025-01-27T15:26:19.971547Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-27T15:26:19.971629","logger":"task","event":"Sending request","level":"debug"}

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh
Copy link
Contributor Author

Trying out the example dags. Some failures, debugging it

@amoghrajesh amoghrajesh force-pushed the AIP72-move-params-sdk branch from 0adf9f5 to d17c54d Compare January 27, 2025 15:21
@amoghrajesh
Copy link
Contributor Author

The example dags are working now.

@amoghrajesh amoghrajesh self-assigned this Jan 28, 2025
@amoghrajesh
Copy link
Contributor Author

Closing in favour of #46176. Doesn't have to depend on mapped tasks PR #46032

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Move DAG Params to Task SDK
2 participants