Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(providers/azure): cancel pipeline if unexpected exception caught #32238

Conversation

Lee-W
Copy link
Member

@Lee-W Lee-W commented Jun 28, 2023

Issue to resolve

When retrying after the 1st failure, the AzureDataFactoryRunPipelineOperator creates a new pipeline run without canceling the original one. This happens when the triggerer fails unexpectedly and gets caught here.

What's changed

Clean up the previous pipeline wrong if unexpected error caught


I tried to check whether the previous run was still running. But we'll need to pass the job_id from the previous run to the current run. According to #18917, it seems we can not use xcom to achieve it, and I doubt Varaible is a good solution.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

parameters=self.parameters,
)
self.run_id = vars(response)["run_id"]
context["ti"].xcom_push(key="run_id", value=self.run_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a few execute methods of Operators, I see we return to push to xcom. Was wondering if is there any advantage of one over the other.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you return it from execute method it will get stored with the "return_value" key but with context["ti"].xcom_push(...) you can have a custom key name for example run_id in this case

@Lee-W Lee-W force-pushed the check-existing-pipeline-run-for-AzureDataFactoryRunPipelineOperator branch from 6d536fa to 20491f7 Compare June 29, 2023 09:40
@Lee-W Lee-W changed the title feat(providers/azure): check existing pipeline run for AzureDataFactoryRunPipelineOperator when retry feat(providers/azure): cancel pipeline if unexpected exception caught Jun 29, 2023
@Lee-W Lee-W marked this pull request as ready for review June 29, 2023 10:56
Lee-W added 4 commits June 29, 2023 19:05
…ataFactoryRunPipelineOperator when retry"

This reverts commit 20491f76812c222d9b444c6d18b749fedc84f145.
…tion failure

and check whether cancel_pipline_run is called if unexpected exception occur
@Lee-W Lee-W force-pushed the check-existing-pipeline-run-for-AzureDataFactoryRunPipelineOperator branch from 20491f7 to 1d89611 Compare June 29, 2023 11:06
@potiuk potiuk merged commit 2ce51ac into apache:main Jun 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants