From fec1460eb7896da6bfad69e95c92b8e531e35485 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Mon, 2 Jan 2023 03:53:28 +0530 Subject: [PATCH] Push job_id in xcom for dataproc submit job op (#28639) --- airflow/providers/google/cloud/operators/dataproc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index 24ca7de40100d..ed47578e2ef4f 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -1002,6 +1002,7 @@ def execute_complete(self, context, event=None) -> None: if job_state == JobStatus.State.CANCELLED: raise AirflowException(f"Job was cancelled:\n{job_id}") self.log.info("%s completed successfully.", self.task_id) + return job_id def on_kill(self) -> None: """ @@ -1899,7 +1900,6 @@ def execute(self, context: Context): job_id=new_job_id, region=self.region, project_id=self.project_id, timeout=self.wait_timeout ) self.log.info("Job %s completed successfully.", new_job_id) - return self.job_id def execute_complete(self, context, event=None) -> None: @@ -1915,6 +1915,7 @@ def execute_complete(self, context, event=None) -> None: if job_state == JobStatus.State.CANCELLED: raise AirflowException(f"Job was cancelled:\n{job_id}") self.log.info("%s completed successfully.", self.task_id) + return job_id def on_kill(self): if self.job_id and self.cancel_on_kill: