Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest) bigquery: Fix BigQuery Datetime/Timestamp type column partition table profile bug #4658

Merged
62 changes: 53 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

# This import verifies that the dependencies are available.
import sqlalchemy_bigquery
from dateutil import parser
from dateutil.relativedelta import relativedelta
from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from sqlalchemy import create_engine, inspect
Expand Down Expand Up @@ -221,6 +221,40 @@ def bigquery_audit_metadata_query_template(
return textwrap.dedent(query)


def get_partition_range_from_partition_id(
partition_id: str, partition_datetime: Optional[datetime.datetime]
) -> Tuple[datetime.datetime, datetime.datetime]:
duration: relativedelta
# if yearly partitioned,
if len(partition_id) == 4:
duration = relativedelta(years=1)
if not partition_datetime:
partition_datetime = datetime.datetime.strptime(partition_id, "%Y")
partition_datetime = partition_datetime.replace(month=1, day=1)
# elif monthly partitioned,
elif len(partition_id) == 6:
duration = relativedelta(months=1)
if not partition_datetime:
partition_datetime = datetime.datetime.strptime(partition_id, "%Y%m")
partition_datetime = partition_datetime.replace(day=1)
# elif daily partitioned,
elif len(partition_id) == 8:
duration = relativedelta(days=1)
if not partition_datetime:
partition_datetime = datetime.datetime.strptime(partition_id, "%Y%m%d")
# elif hourly partitioned,
elif len(partition_id) == 10:
duration = relativedelta(hours=1)
if not partition_datetime:
partition_datetime = datetime.datetime.strptime(partition_id, "%Y%m%d%H")
else:
raise ValueError(
f"check your partition_id {partition_id}. It must be yearly/monthly/daily/hourly."
)
upper_bound_partition_datetime = partition_datetime + duration
return partition_datetime, upper_bound_partition_datetime


# Handle the GEOGRAPHY type. We will temporarily patch the _type_map
# in the get_workunits method of the source.
GEOGRAPHY = make_sqlalchemy_type("GEOGRAPHY")
Expand Down Expand Up @@ -632,14 +666,25 @@ def generate_partition_profiler_query(

partition = self.get_latest_partition(schema, table)
if partition:
partition_ts: Union[datetime.datetime, datetime.date]
if not partition_datetime:
partition_datetime = parser.parse(partition.partition_id)
partition_where_clause: str
logger.debug(f"{table} is partitioned and partition column is {partition}")
(
partition_datetime,
upper_bound_partition_datetime,
) = get_partition_range_from_partition_id(
partition.partition_id, partition_datetime
)
if partition.data_type in ("TIMESTAMP", "DATETIME"):
partition_ts = partition_datetime
partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format(
column_name=partition.column_name,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
elif partition.data_type == "DATE":
partition_ts = partition_datetime.date()
partition_where_clause = "{column_name} = '{partition_id}'".format(
column_name=partition.column_name,
partition_id=partition_datetime.date(),
)
else:
logger.warning(f"Not supported partition type {partition.data_type}")
return None, None
Expand All @@ -650,13 +695,12 @@ def generate_partition_profiler_query(
FROM
`{table_catalog}.{table_schema}.{table_name}`
WHERE
{column_name} = '{partition_id}'
{partition_where_clause}
""".format(
table_catalog=partition.table_catalog,
table_schema=partition.table_schema,
table_name=partition.table_name,
column_name=partition.column_name,
partition_id=partition_ts,
partition_where_clause=partition_where_clause,
)

return (partition.partition_id, custom_sql)
Expand Down
56 changes: 56 additions & 0 deletions metadata-ingestion/tests/unit/test_bq_get_partition_range.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import datetime

from datahub.ingestion.source.sql.bigquery import get_partition_range_from_partition_id


def test_get_partition_range_from_partition_id():
# yearly partition check
assert get_partition_range_from_partition_id(
"2022", datetime.datetime(2022, 1, 1)
) == (datetime.datetime(2022, 1, 1), datetime.datetime(2023, 1, 1))
assert get_partition_range_from_partition_id(
"2022", datetime.datetime(2022, 3, 12)
) == (datetime.datetime(2022, 1, 1), datetime.datetime(2023, 1, 1))
assert get_partition_range_from_partition_id(
"2022", datetime.datetime(2021, 5, 2)
) == (datetime.datetime(2021, 1, 1), datetime.datetime(2022, 1, 1))
assert get_partition_range_from_partition_id("2022", None) == (
datetime.datetime(2022, 1, 1),
datetime.datetime(2023, 1, 1),
)
# monthly partition check
assert get_partition_range_from_partition_id(
"202202", datetime.datetime(2022, 2, 1)
) == (datetime.datetime(2022, 2, 1), datetime.datetime(2022, 3, 1))
assert get_partition_range_from_partition_id(
"202202", datetime.datetime(2022, 2, 3)
) == (datetime.datetime(2022, 2, 1), datetime.datetime(2022, 3, 1))
assert get_partition_range_from_partition_id(
"202202", datetime.datetime(2021, 12, 13)
) == (datetime.datetime(2021, 12, 1), datetime.datetime(2022, 1, 1))
assert get_partition_range_from_partition_id("202202", None) == (
datetime.datetime(2022, 2, 1),
datetime.datetime(2022, 3, 1),
)
# daily partition check
assert get_partition_range_from_partition_id(
"20220205", datetime.datetime(2022, 2, 5)
) == (datetime.datetime(2022, 2, 5), datetime.datetime(2022, 2, 6))
assert get_partition_range_from_partition_id(
"20220205", datetime.datetime(2022, 2, 3)
) == (datetime.datetime(2022, 2, 3), datetime.datetime(2022, 2, 4))
assert get_partition_range_from_partition_id("20220205", None) == (
datetime.datetime(2022, 2, 5),
datetime.datetime(2022, 2, 6),
)
# hourly partition check
assert get_partition_range_from_partition_id(
"2022020509", datetime.datetime(2022, 2, 5, 9)
) == (datetime.datetime(2022, 2, 5, 9), datetime.datetime(2022, 2, 5, 10))
assert get_partition_range_from_partition_id(
"2022020509", datetime.datetime(2022, 2, 3, 1)
) == (datetime.datetime(2022, 2, 3, 1), datetime.datetime(2022, 2, 3, 2))
assert get_partition_range_from_partition_id("2022020509", None) == (
datetime.datetime(2022, 2, 5, 9),
datetime.datetime(2022, 2, 5, 10),
)