From 3b1a0c568e337785856de870bb99a75ed0de4446 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Sat, 24 Sep 2022 00:48:15 +0200 Subject: [PATCH] fix(ingest): bigquery-beta - Getting datasets with biquery client (#6039) * Getting datasets with biquery client instead of information schema because it did not work everywhere Changing lists to lossylist in report --- .../ingestion/source/bigquery_v2/bigquery.py | 3 +-- .../source/bigquery_v2/bigquery_audit.py | 6 ++++-- .../source/bigquery_v2/bigquery_report.py | 8 ++++---- .../source/bigquery_v2/bigquery_schema.py | 16 ++++++++++++---- .../datahub/ingestion/source/sql/sql_common.py | 3 ++- 5 files changed, 23 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 0288125c842220..73daa2b2d4cebb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -465,7 +465,6 @@ def _process_project( yield wu try: - bigquery_project.datasets = ( BigQueryDataDictionary.get_datasets_for_project_id(conn, project_id) ) @@ -722,7 +721,7 @@ def gen_dataset_workunits( self.report.report_workunit(wu) dataset_properties = DatasetProperties( - name=str(datahub_dataset_name), + name=datahub_dataset_name.get_table_display_name(), description=table.comment, qualifiedName=str(datahub_dataset_name), customProperties={**upstream_column_props}, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py index 7e7c9526052326..72094371d1f15d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py @@ -108,7 +108,7 @@ def _remove_suffix(input_string: str, suffixes: List[str]) -> str: return input_string[: -len(suffix)] return input_string - def get_table_name(self) -> str: + def get_table_display_name(self) -> str: shortened_table_name = self.table # if table name ends in _* or * then we strip it as that represents a query on a sharded table shortened_table_name = self._remove_suffix(shortened_table_name, ["_*", "*"]) @@ -130,8 +130,10 @@ def get_table_name(self) -> str: raise ValueError( f"Cannot handle {self} - poorly formatted table name, contains {invalid_chars_in_table_name}" ) + return table_name - return f"{self.project_id}.{self.dataset}.{table_name}" + def get_table_name(self) -> str: + return f"{self.project_id}.{self.dataset}.{self.get_table_display_name()}" def __str__(self) -> str: return self.get_table_name() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 9a3d900203994e..d0ca9162f231c8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -7,6 +7,7 @@ import pydantic from datahub.ingestion.source.sql.sql_common import SQLSourceReport +from datahub.utilities.lossy_collections import LossyDict, LossyList @dataclass @@ -21,12 +22,12 @@ class BigQueryV2Report(SQLSourceReport): num_total_audit_entries: Optional[int] = None num_parsed_audit_entires: Optional[int] = None bigquery_audit_metadata_datasets_missing: Optional[bool] = None - lineage_failed_extraction: List[str] = field(default_factory=list) + lineage_failed_extraction: LossyList[str] = field(default_factory=LossyList) lineage_metadata_entries: Optional[int] = None lineage_mem_size: Optional[str] = None lineage_extraction_sec: Dict[str, float] = field(default_factory=dict) usage_extraction_sec: Dict[str, float] = field(default_factory=dict) - usage_failed_extraction: List[str] = field(default_factory=list) + usage_failed_extraction: LossyList[str] = field(default_factory=LossyList) metadata_extraction_sec: Dict[str, float] = field(default_factory=dict) include_table_lineage: Optional[bool] = None use_date_sharded_audit_log_tables: Optional[bool] = None @@ -38,9 +39,8 @@ class BigQueryV2Report(SQLSourceReport): log_entry_end_time: Optional[str] = None audit_start_time: Optional[str] = None audit_end_time: Optional[str] = None - upstream_lineage: Dict = field(default_factory=dict) + upstream_lineage: LossyDict = field(default_factory=LossyDict) partition_info: Dict[str, str] = field(default_factory=dict) - table_metadata: Dict[str, List[str]] = field(default_factory=dict) profile_table_selection_criteria: Dict[str, str] = field(default_factory=dict) selected_profile_tables: Dict[str, List[str]] = field(default_factory=dict) invalid_partition_ids: Dict[str, str] = field(default_factory=dict) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 21e4242c1fe8a4..58446ff9287912 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -56,10 +56,10 @@ class BigqueryView: @dataclass class BigqueryDataset: name: str - created: datetime - last_altered: Optional[datetime] - location: str - comment: str + created: Optional[datetime] = None + last_altered: Optional[datetime] = None + location: Optional[str] = None + comment: Optional[str] = None tables: List[BigqueryTable] = field(default_factory=list) views: List[BigqueryView] = field(default_factory=list) @@ -228,6 +228,14 @@ def get_projects(conn: bigquery.Client) -> List[BigqueryProject]: def get_datasets_for_project_id( conn: bigquery.Client, project_id: str ) -> List[BigqueryDataset]: + datasets = conn.list_datasets(project_id) + + return [BigqueryDataset(name=d.dataset_id) for d in datasets] + + @staticmethod + def get_datasets_for_project_id_with_information_schema( + conn: bigquery.Client, project_id: str + ) -> List[BigqueryDataset]: schemas = BigQueryDataDictionary.get_query_result( conn, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 3a9d84a19043f7..6785487ab20575 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -93,6 +93,7 @@ ViewPropertiesClass, ) from datahub.telemetry import telemetry +from datahub.utilities.lossy_collections import LossyList from datahub.utilities.registries.domain_registry import DomainRegistry from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport @@ -191,7 +192,7 @@ class SQLSourceReport(StaleEntityRemovalSourceReport): tables_scanned: int = 0 views_scanned: int = 0 entities_profiled: int = 0 - filtered: List[str] = field(default_factory=list) + filtered: LossyList[str] = field(default_factory=LossyList) query_combiner: Optional[SQLAlchemyQueryCombinerReport] = None