Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest) bigquery: Fix BigQuery Datetime/Timestamp type column partition table profile bug #4658

Merged
35 changes: 29 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# This import verifies that the dependencies are available.
import sqlalchemy_bigquery
from dateutil import parser
from dateutil.relativedelta import relativedelta
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from sqlalchemy import create_engine, inspect
Expand Down Expand Up @@ -621,14 +622,37 @@ def generate_partition_profiler_query(

partition = self.get_latest_partition(schema, table)
if partition:
partition_ts: Union[datetime.datetime, datetime.date]
partition_where_clause: str
if not partition_datetime:
partition_datetime = parser.parse(partition.partition_id)
logger.debug(f"{table} is partitioned and partition column is {partition}")
if partition.data_type in ("TIMESTAMP", "DATETIME"):
partition_ts = partition_datetime
duration: relativedelta
# if yearly partitioned,
if len(partition.partition_id) == 4:
Copy link
Collaborator

@anshbansal anshbansal Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests for your changes. Maybe extracting the code inside the if block into a separate function? That will give examples to why we have this len checks in here and validate that this works. Currently after this change it is hard to understand what we are doing here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I will create unit tests for this.

duration = relativedelta(years=1)
partition_datetime = partition_datetime.replace(month=1, day=1)
# elif monthly partitioned,
elif len(partition.partition_id) == 6:
duration = relativedelta(months=1)
partition_datetime = partition_datetime.replace(day=1)
# elif daily partitioned,
elif len(partition.partition_id) == 8:
duration = relativedelta(days=1)
# elif hourly partitioned,
elif len(partition.partition_id) == 10:
duration = relativedelta(hours=1)
upper_bound_partition_datetime = partition_datetime + duration
partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format(
column_name=partition.column_name,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
elif partition.data_type == "DATE":
partition_ts = partition_datetime.date()
partition_where_clause = "{column_name} = '{partition_id}'".format(
column_name=partition.column_name,
partition_id=partition_datetime.date(),
)
else:
logger.warning(f"Not supported partition type {partition.data_type}")
return None, None
Expand All @@ -639,13 +663,12 @@ def generate_partition_profiler_query(
FROM
`{table_catalog}.{table_schema}.{table_name}`
WHERE
{column_name} = '{partition_id}'
{partition_where_clause}
""".format(
table_catalog=partition.table_catalog,
table_schema=partition.table_schema,
table_name=partition.table_name,
column_name=partition.column_name,
partition_id=partition_ts,
partition_where_clause=partition_where_clause,
)

return (partition.partition_id, custom_sql)
Expand Down