Skip to content

Commit

Permalink
fix(ingest): bigquery-beta - Getting datasets with biquery client (#6039
Browse files Browse the repository at this point in the history
)

* Getting datasets with biquery client instead of information schema because it did not work everywhere
Changing lists to lossylist in report
  • Loading branch information
treff7es authored Sep 23, 2022
1 parent af6a423 commit 3b1a0c5
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ def _process_project(
yield wu

try:

bigquery_project.datasets = (
BigQueryDataDictionary.get_datasets_for_project_id(conn, project_id)
)
Expand Down Expand Up @@ -722,7 +721,7 @@ def gen_dataset_workunits(
self.report.report_workunit(wu)

dataset_properties = DatasetProperties(
name=str(datahub_dataset_name),
name=datahub_dataset_name.get_table_display_name(),
description=table.comment,
qualifiedName=str(datahub_dataset_name),
customProperties={**upstream_column_props},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def _remove_suffix(input_string: str, suffixes: List[str]) -> str:
return input_string[: -len(suffix)]
return input_string

def get_table_name(self) -> str:
def get_table_display_name(self) -> str:
shortened_table_name = self.table
# if table name ends in _* or * then we strip it as that represents a query on a sharded table
shortened_table_name = self._remove_suffix(shortened_table_name, ["_*", "*"])
Expand All @@ -130,8 +130,10 @@ def get_table_name(self) -> str:
raise ValueError(
f"Cannot handle {self} - poorly formatted table name, contains {invalid_chars_in_table_name}"
)
return table_name

return f"{self.project_id}.{self.dataset}.{table_name}"
def get_table_name(self) -> str:
return f"{self.project_id}.{self.dataset}.{self.get_table_display_name()}"

def __str__(self) -> str:
return self.get_table_name()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pydantic

from datahub.ingestion.source.sql.sql_common import SQLSourceReport
from datahub.utilities.lossy_collections import LossyDict, LossyList


@dataclass
Expand All @@ -21,12 +22,12 @@ class BigQueryV2Report(SQLSourceReport):
num_total_audit_entries: Optional[int] = None
num_parsed_audit_entires: Optional[int] = None
bigquery_audit_metadata_datasets_missing: Optional[bool] = None
lineage_failed_extraction: List[str] = field(default_factory=list)
lineage_failed_extraction: LossyList[str] = field(default_factory=LossyList)
lineage_metadata_entries: Optional[int] = None
lineage_mem_size: Optional[str] = None
lineage_extraction_sec: Dict[str, float] = field(default_factory=dict)
usage_extraction_sec: Dict[str, float] = field(default_factory=dict)
usage_failed_extraction: List[str] = field(default_factory=list)
usage_failed_extraction: LossyList[str] = field(default_factory=LossyList)
metadata_extraction_sec: Dict[str, float] = field(default_factory=dict)
include_table_lineage: Optional[bool] = None
use_date_sharded_audit_log_tables: Optional[bool] = None
Expand All @@ -38,9 +39,8 @@ class BigQueryV2Report(SQLSourceReport):
log_entry_end_time: Optional[str] = None
audit_start_time: Optional[str] = None
audit_end_time: Optional[str] = None
upstream_lineage: Dict = field(default_factory=dict)
upstream_lineage: LossyDict = field(default_factory=LossyDict)
partition_info: Dict[str, str] = field(default_factory=dict)
table_metadata: Dict[str, List[str]] = field(default_factory=dict)
profile_table_selection_criteria: Dict[str, str] = field(default_factory=dict)
selected_profile_tables: Dict[str, List[str]] = field(default_factory=dict)
invalid_partition_ids: Dict[str, str] = field(default_factory=dict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ class BigqueryView:
@dataclass
class BigqueryDataset:
name: str
created: datetime
last_altered: Optional[datetime]
location: str
comment: str
created: Optional[datetime] = None
last_altered: Optional[datetime] = None
location: Optional[str] = None
comment: Optional[str] = None
tables: List[BigqueryTable] = field(default_factory=list)
views: List[BigqueryView] = field(default_factory=list)

Expand Down Expand Up @@ -228,6 +228,14 @@ def get_projects(conn: bigquery.Client) -> List[BigqueryProject]:
def get_datasets_for_project_id(
conn: bigquery.Client, project_id: str
) -> List[BigqueryDataset]:
datasets = conn.list_datasets(project_id)

return [BigqueryDataset(name=d.dataset_id) for d in datasets]

@staticmethod
def get_datasets_for_project_id_with_information_schema(
conn: bigquery.Client, project_id: str
) -> List[BigqueryDataset]:

schemas = BigQueryDataDictionary.get_query_result(
conn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
ViewPropertiesClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport

Expand Down Expand Up @@ -191,7 +192,7 @@ class SQLSourceReport(StaleEntityRemovalSourceReport):
tables_scanned: int = 0
views_scanned: int = 0
entities_profiled: int = 0
filtered: List[str] = field(default_factory=list)
filtered: LossyList[str] = field(default_factory=LossyList)

query_combiner: Optional[SQLAlchemyQueryCombinerReport] = None

Expand Down

0 comments on commit 3b1a0c5

Please sign in to comment.