Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deferrable mode to ExternalTaskSensor #29260

Merged
merged 12 commits into from
Jul 20, 2023
Merged

Conversation

bkossakowska
Copy link
Contributor


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:core-operators Operators, Sensors and hooks within Core Airflow area:system-tests labels Jan 31, 2023
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added Triggers for external_task, can you please rebase on Main.

For sensors, I would be grateful if you can help the community by maintaining compatibility with astronomer-providers -- There are community members as well as our users using it https://pypistats.org/packages/astronomer-providers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for great implementation os the trigger, i have updated the sensor to use it.
However, after some testing i have found some moments that could be improved in the trigger:

  1. There is no possibility to process several external tasks at once, using external_task_ids parameter. For that reason ExternalTaskSensor now supports only processing one task per run. Can you please take a look on it and add the possibility?;
  2. If you pass name of the Dag that does not exist at all, the trigger will run infinitely trying to query task from that Dag. I have added one additional parameter to set a deadline for this search to prevent infinitive loop. If it will not find specific Dag for a minute, it will terminate.

@VladaZakharova
Copy link
Contributor

Hi @kaxil !
Please, check the changes and let me know if there is something that i can improve. Thanks!

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lgtm, some minor nits. Could you add unit tests to the Async Sensor please similar to the ones in https://github.com/astronomer/astronomer-providers/blob/main/tests/core/sensors/test_external_task.py

airflow/example_dags/example_external_task_marker_dag.py Outdated Show resolved Hide resolved
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "timeout"})
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need return here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no return statement then trigger logic won't work in python 3.7. Trigger will do it's logic of checking task status infinitely and won't stop even after yielding the event. Also source code of the base class says that it is required to return None after yield statement.
I couldn't find this in the documentation, but I found it by myself while was implementing trigger for python3.7. I think that return statement should be there so the interpreter would exit from the generator loop when specified event appears else we get an endless loop.

return
if await self.count_tasks() == len(self.execution_dates):
yield TriggerEvent({"status": "success"})
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

await asyncio.sleep(self.poll_interval)
except Exception:
yield TriggerEvent({"status": "failed"})
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@VladaZakharova
Copy link
Contributor

Hi @kaxil !
Is there something else i can improve in the code?

@potiuk
Copy link
Member

potiuk commented Feb 24, 2023

LGTM @kaxil ?

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Apr 11, 2023
@tanvn
Copy link
Contributor

tanvn commented Apr 11, 2023

Just a gentle reminder to @kaxil
As a heavy user of Airflow, I would love to try this feature soon.

@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Apr 12, 2023
@codecae
Copy link
Contributor

codecae commented Apr 12, 2023

Just a gentle reminder to @kaxil As a heavy user of Airflow, I would love to try this feature soon.

Same! :)

@kaxil
Copy link
Member

kaxil commented May 11, 2023

lgtm. Any final thoughts @phanikumv @pankajastro ?

Copy link
Contributor

@phanikumv phanikumv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation about the new functionality here in /docs/apache-airflow/howto/operator/external_task_sensor.rst

airflow/sensors/external_task.py Outdated Show resolved Hide resolved
Copy link
Member

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bkossakowska, would it make sense to add deferrable in ExternalTaskSensor and not create a new sensor? I know at some places we have two sensors/operators one for sync and the other for async but at most of the places we have a deferrable param which controls this behaviour.

@bkossakowska bkossakowska force-pushed the sensor_def_mode branch 3 times, most recently from 0af46d1 to e88ff7e Compare May 18, 2023 10:18
@bkossakowska
Copy link
Contributor Author

Hi @pankajastro, @phanikumv,
thanks for your comments, changes have been added. Is there anything else I can improve?

@@ -38,25 +39,33 @@ class TaskStateTrigger(BaseTrigger):
:param task_id: The task_id that contains the task you want to
wait for. If ``None`` (default value) the sensor waits for the DAG
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statement "If None the sensor waits for the DAG" is untrue, no such behavior is currently implemented in this Trigger

@@ -38,25 +39,33 @@ class TaskStateTrigger(BaseTrigger):
:param task_id: The task_id that contains the task you want to
wait for. If ``None`` (default value) the sensor waits for the DAG
:param states: allowed states, default is ``['success']``
Copy link

@OfSixes OfSixes Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While having the default value be ["success"] would make a lot of sense, no default value is currently set for the states parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, for the states parameter, as it is in the description, we have set it to ["success"] by default.
list(allowed_states) if allowed_states else [TaskInstanceState.SUCCESS.value].

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default is set in the sensor though, in the trigger itself this behavior is not implemented. If someone now wanted to use the trigger seperately, the documentation for it is incorrect.

@bkossakowska bkossakowska force-pushed the sensor_def_mode branch 2 times, most recently from 38a4158 to c6537a5 Compare July 12, 2023 09:39
@VladaZakharova
Copy link
Contributor

Hi @potiuk !
Could you please check the changes? thanks :)

@potiuk potiuk merged commit dda3dcd into apache:main Jul 20, 2023
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Aug 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:system-tests changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)
Projects
None yet
Development

Successfully merging this pull request may close these issues.