Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start porting mapped task to SDK #45627

Merged
merged 1 commit into from
Jan 21, 2025
Merged

Conversation

ashb
Copy link
Member

@ashb ashb commented Jan 13, 2025

This PR restructures the Mapped Operator and Mapped Task Group code to live in
the Task SDK at definition time.

The big thing this change does not do is make it possible to execute mapped
tasks via the Task Execution API server etc -- that is up next (#44360).

There were some un-avoidable changes to the scheduler/expansion part of mapped
tasks here. Of note:

BaseOperator.get_mapped_ti_count has moved from an instance method on
BaseOperator to be a class method. The reason for this was that with the move
of more and more of the "definition time" code into the TaskSDK BaseOperator
and AbstractOperator it is no longer possible to add DB-accessing code to a
base class and have it apply to the subclasses. (i.e.
airflow.models.abstractoperator.AbstractOperator is now not always in the
MRO for tasks. Eventually that class will be deleted, but not yet)

On a similar vein XComArg's get_task_map_length is also moved to a single
dispatch class method on the TaskMap model since now the definition time
objects live in the TaskSDK, and there is no realistic way to get a per-type
subclass with DB logic (i.e. it's very complex to end up with a
PlainDBXComArg, a MapDBXComArg, etc. that we can attach the method too)

For those who aren't aware, singledispatch (and singledispatchmethod) are a
part of the standard library when the type of the first argument is used to
determine which implementation to call. If you are familiar with C++ or Java
this is very similar to method overloading, the one caveat is that it only
examines the type of the first argument, not the full signature.

The long term goal here is to have a clean separation between "runtime/definition time" behaviour (i.e. creating mapped tasks, or running a mapped task) and expanding a mapped task (which is a scheduling-time operation only)


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@ashb ashb added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label Jan 13, 2025
@ashb ashb requested a review from uranusjr as a code owner January 13, 2025 21:26
@ashb
Copy link
Member Author

ashb commented Jan 13, 2025

Mypy is seriously unhappy. Oh well

@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from 6afcde8 to 7178c24 Compare January 13, 2025 22:15
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from 7178c24 to 29e8600 Compare January 13, 2025 22:48
@ashb
Copy link
Member Author

ashb commented Jan 13, 2025

Oh also singlediaptch and singledispathmethod don't play great with type hints in 3.9. Worked around that easily enough now though.

@ashb ashb requested review from kaxil and removed request for bolkedebruin and XD-DENG January 13, 2025 22:50
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass, will do a more detailed look in an hour

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments but the code looks good, minor adjustments needed to get tests passing

@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch 6 times, most recently from 5b2702f to 8967c4b Compare January 17, 2025 16:41
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from 8967c4b to 6f57645 Compare January 17, 2025 17:17
@ashb ashb requested a review from mobuchowski as a code owner January 18, 2025 10:09
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch 4 times, most recently from 029181b to f239adf Compare January 20, 2025 17:11
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch 2 times, most recently from eb93960 to 387c125 Compare January 20, 2025 18:16
@ashb ashb added legacy ui Whether legacy UI change should be allowed in PR legacy api Whether legacy API changes should be allowed in PR labels Jan 20, 2025
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from 387c125 to c6c52f0 Compare January 20, 2025 21:48
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from c6c52f0 to 183bb00 Compare January 21, 2025 09:53
This PR restructures the Mapped Operator and Mapped Task Group code to live in
the Task SDK at definition time.

The big thing this change _does not do_ is make it possible to execute mapped
tasks via the Task Execution API server etc -- that is up next.

There were some un-avoidable changes to the scheduler/expansion part of mapped
tasks here. Of note:

`BaseOperator.get_mapped_ti_count` has moved from an instance method on
BaseOperator to be a class method. The reason for this was that with the move
of more and more of the "definition time" code into the TaskSDK BaseOperator
and AbstractOperator it is no longer possible to add DB-accessing code to a
base class and have it apply to the subclasses. (i.e.
`airflow.models.abstractoperator.AbstractOperator` is now _not always_ in the
MRO for tasks. Eventually that class will be deleted, but not yet)

On a similar vein XComArg's `get_task_map_length` is also moved to a single
dispatch class method on the TaskMap model since now the definition time
objects live in the TaskSDK, and there is no realistic way to get a per-type
subclass with DB logic (i.e. it's very complex to end up with a
PlainDBXComArg, a MapDBXComArg, etc. that we can attach the method too)

For those who aren't aware, singledispatch (and singledispatchmethod) are a
part of the standard library when the type of the first argument is used to
determine which implementation to call. If you are familiar with C++ or Java
this is very similar to method overloading, the one caveat is that it _only_
examines the type of the first argument, not the full signature.
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from 183bb00 to 0b4b7a0 Compare January 21, 2025 10:14
@ashb ashb merged commit d1b2a44 into apache:main Jan 21, 2025
86 checks passed
@ashb ashb deleted the mapped-tasks-to-task-sdk branch January 21, 2025 11:05
@kaxil
Copy link
Member

kaxil commented Jan 21, 2025

🎉 🎉 🎉

@kaxil
Copy link
Member

kaxil commented Jan 21, 2025

#protm

dauinh pushed a commit to dauinh/airflow that referenced this pull request Jan 24, 2025
This PR restructures the Mapped Operator and Mapped Task Group code to live in
the Task SDK at definition time.

The big thing this change _does not do_ is make it possible to execute mapped
tasks via the Task Execution API server etc -- that is up next.

There were some un-avoidable changes to the scheduler/expansion part of mapped
tasks here. Of note:

`BaseOperator.get_mapped_ti_count` has moved from an instance method on
BaseOperator to be a class method. The reason for this was that with the move
of more and more of the "definition time" code into the TaskSDK BaseOperator
and AbstractOperator it is no longer possible to add DB-accessing code to a
base class and have it apply to the subclasses. (i.e.
`airflow.models.abstractoperator.AbstractOperator` is now _not always_ in the
MRO for tasks. Eventually that class will be deleted, but not yet)

On a similar vein XComArg's `get_task_map_length` is also moved to a single
dispatch class method on the TaskMap model since now the definition time
objects live in the TaskSDK, and there is no realistic way to get a per-type
subclass with DB logic (i.e. it's very complex to end up with a
PlainDBXComArg, a MapDBXComArg, etc. that we can attach the method too)

For those who aren't aware, singledispatch (and singledispatchmethod) are a
part of the standard library when the type of the first argument is used to
determine which implementation to call. If you are familiar with C++ or Java
this is very similar to method overloading, the one caveat is that it _only_
examines the type of the first argument, not the full signature.
gpathak128 pushed a commit to gpathak128/airflow that referenced this pull request Jan 29, 2025
This PR restructures the Mapped Operator and Mapped Task Group code to live in
the Task SDK at definition time.

The big thing this change _does not do_ is make it possible to execute mapped
tasks via the Task Execution API server etc -- that is up next.

There were some un-avoidable changes to the scheduler/expansion part of mapped
tasks here. Of note:

`BaseOperator.get_mapped_ti_count` has moved from an instance method on
BaseOperator to be a class method. The reason for this was that with the move
of more and more of the "definition time" code into the TaskSDK BaseOperator
and AbstractOperator it is no longer possible to add DB-accessing code to a
base class and have it apply to the subclasses. (i.e.
`airflow.models.abstractoperator.AbstractOperator` is now _not always_ in the
MRO for tasks. Eventually that class will be deleted, but not yet)

On a similar vein XComArg's `get_task_map_length` is also moved to a single
dispatch class method on the TaskMap model since now the definition time
objects live in the TaskSDK, and there is no realistic way to get a per-type
subclass with DB logic (i.e. it's very complex to end up with a
PlainDBXComArg, a MapDBXComArg, etc. that we can attach the method too)

For those who aren't aware, singledispatch (and singledispatchmethod) are a
part of the standard library when the type of the first argument is used to
determine which implementation to call. If you are familiar with C++ or Java
this is very similar to method overloading, the one caveat is that it _only_
examines the type of the first argument, not the full signature.
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
This PR restructures the Mapped Operator and Mapped Task Group code to live in
the Task SDK at definition time.

The big thing this change _does not do_ is make it possible to execute mapped
tasks via the Task Execution API server etc -- that is up next.

There were some un-avoidable changes to the scheduler/expansion part of mapped
tasks here. Of note:

`BaseOperator.get_mapped_ti_count` has moved from an instance method on
BaseOperator to be a class method. The reason for this was that with the move
of more and more of the "definition time" code into the TaskSDK BaseOperator
and AbstractOperator it is no longer possible to add DB-accessing code to a
base class and have it apply to the subclasses. (i.e.
`airflow.models.abstractoperator.AbstractOperator` is now _not always_ in the
MRO for tasks. Eventually that class will be deleted, but not yet)

On a similar vein XComArg's `get_task_map_length` is also moved to a single
dispatch class method on the TaskMap model since now the definition time
objects live in the TaskSDK, and there is no realistic way to get a per-type
subclass with DB logic (i.e. it's very complex to end up with a
PlainDBXComArg, a MapDBXComArg, etc. that we can attach the method too)

For those who aren't aware, singledispatch (and singledispatchmethod) are a
part of the standard library when the type of the first argument is used to
determine which implementation to call. If you are familiar with C++ or Java
this is very similar to method overloading, the one caveat is that it _only_
examines the type of the first argument, not the full signature.
niklasr22 pushed a commit to niklasr22/airflow that referenced this pull request Feb 8, 2025
This PR restructures the Mapped Operator and Mapped Task Group code to live in
the Task SDK at definition time.

The big thing this change _does not do_ is make it possible to execute mapped
tasks via the Task Execution API server etc -- that is up next.

There were some un-avoidable changes to the scheduler/expansion part of mapped
tasks here. Of note:

`BaseOperator.get_mapped_ti_count` has moved from an instance method on
BaseOperator to be a class method. The reason for this was that with the move
of more and more of the "definition time" code into the TaskSDK BaseOperator
and AbstractOperator it is no longer possible to add DB-accessing code to a
base class and have it apply to the subclasses. (i.e.
`airflow.models.abstractoperator.AbstractOperator` is now _not always_ in the
MRO for tasks. Eventually that class will be deleted, but not yet)

On a similar vein XComArg's `get_task_map_length` is also moved to a single
dispatch class method on the TaskMap model since now the definition time
objects live in the TaskSDK, and there is no realistic way to get a per-type
subclass with DB logic (i.e. it's very complex to end up with a
PlainDBXComArg, a MapDBXComArg, etc. that we can attach the method too)

For those who aren't aware, singledispatch (and singledispatchmethod) are a
part of the standard library when the type of the first argument is used to
determine which implementation to call. If you are familiar with C++ or Java
this is very similar to method overloading, the one caveat is that it _only_
examines the type of the first argument, not the full signature.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:serialization area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk legacy api Whether legacy API changes should be allowed in PR legacy ui Whether legacy UI change should be allowed in PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants