Skip to content

Commit

Permalink
fix(ingest): bigquery-beta - Additional fixes for Bigquery beta (#6051)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Sep 26, 2022
1 parent 6092875 commit 32b8bef
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 72 deletions.
16 changes: 8 additions & 8 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def get_long_description():
"sqlalchemy-redshift",
"psycopg2-binary",
"GeoAlchemy2",
"sqllineage==1.3.5",
"sqllineage==1.3.6",
*path_spec_common,
}

Expand Down Expand Up @@ -216,18 +216,18 @@ def get_long_description():
"gql>=3.3.0",
"gql[requests]>=3.3.0",
},
"great-expectations": sql_common | {"sqllineage==1.3.5"},
"great-expectations": sql_common | {"sqllineage==1.3.6"},
# Source plugins
# PyAthena is pinned with exact version because we use private method in PyAthena
"athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"},
"azure-ad": set(),
"bigquery": sql_common
| bigquery_common
| {"sqlalchemy-bigquery>=1.4.1", "sqllineage==1.3.5", "sqlparse"},
| {"sqlalchemy-bigquery>=1.4.1", "sqllineage==1.3.6", "sqlparse"},
"bigquery-usage": bigquery_common | usage_common | {"cachetools"},
"bigquery-beta": sql_common
| bigquery_common
| {"sqllineage==1.3.5", "sql_metadata"},
| {"sqllineage==1.3.6", "sql_metadata"},
"clickhouse": sql_common | {"clickhouse-sqlalchemy==0.1.8"},
"clickhouse-usage": sql_common
| usage_common
Expand Down Expand Up @@ -269,9 +269,9 @@ def get_long_description():
"looker": looker_common,
# lkml>=1.1.2 is required to support the sql_preamble expression in LookML
"lookml": looker_common
| {"lkml>=1.1.2", "sql-metadata==2.2.2", "sqllineage==1.3.5", "GitPython>2"},
"metabase": {"requests", "sqllineage==1.3.5"},
"mode": {"requests", "sqllineage==1.3.5", "tenacity>=8.0.1"},
| {"lkml>=1.1.2", "sql-metadata==2.2.2", "sqllineage==1.3.6", "GitPython>2"},
"metabase": {"requests", "sqllineage==1.3.6"},
"mode": {"requests", "sqllineage==1.3.6", "tenacity>=8.0.1"},
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
"mssql-odbc": sql_common | {"pyodbc"},
Expand All @@ -284,7 +284,7 @@ def get_long_description():
"presto-on-hive": sql_common
| {"psycopg2-binary", "acryl-pyhive[hive]>=0.6.12", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.5"},
"redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.6"},
"redshift": sql_common | redshift_common,
"redshift-usage": sql_common | usage_common | redshift_common,
"s3": {*s3_base, *data_lake_profiling},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,10 @@ def _process_view(
view.columns = self.get_columns_for_table(conn, table_identifier)

lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
table_identifier, self.platform
)
if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
table_identifier, self.platform
)

view_workunits = self.gen_view_dataset_workunits(
view, project_id, dataset_name, lineage_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class BigQueryV2Report(SQLSourceReport):
num_total_lineage_entries: Optional[int] = None
num_skipped_lineage_entries_missing_data: Optional[int] = None
num_skipped_lineage_entries_not_allowed: Optional[int] = None
num_skipped_lineage_entries_sql_parser_failure: Optional[int] = None
num_lineage_entries_sql_parser_failure: Optional[int] = None
num_skipped_lineage_entries_other: Optional[int] = None
num_total_log_entries: Optional[int] = None
num_parsed_log_entires: Optional[int] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
self.report.num_skipped_lineage_entries_missing_data = 0
self.report.num_skipped_lineage_entries_not_allowed = 0
self.report.num_skipped_lineage_entries_other = 0
self.report.num_skipped_lineage_entries_sql_parser_failure = 0
self.report.num_lineage_entries_sql_parser_failure = 0
for e in entries:
self.report.num_total_lineage_entries += 1
if e.destinationTable is None or not (
Expand Down Expand Up @@ -400,10 +400,10 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
map(lambda x: x.split(".")[-1], parser.get_tables())
)
except Exception as ex:
logger.warning(
f"Sql Parser failed on query: {e.query}. It will be skipped from lineage. The error was {ex}"
logger.debug(
f"Sql Parser failed on query: {e.query}. It won't cause any issue except table/view lineage can't be detected reliably. The error was {ex}."
)
self.report.num_skipped_lineage_entries_sql_parser_failure += 1
self.report.num_lineage_entries_sql_parser_failure += 1
continue
curr_lineage_str = lineage_map[destination_table_str]
new_lineage_str = set()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryTable
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryColumn,
BigqueryTable,
)
from datahub.ingestion.source.ge_data_profiler import (
DatahubGEProfiler,
GEProfilerRequest,
Expand Down Expand Up @@ -78,39 +81,57 @@ def generate_partition_profiler_query(
partition = table.max_partition_id
if partition:
partition_where_clause: str
logger.debug(f"{table} is partitioned and partition column is {partition}")
try:
(
partition_datetime,
upper_bound_partition_datetime,
) = self.get_partition_range_from_partition_id(
partition, partition_datetime
)
except ValueError as e:
logger.error(
f"Unable to get partition range for partition id: {partition} it failed with exception {e}"
)
self.report.invalid_partition_ids[f"{schema}.{table}"] = partition
return None, None

if table.time_partitioning.type_ in ("DAY", "MONTH", "YEAR"):
partition_where_clause = "{column_name} BETWEEN DATE('{partition_id}') AND DATE('{upper_bound_partition_id}')".format(
column_name=table.time_partitioning.field,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
elif table.time_partitioning.type_ in ("HOUR"):
partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format(
column_name=table.time_partitioning.field,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)

if not table.time_partitioning:
partition_column: Optional[BigqueryColumn] = None
for column in table.columns:
if column.is_partition_column:
partition_column = column
break
if partition_column:
partition_where_clause = f"{partition_column.name} >= {partition}"
else:
logger.warning(
f"Partitioned table {table.name} without partiton column"
)
return None, None
else:
logger.warning(
f"Not supported partition type {table.time_partitioning.type_}"
logger.debug(
f"{table.name} is partitioned and partition column is {partition}"
)
return None, None

try:
(
partition_datetime,
upper_bound_partition_datetime,
) = self.get_partition_range_from_partition_id(
partition, partition_datetime
)
except ValueError as e:
logger.error(
f"Unable to get partition range for partition id: {partition} it failed with exception {e}"
)
self.report.invalid_partition_ids[
f"{schema}.{table.name}"
] = partition
return None, None

if table.time_partitioning.type_ in ("DAY", "MONTH", "YEAR"):
partition_where_clause = "{column_name} BETWEEN DATE('{partition_id}') AND DATE('{upper_bound_partition_id}')".format(
column_name=table.time_partitioning.field,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
elif table.time_partitioning.type_ in ("HOUR"):
partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format(
column_name=table.time_partitioning.field,
partition_id=partition_datetime,
upper_bound_partition_id=upper_bound_partition_datetime,
)
else:
logger.warning(
f"Not supported partition type {table.time_partitioning.type_}"
)
return None, None
custom_sql = """
SELECT
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,25 @@ def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
def _get_sql_info(cls, sql: str, sql_parser_path: str) -> SQLInfo:
parser_cls = cls._import_sql_parser_cls(sql_parser_path)

parser_instance: SQLParser = parser_cls(sql)
try:
parser_instance: SQLParser = parser_cls(sql)
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
return SQLInfo(table_names=[], column_names=[])

sql_table_names: List[str]
try:
sql_table_names = parser_instance.get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
sql_table_names = []

try:
column_names: List[str] = parser_instance.get_columns()
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
column_names = []

sql_table_names: List[str] = parser_instance.get_tables()
column_names: List[str] = parser_instance.get_columns()
logger.debug(f"Column names parsed = {column_names}")
# Drop table names with # in them
sql_table_names = [t for t in sql_table_names if "#" not in t]
Expand Down
6 changes: 5 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/redash.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,11 @@ def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
def _get_sql_table_names(cls, sql: str, sql_parser_path: str) -> List[str]:
parser_cls = cls._import_sql_parser_cls(sql_parser_path)

sql_table_names: List[str] = parser_cls(sql).get_tables()
try:
sql_table_names: List[str] = parser_cls(sql).get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
return []

# Remove quotes from table names
sql_table_names = [t.replace('"', "") for t in sql_table_names]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,12 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
query=query,
customProperties=batchSpecProperties,
)
tables = DefaultSQLParser(query).get_tables()
try:
tables = DefaultSQLParser(query).get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {query} with {e}")
tables = []

if len(set(tables)) != 1:
warn(
"DataHubValidationAction does not support cross dataset assertions."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from sqllineage.core.holders import Column, SQLLineageHolder
from sqllineage.exceptions import SQLLineageException

from datahub.utilities.sql_parser_base import SQLParser, SqlParserException

with contextlib.suppress(ImportError):
import sqlparse
from networkx import DiGraph
Expand All @@ -17,7 +19,7 @@
logger = logging.getLogger(__name__)


class SqlLineageSQLParserImpl:
class SqlLineageSQLParserImpl(SQLParser):
_DATE_SWAP_TOKEN = "__d_a_t_e"
_HOUR_SWAP_TOKEN = "__h_o_u_r"
_TIMESTAMP_SWAP_TOKEN = "__t_i_m_e_s_t_a_m_p"
Expand All @@ -27,6 +29,7 @@ class SqlLineageSQLParserImpl:
_MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"

def __init__(self, sql_query: str) -> None:
super().__init__(sql_query)
original_sql_query = sql_query

# SqlLineageParser makes mistakes on lateral flatten queries, use the prefix
Expand Down Expand Up @@ -97,7 +100,9 @@ def __init__(self, sql_query: str) -> None:
]
self._sql_holder = SQLLineageHolder.of(*self._stmt_holders)
except SQLLineageException as e:
logger.error(f"SQL lineage analyzer error '{e}' for query: '{self._sql}")
raise SqlParserException(
f"SQL lineage analyzer error '{e}' for query: '{self._sql}"
) from e

def get_tables(self) -> List[str]:
result: List[str] = []
Expand All @@ -123,8 +128,7 @@ def get_tables(self) -> List[str]:

def get_columns(self) -> List[str]:
if self._sql_holder is None:
logger.error("sql holder not present so cannot get columns")
return []
raise SqlParserException("sql holder not present so cannot get columns")
graph: DiGraph = self._sql_holder.graph # For mypy attribute checking
column_nodes = [n for n in graph.nodes if isinstance(n, Column)]
column_graph = graph.subgraph(column_nodes)
Expand Down
17 changes: 2 additions & 15 deletions metadata-ingestion/src/datahub/utilities/sql_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,17 @@
import re
import sys
import traceback
from abc import ABCMeta, abstractmethod
from multiprocessing import Process, Queue
from typing import List, Optional, Tuple, Type

from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl
from datahub.utilities.sql_parser_base import SQLParser

with contextlib.suppress(ImportError):
from sql_metadata import Parser as MetadataSQLParser
logger = logging.getLogger(__name__)


class SQLParser(metaclass=ABCMeta):
def __init__(self, sql_query: str) -> None:
self._sql_query = sql_query

@abstractmethod
def get_tables(self) -> List[str]:
pass

@abstractmethod
def get_columns(self) -> List[str]:
pass


class MetadataSQLSQLParser(SQLParser):
_DATE_SWAP_TOKEN = "__d_a_t_e"

Expand Down Expand Up @@ -104,7 +91,7 @@ def sql_lineage_parser_impl_func_wrapper(
exc_info = sys.exc_info()
exc_msg: str = str(exc_info[1]) + "".join(traceback.format_tb(exc_info[2]))
exception_details = (exc_info[0], exc_msg)
logger.error(exc_msg)
logger.debug(exc_msg)
finally:
queue.put((tables, columns, exception_details))

Expand Down
21 changes: 21 additions & 0 deletions metadata-ingestion/src/datahub/utilities/sql_parser_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from abc import ABCMeta, abstractmethod
from typing import List


class SqlParserException(Exception):
"""Raised when sql parser fails"""

pass


class SQLParser(metaclass=ABCMeta):
def __init__(self, sql_query: str) -> None:
self._sql_query = sql_query

@abstractmethod
def get_tables(self) -> List[str]:
pass

@abstractmethod
def get_columns(self) -> List[str]:
pass

0 comments on commit 32b8bef

Please sign in to comment.