Skip to content

Commit

Permalink
fix(bigquery): reduce number of calls for details of partitioning (#5014
Browse files Browse the repository at this point in the history
)
  • Loading branch information
anshbansal authored May 27, 2022
1 parent 05310e4 commit 912ce11
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
31 changes: 26 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ def __init__(self, config, ctx):
self.report: BigQueryReport = BigQueryReport()
self.lineage_metadata: Optional[Dict[str, Set[str]]] = None
self.maximum_shard_ids: Dict[str, str] = dict()
self.partition_info: Dict[str, str] = dict()
atexit.register(cleanup, config)

def get_db_name(self, inspector: Inspector = None) -> str:
Expand Down Expand Up @@ -703,15 +704,35 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool:
else:
return True

def add_information_for_schema(self, inspector: Inspector, schema: str) -> None:
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
project_id = self.get_db_name(inspector)
with engine.connect() as con:
inspector = inspect(con)
sql = f"""
select table_name, column_name
from `{project_id}.{schema}.INFORMATION_SCHEMA.COLUMNS`
where is_partitioning_column = 'YES';
"""
result = con.execute(sql)
for row in result.fetchall():
table = row[0]
partition_column = row[1]
self.partition_info[f"{project_id}.{schema}.{table}"] = partition_column
self.report.partition_info = self.partition_info

def get_extra_tags(
self, inspector: Inspector, schema: str, table: str
) -> Dict[str, List[str]]:
extra_tags: Dict[str, List[str]] = {}
partition: Optional[BigQueryPartitionColumn] = self.get_latest_partition(
schema, table
)
if partition:
extra_tags[partition.column_name] = [Constants.TAG_PARTITION_KEY]
project_id = self.get_db_name(inspector)

partition_lookup_key = f"{project_id}.{schema}.{table}"
if partition_lookup_key in self.partition_info:
extra_tags[self.partition_info[partition_lookup_key]] = [
Constants.TAG_PARTITION_KEY
]
return extra_tags

def generate_partition_profiler_query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
if not sql_config.schema_pattern.allowed(schema):
self.report.report_dropped(f"{schema}.*")
continue
self.add_information_for_schema(inspector, schema)

yield from self.gen_schema_containers(schema, db_name)

Expand Down Expand Up @@ -871,6 +872,9 @@ def loop_tables( # noqa: C901
except Exception as e:
self.report.report_failure(f"{schema}", f"Tables error: {e}")

def add_information_for_schema(self, inspector: Inspector, schema: str) -> None:
pass

def get_extra_tags(
self, inspector: Inspector, schema: str, table: str
) -> Optional[Dict[str, List[str]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ class BigQueryReport(SQLSourceReport):
audit_start_time: Optional[str] = None
audit_end_time: Optional[str] = None
upstream_lineage: Dict = field(default_factory=dict)
partition_info: Dict[str, str] = field(default_factory=dict)

0 comments on commit 912ce11

Please sign in to comment.