From 8d9f8175211f67a237b009897100545905be4036 Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Fri, 7 Feb 2025 15:59:30 +0100 Subject: [PATCH] fix: Dataproc operators fail to import without OpenLineage Signed-off-by: Kacper Muda --- .../google/cloud/openlineage/facets.py | 62 ++++++++++++++ .../google/cloud/openlineage/mixins.py | 4 +- .../google/cloud/openlineage/utils.py | 29 ------- .../google/cloud/operators/dataproc.py | 84 ++++++++++++++----- .../google/cloud/openlineage/test_facets.py | 26 ++++++ .../google/cloud/openlineage/test_mixins.py | 4 +- 6 files changed, 153 insertions(+), 56 deletions(-) create mode 100644 providers/google/src/airflow/providers/google/cloud/openlineage/facets.py create mode 100644 providers/google/tests/provider_tests/google/cloud/openlineage/test_facets.py diff --git a/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py b/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py new file mode 100644 index 0000000000000..923c4ea1f9325 --- /dev/null +++ b/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from attr import define, field + +from airflow.providers.google import __version__ as provider_version + +try: + try: + from openlineage.client.generated.base import RunFacet + except ImportError: # Old OpenLineage client is used + from openlineage.client.facet import BaseFacet as RunFacet + + @define + class BigQueryJobRunFacet(RunFacet): + """ + Facet that represents relevant statistics of bigquery run. + + :param cached: BigQuery caches query results. Rest of the statistics will not be provided for cached queries. + :param billedBytes: How many bytes BigQuery bills for. + :param properties: Full property tree of BigQUery run. + """ + + cached: bool + billedBytes: int | None = field(default=None) + properties: str | None = field(default=None) + + @staticmethod + def _get_schema() -> str: + return ( + "https://raw.githubusercontent.com/apache/airflow/" + f"providers-google/{provider_version}/airflow/providers/google/" + "openlineage/BigQueryJobRunFacet.json" + ) +except ImportError: # OpenLineage is not available + + def create_no_op(*_, **__) -> None: + """ + Create a no-op placeholder. + + This function creates and returns a None value, used as a placeholder when the OpenLineage client + library is available. It represents an action that has no effect. + """ + return None + + BigQueryJobRunFacet = create_no_op diff --git a/providers/google/src/airflow/providers/google/cloud/openlineage/mixins.py b/providers/google/src/airflow/providers/google/cloud/openlineage/mixins.py index c9d1bb009fb4c..fd71655663772 100644 --- a/providers/google/src/airflow/providers/google/cloud/openlineage/mixins.py +++ b/providers/google/src/airflow/providers/google/cloud/openlineage/mixins.py @@ -38,7 +38,6 @@ ) from airflow.providers.google.cloud.openlineage.utils import ( BIGQUERY_NAMESPACE, - BigQueryJobRunFacet, get_facets_from_bq_table, get_from_nullable_chain, get_identity_column_lineage_facet, @@ -48,6 +47,7 @@ if TYPE_CHECKING: from airflow.providers.common.compat.openlineage.facet import Dataset, RunFacet + from airflow.providers.google.cloud.openlineage.facets import BigQueryJobRunFacet class _BigQueryInsertJobOperatorOpenLineageMixin: @@ -316,6 +316,8 @@ def _get_inputs_and_outputs_for_extract_job( @staticmethod def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet: + from airflow.providers.google.cloud.openlineage.facets import BigQueryJobRunFacet + job_type = get_from_nullable_chain(properties, ["configuration", "jobType"]) cache_hit, billed_bytes = None, None if job_type == "QUERY": diff --git a/providers/google/src/airflow/providers/google/cloud/openlineage/utils.py b/providers/google/src/airflow/providers/google/cloud/openlineage/utils.py index 4c01ab476ed35..59f2b2cc8bce0 100644 --- a/providers/google/src/airflow/providers/google/cloud/openlineage/utils.py +++ b/providers/google/src/airflow/providers/google/cloud/openlineage/utils.py @@ -26,8 +26,6 @@ from collections.abc import Iterable from typing import TYPE_CHECKING, Any -from attr import define, field - from airflow.providers.common.compat.openlineage.facet import ( ColumnLineageDatasetFacet, DatasetFacet, @@ -35,7 +33,6 @@ Fields, Identifier, InputField, - RunFacet, SchemaDatasetFacet, SchemaDatasetFacetFields, SymlinksDatasetFacet, @@ -44,7 +41,6 @@ inject_parent_job_information_into_spark_properties, inject_transport_information_into_spark_properties, ) -from airflow.providers.google import __version__ as provider_version from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url from google.cloud.dataproc_v1 import Batch, RuntimeConfig @@ -317,31 +313,6 @@ def get_identity_column_lineage_facet( return {"columnLineage": column_lineage_facet} -@define -class BigQueryJobRunFacet(RunFacet): - """ - Facet that represents relevant statistics of bigquery run. - - This facet is used to provide statistics about bigquery run. - - :param cached: BigQuery caches query results. Rest of the statistics will not be provided for cached queries. - :param billedBytes: How many bytes BigQuery bills for. - :param properties: Full property tree of BigQUery run. - """ - - cached: bool - billedBytes: int | None = field(default=None) - properties: str | None = field(default=None) - - @staticmethod - def _get_schema() -> str: - return ( - "https://raw.githubusercontent.com/apache/airflow/" - f"providers-google/{provider_version}/airflow/providers/google/" - "openlineage/BigQueryJobRunFacet.json" - ) - - def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None: """ Get object from nested structure of objects, where it's not guaranteed that all keys in the nested structure exist. diff --git a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py index f6f7deb31bd47..3ceb773fd64f4 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/dataproc.py @@ -49,11 +49,6 @@ DataprocWorkflowLink, DataprocWorkflowTemplateLink, ) -from airflow.providers.google.cloud.openlineage.utils import ( - inject_openlineage_properties_into_dataproc_batch, - inject_openlineage_properties_into_dataproc_job, - inject_openlineage_properties_into_dataproc_workflow_template, -) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.triggers.dataproc import ( DataprocBatchTrigger, @@ -1858,12 +1853,7 @@ def execute(self, context: Context): project_id = self.project_id or hook.project_id if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info: self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.") - self.template = inject_openlineage_properties_into_dataproc_workflow_template( - template=self.template, - context=context, - inject_parent_job_info=self.openlineage_inject_parent_job_info, - inject_transport_info=self.openlineage_inject_transport_info, - ) + self._inject_openlineage_properties_into_dataproc_workflow_template(context) operation = hook.instantiate_inline_workflow_template( template=self.template, @@ -1920,6 +1910,25 @@ def on_kill(self) -> None: hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) hook.get_operations_client(region=self.region).cancel_operation(name=self.operation_name) + def _inject_openlineage_properties_into_dataproc_workflow_template(self, context: Context) -> None: + try: + from airflow.providers.google.cloud.openlineage.utils import ( + inject_openlineage_properties_into_dataproc_workflow_template, + ) + + self.template = inject_openlineage_properties_into_dataproc_workflow_template( + template=self.template, + context=context, + inject_parent_job_info=self.openlineage_inject_parent_job_info, + inject_transport_info=self.openlineage_inject_transport_info, + ) + except Exception as e: + self.log.warning( + "An error occurred while trying to inject OpenLineage information. " + "Dataproc template has not been modified by OpenLineage.", + exc_info=e, + ) + class DataprocSubmitJobOperator(GoogleCloudBaseOperator): """ @@ -2017,12 +2026,8 @@ def execute(self, context: Context): self.hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info: self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.") - self.job = inject_openlineage_properties_into_dataproc_job( - job=self.job, - context=context, - inject_parent_job_info=self.openlineage_inject_parent_job_info, - inject_transport_info=self.openlineage_inject_transport_info, - ) + self._inject_openlineage_properties_into_dataproc_job(context) + job_object = self.hook.submit_job( project_id=self.project_id, region=self.region, @@ -2096,6 +2101,25 @@ def on_kill(self): if self.job_id and self.cancel_on_kill: self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id, region=self.region) + def _inject_openlineage_properties_into_dataproc_job(self, context: Context) -> None: + try: + from airflow.providers.google.cloud.openlineage.utils import ( + inject_openlineage_properties_into_dataproc_job, + ) + + self.job = inject_openlineage_properties_into_dataproc_job( + job=self.job, + context=context, + inject_parent_job_info=self.openlineage_inject_parent_job_info, + inject_transport_info=self.openlineage_inject_transport_info, + ) + except Exception as e: + self.log.warning( + "An error occurred while trying to inject OpenLineage information. " + "Dataproc job has not been modified by OpenLineage.", + exc_info=e, + ) + class DataprocUpdateClusterOperator(GoogleCloudBaseOperator): """ @@ -2502,12 +2526,7 @@ def execute(self, context: Context): if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info: self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.") - self.batch = inject_openlineage_properties_into_dataproc_batch( - batch=self.batch, - context=context, - inject_parent_job_info=self.openlineage_inject_parent_job_info, - inject_transport_info=self.openlineage_inject_transport_info, - ) + self._inject_openlineage_properties_into_dataproc_batch(context) try: self.operation = self.hook.create_batch( @@ -2670,6 +2689,25 @@ def retry_batch_creation( ) return batch, batch_id + def _inject_openlineage_properties_into_dataproc_batch(self, context: Context) -> None: + try: + from airflow.providers.google.cloud.openlineage.utils import ( + inject_openlineage_properties_into_dataproc_batch, + ) + + self.batch = inject_openlineage_properties_into_dataproc_batch( + batch=self.batch, + context=context, + inject_parent_job_info=self.openlineage_inject_parent_job_info, + inject_transport_info=self.openlineage_inject_transport_info, + ) + except Exception as e: + self.log.warning( + "An error occurred while trying to inject OpenLineage information. " + "Dataproc batch has not been modified by OpenLineage.", + exc_info=e, + ) + class DataprocDeleteBatchOperator(GoogleCloudBaseOperator): """ diff --git a/providers/google/tests/provider_tests/google/cloud/openlineage/test_facets.py b/providers/google/tests/provider_tests/google/cloud/openlineage/test_facets.py new file mode 100644 index 0000000000000..4b45a9ee4b739 --- /dev/null +++ b/providers/google/tests/provider_tests/google/cloud/openlineage/test_facets.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.providers.google.cloud.openlineage.facets import BigQueryJobRunFacet + + +def test_bigquery_job_run_facet(): + facet = BigQueryJobRunFacet(cached=True, billedBytes=123, properties="some_properties") + assert facet.cached is True + assert facet.billedBytes == 123 + assert facet.properties == "some_properties" diff --git a/providers/google/tests/provider_tests/google/cloud/openlineage/test_mixins.py b/providers/google/tests/provider_tests/google/cloud/openlineage/test_mixins.py index 770a2bf61e6c4..51e0176f054c2 100644 --- a/providers/google/tests/provider_tests/google/cloud/openlineage/test_mixins.py +++ b/providers/google/tests/provider_tests/google/cloud/openlineage/test_mixins.py @@ -38,10 +38,8 @@ SchemaDatasetFacet, SchemaDatasetFacetFields, ) +from airflow.providers.google.cloud.openlineage.facets import BigQueryJobRunFacet from airflow.providers.google.cloud.openlineage.mixins import _BigQueryInsertJobOperatorOpenLineageMixin -from airflow.providers.google.cloud.openlineage.utils import ( - BigQueryJobRunFacet, -) from airflow.providers.openlineage.sqlparser import SQLParser QUERY_JOB_PROPERTIES = {