Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): bigquery-beta - Additional fixes for Bigquery beta #6051

Merged
merged 4 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def get_long_description():
"sqlalchemy-redshift",
"psycopg2-binary",
"GeoAlchemy2",
"sqllineage==1.3.5",
"sqllineage==1.3.6",
*path_spec_common,
}

Expand Down Expand Up @@ -216,18 +216,18 @@ def get_long_description():
"gql>=3.3.0",
"gql[requests]>=3.3.0",
},
"great-expectations": sql_common | {"sqllineage==1.3.5"},
"great-expectations": sql_common | {"sqllineage==1.3.6"},
# Source plugins
# PyAthena is pinned with exact version because we use private method in PyAthena
"athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"},
"azure-ad": set(),
"bigquery": sql_common
| bigquery_common
| {"sqlalchemy-bigquery>=1.4.1", "sqllineage==1.3.5", "sqlparse"},
| {"sqlalchemy-bigquery>=1.4.1", "sqllineage==1.3.6", "sqlparse"},
"bigquery-usage": bigquery_common | usage_common | {"cachetools"},
"bigquery-beta": sql_common
| bigquery_common
| {"sqllineage==1.3.5", "sql_metadata"},
| {"sqllineage==1.3.6", "sql_metadata"},
"clickhouse": sql_common | {"clickhouse-sqlalchemy==0.1.8"},
"clickhouse-usage": sql_common
| usage_common
Expand Down Expand Up @@ -269,9 +269,9 @@ def get_long_description():
"looker": looker_common,
# lkml>=1.1.2 is required to support the sql_preamble expression in LookML
"lookml": looker_common
| {"lkml>=1.1.2", "sql-metadata==2.2.2", "sqllineage==1.3.5", "GitPython>2"},
"metabase": {"requests", "sqllineage==1.3.5"},
"mode": {"requests", "sqllineage==1.3.5", "tenacity>=8.0.1"},
| {"lkml>=1.1.2", "sql-metadata==2.2.2", "sqllineage==1.3.6", "GitPython>2"},
"metabase": {"requests", "sqllineage==1.3.6"},
"mode": {"requests", "sqllineage==1.3.6", "tenacity>=8.0.1"},
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
"mssql-odbc": sql_common | {"pyodbc"},
Expand All @@ -284,7 +284,7 @@ def get_long_description():
"presto-on-hive": sql_common
| {"psycopg2-binary", "acryl-pyhive[hive]>=0.6.12", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.5"},
"redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.6"},
"redshift": sql_common | redshift_common,
"redshift-usage": sql_common | usage_common | redshift_common,
"s3": {*s3_base, *data_lake_profiling},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,10 @@ def _process_view(
view.columns = self.get_columns_for_table(conn, table_identifier)

lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
table_identifier, self.platform
)
if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
table_identifier, self.platform
)

view_workunits = self.gen_view_dataset_workunits(
view, project_id, dataset_name, lineage_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class BigQueryV2Report(SQLSourceReport):
num_total_lineage_entries: Optional[int] = None
num_skipped_lineage_entries_missing_data: Optional[int] = None
num_skipped_lineage_entries_not_allowed: Optional[int] = None
num_skipped_lineage_entries_sql_parser_failure: Optional[int] = None
num_lineage_entries_sql_parser_failure: Optional[int] = None
num_skipped_lineage_entries_other: Optional[int] = None
num_total_log_entries: Optional[int] = None
num_parsed_log_entires: Optional[int] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
self.report.num_skipped_lineage_entries_missing_data = 0
self.report.num_skipped_lineage_entries_not_allowed = 0
self.report.num_skipped_lineage_entries_other = 0
self.report.num_skipped_lineage_entries_sql_parser_failure = 0
self.report.num_lineage_entries_sql_parser_failure = 0
for e in entries:
self.report.num_total_lineage_entries += 1
if e.destinationTable is None or not (
Expand Down Expand Up @@ -400,10 +400,10 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
map(lambda x: x.split(".")[-1], parser.get_tables())
)
except Exception as ex:
logger.warning(
f"Sql Parser failed on query: {e.query}. It will be skipped from lineage. The error was {ex}"
logger.debug(
f"Sql Parser failed on query: {e.query}. It won't cause any issue except table/view lineage can't be detected reliably. The error was {ex}."
)
self.report.num_skipped_lineage_entries_sql_parser_failure += 1
self.report.num_lineage_entries_sql_parser_failure += 1
continue
curr_lineage_str = lineage_map[destination_table_str]
new_lineage_str = set()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryTable
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryColumn,
BigqueryTable,
)
from datahub.ingestion.source.ge_data_profiler import (
DatahubGEProfiler,
GEProfilerRequest,
Expand Down Expand Up @@ -78,39 +81,57 @@ def generate_partition_profiler_query(
partition = table.max_partition_id
if partition:
partition_where_clause: str
logger.debug(f"{table} is partitioned and partition column is {partition}")
try:
(
partition_datetime,
upper_bound_partition_datetime,
) = self.get_partition_range_from_partition_id(
partition, partition_datetime
)
except ValueError as e:
logger.error(
f"Unable to get partition range for partition id: {partition} it failed with exception {e}"
)
self.report.invalid_partition_ids[f"{schema}.{table}"] = partition
return None, None

if table.time_partitioning.type_ in ("DAY", "MONTH", "YEAR"):
partition_where_clause = "{column_name} BETWEEN DATE('{partition_id}') AND DATE('{upper_bound_partition_id}')".format(
column_name=table.time_partitioning.field,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
elif table.time_partitioning.type_ in ("HOUR"):
partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format(
column_name=table.time_partitioning.field,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)

if not table.time_partitioning:
partition_column: Optional[BigqueryColumn] = None
for column in table.columns:
if column.is_partition_column:
partition_column = column
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can there be multiple partition columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigQuery only supports partitioning on one column.

break
if partition_column:
partition_where_clause = f"{partition_column.name} >= {partition}"
else:
logger.warning(
f"Partitioned table {table.name} without partiton column"
)
return None, None
else:
logger.warning(
f"Not supported partition type {table.time_partitioning.type_}"
logger.debug(
f"{table.name} is partitioned and partition column is {partition}"
)
return None, None

try:
(
partition_datetime,
upper_bound_partition_datetime,
) = self.get_partition_range_from_partition_id(
partition, partition_datetime
)
except ValueError as e:
logger.error(
f"Unable to get partition range for partition id: {partition} it failed with exception {e}"
)
self.report.invalid_partition_ids[
f"{schema}.{table.name}"
] = partition
return None, None

if table.time_partitioning.type_ in ("DAY", "MONTH", "YEAR"):
partition_where_clause = "{column_name} BETWEEN DATE('{partition_id}') AND DATE('{upper_bound_partition_id}')".format(
column_name=table.time_partitioning.field,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
elif table.time_partitioning.type_ in ("HOUR"):
partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format(
column_name=table.time_partitioning.field,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
else:
logger.warning(
f"Not supported partition type {table.time_partitioning.type_}"
)
return None, None
custom_sql = """
SELECT
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,25 @@ def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
def _get_sql_info(cls, sql: str, sql_parser_path: str) -> SQLInfo:
parser_cls = cls._import_sql_parser_cls(sql_parser_path)

parser_instance: SQLParser = parser_cls(sql)
try:
parser_instance: SQLParser = parser_cls(sql)
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
return SQLInfo(table_names=[], column_names=[])

sql_table_names: List[str]
try:
sql_table_names = parser_instance.get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
sql_table_names = []

try:
column_names: List[str] = parser_instance.get_columns()
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
column_names = []

sql_table_names: List[str] = parser_instance.get_tables()
column_names: List[str] = parser_instance.get_columns()
logger.debug(f"Column names parsed = {column_names}")
# Drop table names with # in them
sql_table_names = [t for t in sql_table_names if "#" not in t]
Expand Down
6 changes: 5 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/redash.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,11 @@ def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
def _get_sql_table_names(cls, sql: str, sql_parser_path: str) -> List[str]:
parser_cls = cls._import_sql_parser_cls(sql_parser_path)

sql_table_names: List[str] = parser_cls(sql).get_tables()
try:
sql_table_names: List[str] = parser_cls(sql).get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
return []

# Remove quotes from table names
sql_table_names = [t.replace('"', "") for t in sql_table_names]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,12 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
query=query,
customProperties=batchSpecProperties,
)
tables = DefaultSQLParser(query).get_tables()
try:
tables = DefaultSQLParser(query).get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {query} with {e}")
tables = []

if len(set(tables)) != 1:
warn(
"DataHubValidationAction does not support cross dataset assertions."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from sqllineage.core.holders import Column, SQLLineageHolder
from sqllineage.exceptions import SQLLineageException

from datahub.utilities.sql_parser_base import SQLParser, SqlParserException

with contextlib.suppress(ImportError):
import sqlparse
from networkx import DiGraph
Expand All @@ -17,7 +19,7 @@
logger = logging.getLogger(__name__)


class SqlLineageSQLParserImpl:
class SqlLineageSQLParserImpl(SQLParser):
_DATE_SWAP_TOKEN = "__d_a_t_e"
_HOUR_SWAP_TOKEN = "__h_o_u_r"
_TIMESTAMP_SWAP_TOKEN = "__t_i_m_e_s_t_a_m_p"
Expand All @@ -27,6 +29,7 @@ class SqlLineageSQLParserImpl:
_MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"

def __init__(self, sql_query: str) -> None:
super().__init__(sql_query)
original_sql_query = sql_query

# SqlLineageParser makes mistakes on lateral flatten queries, use the prefix
Expand Down Expand Up @@ -97,7 +100,9 @@ def __init__(self, sql_query: str) -> None:
]
self._sql_holder = SQLLineageHolder.of(*self._stmt_holders)
except SQLLineageException as e:
logger.error(f"SQL lineage analyzer error '{e}' for query: '{self._sql}")
raise SqlParserException(
f"SQL lineage analyzer error '{e}' for query: '{self._sql}"
) from e

def get_tables(self) -> List[str]:
result: List[str] = []
Expand All @@ -123,8 +128,7 @@ def get_tables(self) -> List[str]:

def get_columns(self) -> List[str]:
if self._sql_holder is None:
logger.error("sql holder not present so cannot get columns")
return []
raise SqlParserException("sql holder not present so cannot get columns")
graph: DiGraph = self._sql_holder.graph # For mypy attribute checking
column_nodes = [n for n in graph.nodes if isinstance(n, Column)]
column_graph = graph.subgraph(column_nodes)
Expand Down
17 changes: 2 additions & 15 deletions metadata-ingestion/src/datahub/utilities/sql_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,17 @@
import re
import sys
import traceback
from abc import ABCMeta, abstractmethod
from multiprocessing import Process, Queue
from typing import List, Optional, Tuple, Type

from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl
from datahub.utilities.sql_parser_base import SQLParser

with contextlib.suppress(ImportError):
from sql_metadata import Parser as MetadataSQLParser
logger = logging.getLogger(__name__)


class SQLParser(metaclass=ABCMeta):
def __init__(self, sql_query: str) -> None:
self._sql_query = sql_query

@abstractmethod
def get_tables(self) -> List[str]:
pass

@abstractmethod
def get_columns(self) -> List[str]:
pass


class MetadataSQLSQLParser(SQLParser):
_DATE_SWAP_TOKEN = "__d_a_t_e"

Expand Down Expand Up @@ -104,7 +91,7 @@ def sql_lineage_parser_impl_func_wrapper(
exc_info = sys.exc_info()
exc_msg: str = str(exc_info[1]) + "".join(traceback.format_tb(exc_info[2]))
exception_details = (exc_info[0], exc_msg)
logger.error(exc_msg)
logger.debug(exc_msg)
finally:
queue.put((tables, columns, exception_details))

Expand Down
21 changes: 21 additions & 0 deletions metadata-ingestion/src/datahub/utilities/sql_parser_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from abc import ABCMeta, abstractmethod
from typing import List


class SqlParserException(Exception):
"""Raised when sql parser fails"""

pass


class SQLParser(metaclass=ABCMeta):
def __init__(self, sql_query: str) -> None:
self._sql_query = sql_query

@abstractmethod
def get_tables(self) -> List[str]:
pass

@abstractmethod
def get_columns(self) -> List[str]:
pass