-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add deferrable mode to ExternalTaskSensor #29260
Conversation
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
from __future__ import annotations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added Triggers for external_task
, can you please rebase on Main.
For sensors, I would be grateful if you can help the community by maintaining compatibility with astronomer-providers -- There are community members as well as our users using it https://pypistats.org/packages/astronomer-providers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for great implementation os the trigger, i have updated the sensor to use it.
However, after some testing i have found some moments that could be improved in the trigger:
- There is no possibility to process several external tasks at once, using external_task_ids parameter. For that reason ExternalTaskSensor now supports only processing one task per run. Can you please take a look on it and add the possibility?;
- If you pass name of the Dag that does not exist at all, the trigger will run infinitely trying to query task from that Dag. I have added one additional parameter to set a deadline for this search to prevent infinitive loop. If it will not find specific Dag for a minute, it will terminate.
363bd06
to
27d5a7e
Compare
Hi @kaxil ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lgtm, some minor nits. Could you add unit tests to the Async Sensor please similar to the ones in https://github.com/astronomer/astronomer-providers/blob/main/tests/core/sensors/test_external_task.py
await asyncio.sleep(self.poll_interval) | ||
else: | ||
yield TriggerEvent({"status": "timeout"}) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we need return here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no return statement then trigger logic won't work in python 3.7. Trigger will do it's logic of checking task status infinitely and won't stop even after yielding the event. Also source code of the base class says that it is required to return None after yield statement.
I couldn't find this in the documentation, but I found it by myself while was implementing trigger for python3.7. I think that return statement should be there so the interpreter would exit from the generator loop when specified event appears else we get an endless loop.
return | ||
if await self.count_tasks() == len(self.execution_dates): | ||
yield TriggerEvent({"status": "success"}) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
await asyncio.sleep(self.poll_interval) | ||
except Exception: | ||
yield TriggerEvent({"status": "failed"}) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
Hi @kaxil ! |
LGTM @kaxil ? |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
Just a gentle reminder to @kaxil |
Same! :) |
lgtm. Any final thoughts @phanikumv @pankajastro ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add documentation about the new functionality here in /docs/apache-airflow/howto/operator/external_task_sensor.rst
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @bkossakowska, would it make sense to add deferrable
in ExternalTaskSensor
and not create a new sensor? I know at some places we have two sensors/operators one for sync and the other for async but at most of the places we have a deferrable
param which controls this behaviour.
0af46d1
to
e88ff7e
Compare
Hi @pankajastro, @phanikumv, |
e88ff7e
to
54d6c58
Compare
airflow/triggers/external_task.py
Outdated
@@ -38,25 +39,33 @@ class TaskStateTrigger(BaseTrigger): | |||
:param task_id: The task_id that contains the task you want to | |||
wait for. If ``None`` (default value) the sensor waits for the DAG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The statement "If None
the sensor waits for the DAG" is untrue, no such behavior is currently implemented in this Trigger
@@ -38,25 +39,33 @@ class TaskStateTrigger(BaseTrigger): | |||
:param task_id: The task_id that contains the task you want to | |||
wait for. If ``None`` (default value) the sensor waits for the DAG | |||
:param states: allowed states, default is ``['success']`` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While having the default value be ["success"]
would make a lot of sense, no default value is currently set for the states
parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, for the states
parameter, as it is in the description, we have set it to ["success"]
by default.
list(allowed_states) if allowed_states else [TaskInstanceState.SUCCESS.value].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This default is set in the sensor though, in the trigger itself this behavior is not implemented. If someone now wanted to use the trigger seperately, the documentation for it is incorrect.
38a4158
to
c6537a5
Compare
c6537a5
to
9c314d1
Compare
Hi @potiuk ! |
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.