From acf2921893084baa350fd010ad1a3610f0af6d7b Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 23 Aug 2023 08:34:26 +0200 Subject: [PATCH] Fix Google Provider Link Deprecations The Dataproc Links have been deprecated, but they were deprecated wrongly - the Deprecation Warnings were always raised when the dataproc module has been deprecated, but only few classes there were deprecated. This was because the warnings were added as class-level warnings rather than as warnings in constructors. Also the deprecated classes have still been used in operators, raising the warnings even if the deprecation warnings have been moved from class to it's constructor. This PR fixes that: * moves deprecation warnings from classes to constructors * replaces usage of deprecated links with those links that replaced the deprecated ones Found during implementing of #33640 --- .../providers/google/cloud/links/dataproc.py | 20 ++++++--- .../google/cloud/operators/dataproc.py | 27 ++++++------ airflow/providers/google/provider.yaml | 2 - .../google/cloud/operators/test_dataproc.py | 43 +++++++++---------- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/airflow/providers/google/cloud/links/dataproc.py b/airflow/providers/google/cloud/links/dataproc.py index 917a1a7731cc7..2c54f96c39fa5 100644 --- a/airflow/providers/google/cloud/links/dataproc.py +++ b/airflow/providers/google/cloud/links/dataproc.py @@ -76,10 +76,14 @@ class DataprocLink(BaseOperatorLink): This link is deprecated. """ - warnings.warn( - "This DataprocLink is deprecated.", - AirflowProviderDeprecationWarning, - ) + def __init__(self, *args, **kwargs): + raise Exception() + warnings.warn( + "DataprocLink is deprecated. Please use Dataproc*Link classes", + AirflowProviderDeprecationWarning, + ) + super().__init__(*args, **kwargs) + name = "Dataproc resource" key = "conf" @@ -125,7 +129,13 @@ class DataprocListLink(BaseOperatorLink): This link is deprecated. """ - warnings.warn("This DataprocListLink is deprecated.", AirflowProviderDeprecationWarning) + def __init__(self, *args, **kwargs): + warnings.warn( + "DataprocListLink is deprecated. Please use Dataproc*ListLink classes", + AirflowProviderDeprecationWarning, + ) + super().__init__(*args, **kwargs) + name = "Dataproc resources" key = "list_conf" diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index 09e76ae2069fe..d842a4595862f 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -43,13 +43,10 @@ from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.links.dataproc import ( DATAPROC_BATCH_LINK, - DATAPROC_CLUSTER_LINK_DEPRECATED, - DATAPROC_JOB_LINK_DEPRECATED, DataprocBatchesListLink, DataprocBatchLink, DataprocClusterLink, DataprocJobLink, - DataprocLink, DataprocWorkflowLink, DataprocWorkflowTemplateLink, ) @@ -742,7 +739,7 @@ class DataprocScaleClusterOperator(GoogleCloudBaseOperator): template_fields: Sequence[str] = ("cluster_name", "project_id", "region", "impersonation_chain") - operator_extra_links = (DataprocLink(),) + operator_extra_links = (DataprocClusterLink(),) def __init__( self, @@ -821,12 +818,15 @@ def execute(self, context: Context) -> None: update_mask = ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"] hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) + # Hook always has a project_id as fallback so we can ignore assignment + project_id: str = self.project_id if self.project_id else hook.project_id # type: ignore[assignment] # Save data required to display extra link no matter what the cluster status will be - DataprocLink.persist( + DataprocClusterLink.persist( context=context, - task_instance=self, - url=DATAPROC_CLUSTER_LINK_DEPRECATED, - resource=self.cluster_name, + operator=self, + cluster_id=self.cluster_name, + project_id=project_id, + region=self.region, ) operation = hook.update_cluster( project_id=self.project_id, @@ -1000,7 +1000,7 @@ class DataprocJobBaseOperator(GoogleCloudBaseOperator): job_type = "" - operator_extra_links = (DataprocLink(),) + operator_extra_links = (DataprocJobLink(),) def __init__( self, @@ -1034,7 +1034,8 @@ def __init__( self.job_error_states = job_error_states if job_error_states is not None else {"ERROR"} self.impersonation_chain = impersonation_chain self.hook = DataprocHook(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain) - self.project_id = self.hook.project_id if project_id is None else project_id + # Hook project id is used as fallback so we can ignore assignment + self.project_id: str = project_id if project_id else self.hook.project_id # type: ignore[assignment] self.job_template: DataProcJobBuilder | None = None self.job: dict | None = None self.dataproc_job_id = None @@ -1081,8 +1082,8 @@ def execute(self, context: Context): job_id = job_object.reference.job_id self.log.info("Job %s submitted successfully.", job_id) # Save data required for extra links no matter what the job status will be - DataprocLink.persist( - context=context, task_instance=self, url=DATAPROC_JOB_LINK_DEPRECATED, resource=job_id + DataprocJobLink.persist( + context=context, operator=self, job_id=job_id, project_id=self.project_id, region=self.region ) if self.deferrable: @@ -1184,7 +1185,7 @@ class DataprocSubmitPigJobOperator(DataprocJobBaseOperator): ui_color = "#0273d4" job_type = "pig_job" - operator_extra_links = (DataprocLink(),) + operator_extra_links = (DataprocJobLink(),) def __init__( self, diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 29fd2c8073a76..730f0b2a5e861 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -1089,8 +1089,6 @@ extra-links: - airflow.providers.google.cloud.links.datacatalog.DataCatalogEntryGroupLink - airflow.providers.google.cloud.links.datacatalog.DataCatalogEntryLink - airflow.providers.google.cloud.links.datacatalog.DataCatalogTagTemplateLink - - airflow.providers.google.cloud.links.dataproc.DataprocLink - - airflow.providers.google.cloud.links.dataproc.DataprocListLink - airflow.providers.google.cloud.links.dataproc.DataprocClusterLink - airflow.providers.google.cloud.links.dataproc.DataprocJobLink - airflow.providers.google.cloud.links.dataproc.DataprocWorkflowLink diff --git a/tests/providers/google/cloud/operators/test_dataproc.py b/tests/providers/google/cloud/operators/test_dataproc.py index 6ddaec8be8928..8eafa6a00be42 100644 --- a/tests/providers/google/cloud/operators/test_dataproc.py +++ b/tests/providers/google/cloud/operators/test_dataproc.py @@ -33,8 +33,6 @@ ) from airflow.models import DAG, DagBag from airflow.providers.google.cloud.links.dataproc import ( - DATAPROC_CLUSTER_LINK_DEPRECATED, - DATAPROC_JOB_LINK_DEPRECATED, DataprocClusterLink, DataprocJobLink, DataprocWorkflowLink, @@ -49,7 +47,6 @@ DataprocGetBatchOperator, DataprocInstantiateInlineWorkflowTemplateOperator, DataprocInstantiateWorkflowTemplateOperator, - DataprocLink, DataprocListBatchesOperator, DataprocScaleClusterOperator, DataprocSubmitHadoopJobOperator, @@ -242,10 +239,9 @@ f"project={GCP_PROJECT}" ) DATAPROC_JOB_CONF_EXPECTED = { - "resource": TEST_JOB_ID, + "job_id": TEST_JOB_ID, "region": GCP_REGION, "project_id": GCP_PROJECT, - "url": DATAPROC_JOB_LINK_DEPRECATED, } DATAPROC_JOB_EXPECTED = { "job_id": TEST_JOB_ID, @@ -253,10 +249,9 @@ "project_id": GCP_PROJECT, } DATAPROC_CLUSTER_CONF_EXPECTED = { - "resource": CLUSTER_NAME, + "cluster_id": CLUSTER_NAME, "region": GCP_REGION, "project_id": GCP_PROJECT, - "url": DATAPROC_CLUSTER_LINK_DEPRECATED, } DATAPROC_CLUSTER_EXPECTED = { "cluster_id": CLUSTER_NAME, @@ -781,7 +776,9 @@ class TestDataprocClusterScaleOperator(DataprocClusterTestBase): def setup_class(cls): super().setup_class() cls.extra_links_expected_calls_base = [ - call.ti.xcom_push(execution_date=None, key="conf", value=DATAPROC_CLUSTER_CONF_EXPECTED) + call.ti.xcom_push( + execution_date=None, key="dataproc_cluster", value=DATAPROC_CLUSTER_CONF_EXPECTED + ) ] def test_deprecation_warning(self): @@ -827,7 +824,7 @@ def test_execute(self, mock_hook): self.extra_links_manager_mock.assert_has_calls(expected_calls, any_order=False) self.mock_ti.xcom_push.assert_called_once_with( - key="conf", + key="dataproc_cluster", value=DATAPROC_CLUSTER_CONF_EXPECTED, execution_date=None, ) @@ -855,17 +852,17 @@ def test_scale_cluster_operator_extra_links(dag_maker, create_task_instance_of_o # Assert operator links for serialized DAG assert serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}} + {"airflow.providers.google.cloud.links.dataproc.DataprocClusterLink": {}} ] # Assert operator link types are preserved during deserialization - assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink) + assert isinstance(deserialized_task.operator_extra_links[0], DataprocClusterLink) # Assert operator link is empty when no XCom push occurred - assert ti.task.get_extra_links(ti, DataprocLink.name) == "" + assert ti.task.get_extra_links(ti, DataprocClusterLink.name) == "" # Assert operator link is empty for deserialized task when no XCom push occurred - assert deserialized_task.get_extra_links(ti, DataprocLink.name) == "" + assert deserialized_task.get_extra_links(ti, DataprocClusterLink.name) == "" ti.xcom_push( key="conf", @@ -873,10 +870,10 @@ def test_scale_cluster_operator_extra_links(dag_maker, create_task_instance_of_o ) # Assert operator links are preserved in deserialized tasks after execution - assert deserialized_task.get_extra_links(ti, DataprocLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED + assert deserialized_task.get_extra_links(ti, DataprocClusterLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED # Assert operator links after execution - assert ti.task.get_extra_links(ti, DataprocLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED + assert ti.task.get_extra_links(ti, DataprocClusterLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED class TestDataprocClusterDeleteOperator: @@ -1817,7 +1814,7 @@ class TestDataProcSparkOperator(DataprocJobTestBase): @classmethod def setup_class(cls): cls.extra_links_expected_calls = [ - call.ti.xcom_push(execution_date=None, key="conf", value=DATAPROC_JOB_CONF_EXPECTED), + call.ti.xcom_push(execution_date=None, key="dataproc_job", value=DATAPROC_JOB_CONF_EXPECTED), call.hook().wait_for_job(job_id=TEST_JOB_ID, region=GCP_REGION, project_id=GCP_PROJECT), ] @@ -1864,7 +1861,7 @@ def test_execute(self, mock_hook, mock_uuid): op.execute(context=self.mock_context) self.mock_ti.xcom_push.assert_called_once_with( - key="conf", value=DATAPROC_JOB_CONF_EXPECTED, execution_date=None + key="dataproc_job", value=DATAPROC_JOB_CONF_EXPECTED, execution_date=None ) # Test whether xcom push occurs before polling for job @@ -1893,25 +1890,25 @@ def test_submit_spark_job_operator_extra_links(mock_hook, dag_maker, create_task # Assert operator links for serialized DAG assert serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] == [ - {"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}} + {"airflow.providers.google.cloud.links.dataproc.DataprocJobLink": {}} ] # Assert operator link types are preserved during deserialization - assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink) + assert isinstance(deserialized_task.operator_extra_links[0], DataprocJobLink) # Assert operator link is empty when no XCom push occurred - assert ti.task.get_extra_links(ti, DataprocLink.name) == "" + assert ti.task.get_extra_links(ti, DataprocJobLink.name) == "" # Assert operator link is empty for deserialized task when no XCom push occurred - assert deserialized_task.get_extra_links(ti, DataprocLink.name) == "" + assert deserialized_task.get_extra_links(ti, DataprocJobLink.name) == "" ti.xcom_push(key="conf", value=DATAPROC_JOB_CONF_EXPECTED) # Assert operator links after task execution - assert ti.task.get_extra_links(ti, DataprocLink.name) == DATAPROC_JOB_LINK_EXPECTED + assert ti.task.get_extra_links(ti, DataprocJobLink.name) == DATAPROC_JOB_LINK_EXPECTED # Assert operator links are preserved in deserialized tasks - link = deserialized_task.get_extra_links(ti, DataprocLink.name) + link = deserialized_task.get_extra_links(ti, DataprocJobLink.name) assert link == DATAPROC_JOB_LINK_EXPECTED