Skip to content

Commit

Permalink
Fix Google Provider Link Deprecations
Browse files Browse the repository at this point in the history
The Dataproc Links have been deprecated, but they were deprecated
wrongly - the Deprecation Warnings were always raised when the
dataproc module has been deprecated, but only few classes there
were deprecated. This was because the warnings were added as
class-level warnings rather than as warnings in constructors.

Also the deprecated classes have still been used in operators, raising
the warnings even if the deprecation warnings have been moved
from class to it's constructor.

This PR fixes that:

* moves deprecation warnings from classes to constructors
* replaces usage of deprecated links with those links that
  replaced the deprecated ones

Found during implementing of apache#33640
  • Loading branch information
potiuk committed Aug 23, 2023
1 parent 911cf46 commit acf2921
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 43 deletions.
20 changes: 15 additions & 5 deletions airflow/providers/google/cloud/links/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,14 @@ class DataprocLink(BaseOperatorLink):
This link is deprecated.
"""

warnings.warn(
"This DataprocLink is deprecated.",
AirflowProviderDeprecationWarning,
)
def __init__(self, *args, **kwargs):
raise Exception()
warnings.warn(
"DataprocLink is deprecated. Please use Dataproc*Link classes",
AirflowProviderDeprecationWarning,
)
super().__init__(*args, **kwargs)

name = "Dataproc resource"
key = "conf"

Expand Down Expand Up @@ -125,7 +129,13 @@ class DataprocListLink(BaseOperatorLink):
This link is deprecated.
"""

warnings.warn("This DataprocListLink is deprecated.", AirflowProviderDeprecationWarning)
def __init__(self, *args, **kwargs):
warnings.warn(
"DataprocListLink is deprecated. Please use Dataproc*ListLink classes",
AirflowProviderDeprecationWarning,
)
super().__init__(*args, **kwargs)

name = "Dataproc resources"
key = "list_conf"

Expand Down
27 changes: 14 additions & 13 deletions airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.links.dataproc import (
DATAPROC_BATCH_LINK,
DATAPROC_CLUSTER_LINK_DEPRECATED,
DATAPROC_JOB_LINK_DEPRECATED,
DataprocBatchesListLink,
DataprocBatchLink,
DataprocClusterLink,
DataprocJobLink,
DataprocLink,
DataprocWorkflowLink,
DataprocWorkflowTemplateLink,
)
Expand Down Expand Up @@ -742,7 +739,7 @@ class DataprocScaleClusterOperator(GoogleCloudBaseOperator):

template_fields: Sequence[str] = ("cluster_name", "project_id", "region", "impersonation_chain")

operator_extra_links = (DataprocLink(),)
operator_extra_links = (DataprocClusterLink(),)

def __init__(
self,
Expand Down Expand Up @@ -821,12 +818,15 @@ def execute(self, context: Context) -> None:
update_mask = ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]

hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
# Hook always has a project_id as fallback so we can ignore assignment
project_id: str = self.project_id if self.project_id else hook.project_id # type: ignore[assignment]
# Save data required to display extra link no matter what the cluster status will be
DataprocLink.persist(
DataprocClusterLink.persist(
context=context,
task_instance=self,
url=DATAPROC_CLUSTER_LINK_DEPRECATED,
resource=self.cluster_name,
operator=self,
cluster_id=self.cluster_name,
project_id=project_id,
region=self.region,
)
operation = hook.update_cluster(
project_id=self.project_id,
Expand Down Expand Up @@ -1000,7 +1000,7 @@ class DataprocJobBaseOperator(GoogleCloudBaseOperator):

job_type = ""

operator_extra_links = (DataprocLink(),)
operator_extra_links = (DataprocJobLink(),)

def __init__(
self,
Expand Down Expand Up @@ -1034,7 +1034,8 @@ def __init__(
self.job_error_states = job_error_states if job_error_states is not None else {"ERROR"}
self.impersonation_chain = impersonation_chain
self.hook = DataprocHook(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain)
self.project_id = self.hook.project_id if project_id is None else project_id
# Hook project id is used as fallback so we can ignore assignment
self.project_id: str = project_id if project_id else self.hook.project_id # type: ignore[assignment]
self.job_template: DataProcJobBuilder | None = None
self.job: dict | None = None
self.dataproc_job_id = None
Expand Down Expand Up @@ -1081,8 +1082,8 @@ def execute(self, context: Context):
job_id = job_object.reference.job_id
self.log.info("Job %s submitted successfully.", job_id)
# Save data required for extra links no matter what the job status will be
DataprocLink.persist(
context=context, task_instance=self, url=DATAPROC_JOB_LINK_DEPRECATED, resource=job_id
DataprocJobLink.persist(
context=context, operator=self, job_id=job_id, project_id=self.project_id, region=self.region
)

if self.deferrable:
Expand Down Expand Up @@ -1184,7 +1185,7 @@ class DataprocSubmitPigJobOperator(DataprocJobBaseOperator):
ui_color = "#0273d4"
job_type = "pig_job"

operator_extra_links = (DataprocLink(),)
operator_extra_links = (DataprocJobLink(),)

def __init__(
self,
Expand Down
2 changes: 0 additions & 2 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1089,8 +1089,6 @@ extra-links:
- airflow.providers.google.cloud.links.datacatalog.DataCatalogEntryGroupLink
- airflow.providers.google.cloud.links.datacatalog.DataCatalogEntryLink
- airflow.providers.google.cloud.links.datacatalog.DataCatalogTagTemplateLink
- airflow.providers.google.cloud.links.dataproc.DataprocLink
- airflow.providers.google.cloud.links.dataproc.DataprocListLink
- airflow.providers.google.cloud.links.dataproc.DataprocClusterLink
- airflow.providers.google.cloud.links.dataproc.DataprocJobLink
- airflow.providers.google.cloud.links.dataproc.DataprocWorkflowLink
Expand Down
43 changes: 20 additions & 23 deletions tests/providers/google/cloud/operators/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
)
from airflow.models import DAG, DagBag
from airflow.providers.google.cloud.links.dataproc import (
DATAPROC_CLUSTER_LINK_DEPRECATED,
DATAPROC_JOB_LINK_DEPRECATED,
DataprocClusterLink,
DataprocJobLink,
DataprocWorkflowLink,
Expand All @@ -49,7 +47,6 @@
DataprocGetBatchOperator,
DataprocInstantiateInlineWorkflowTemplateOperator,
DataprocInstantiateWorkflowTemplateOperator,
DataprocLink,
DataprocListBatchesOperator,
DataprocScaleClusterOperator,
DataprocSubmitHadoopJobOperator,
Expand Down Expand Up @@ -242,21 +239,19 @@
f"project={GCP_PROJECT}"
)
DATAPROC_JOB_CONF_EXPECTED = {
"resource": TEST_JOB_ID,
"job_id": TEST_JOB_ID,
"region": GCP_REGION,
"project_id": GCP_PROJECT,
"url": DATAPROC_JOB_LINK_DEPRECATED,
}
DATAPROC_JOB_EXPECTED = {
"job_id": TEST_JOB_ID,
"region": GCP_REGION,
"project_id": GCP_PROJECT,
}
DATAPROC_CLUSTER_CONF_EXPECTED = {
"resource": CLUSTER_NAME,
"cluster_id": CLUSTER_NAME,
"region": GCP_REGION,
"project_id": GCP_PROJECT,
"url": DATAPROC_CLUSTER_LINK_DEPRECATED,
}
DATAPROC_CLUSTER_EXPECTED = {
"cluster_id": CLUSTER_NAME,
Expand Down Expand Up @@ -781,7 +776,9 @@ class TestDataprocClusterScaleOperator(DataprocClusterTestBase):
def setup_class(cls):
super().setup_class()
cls.extra_links_expected_calls_base = [
call.ti.xcom_push(execution_date=None, key="conf", value=DATAPROC_CLUSTER_CONF_EXPECTED)
call.ti.xcom_push(
execution_date=None, key="dataproc_cluster", value=DATAPROC_CLUSTER_CONF_EXPECTED
)
]

def test_deprecation_warning(self):
Expand Down Expand Up @@ -827,7 +824,7 @@ def test_execute(self, mock_hook):
self.extra_links_manager_mock.assert_has_calls(expected_calls, any_order=False)

self.mock_ti.xcom_push.assert_called_once_with(
key="conf",
key="dataproc_cluster",
value=DATAPROC_CLUSTER_CONF_EXPECTED,
execution_date=None,
)
Expand Down Expand Up @@ -855,28 +852,28 @@ def test_scale_cluster_operator_extra_links(dag_maker, create_task_instance_of_o

# Assert operator links for serialized DAG
assert serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] == [
{"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}}
{"airflow.providers.google.cloud.links.dataproc.DataprocClusterLink": {}}
]

# Assert operator link types are preserved during deserialization
assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink)
assert isinstance(deserialized_task.operator_extra_links[0], DataprocClusterLink)

# Assert operator link is empty when no XCom push occurred
assert ti.task.get_extra_links(ti, DataprocLink.name) == ""
assert ti.task.get_extra_links(ti, DataprocClusterLink.name) == ""

# Assert operator link is empty for deserialized task when no XCom push occurred
assert deserialized_task.get_extra_links(ti, DataprocLink.name) == ""
assert deserialized_task.get_extra_links(ti, DataprocClusterLink.name) == ""

ti.xcom_push(
key="conf",
value=DATAPROC_CLUSTER_CONF_EXPECTED,
)

# Assert operator links are preserved in deserialized tasks after execution
assert deserialized_task.get_extra_links(ti, DataprocLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED
assert deserialized_task.get_extra_links(ti, DataprocClusterLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED

# Assert operator links after execution
assert ti.task.get_extra_links(ti, DataprocLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED
assert ti.task.get_extra_links(ti, DataprocClusterLink.name) == DATAPROC_CLUSTER_LINK_EXPECTED


class TestDataprocClusterDeleteOperator:
Expand Down Expand Up @@ -1817,7 +1814,7 @@ class TestDataProcSparkOperator(DataprocJobTestBase):
@classmethod
def setup_class(cls):
cls.extra_links_expected_calls = [
call.ti.xcom_push(execution_date=None, key="conf", value=DATAPROC_JOB_CONF_EXPECTED),
call.ti.xcom_push(execution_date=None, key="dataproc_job", value=DATAPROC_JOB_CONF_EXPECTED),
call.hook().wait_for_job(job_id=TEST_JOB_ID, region=GCP_REGION, project_id=GCP_PROJECT),
]

Expand Down Expand Up @@ -1864,7 +1861,7 @@ def test_execute(self, mock_hook, mock_uuid):

op.execute(context=self.mock_context)
self.mock_ti.xcom_push.assert_called_once_with(
key="conf", value=DATAPROC_JOB_CONF_EXPECTED, execution_date=None
key="dataproc_job", value=DATAPROC_JOB_CONF_EXPECTED, execution_date=None
)

# Test whether xcom push occurs before polling for job
Expand Down Expand Up @@ -1893,25 +1890,25 @@ def test_submit_spark_job_operator_extra_links(mock_hook, dag_maker, create_task

# Assert operator links for serialized DAG
assert serialized_dag["dag"]["tasks"][0]["_operator_extra_links"] == [
{"airflow.providers.google.cloud.links.dataproc.DataprocLink": {}}
{"airflow.providers.google.cloud.links.dataproc.DataprocJobLink": {}}
]

# Assert operator link types are preserved during deserialization
assert isinstance(deserialized_task.operator_extra_links[0], DataprocLink)
assert isinstance(deserialized_task.operator_extra_links[0], DataprocJobLink)

# Assert operator link is empty when no XCom push occurred
assert ti.task.get_extra_links(ti, DataprocLink.name) == ""
assert ti.task.get_extra_links(ti, DataprocJobLink.name) == ""

# Assert operator link is empty for deserialized task when no XCom push occurred
assert deserialized_task.get_extra_links(ti, DataprocLink.name) == ""
assert deserialized_task.get_extra_links(ti, DataprocJobLink.name) == ""

ti.xcom_push(key="conf", value=DATAPROC_JOB_CONF_EXPECTED)

# Assert operator links after task execution
assert ti.task.get_extra_links(ti, DataprocLink.name) == DATAPROC_JOB_LINK_EXPECTED
assert ti.task.get_extra_links(ti, DataprocJobLink.name) == DATAPROC_JOB_LINK_EXPECTED

# Assert operator links are preserved in deserialized tasks
link = deserialized_task.get_extra_links(ti, DataprocLink.name)
link = deserialized_task.get_extra_links(ti, DataprocJobLink.name)
assert link == DATAPROC_JOB_LINK_EXPECTED


Expand Down

0 comments on commit acf2921

Please sign in to comment.