From f09bd4ed1bdb09cac3070a7c26ea90b8209e026d Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Mon, 30 Dec 2024 15:43:25 +0100 Subject: [PATCH] feat: Add CLL to OpenLineage in BigQueryInsertJobOperator (#44872) Signed-off-by: Kacper Muda --- .../google/cloud/openlineage/mixins.py | 502 +++++++++++++----- .../google/cloud/openlineage/utils.py | 115 +++- .../google/cloud/operators/bigquery.py | 4 +- .../google/cloud/openlineage/test_mixins.py | 442 ++++++++++++++- .../google/cloud/openlineage/test_utils.py | 131 +++++ .../tests/google/cloud/utils/job_details.json | 17 +- 6 files changed, 1048 insertions(+), 163 deletions(-) diff --git a/providers/src/airflow/providers/google/cloud/openlineage/mixins.py b/providers/src/airflow/providers/google/cloud/openlineage/mixins.py index ce7a14e03ae32..8ed91424046f0 100644 --- a/providers/src/airflow/providers/google/cloud/openlineage/mixins.py +++ b/providers/src/airflow/providers/google/cloud/openlineage/mixins.py @@ -22,59 +22,65 @@ import traceback from typing import TYPE_CHECKING, cast -if TYPE_CHECKING: - from airflow.providers.common.compat.openlineage.facet import ( - Dataset, - InputDataset, - OutputDataset, - OutputStatisticsOutputDatasetFacet, - RunFacet, - SchemaDatasetFacet, - ) - from airflow.providers.google.cloud.openlineage.utils import BigQueryJobRunFacet +from airflow.providers.common.compat.openlineage.facet import ( + ColumnLineageDatasetFacet, + ErrorMessageRunFacet, + ExternalQueryRunFacet, + Fields, + InputDataset, + InputField, + OutputDataset, + OutputStatisticsOutputDatasetFacet, + SchemaDatasetFacet, + SchemaDatasetFacetFields, + SQLJobFacet, +) +from airflow.providers.google.cloud.openlineage.utils import ( + BIGQUERY_NAMESPACE, + BigQueryJobRunFacet, + get_from_nullable_chain, + merge_column_lineage_facets, +) +if TYPE_CHECKING: + from airflow.providers.common.compat.openlineage.facet import Dataset, RunFacet -BIGQUERY_NAMESPACE = "bigquery" +class _BigQueryInsertJobOperatorOpenLineageMixin: + """Mixin for BigQueryInsertJobOperator to extract OpenLineage metadata.""" -class _BigQueryOpenLineageMixin: def get_openlineage_facets_on_complete(self, _): """ - Retrieve OpenLineage data for a COMPLETE BigQuery job. - - This method retrieves statistics for the specified job_ids using the BigQueryDatasetsProvider. - It calls BigQuery API, retrieving input and output dataset info from it, as well as run-level - usage statistics. + Retrieve OpenLineage data for a completed BigQuery job. - Run facets should contain: - - ExternalQueryRunFacet - - BigQueryJobRunFacet + This method calls BigQuery API, retrieving input and output dataset info from it, + as well as run-level statistics. Run facets may contain: - - ErrorMessageRunFacet + - ExternalQueryRunFacet (for QUERY job type) + - BigQueryJobRunFacet + - ErrorMessageRunFacet (if an error occurred) Job facets should contain: - - SqlJobFacet if operator has self.sql + - SqlJobFacet (for QUERY job type) - Input datasets should contain facets: - - DataSourceDatasetFacet + Input datasets should contain: - SchemaDatasetFacet - Output datasets should contain facets: - - DataSourceDatasetFacet + Output datasets should contain: - SchemaDatasetFacet - - OutputStatisticsOutputDatasetFacet + - OutputStatisticsOutputDatasetFacet (for QUERY job type) + - ColumnLineageDatasetFacet (for QUERY job type) """ - from airflow.providers.common.compat.openlineage.facet import ExternalQueryRunFacet, SQLJobFacet from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.sqlparser import SQLParser if not self.job_id: - if hasattr(self, "log"): - self.log.warning("No BigQuery job_id was found by OpenLineage.") + self.log.warning("No BigQuery job_id was found by OpenLineage.") # type: ignore[attr-defined] return OperatorLineage() if not self.hook: + # This can occur when in deferrable mode from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook self.hook = BigQueryHook( @@ -82,64 +88,35 @@ def get_openlineage_facets_on_complete(self, _): impersonation_chain=self.impersonation_chain, ) + self.log.debug("Extracting data from bigquery job: `%s`", self.job_id) # type: ignore[attr-defined] + inputs, outputs = [], [] run_facets: dict[str, RunFacet] = { "externalQuery": ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery") } - - job_facets = {"sql": SQLJobFacet(query=SQLParser.normalize_sql(self.sql))} - - self.client = self.hook.get_client(project_id=self.hook.project_id, location=self.location) - job_ids = self.job_id - if isinstance(self.job_id, str): - job_ids = [self.job_id] - inputs, outputs = [], [] - for job_id in job_ids: - inner_inputs, inner_outputs, inner_run_facets = self.get_facets(job_id=job_id) - inputs.extend(inner_inputs) - outputs.extend(inner_outputs) - run_facets.update(inner_run_facets) - - return OperatorLineage( - inputs=inputs, - outputs=outputs, - run_facets=run_facets, - job_facets=job_facets, - ) - - def get_facets(self, job_id: str): - from airflow.providers.common.compat.openlineage.facet import ErrorMessageRunFacet - from airflow.providers.google.cloud.openlineage.utils import get_from_nullable_chain - - inputs = [] - outputs = [] - run_facets: dict[str, RunFacet] = {} - if hasattr(self, "log"): - self.log.debug("Extracting data from bigquery job: `%s`", job_id) + self._client = self.hook.get_client(project_id=self.hook.project_id, location=self.location) try: - job = self.client.get_job(job_id=job_id) # type: ignore - props = job._properties + job_properties = self._client.get_job(job_id=self.job_id)._properties # type: ignore - if get_from_nullable_chain(props, ["status", "state"]) != "DONE": - raise ValueError(f"Trying to extract data from running bigquery job: `{job_id}`") + if get_from_nullable_chain(job_properties, ["status", "state"]) != "DONE": + raise ValueError(f"Trying to extract data from running bigquery job: `{self.job_id}`") - run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(props) + run_facets.update(self._get_run_facets(job_properties)) - if get_from_nullable_chain(props, ["statistics", "numChildJobs"]): - if hasattr(self, "log"): - self.log.debug("Found SCRIPT job. Extracting lineage from child jobs instead.") + if get_from_nullable_chain(job_properties, ["statistics", "numChildJobs"]): + self.log.debug("Found SCRIPT job. Extracting lineage from child jobs instead.") # type: ignore[attr-defined] # SCRIPT job type has no input / output information but spawns child jobs that have one # https://cloud.google.com/bigquery/docs/information-schema-jobs#multi-statement_query_job - for child_job_id in self.client.list_jobs(parent_job=job_id): - child_job = self.client.get_job(job_id=child_job_id) # type: ignore - child_inputs, child_output = self._get_inputs_outputs_from_job(child_job._properties) + for child_job_id in self._client.list_jobs(parent_job=self.job_id): + child_job_properties = self._client.get_job(job_id=child_job_id)._properties # type: ignore + child_inputs, child_output = self._get_inputs_and_output(child_job_properties) inputs.extend(child_inputs) outputs.append(child_output) else: - inputs, _output = self._get_inputs_outputs_from_job(props) + inputs, _output = self._get_inputs_and_output(job_properties) outputs.append(_output) + except Exception as e: - if hasattr(self, "log"): - self.log.warning("Cannot retrieve job details from BigQuery.Client. %s", e, exc_info=True) + self.log.warning("Cannot retrieve job details from BigQuery.Client. %s", e, exc_info=True) # type: ignore[attr-defined] exception_msg = traceback.format_exc() run_facets.update( { @@ -149,16 +126,40 @@ def get_facets(self, job_id: str): ) } ) - deduplicated_outputs = self._deduplicate_outputs(outputs) - return inputs, deduplicated_outputs, run_facets + + return OperatorLineage( + inputs=inputs, + outputs=self._deduplicate_outputs(outputs), + run_facets=run_facets, + job_facets={"sql": SQLJobFacet(query=SQLParser.normalize_sql(self.sql))} if self.sql else {}, + ) + + def _get_run_facets(self, properties: dict) -> dict[str, RunFacet]: + job_type = get_from_nullable_chain(properties, ["configuration", "jobType"]) + + run_facets: dict[str, RunFacet] = {} + if job_type == "QUERY": + run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(properties) + + return run_facets + + def _get_inputs_and_output(self, properties: dict) -> tuple[list[InputDataset], OutputDataset | None]: + job_type = get_from_nullable_chain(properties, ["configuration", "jobType"]) + + if job_type == "QUERY": + inputs, output = self._get_inputs_and_output_for_query_job(properties) + else: + self.log.debug("Unsupported job type for input/output extraction: `%s`.", job_type) # type: ignore[attr-defined] + inputs, output = [], None + + return inputs, output def _deduplicate_outputs(self, outputs: list[OutputDataset | None]) -> list[OutputDataset]: - # Sources are the same so we can compare only names final_outputs = {} for single_output in outputs: if not single_output: continue - key = single_output.name + key = f"{single_output.namespace}.{single_output.name}" if key not in final_outputs: final_outputs[key] = single_output continue @@ -167,77 +168,32 @@ def _deduplicate_outputs(self, outputs: list[OutputDataset | None]) -> list[Outp # if the rowCount or size can be summed together. if single_output.outputFacets: single_output.outputFacets.pop("outputStatistics", None) - final_outputs[key] = single_output - - return list(final_outputs.values()) - - def _get_inputs_outputs_from_job( - self, properties: dict - ) -> tuple[list[InputDataset], OutputDataset | None]: - from airflow.providers.google.cloud.openlineage.utils import get_from_nullable_chain - - input_tables = get_from_nullable_chain(properties, ["statistics", "query", "referencedTables"]) or [] - output_table = get_from_nullable_chain(properties, ["configuration", "query", "destinationTable"]) - inputs = [(self._get_input_dataset(input_table)) for input_table in input_tables] - if output_table: - output = self._get_output_dataset(output_table) - dataset_stat_facet = self._get_statistics_dataset_facet(properties) - output.outputFacets = output.outputFacets or {} - if dataset_stat_facet: - output.outputFacets["outputStatistics"] = dataset_stat_facet - return inputs, output - - @staticmethod - def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet: - from airflow.providers.google.cloud.openlineage.utils import ( - BigQueryJobRunFacet, - get_from_nullable_chain, - ) - - if get_from_nullable_chain(properties, ["configuration", "query", "query"]): - # Exclude the query to avoid event size issues and duplicating SqlJobFacet information. - properties = copy.deepcopy(properties) - properties["configuration"]["query"].pop("query") - cache_hit = get_from_nullable_chain(properties, ["statistics", "query", "cacheHit"]) - billed_bytes = get_from_nullable_chain(properties, ["statistics", "query", "totalBytesBilled"]) - return BigQueryJobRunFacet( - cached=str(cache_hit).lower() == "true", - billedBytes=int(billed_bytes) if billed_bytes else None, - properties=json.dumps(properties), - ) - - @staticmethod - def _get_statistics_dataset_facet( - properties, - ) -> OutputStatisticsOutputDatasetFacet | None: - from airflow.providers.common.compat.openlineage.facet import OutputStatisticsOutputDatasetFacet - from airflow.providers.google.cloud.openlineage.utils import get_from_nullable_chain + # If multiple outputs contain Column Level Lineage Facet - merge the facets + if ( + single_output.facets + and final_outputs[key].facets + and "columnLineage" in single_output.facets # type: ignore + and "columnLineage" in final_outputs[key].facets # type: ignore + ): + single_output.facets["columnLineage"] = merge_column_lineage_facets( + [ + single_output.facets["columnLineage"], # type: ignore + final_outputs[key].facets["columnLineage"], # type: ignore + ] + ) - query_plan = get_from_nullable_chain(properties, chain=["statistics", "query", "queryPlan"]) - if not query_plan: - return None + final_outputs[key] = single_output - out_stage = query_plan[-1] - out_rows = out_stage.get("recordsWritten", None) - out_bytes = out_stage.get("shuffleOutputBytes", None) - if out_bytes and out_rows: - return OutputStatisticsOutputDatasetFacet(rowCount=int(out_rows), size=int(out_bytes)) - return None + return list(final_outputs.values()) def _get_input_dataset(self, table: dict) -> InputDataset: - from airflow.providers.common.compat.openlineage.facet import InputDataset - return cast(InputDataset, self._get_dataset(table, "input")) def _get_output_dataset(self, table: dict) -> OutputDataset: - from airflow.providers.common.compat.openlineage.facet import OutputDataset - return cast(OutputDataset, self._get_dataset(table, "output")) def _get_dataset(self, table: dict, dataset_type: str) -> Dataset: - from airflow.providers.common.compat.openlineage.facet import InputDataset, OutputDataset - project = table.get("projectId") dataset = table.get("datasetId") table_name = table.get("tableId") @@ -273,18 +229,11 @@ def _get_table_schema_safely(self, table_name: str) -> SchemaDatasetFacet | None try: return self._get_table_schema(table_name) except Exception as e: - if hasattr(self, "log"): - self.log.warning("Could not extract output schema from bigquery. %s", e) + self.log.warning("Could not extract output schema from bigquery. %s", e) # type: ignore[attr-defined] return None def _get_table_schema(self, table: str) -> SchemaDatasetFacet | None: - from airflow.providers.common.compat.openlineage.facet import ( - SchemaDatasetFacet, - SchemaDatasetFacetFields, - ) - from airflow.providers.google.cloud.openlineage.utils import get_from_nullable_chain - - bq_table = self.client.get_table(table) + bq_table = self._client.get_table(table) if not bq_table._properties: return None @@ -303,3 +252,262 @@ def _get_table_schema(self, table: str) -> SchemaDatasetFacet | None: for field in fields ] ) + + def _get_inputs_and_output_for_query_job( + self, properties: dict + ) -> tuple[list[InputDataset], OutputDataset | None]: + input_tables = get_from_nullable_chain(properties, ["statistics", "query", "referencedTables"]) or [] + output_table = get_from_nullable_chain(properties, ["configuration", "query", "destinationTable"]) + + inputs = [ + (self._get_input_dataset(input_table)) + for input_table in input_tables + if input_table != output_table # Output table is in `referencedTables` and needs to be removed + ] + + if not output_table: + return inputs, None + + output = self._get_output_dataset(output_table) + if dataset_stat_facet := self._get_statistics_dataset_facet(properties): + output.outputFacets = output.outputFacets or {} + output.outputFacets["outputStatistics"] = dataset_stat_facet + if cll_facet := self._get_column_level_lineage_facet(properties, output, inputs): + output.facets = output.facets or {} + output.facets["columnLineage"] = cll_facet + return inputs, output + + @staticmethod + def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet: + if get_from_nullable_chain(properties, ["configuration", "query", "query"]): + # Exclude the query to avoid event size issues and duplicating SqlJobFacet information. + properties = copy.deepcopy(properties) + properties["configuration"]["query"].pop("query") + cache_hit = get_from_nullable_chain(properties, ["statistics", "query", "cacheHit"]) + billed_bytes = get_from_nullable_chain(properties, ["statistics", "query", "totalBytesBilled"]) + return BigQueryJobRunFacet( + cached=str(cache_hit).lower() == "true", + billedBytes=int(billed_bytes) if billed_bytes else None, + properties=json.dumps(properties), + ) + + @staticmethod + def _get_statistics_dataset_facet( + properties, + ) -> OutputStatisticsOutputDatasetFacet | None: + query_plan = get_from_nullable_chain(properties, chain=["statistics", "query", "queryPlan"]) + if not query_plan: + return None + + out_stage = query_plan[-1] + out_rows = out_stage.get("recordsWritten", None) + out_bytes = out_stage.get("shuffleOutputBytes", None) + if out_bytes and out_rows: + return OutputStatisticsOutputDatasetFacet(rowCount=int(out_rows), size=int(out_bytes)) + return None + + def _get_column_level_lineage_facet( + self, properties: dict, output: OutputDataset, inputs: list[InputDataset] + ) -> ColumnLineageDatasetFacet | None: + """ + Extract column-level lineage information from a BigQuery job and return it as a facet. + + The Column Level Lineage Facet will NOT be returned if any of the following condition is met: + - The parsed result does not contain column lineage information. + - The parsed result does not contain exactly one output table. + - The parsed result has a different output table than the output table from the BQ job. + - The parsed result has at least one input table not present in the input tables from the BQ job. + - The parsed result has a column not present in the schema of given dataset from the BQ job. + + Args: + properties: The properties of the BigQuery job. + output: The output dataset for which the column lineage is being extracted. + + Returns: + The extracted Column Lineage Dataset Facet, or None if conditions are not met. + """ + from airflow.providers.openlineage.sqlparser import SQLParser + + # Extract SQL query and parse it + self.log.debug("Extracting column-level lineage facet from BigQuery query.") # type: ignore[attr-defined] + query = get_from_nullable_chain(properties, ["configuration", "query", "query"]) or "" + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string(SQLParser.normalize_sql(query))) + + if parse_result is None or parse_result.column_lineage == []: + self.log.debug("No column-level lineage found in the SQL query. Facet generation skipped.") # type: ignore[attr-defined] + return None + + default_dataset, default_project = self._extract_default_dataset_and_project( + properties, + self.project_id, # type: ignore[attr-defined] + ) + + # Verify if the output table id from the parse result matches the BQ job output table + if not self._validate_output_table_id( + parse_result, + output, + default_project, + default_dataset, + ): + return None + + # Verify if all columns from parse results are present in the output dataset schema + if not self._validate_output_columns(parse_result, output): + return None + + input_tables_from_parse_result = self._extract_parsed_input_tables( + parse_result, default_project, default_dataset + ) + input_tables_from_bq = {input_ds.name: self._extract_column_names(input_ds) for input_ds in inputs} + + # Verify if all datasets from parse results are present in bq job input datasets + if not self._validate_input_tables(input_tables_from_parse_result, input_tables_from_bq): + return None + + # Verify if all columns from parse results are present in their respective bq job input datasets + if not self._validate_input_columns(input_tables_from_parse_result, input_tables_from_bq): + return None + + return self._generate_column_lineage_facet(parse_result, default_project, default_dataset) + + @staticmethod + def _get_qualified_name_from_parse_result(table, default_project: str, default_dataset: str) -> str: + """Get the qualified name of a table from the parse result.""" + return ".".join( + ( + table.database or default_project, + table.schema or default_dataset, + table.name, + ) + ) + + @staticmethod + def _extract_default_dataset_and_project(properties: dict, default_project: str) -> tuple[str, str]: + """Extract the default dataset and project from the BigQuery job properties.""" + default_dataset_obj = get_from_nullable_chain( + properties, ["configuration", "query", "defaultDataset"] + ) + default_dataset = default_dataset_obj.get("datasetId", "") if default_dataset_obj else "" + default_project = ( + default_dataset_obj.get("projectId", default_project) if default_dataset_obj else default_project + ) + return default_dataset, default_project + + def _validate_output_table_id( + self, parse_result, output: OutputDataset, default_project: str, default_dataset: str + ) -> bool: + """Check if the output table id from the parse result matches the BQ job output table.""" + if len(parse_result.out_tables) != 1: + self.log.debug( # type: ignore[attr-defined] + "Invalid output tables in the parse result: `%s`. Expected exactly one output table.", + parse_result.out_tables, + ) + return False + + parsed_output_table = self._get_qualified_name_from_parse_result( + parse_result.out_tables[0], default_project, default_dataset + ) + if parsed_output_table != output.name: + self.log.debug( # type: ignore[attr-defined] + "Mismatch between parsed output table `%s` and BQ job output table `%s`.", + parsed_output_table, + output.name, + ) + return False + return True + + @staticmethod + def _extract_column_names(dataset: Dataset) -> list[str]: + """Extract column names from a dataset's schema.""" + return [ + f.name + for f in dataset.facets.get("schema", SchemaDatasetFacet(fields=[])).fields # type: ignore[union-attr] + if dataset.facets + ] + + def _validate_output_columns(self, parse_result, output: OutputDataset) -> bool: + """Validate if all descendant columns in parse result exist in output dataset schema.""" + output_column_names = self._extract_column_names(output) + missing_columns = [ + lineage.descendant.name + for lineage in parse_result.column_lineage + if lineage.descendant.name not in output_column_names + ] + if missing_columns: + self.log.debug( # type: ignore[attr-defined] + "Output dataset schema is missing columns from the parse result: `%s`.", missing_columns + ) + return False + return True + + def _extract_parsed_input_tables( + self, parse_result, default_project: str, default_dataset: str + ) -> dict[str, list[str]]: + """Extract input tables and their columns from the parse result.""" + input_tables: dict[str, list[str]] = {} + for lineage in parse_result.column_lineage: + for column_meta in lineage.lineage: + if not column_meta.origin: + self.log.debug( # type: ignore[attr-defined] + "Column `%s` lacks origin information. Skipping facet generation.", column_meta.name + ) + return {} + + input_table_id = self._get_qualified_name_from_parse_result( + column_meta.origin, default_project, default_dataset + ) + input_tables.setdefault(input_table_id, []).append(column_meta.name) + return input_tables + + def _validate_input_tables( + self, parsed_input_tables: dict[str, list[str]], input_tables_from_bq: dict[str, list[str]] + ) -> bool: + """Validate if all parsed input tables exist in the BQ job's input datasets.""" + if not parsed_input_tables: + self.log.debug("No input tables found in the parse result. Facet generation skipped.") # type: ignore[attr-defined] + return False + if missing_tables := set(parsed_input_tables) - set(input_tables_from_bq): + self.log.debug( # type: ignore[attr-defined] + "Parsed input tables not found in the BQ job's input datasets: `%s`.", missing_tables + ) + return False + return True + + def _validate_input_columns( + self, parsed_input_tables: dict[str, list[str]], input_tables_from_bq: dict[str, list[str]] + ) -> bool: + """Validate if all parsed input columns exist in their respective BQ job input table schemas.""" + if not parsed_input_tables: + self.log.debug("No input tables found in the parse result. Facet generation skipped.") # type: ignore[attr-defined] + return False + for table, columns in parsed_input_tables.items(): + if missing_columns := set(columns) - set(input_tables_from_bq.get(table, [])): + self.log.debug( # type: ignore[attr-defined] + "Input table `%s` is missing columns from the parse result: `%s`.", table, missing_columns + ) + return False + return True + + def _generate_column_lineage_facet( + self, parse_result, default_project: str, default_dataset: str + ) -> ColumnLineageDatasetFacet: + """Generate the ColumnLineageDatasetFacet based on the parsed result.""" + return ColumnLineageDatasetFacet( + fields={ + lineage.descendant.name: Fields( + inputFields=[ + InputField( + namespace=BIGQUERY_NAMESPACE, + name=self._get_qualified_name_from_parse_result( + column_meta.origin, default_project, default_dataset + ), + field=column_meta.name, + ) + for column_meta in lineage.lineage + ], + transformationType="", + transformationDescription="", + ) + for lineage in parse_result.column_lineage + } + ) diff --git a/providers/src/airflow/providers/google/cloud/openlineage/utils.py b/providers/src/airflow/providers/google/cloud/openlineage/utils.py index 1700ff29619e3..0f3dcb5d4be92 100644 --- a/providers/src/airflow/providers/google/cloud/openlineage/utils.py +++ b/providers/src/airflow/providers/google/cloud/openlineage/utils.py @@ -21,16 +21,10 @@ import logging import os import pathlib +from collections import defaultdict from typing import TYPE_CHECKING, Any from attr import define, field - -if TYPE_CHECKING: - from google.cloud.bigquery.table import Table - - from airflow.providers.common.compat.openlineage.facet import Dataset - from airflow.utils.context import Context - from google.cloud.dataproc_v1 import Batch, RuntimeConfig from airflow.providers.common.compat.openlineage.facet import ( @@ -51,6 +45,13 @@ from airflow.providers.google import __version__ as provider_version from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url +if TYPE_CHECKING: + from google.cloud.bigquery.table import Table + + from airflow.providers.common.compat.openlineage.facet import Dataset + from airflow.utils.context import Context + + log = logging.getLogger(__name__) BIGQUERY_NAMESPACE = "bigquery" @@ -58,6 +59,106 @@ WILDCARD = "*" +def merge_column_lineage_facets(facets: list[ColumnLineageDatasetFacet]) -> ColumnLineageDatasetFacet: + """ + Merge multiple column lineage facets into a single consolidated facet. + + Specifically, it aggregates input fields and transformations for each field across all provided facets. + + Args: + facets: Column Lineage Facets to be merged. + + Returns: + A new Column Lineage Facet containing all fields, their respective input fields and transformations. + + Notes: + - Input fields are uniquely identified by their `(namespace, name, field)` tuple. + - If multiple facets contain the same field with the same input field, those input + fields are merged without duplication. + - Transformations associated with input fields are also merged. If transformations + are not supported by the version of the `InputField` class, they will be omitted. + - Transformation merging relies on a composite key of the field name and input field + tuple to track and consolidate transformations. + + Examples: + Case 1: Two facets with the same input field + ``` + >>> facet1 = ColumnLineageDatasetFacet( + ... fields={"columnA": Fields(inputFields=[InputField("namespace1", "dataset1", "field1")])} + ... ) + >>> facet2 = ColumnLineageDatasetFacet( + ... fields={"columnA": Fields(inputFields=[InputField("namespace1", "dataset1", "field1")])} + ... ) + >>> merged = merge_column_lineage_facets([facet1, facet2]) + >>> merged.fields["columnA"].inputFields + [InputField("namespace1", "dataset1", "field1")] + ``` + + Case 2: Two facets with different transformations for the same input field + ``` + >>> facet1 = ColumnLineageDatasetFacet( + ... fields={ + ... "columnA": Fields( + ... inputFields=[InputField("namespace1", "dataset1", "field1", transformations=["t1"])] + ... ) + ... } + ... ) + >>> facet2 = ColumnLineageDatasetFacet( + ... fields={ + ... "columnA": Fields( + ... inputFields=[InputField("namespace1", "dataset1", "field1", transformations=["t2"])] + ... ) + ... } + ... ) + >>> merged = merge_column_lineage_facets([facet1, facet2]) + >>> merged.fields["columnA"].inputFields[0].transformations + ["t1", "t2"] + ``` + """ + # Dictionary to collect all unique input fields for each field name + fields_sources: dict[str, set[tuple[str, str, str]]] = defaultdict(set) + # Dictionary to aggregate transformations for each input field + transformations: dict[str, list] = defaultdict(list) + + for facet in facets: + for field_name, single_field in facet.fields.items(): + for input_field in single_field.inputFields: + input_key_fields = (input_field.namespace, input_field.name, input_field.field) + fields_sources[field_name].add(input_key_fields) + + if single_transformations := getattr(input_field, "transformations", []): + transformation_key = "".join((field_name, *input_key_fields)) + transformations[transformation_key].extend(single_transformations) + + # Check if the `InputField` class supports the `transformations` attribute (since OL client 1.17.1) + input_field_allows_transformation_info = True + try: + InputField(namespace="a", name="b", field="c", transformations=[]) + except TypeError: + input_field_allows_transformation_info = False + + return ColumnLineageDatasetFacet( + fields={ + field_name: Fields( + inputFields=[ + InputField( + namespace, + name, + column, + transformations.get("".join((field_name, namespace, name, column)), []), + ) + if input_field_allows_transformation_info + else InputField(namespace, name, column) + for namespace, name, column in sorted(input_fields) + ], + transformationType="", # Legacy transformation information + transformationDescription="", # Legacy transformation information + ) + for field_name, input_fields in fields_sources.items() + } + ) + + def extract_ds_name_from_gcs_path(path: str) -> str: """ Extract and process the dataset name from a given path. diff --git a/providers/src/airflow/providers/google/cloud/operators/bigquery.py b/providers/src/airflow/providers/google/cloud/operators/bigquery.py index 43cf1a8f771ad..bc950ec9f3f0c 100644 --- a/providers/src/airflow/providers/google/cloud/operators/bigquery.py +++ b/providers/src/airflow/providers/google/cloud/operators/bigquery.py @@ -48,7 +48,7 @@ BigQueryJobDetailLink, BigQueryTableLink, ) -from airflow.providers.google.cloud.openlineage.mixins import _BigQueryOpenLineageMixin +from airflow.providers.google.cloud.openlineage.mixins import _BigQueryInsertJobOperatorOpenLineageMixin from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.triggers.bigquery import ( BigQueryCheckTrigger, @@ -2491,7 +2491,7 @@ def execute(self, context: Context): return table -class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMixin): +class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryInsertJobOperatorOpenLineageMixin): """ Execute a BigQuery job. diff --git a/providers/tests/google/cloud/openlineage/test_mixins.py b/providers/tests/google/cloud/openlineage/test_mixins.py index fb047ddc2d1c8..824da4b8447e8 100644 --- a/providers/tests/google/cloud/openlineage/test_mixins.py +++ b/providers/tests/google/cloud/openlineage/test_mixins.py @@ -16,24 +16,93 @@ # under the License. from __future__ import annotations +import copy import json +import logging import os from unittest.mock import MagicMock import pytest from airflow.providers.common.compat.openlineage.facet import ( + ColumnLineageDatasetFacet, + Dataset, ExternalQueryRunFacet, + Fields, InputDataset, + InputField, OutputDataset, OutputStatisticsOutputDatasetFacet, SchemaDatasetFacet, SchemaDatasetFacetFields, ) -from airflow.providers.google.cloud.openlineage.mixins import _BigQueryOpenLineageMixin +from airflow.providers.google.cloud.openlineage.mixins import _BigQueryInsertJobOperatorOpenLineageMixin from airflow.providers.google.cloud.openlineage.utils import ( BigQueryJobRunFacet, ) +from airflow.providers.openlineage.sqlparser import SQLParser + +QUERY_JOB_PROPERTIES = { + "configuration": { + "query": { + "query": """ + INSERT INTO dest_project.dest_dataset.dest_table + SELECT a, b, c FROM source_project.source_dataset.source_table + UNION ALL + SELECT a, b, c FROM source_table2 + """, + "defaultDataset": {"datasetId": "default_dataset", "projectId": "default_project"}, + } + } +} +OUTPUT_DATASET = OutputDataset( + namespace="bigquery", + name="dest_project.dest_dataset.dest_table", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("a", "STRING"), + SchemaDatasetFacetFields("b", "STRING"), + SchemaDatasetFacetFields("c", "STRING"), + SchemaDatasetFacetFields("d", "STRING"), + SchemaDatasetFacetFields("e", "STRING"), + SchemaDatasetFacetFields("f", "STRING"), + SchemaDatasetFacetFields("g", "STRING"), + ] + ) + }, +) +INPUT_DATASETS = [ + InputDataset( + namespace="bigquery", + name="source_project.source_dataset.source_table", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("a", "STRING"), + SchemaDatasetFacetFields("b", "STRING"), + SchemaDatasetFacetFields("c", "STRING"), + SchemaDatasetFacetFields("x", "STRING"), + ] + ) + }, + ), + InputDataset( + namespace="bigquery", + name="default_project.default_dataset.source_table2", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("a", "STRING"), + SchemaDatasetFacetFields("b", "STRING"), + SchemaDatasetFacetFields("c", "STRING"), + SchemaDatasetFacetFields("y", "STRING"), + ] + ) + }, + ), + InputDataset("bigquery", "some.random.tb"), +] def read_common_json_file(rel: str): @@ -61,10 +130,12 @@ def setup_method(self): hook = MagicMock() self.client = MagicMock() - class BQOperator(_BigQueryOpenLineageMixin): + class BQOperator(_BigQueryInsertJobOperatorOpenLineageMixin): sql = "" job_id = "job_id" + project_id = "project_id" location = None + log = logging.getLogger("BQOperator") @property def hook(self): @@ -202,6 +273,100 @@ def test_deduplicate_outputs(self): assert second_result.name == "d2" assert second_result.facets == {"t20": "t20"} + def test_deduplicate_outputs_with_cll(self): + outputs = [ + None, + OutputDataset( + name="a.b.c", + namespace="bigquery", + facets={ + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "c": Fields( + inputFields=[InputField("bigquery", "a.b.1", "c")], + transformationType="", + transformationDescription="", + ), + "d": Fields( + inputFields=[InputField("bigquery", "a.b.2", "d")], + transformationType="", + transformationDescription="", + ), + } + ) + }, + ), + OutputDataset( + name="a.b.c", + namespace="bigquery", + facets={ + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "c": Fields( + inputFields=[InputField("bigquery", "a.b.3", "x")], + transformationType="", + transformationDescription="", + ), + "e": Fields( + inputFields=[InputField("bigquery", "a.b.1", "e")], + transformationType="", + transformationDescription="", + ), + } + ) + }, + ), + OutputDataset( + name="x.y.z", + namespace="bigquery", + facets={ + "columnLineage": ColumnLineageDatasetFacet( + fields={ + "c": Fields( + inputFields=[InputField("bigquery", "a.b.3", "x")], + transformationType="", + transformationDescription="", + ) + } + ) + }, + ), + ] + result = self.operator._deduplicate_outputs(outputs) + assert len(result) == 2 + first_result = result[0] + assert first_result.name == "a.b.c" + assert first_result.facets["columnLineage"] == ColumnLineageDatasetFacet( + fields={ + "c": Fields( + inputFields=[InputField("bigquery", "a.b.1", "c"), InputField("bigquery", "a.b.3", "x")], + transformationType="", + transformationDescription="", + ), + "d": Fields( + inputFields=[InputField("bigquery", "a.b.2", "d")], + transformationType="", + transformationDescription="", + ), + "e": Fields( + inputFields=[InputField("bigquery", "a.b.1", "e")], + transformationType="", + transformationDescription="", + ), + } + ) + second_result = result[1] + assert second_result.name == "x.y.z" + assert second_result.facets["columnLineage"] == ColumnLineageDatasetFacet( + fields={ + "c": Fields( + inputFields=[InputField("bigquery", "a.b.3", "x")], + transformationType="", + transformationDescription="", + ) + } + ) + @pytest.mark.parametrize("cache", (None, "false", False, 0)) def test_get_job_run_facet_no_cache_and_with_bytes(self, cache): properties = { @@ -259,3 +424,276 @@ def test_get_statistics_dataset_facet_with_stats(self): result = self.operator._get_statistics_dataset_facet(properties) assert result.rowCount == 123 assert result.size == 321 + + def test_get_column_level_lineage_facet(self): + result = self.operator._get_column_level_lineage_facet( + QUERY_JOB_PROPERTIES, OUTPUT_DATASET, INPUT_DATASETS + ) + assert result == ColumnLineageDatasetFacet( + fields={ + col: Fields( + inputFields=[ + InputField("bigquery", "default_project.default_dataset.source_table2", col), + InputField("bigquery", "source_project.source_dataset.source_table", col), + ], + transformationType="", + transformationDescription="", + ) + for col in ("a", "b", "c") + } + ) + + def test_get_column_level_lineage_facet_early_exit_empty_cll_from_parser(self): + properties = {"configuration": {"query": {"query": "SELECT 1"}}} + assert ( + self.operator._get_column_level_lineage_facet(properties, OUTPUT_DATASET, INPUT_DATASETS) is None + ) + assert self.operator._get_column_level_lineage_facet({}, OUTPUT_DATASET, INPUT_DATASETS) is None + + def test_get_column_level_lineage_facet_early_exit_output_table_id_mismatch(self): + output = copy.deepcopy(OUTPUT_DATASET) + output.name = "different.name.table" + assert ( + self.operator._get_column_level_lineage_facet(QUERY_JOB_PROPERTIES, output, INPUT_DATASETS) + is None + ) + + def test_get_column_level_lineage_facet_early_exit_output_columns_mismatch(self): + output = copy.deepcopy(OUTPUT_DATASET) + output.facets["schema"].fields = [ + SchemaDatasetFacetFields("different_col", "STRING"), + ] + assert ( + self.operator._get_column_level_lineage_facet(QUERY_JOB_PROPERTIES, output, INPUT_DATASETS) + is None + ) + + def test_get_column_level_lineage_facet_early_exit_wrong_parsed_input_tables(self): + properties = { + "configuration": { + "query": { + "query": """ + INSERT INTO dest_project.dest_dataset.dest_table + SELECT a, b, c FROM some.wrong.source_table + """, + } + } + } + assert ( + self.operator._get_column_level_lineage_facet(properties, OUTPUT_DATASET, INPUT_DATASETS) is None + ) + + def test_get_column_level_lineage_facet_early_exit_wrong_parsed_input_columns(self): + properties = { + "configuration": { + "query": { + "query": """ + INSERT INTO dest_project.dest_dataset.dest_table + SELECT wrong_col, wrong2, wrong3 FROM source_project.source_dataset.source_table + """, + } + } + } + assert ( + self.operator._get_column_level_lineage_facet(properties, OUTPUT_DATASET, INPUT_DATASETS) is None + ) + + def test_get_qualified_name_from_parse_result(self): + class _Table: # Replacement for SQL parser TableMeta + database = "project" + schema = "dataset" + name = "table" + + class _TableNoSchema: # Replacement for SQL parser TableMeta + database = None + schema = "dataset" + name = "table" + + class _TableNoSchemaNoDb: # Replacement for SQL parser TableMeta + database = None + schema = None + name = "table" + + result = self.operator._get_qualified_name_from_parse_result( + table=_Table(), + default_project="default_project", + default_dataset="default_dataset", + ) + assert result == "project.dataset.table" + + result = self.operator._get_qualified_name_from_parse_result( + table=_TableNoSchema(), + default_project="default_project", + default_dataset="default_dataset", + ) + assert result == "default_project.dataset.table" + + result = self.operator._get_qualified_name_from_parse_result( + table=_TableNoSchemaNoDb(), + default_project="default_project", + default_dataset="default_dataset", + ) + assert result == "default_project.default_dataset.table" + + def test_extract_default_dataset_and_project(self): + properties = {"configuration": {"query": {"defaultDataset": {"datasetId": "default_dataset"}}}} + result = self.operator._extract_default_dataset_and_project(properties, "default_project") + assert result == ("default_dataset", "default_project") + + properties = { + "configuration": { + "query": {"defaultDataset": {"datasetId": "default_dataset", "projectId": "default_project"}} + } + } + result = self.operator._extract_default_dataset_and_project(properties, "another_project") + assert result == ("default_dataset", "default_project") + + result = self.operator._extract_default_dataset_and_project({}, "default_project") + assert result == ("", "default_project") + + def test_validate_output_table_id_no_table(self): + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string("SELECT 1")) + assert parse_result.out_tables == [] + assert self.operator._validate_output_table_id(parse_result, None, None, None) is False + + def test_validate_output_table_id_multiple_tables(self): + query = "INSERT INTO a.b.c VALUES (1); INSERT INTO d.e.f VALUES (2);" + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string(query)) + assert len(parse_result.out_tables) == 2 + assert self.operator._validate_output_table_id(parse_result, None, None, None) is False + + def test_validate_output_table_id_mismatch(self): + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string("INSERT INTO a.b.c VALUES (1)")) + assert len(parse_result.out_tables) == 1 + assert parse_result.out_tables[0].qualified_name == "a.b.c" + assert ( + self.operator._validate_output_table_id(parse_result, OutputDataset("", "d.e.f"), None, None) + is False + ) + + def test_validate_output_table_id(self): + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string("INSERT INTO a.b.c VALUES (1)")) + assert len(parse_result.out_tables) == 1 + assert parse_result.out_tables[0].qualified_name == "a.b.c" + assert ( + self.operator._validate_output_table_id(parse_result, OutputDataset("", "a.b.c"), None, None) + is True + ) + + def test_validate_output_table_id_query_with_table_name_only(self): + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string("INSERT INTO c VALUES (1)")) + assert len(parse_result.out_tables) == 1 + assert parse_result.out_tables[0].qualified_name == "c" + assert ( + self.operator._validate_output_table_id(parse_result, OutputDataset("", "a.b.c"), "a", "b") + is True + ) + + def test_extract_column_names_dataset_without_schema(self): + assert self.operator._extract_column_names(Dataset("a", "b")) == [] + + def test_extract_column_names_dataset_(self): + ds = Dataset( + "a", + "b", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("col1", "STRING"), + SchemaDatasetFacetFields("col2", "STRING"), + ] + ) + }, + ) + assert self.operator._extract_column_names(ds) == ["col1", "col2"] + + def test_validate_output_columns_mismatch(self): + ds = OutputDataset( + "a", + "b", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("col1", "STRING"), + SchemaDatasetFacetFields("col2", "STRING"), + ] + ) + }, + ) + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string("SELECT a , b FROM c")) + assert self.operator._validate_output_columns(parse_result, ds) is False + + def test_validate_output_columns(self): + ds = OutputDataset( + "a", + "b", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaDatasetFacetFields("a", "STRING"), + SchemaDatasetFacetFields("b", "STRING"), + ] + ) + }, + ) + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string("SELECT a , b FROM c")) + assert self.operator._validate_output_columns(parse_result, ds) is True + + def test_extract_parsed_input_tables(self): + query = "INSERT INTO x SELECT a, b from project1.ds1.tb1; INSERT INTO y SELECT c, d from tb2;" + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string(query)) + assert self.operator._extract_parsed_input_tables(parse_result, "default_project", "default_ds") == { + "project1.ds1.tb1": ["a", "b"], + "default_project.default_ds.tb2": ["c", "d"], + } + + def test_extract_parsed_input_tables_no_cll(self): + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string("SELECT 1")) + assert self.operator._extract_parsed_input_tables(parse_result, "p", "d") == {} + + def test_validate_input_tables_mismatch(self): + result = self.operator._validate_input_tables({"a": None, "b": None}, {"a": None, "c": None}) + assert result is False + + def test_validate_input_tables_bq_has_more_tables(self): + result = self.operator._validate_input_tables({"a": None}, {"a": None, "c": None}) + assert result is True + + def test_validate_input_tables_empty(self): + result = self.operator._validate_input_tables({}, {"a": None, "c": None}) + assert result is False + + def test_validate_input_columns_mismatch(self): + result = self.operator._validate_input_columns( + {"a": ["1", "2"], "b": ["3", "4"]}, {"a": ["1", "2", "3"], "c": ["4", "5"]} + ) + assert result is False + + def test_validate_input_columns_bq_has_more_cols(self): + result = self.operator._validate_input_columns( + {"a": ["1", "2"]}, {"a": ["1", "2", "3"], "c": ["4", "5"]} + ) + assert result is True + + def test_validate_input_columns_empty(self): + result = self.operator._validate_input_columns({}, {"a": ["1", "2", "3"], "c": ["4", "5"]}) + assert result is False + + def test_generate_column_lineage_facet(self): + query = "INSERT INTO b.c SELECT c, d from tb2;" + parse_result = SQLParser("bigquery").parse(SQLParser.split_sql_string(query)) + result = self.operator._generate_column_lineage_facet(parse_result, "default_project", "default_ds") + assert result == ColumnLineageDatasetFacet( + fields={ + "c": Fields( + inputFields=[InputField("bigquery", "default_project.default_ds.tb2", "c")], + transformationType="", + transformationDescription="", + ), + "d": Fields( + inputFields=[InputField("bigquery", "default_project.default_ds.tb2", "d")], + transformationType="", + transformationDescription="", + ), + } + ) diff --git a/providers/tests/google/cloud/openlineage/test_utils.py b/providers/tests/google/cloud/openlineage/test_utils.py index 86f87531d2400..58949125f8433 100644 --- a/providers/tests/google/cloud/openlineage/test_utils.py +++ b/providers/tests/google/cloud/openlineage/test_utils.py @@ -23,6 +23,7 @@ import pytest from google.cloud.bigquery.table import Table from google.cloud.dataproc_v1 import Batch, RuntimeConfig +from openlineage.client.facet_v2 import column_lineage_dataset from airflow.providers.common.compat.openlineage.facet import ( ColumnLineageDatasetFacet, @@ -47,6 +48,7 @@ get_identity_column_lineage_facet, inject_openlineage_properties_into_dataproc_batch, inject_openlineage_properties_into_dataproc_job, + merge_column_lineage_facets, ) TEST_DATASET = "test-dataset" @@ -91,6 +93,135 @@ def _properties(self): return self.inputs.pop() +def test_merge_column_lineage_facets(): + result = merge_column_lineage_facets( + [ + ColumnLineageDatasetFacet( + fields={ + "c": Fields( + inputFields=[ + InputField( + "bigquery", + "a.b.1", + "c", + [ + column_lineage_dataset.Transformation( + "type", "some_subtype", "desc", False + ) + ], + ) + ], + transformationType="IDENTITY", + transformationDescription="IDENTICAL", + ), + "d": Fields( + inputFields=[ + InputField( + "bigquery", + "a.b.2", + "d", + [column_lineage_dataset.Transformation("t", "s", "d", False)], + ) + ], + transformationType="", + transformationDescription="", + ), + } + ), + ColumnLineageDatasetFacet( + fields={ + "c": Fields( + inputFields=[ + InputField( + "bigquery", + "a.b.3", + "x", + [ + column_lineage_dataset.Transformation( + "another_type", "different_subtype", "example", True + ) + ], + ), + InputField( + "bigquery", + "a.b.1", + "c", + [ + column_lineage_dataset.Transformation( + "diff_type", "diff_subtype", "diff", True + ) + ], + ), + ], + transformationType="", + transformationDescription="", + ), + "e": Fields( + inputFields=[InputField("bigquery", "a.b.1", "e")], + transformationType="IDENTITY", + transformationDescription="IDENTICAL", + ), + } + ), + ColumnLineageDatasetFacet( + fields={ + "c": Fields( + inputFields=[InputField("bigquery", "a.b.3", "x")], + transformationType="", + transformationDescription="", + ) + } + ), + ] + ) + assert result == ColumnLineageDatasetFacet( + fields={ + "c": Fields( + inputFields=[ + InputField( + "bigquery", + "a.b.1", + "c", + [ + column_lineage_dataset.Transformation("type", "some_subtype", "desc", False), + column_lineage_dataset.Transformation("diff_type", "diff_subtype", "diff", True), + ], + ), + InputField( + "bigquery", + "a.b.3", + "x", + [ + column_lineage_dataset.Transformation( + "another_type", "different_subtype", "example", True + ) + ], + ), + ], + transformationType="", + transformationDescription="", + ), + "d": Fields( + inputFields=[ + InputField( + "bigquery", + "a.b.2", + "d", + [column_lineage_dataset.Transformation("t", "s", "d", False)], + ) + ], + transformationType="", + transformationDescription="", + ), + "e": Fields( + inputFields=[InputField("bigquery", "a.b.1", "e")], + transformationType="", + transformationDescription="", + ), + } + ) + + def test_get_facets_from_bq_table(): expected_facets = { "schema": SchemaDatasetFacet( diff --git a/providers/tests/google/cloud/utils/job_details.json b/providers/tests/google/cloud/utils/job_details.json index f12ec1321d57f..b533e0faa6704 100644 --- a/providers/tests/google/cloud/utils/job_details.json +++ b/providers/tests/google/cloud/utils/job_details.json @@ -225,11 +225,18 @@ "billingTier": 1, "totalSlotMs": "825", "cacheHit": false, - "referencedTables": [{ - "projectId": "airflow-openlineage", - "datasetId": "new_dataset", - "tableId": "test_table" - }], + "referencedTables": [ + { + "projectId": "airflow-openlineage", + "datasetId": "new_dataset", + "tableId": "test_table" + }, + { + "projectId": "airflow-openlineage", + "datasetId": "new_dataset", + "tableId": "output_table" + } + ], "statementType": "SELECT" }, "totalSlotMs": "825"