-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a "force" option to emr serverless stop/delete operator #30757
Changes from 3 commits
7fbb07c
a7f25a1
8370c81
ede8b85
d132b70
0232d2e
4b44efe
9f88cc1
af8eb19
76bd915
9ddaafe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1025,6 +1025,9 @@ class EmrServerlessStopApplicationOperator(BaseOperator): | |
the application be stopped. Defaults to 5 minutes. | ||
:param waiter_check_interval_seconds: Number of seconds between polling the state of the application. | ||
Defaults to 30 seconds. | ||
:param force_stop: If set to True, any job for that app that is not in a terminal state will be cancelled. | ||
Otherwise, trying to stop an app with running jobs will return an error. | ||
If you want to wait for the jobs to finish gracefully, use :ref:`_howto/sensor:EmrServerlessJobSensor` | ||
""" | ||
|
||
template_fields: Sequence[str] = ("application_id",) | ||
|
@@ -1036,13 +1039,15 @@ def __init__( | |
aws_conn_id: str = "aws_default", | ||
waiter_countdown: int = 5 * 60, | ||
waiter_check_interval_seconds: int = 30, | ||
force_stop: bool = False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WDYT about updating the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added it to the system test (https://github.com/apache/airflow/pull/30757/files/8370c8172d8c28ffeec3ca460621b9d0447bfba6#diff-69be45953c5be696ca3159bb385a89c90930fc06e68357e8bdb33a1b31694f88R120) inside the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks good |
||
**kwargs, | ||
): | ||
self.aws_conn_id = aws_conn_id | ||
self.application_id = application_id | ||
self.wait_for_completion = wait_for_completion | ||
self.waiter_countdown = waiter_countdown | ||
self.waiter_check_interval_seconds = waiter_check_interval_seconds | ||
self.force_stop = force_stop | ||
super().__init__(**kwargs) | ||
|
||
@cached_property | ||
|
@@ -1052,6 +1057,16 @@ def hook(self) -> EmrServerlessHook: | |
|
||
def execute(self, context: Context) -> None: | ||
self.log.info("Stopping application: %s", self.application_id) | ||
|
||
if self.force_stop: | ||
self.hook.cancel_running_jobs( | ||
self.application_id, | ||
waiter_config={ | ||
"Delay": self.waiter_check_interval_seconds, | ||
"MaxAttempts": self.waiter_countdown / self.waiter_check_interval_seconds, | ||
}, | ||
) | ||
|
||
self.hook.conn.stop_application(applicationId=self.application_id) | ||
|
||
if self.wait_for_completion: | ||
|
@@ -1088,6 +1103,9 @@ class EmrServerlessDeleteApplicationOperator(EmrServerlessStopApplicationOperato | |
the application to be stopped, and then deleted. Defaults to 25 minutes. | ||
:param waiter_check_interval_seconds: Number of seconds between polling the state of the application. | ||
Defaults to 60 seconds. | ||
:param force_stop: If set to True, any job for that app that is not in a terminal state will be cancelled. | ||
Otherwise, trying to delete an app with running jobs will return an error. | ||
If you want to wait for the jobs to finish gracefully, use :ref:`_howto/sensor:EmrServerlessJobSensor` | ||
""" | ||
|
||
template_fields: Sequence[str] = ("application_id",) | ||
|
@@ -1099,6 +1117,7 @@ def __init__( | |
aws_conn_id: str = "aws_default", | ||
waiter_countdown: int = 25 * 60, | ||
waiter_check_interval_seconds: int = 60, | ||
force_stop: bool = False, | ||
**kwargs, | ||
): | ||
self.wait_for_delete_completion = wait_for_completion | ||
|
@@ -1110,6 +1129,7 @@ def __init__( | |
aws_conn_id=aws_conn_id, | ||
waiter_countdown=waiter_countdown, | ||
waiter_check_interval_seconds=waiter_check_interval_seconds, | ||
force_stop=force_stop, | ||
**kwargs, | ||
) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
"version": 2, | ||
"waiters": { | ||
"no_job_running": { | ||
"operation": "ListJobRuns", | ||
"delay": 10, | ||
"maxAttempts": 60, | ||
"acceptors": [ | ||
{ | ||
"matcher": "path", | ||
"argument": "length(jobRuns) == `0`", | ||
"expected": true, | ||
"state": "success" | ||
} | ||
] | ||
Comment on lines
+8
to
+15
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like too much the fact that I don't have a failure case for this waiter, but I think there is nothing we can do about it... |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
|
||
from airflow.models.baseoperator import chain | ||
from airflow.models.dag import DAG | ||
from airflow.providers.amazon.aws.hooks.emr import EmrServerlessHook | ||
from airflow.providers.amazon.aws.operators.emr import ( | ||
EmrServerlessCreateApplicationOperator, | ||
EmrServerlessDeleteApplicationOperator, | ||
|
@@ -100,13 +101,14 @@ | |
configuration_overrides=SPARK_CONFIGURATION_OVERRIDES, | ||
) | ||
# [END howto_operator_emr_serverless_start_job] | ||
start_job.waiter_check_interval_seconds = 10 | ||
start_job.wait_for_completion = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of waiting for the job to finish, we just trigger it and continue... |
||
|
||
# [START howto_sensor_emr_serverless_job] | ||
wait_for_job = EmrServerlessJobSensor( | ||
task_id="wait_for_job", | ||
application_id=emr_serverless_app_id, | ||
job_run_id=start_job.output, | ||
target_states=EmrServerlessHook.JOB_INTERMEDIATE_STATES, | ||
vandonr-amz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
# [END howto_sensor_emr_serverless_job] | ||
wait_for_job.poke_interval = 10 | ||
|
@@ -115,6 +117,7 @@ | |
stop_app = EmrServerlessStopApplicationOperator( | ||
task_id="stop_application", | ||
application_id=emr_serverless_app_id, | ||
force_stop=True, | ||
) | ||
# [END howto_operator_emr_serverless_stop_application] | ||
stop_app.waiter_check_interval_seconds = 1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nothing to do with the sauce, but I took the opportunity to clean this as I was around