diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 229b76007790f0..8530090f9f5039 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -632,7 +632,10 @@ def _process_table( if self.config.include_table_lineage: lineage_info = self.lineage_extractor.get_upstream_lineage_info( - table_identifier, self.platform + project_id=project_id, + dataset_name=schema_name, + table=table, + platform=self.platform, ) table_workunits = self.gen_table_dataset_workunits( @@ -665,7 +668,10 @@ def _process_view( lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None if self.config.include_table_lineage: lineage_info = self.lineage_extractor.get_upstream_lineage_info( - table_identifier, self.platform + project_id=project_id, + dataset_name=dataset_name, + table=view, + platform=self.platform, ) view_workunits = self.gen_view_dataset_workunits( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index d9241772540590..da9a2a00bb99f9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -54,12 +54,12 @@ class BigQueryV2Config(BigQueryConfig): ) number_of_datasets_process_in_batch: int = Field( - default=50, - description="Number of table queried in batch when getting metadata. This is a low leve config propert which should be touched with care. This restriction needed because we query partitions system view which throws error if we try to touch too many tables.", + default=80, + description="Number of table queried in batch when getting metadata. This is a low level config property which should be touched with care. This restriction is needed because we query partitions system view which throws error if we try to touch too many tables.", ) column_limit: int = Field( - default=1000, - description="Maximum number of columns to process in a table", + default=300, + description="Maximum number of columns to process in a table. This is a low level config property which should be touched with care. This restriction is needed because excessively wide tables can result in failure to ingest the schema.", ) # The inheritance hierarchy is wonky here, but these options need modifications. project_id: Optional[str] = Field( @@ -72,6 +72,10 @@ class BigQueryV2Config(BigQueryConfig): default=False, description="Experimental. Use sql parser to resolve view/table lineage. If there is a view being referenced then bigquery sends both the view as well as underlying tablein the references. There is no distinction between direct/base objects accessed. So doing sql parsing to ensure we only use direct objects accessed for lineage.", ) + lineage_parse_view_ddl: bool = Field( + default=True, + description="Sql parse view ddl to get lineage.", + ) @root_validator(pre=False) def profile_default_settings(cls, values: Dict) -> Dict: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 8d0e5b456a6929..3a287018fccdce 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -19,6 +19,10 @@ ) from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report +from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( + BigqueryTable, + BigqueryView, +) from datahub.ingestion.source.bigquery_v2.common import ( BQ_DATE_SHARD_FORMAT, BQ_DATETIME_FORMAT, @@ -396,6 +400,10 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st continue destination_table_str = destination_table.table_identifier.get_table_name() + destination_table_str = str( + BigQueryTableRef(table_identifier=destination_table.table_identifier) + ) + if not self.config.dataset_pattern.allowed( destination_table.table_identifier.dataset ) or not self.config.table_pattern.allowed( @@ -464,6 +472,47 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st logger.info("Exiting create lineage map function") return lineage_map + def parse_view_lineage( + self, project: str, dataset: str, view: BigqueryView + ) -> List[BigqueryTableIdentifier]: + parsed_tables = set() + if view.ddl: + try: + parser = BigQuerySQLParser(view.ddl) + tables = parser.get_tables() + except Exception as ex: + logger.debug( + f"View {view.name} definination sql parsing failed on query: {view.ddl}. Edge from physical table to view won't be added. The error was {ex}." + ) + return [] + + for table in tables: + parts = table.split(".") + if len(parts) == 1: + parsed_tables.add( + BigqueryTableIdentifier( + project_id=project, dataset=dataset, table=table + ) + ) + elif len(parts) == 2: + parsed_tables.add( + BigqueryTableIdentifier( + project_id=project, dataset=parts[0], table=parts[1] + ) + ) + elif len(parts) == 3: + parsed_tables.add( + BigqueryTableIdentifier( + project_id=parts[0], dataset=parts[1], table=parts[2] + ) + ) + else: + continue + + return list(parsed_tables) + else: + return [] + def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[str]]: lineage_extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( config=self.config, report=self.report @@ -524,11 +573,18 @@ def get_upstream_tables( ) else: upstreams.add(upstream_table) + return upstreams def get_upstream_lineage_info( - self, table_identifier: BigqueryTableIdentifier, platform: str + self, + project_id: str, + dataset_name: str, + table: Union[BigqueryTable, BigqueryView], + platform: str, ) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]: + table_identifier = BigqueryTableIdentifier(project_id, dataset_name, table.name) + if table_identifier.project_id not in self.loaded_project_ids: with PerfTimer() as timer: self.lineage_metadata.update( @@ -539,6 +595,21 @@ def get_upstream_lineage_info( ) self.loaded_project_ids.append(table_identifier.project_id) + if self.config.lineage_parse_view_ddl and isinstance(table, BigqueryView): + for table_id in self.parse_view_lineage(project_id, dataset_name, table): + if table_identifier.get_table_name() in self.lineage_metadata: + self.lineage_metadata[ + str( + BigQueryTableRef(table_identifier).get_sanitized_table_ref() + ) + ].add(str(BigQueryTableRef(table_id).get_sanitized_table_ref())) + else: + self.lineage_metadata[ + str( + BigQueryTableRef(table_identifier).get_sanitized_table_ref() + ) + ] = {str(BigQueryTableRef(table_id).get_sanitized_table_ref())} + bq_table = BigQueryTableRef.from_bigquery_table(table_identifier) if str(bq_table) in self.lineage_metadata: upstream_list: List[UpstreamClass] = [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 50acdb0554933e..5c0d0f4a6d18bf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -76,7 +76,7 @@ def generate_partition_profiler_query( See more about partitioned tables at https://cloud.google.com/bigquery/docs/partitioned-tables """ logger.debug( - f"generate partition profiler query for project: {project} schema: {schema} and table {table}, partition_datetime: {partition_datetime}" + f"generate partition profiler query for project: {project} schema: {schema} and table {table.name}, partition_datetime: {partition_datetime}" ) partition = table.max_partition_id if partition: diff --git a/metadata-ingestion/tests/unit/test_bigquery_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_lineage.py new file mode 100644 index 00000000000000..a27c61be7a4ce6 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_bigquery_lineage.py @@ -0,0 +1,87 @@ +import datetime + +from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config +from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report +from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryView +from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor + + +def test_parse_view_lineage(): + config = BigQueryV2Config() + report = BigQueryV2Report() + extractor = BigqueryLineageExtractor(config, report) + + # ddl = "select * from some_dataset.sometable as a" + ddl = """CREATE VIEW `my-project.my-dataset.test_table` +AS SELECT + * REPLACE( + myrandom(something) AS something) +FROM + `my-project2.my-dataset2.test_physical_table`; +""" + view = BigqueryView( + name="test", + created=datetime.datetime.now(), + last_altered=datetime.datetime.now(), + comment="", + ddl=ddl, + ) + tables = extractor.parse_view_lineage("my_project", "my_dataset", view) + assert 1 == len(tables) + assert "my-project2.my-dataset2.test_physical_table" == tables[0].get_table_name() + + +def test_parse_view_lineage_with_two_part_table_name(): + config = BigQueryV2Config() + report = BigQueryV2Report() + extractor = BigqueryLineageExtractor(config, report) + + ddl = "CREATE VIEW my_view as select * from some_dataset.sometable as a" + view = BigqueryView( + name="test", + created=datetime.datetime.now(), + last_altered=datetime.datetime.now(), + comment="", + ddl=ddl, + ) + tables = extractor.parse_view_lineage("my_project", "my_dataset", view) + assert 1 == len(tables) + assert "my_project.some_dataset.sometable" == tables[0].get_table_name() + + +def test_one_part_table(): + config = BigQueryV2Config() + report = BigQueryV2Report() + extractor = BigqueryLineageExtractor(config, report) + + ddl = "CREATE VIEW my_view as select * from sometable as a" + view = BigqueryView( + name="test", + created=datetime.datetime.now(), + last_altered=datetime.datetime.now(), + comment="", + ddl=ddl, + ) + tables = extractor.parse_view_lineage("my_project", "my_dataset", view) + assert 1 == len(tables) + assert "my_project.my_dataset.sometable" == tables[0].get_table_name() + + +def test_create_statement_with_multiple_table(): + config = BigQueryV2Config() + report = BigQueryV2Report() + extractor = BigqueryLineageExtractor(config, report) + + ddl = "CREATE VIEW my_view as select * from my_project_2.my_dataset_2.sometable union select * from my_project_2.my_dataset_2.sometable2 as a" + view = BigqueryView( + name="test", + created=datetime.datetime.now(), + last_altered=datetime.datetime.now(), + comment="", + ddl=ddl, + ) + tables = extractor.parse_view_lineage("my_project", "my_dataset", view) + tables.sort(key=lambda e: e.get_table_name()) + assert 2 == len(tables) + assert "my_project_2.my_dataset_2.sometable" == tables[0].get_table_name() + assert "my_project_2.my_dataset_2.sometable2" == tables[1].get_table_name()