Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatically inject OL info into spark properties in DataprocCreateBatchOperator #44612

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ This is because each emitting task sends a `ParentRunFacet <https://openlineage.
which requires the DAG-level lineage to be enabled in some OpenLineage backend systems.
Disabling DAG-level lineage while enabling task-level lineage might cause errors or inconsistencies.

.. _options:spark_inject_parent_job_info:

Passing parent job information to Spark jobs
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -425,9 +426,15 @@ It allows Spark integration to automatically include ``parentRunFacet`` in appli
creating a parent-child relationship between tasks from different integrations.
See `Scheduling from Airflow <https://openlineage.io/docs/integrations/spark/configuration/airflow>`_.

.. warning::
This configuration serves as the default behavior for all Operators that support automatic Spark properties injection,
unless it is explicitly overridden at the Operator level.
To prevent a specific Operator from injecting the parent job information while
allowing all other supported Operators to do so by default, ``openlineage_inject_parent_job_info=False``
can be explicitly provided to that specific Operator.

.. note::

If any of the above properties are manually specified in the Spark job configuration, the integration will refrain from injecting parent job properties to ensure that manually provided values are preserved.
If any of the ``spark.openlineage.parent*`` properties are manually specified in the Spark job configuration, the integration will refrain from injecting parent job properties to ensure that manually provided values are preserved.

You can enable this automation by setting ``spark_inject_parent_job_info`` option to ``true`` in Airflow configuration.

Expand Down
3 changes: 3 additions & 0 deletions docs/exts/templates/openlineage.rst.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ Spark operators
===============
The OpenLineage integration can automatically inject information into Spark application properties when its being submitted from Airflow.
The following is a list of supported operators along with the corresponding information that can be injected.
See :ref:`automatic injection of parent job information <options:spark_inject_parent_job_info>` for more details.

apache-airflow-providers-google
"""""""""""""""""""""""""""""""

- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
- Parent Job Information
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator`
- Parent Job Information


:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`
Expand Down
135 changes: 135 additions & 0 deletions providers/src/airflow/providers/google/cloud/openlineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import copy
import logging
import os
import pathlib
Expand All @@ -30,6 +31,8 @@
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.utils.context import Context

from google.cloud.dataproc_v1 import Batch, RuntimeConfig

from airflow.providers.common.compat.openlineage.facet import (
BaseFacet,
ColumnLineageDatasetFacet,
Expand Down Expand Up @@ -386,3 +389,135 @@ def inject_openlineage_properties_into_dataproc_job(
job=job, job_type=job_type, new_properties=properties
)
return job_with_ol_config


def _is_dataproc_batch_of_supported_type(batch: dict | Batch) -> bool:
"""
Check if a Dataproc batch is of a supported type for Openlineage automatic injection.

This function determines if the given batch is of a supported type
by checking for specific job type attributes or keys in the batch.

Args:
batch: The Dataproc batch to check.

Returns:
True if the batch is of a supported type (`spark_batch` or
`pyspark_batch`), otherwise False.
"""
supported_job_types = ("spark_batch", "pyspark_batch")
if isinstance(batch, Batch):
if any(getattr(batch, job_type) for job_type in supported_job_types):
return True
return False

# For dictionary-based batch
if any(job_type in batch for job_type in supported_job_types):
return True
return False


def _extract_dataproc_batch_properties(batch: dict | Batch) -> dict:
"""
Extract Dataproc batch properties from a Batch object or dictionary.

This function retrieves the `properties` from the `runtime_config` of a
Dataproc `Batch` object or a dictionary representation of a batch.

Args:
batch: The Dataproc batch to extract properties from.

Returns:
Extracted `properties` if found, otherwise an empty dictionary.
"""
if isinstance(batch, Batch):
return dict(batch.runtime_config.properties)

# For dictionary-based batch
run_time_config = batch.get("runtime_config", {})
if isinstance(run_time_config, RuntimeConfig):
return dict(run_time_config.properties)
return run_time_config.get("properties", {})


def _replace_dataproc_batch_properties(batch: dict | Batch, new_properties: dict) -> dict | Batch:
"""
Replace the properties of a Dataproc batch.

Args:
batch: The original Dataproc batch definition.
new_properties: The new properties to replace the existing ones.

Returns:
A modified copy of the Dataproc batch definition with updated properties.
"""
batch = copy.deepcopy(batch)
if isinstance(batch, Batch):
if not batch.runtime_config:
batch.runtime_config = RuntimeConfig(properties=new_properties)
elif isinstance(batch.runtime_config, dict):
batch.runtime_config["properties"] = new_properties
else:
batch.runtime_config.properties = new_properties
return batch

# For dictionary-based batch
run_time_config = batch.get("runtime_config")
if not run_time_config:
batch["runtime_config"] = {"properties": new_properties}
elif isinstance(run_time_config, dict):
run_time_config["properties"] = new_properties
else:
run_time_config.properties = new_properties
return batch


def inject_openlineage_properties_into_dataproc_batch(
batch: dict | Batch, context: Context, inject_parent_job_info: bool
) -> dict | Batch:
"""
Inject OpenLineage properties into Dataproc batch definition.

It's not removing any configuration or modifying the batch in any other way.
This function add desired OpenLineage properties to Dataproc batch configuration.

Note:
Any modification to job will be skipped if:
- OpenLineage provider is not accessible.
- The batch type is not supported.
- Automatic parent job information injection is disabled.
- Any OpenLineage properties with parent job information are already present
in the Spark job configuration.

Args:
batch: The original Dataproc batch definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.

Returns:
The modified batch definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return batch

if not _is_openlineage_provider_accessible():
log.warning(
"Could not access OpenLineage provider for automatic OpenLineage "
"properties injection. No action will be performed."
)
return batch

if not _is_dataproc_batch_of_supported_type(batch):
log.warning(
"Could not find a supported Dataproc batch type for automatic OpenLineage "
"properties injection. No action will be performed.",
)
return batch

properties = _extract_dataproc_batch_properties(batch)

properties = inject_parent_job_information_into_spark_properties(properties=properties, context=context)

batch_with_ol_config = _replace_dataproc_batch_properties(batch=batch, new_properties=properties)
return batch_with_ol_config
13 changes: 13 additions & 0 deletions providers/src/airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
DataprocWorkflowTemplateLink,
)
from airflow.providers.google.cloud.openlineage.utils import (
inject_openlineage_properties_into_dataproc_batch,
inject_openlineage_properties_into_dataproc_job,
)
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
Expand Down Expand Up @@ -2425,6 +2426,9 @@ def __init__(
asynchronous: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
polling_interval_seconds: int = 5,
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -2446,6 +2450,7 @@ def __init__(
self.asynchronous = asynchronous
self.deferrable = deferrable
self.polling_interval_seconds = polling_interval_seconds
self.openlineage_inject_parent_job_info = openlineage_inject_parent_job_info

def execute(self, context: Context):
if self.asynchronous and self.deferrable:
Expand All @@ -2468,6 +2473,14 @@ def execute(self, context: Context):
else:
self.log.info("Starting batch. The batch ID will be generated since it was not provided.")

if self.openlineage_inject_parent_job_info:
self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
self.batch = inject_openlineage_properties_into_dataproc_batch(
batch=self.batch,
context=context,
inject_parent_job_info=self.openlineage_inject_parent_job_info,
)

try:
self.operation = self.hook.create_batch(
region=self.region,
Expand Down
Loading