Skip to content

Commit

Permalink
fix(ingest): bigquery - Fixing querying non-date partition columns in…
Browse files Browse the repository at this point in the history
… profiling (#6554)
  • Loading branch information
treff7es authored Nov 26, 2022
1 parent d424edd commit 278c38c
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 20 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def get_long_description():
"types-ujson>=5.2.0",
"types-termcolor>=1.0.0",
"types-Deprecated",
# Mypy complains with 4.21.0.0 => error: Library stubs not installed for "google.protobuf.descriptor"
# Mypy complains with 4.21.0.0 => error: Library stubs not installed for "google.protobuf.descriptor"
"types-protobuf<4.21.0.0",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class BigQueryV2Report(SQLSourceReport):
upstream_lineage: LossyDict = field(default_factory=LossyDict)
partition_info: Dict[str, str] = field(default_factory=TopKDict)
profile_table_selection_criteria: Dict[str, str] = field(default_factory=TopKDict)
num_tables_not_eligible_profiling: Dict[str, int] = field(default_factory=TopKDict)
selected_profile_tables: Dict[str, List[str]] = field(default_factory=TopKDict)
invalid_partition_ids: Dict[str, str] = field(default_factory=TopKDict)
allow_pattern: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,15 @@ def generate_partition_profiler_query(
] = partition
return None, None

partition_column_type: str = "DATE"
for c in table.columns:
if c.is_partition_column:
partition_column_type = c.data_type

if table.time_partitioning.type_ in ("DAY", "MONTH", "YEAR"):
partition_where_clause = "{column_name} BETWEEN DATE('{partition_id}') AND DATE('{upper_bound_partition_id}')".format(
column_name=table.time_partitioning.field,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
partition_where_clause = f"`{table.time_partitioning.field}` BETWEEN {partition_column_type}('{partition_datetime}') AND {partition_column_type}('{upper_bound_partition_datetime}')"
elif table.time_partitioning.type_ in ("HOUR"):
partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format(
column_name=table.time_partitioning.field,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
partition_where_clause = f"`{table.time_partitioning.field}` BETWEEN '{partition_datetime}' AND '{upper_bound_partition_datetime}'"
else:
logger.warning(
f"Not supported partition type {table.time_partitioning.type_}"
Expand Down Expand Up @@ -216,14 +213,10 @@ def get_bigquery_profile_request(
if not self.is_dataset_eligible_for_profiling(
dataset_name, table.last_altered, table.size_in_bytes, table.rows_count
):
# Profile only table level if dataset is filtered from profiling
# due to size limits alone
if self.is_dataset_eligible_for_profiling(
dataset_name, table.last_altered, 0, 0
):
profile_table_level_only = True
else:
skip_profiling = True
profile_table_level_only = True
self.report.num_tables_not_eligible_profiling[dataset] = (
self.report.num_tables_not_eligible_profiling.get(dataset, 0) + 1
)

if not table.columns:
skip_profiling = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class OperationalDataMeta:
def bigquery_audit_metadata_query_template(
dataset: str,
use_date_sharded_tables: bool,
table_allow_filter: str = None,
table_allow_filter: Optional[str] = None,
limit: Optional[int] = None,
) -> str:
"""
Expand Down

0 comments on commit 278c38c

Please sign in to comment.