From db02d651f602393d8dae415424c7a6f35bf29584 Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 26 Sep 2022 15:00:56 +0200 Subject: [PATCH 1/4] Additional fixer for bigquery v2: - Apply lineage extraction enable parameter for views as well - Range partitioned tables are partitioned on the last partition - Making lineage parser exceptions less verbose to don't freak out users - Fixing invalid partition in report --- .../src/datahub/utilities/sql_parser_base.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 metadata-ingestion/src/datahub/utilities/sql_parser_base.py diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser_base.py b/metadata-ingestion/src/datahub/utilities/sql_parser_base.py new file mode 100644 index 00000000000000..c98ffe98a4efdd --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/sql_parser_base.py @@ -0,0 +1,21 @@ +from abc import ABCMeta, abstractmethod +from typing import List + + +class SqlParserException(Exception): + """Raised when sql parser fails""" + + pass + + +class SQLParser(metaclass=ABCMeta): + def __init__(self, sql_query: str) -> None: + self._sql_query = sql_query + + @abstractmethod + def get_tables(self) -> List[str]: + pass + + @abstractmethod + def get_columns(self) -> List[str]: + pass From 621b428250cc52c7ada75c26a4e170b4059bc37f Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 26 Sep 2022 15:04:39 +0200 Subject: [PATCH 2/4] Adding missing files --- metadata-ingestion/setup.py | 2 +- .../ingestion/source/bigquery_v2/bigquery.py | 7 +- .../source/bigquery_v2/bigquery_report.py | 2 +- .../ingestion/source/bigquery_v2/lineage.py | 8 +- .../ingestion/source/bigquery_v2/profiler.py | 85 ++++++++++++------- .../ingestion/source/looker/lookml_source.py | 21 ++++- .../src/datahub/ingestion/source/redash.py | 6 +- .../integrations/great_expectations/action.py | 7 +- .../utilities/sql_lineage_parser_impl.py | 12 ++- .../src/datahub/utilities/sql_parser.py | 16 +--- 10 files changed, 102 insertions(+), 64 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 5b3a93b788254a..4c384a81ea584e 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -216,7 +216,7 @@ def get_long_description(): "gql>=3.3.0", "gql[requests]>=3.3.0", }, - "great-expectations": sql_common | {"sqllineage==1.3.5"}, + "great-expectations": sql_common | {"sqllineage==1.3.6"}, # Source plugins # PyAthena is pinned with exact version because we use private method in PyAthena "athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"}, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 73daa2b2d4cebb..93f6db2c5feb53 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -571,9 +571,10 @@ def _process_view( view.columns = self.get_columns_for_table(conn, table_identifier) lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None - lineage_info = self.lineage_extractor.get_upstream_lineage_info( - table_identifier, self.platform - ) + if self.config.include_table_lineage: + lineage_info = self.lineage_extractor.get_upstream_lineage_info( + table_identifier, self.platform + ) view_workunits = self.gen_view_dataset_workunits( view, project_id, dataset_name, lineage_info diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index d0ca9162f231c8..016236121235fd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -15,7 +15,7 @@ class BigQueryV2Report(SQLSourceReport): num_total_lineage_entries: Optional[int] = None num_skipped_lineage_entries_missing_data: Optional[int] = None num_skipped_lineage_entries_not_allowed: Optional[int] = None - num_skipped_lineage_entries_sql_parser_failure: Optional[int] = None + num_lineage_entries_sql_parser_failure: Optional[int] = None num_skipped_lineage_entries_other: Optional[int] = None num_total_log_entries: Optional[int] = None num_parsed_log_entires: Optional[int] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index dfecc7c2cbf10f..3458a64a6f2f0c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -362,7 +362,7 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st self.report.num_skipped_lineage_entries_missing_data = 0 self.report.num_skipped_lineage_entries_not_allowed = 0 self.report.num_skipped_lineage_entries_other = 0 - self.report.num_skipped_lineage_entries_sql_parser_failure = 0 + self.report.num_lineage_entries_sql_parser_failure = 0 for e in entries: self.report.num_total_lineage_entries += 1 if e.destinationTable is None or not ( @@ -400,10 +400,10 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st map(lambda x: x.split(".")[-1], parser.get_tables()) ) except Exception as ex: - logger.warning( - f"Sql Parser failed on query: {e.query}. It will be skipped from lineage. The error was {ex}" + logger.debug( + f"Sql Parser failed on query: {e.query}. It won't cause any issue except table/view lineage can't be detected reliably. The error was {ex}." ) - self.report.num_skipped_lineage_entries_sql_parser_failure += 1 + self.report.num_lineage_entries_sql_parser_failure += 1 continue curr_lineage_str = lineage_map[destination_table_str] new_lineage_str = set() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 7e5ea81ba95c0b..ab88dfcd41ff09 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -13,7 +13,10 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report -from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryTable +from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( + BigqueryColumn, + BigqueryTable, +) from datahub.ingestion.source.ge_data_profiler import ( DatahubGEProfiler, GEProfilerRequest, @@ -78,39 +81,57 @@ def generate_partition_profiler_query( partition = table.max_partition_id if partition: partition_where_clause: str - logger.debug(f"{table} is partitioned and partition column is {partition}") - try: - ( - partition_datetime, - upper_bound_partition_datetime, - ) = self.get_partition_range_from_partition_id( - partition, partition_datetime - ) - except ValueError as e: - logger.error( - f"Unable to get partition range for partition id: {partition} it failed with exception {e}" - ) - self.report.invalid_partition_ids[f"{schema}.{table}"] = partition - return None, None - - if table.time_partitioning.type_ in ("DAY", "MONTH", "YEAR"): - partition_where_clause = "{column_name} BETWEEN DATE('{partition_id}') AND DATE('{upper_bound_partition_id}')".format( - column_name=table.time_partitioning.field, - partition_id=partition_datetime, - upper_bound_partition_id=upper_bound_partition_datetime, - ) - elif table.time_partitioning.type_ in ("HOUR"): - partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format( - column_name=table.time_partitioning.field, - partition_id=partition_datetime, - upper_bound_partition_id=upper_bound_partition_datetime, - ) + + if not table.time_partitioning: + partition_column: Optional[BigqueryColumn] = None + for column in table.columns: + if column.is_partition_column: + partition_column = column + break + if partition_column: + partition_where_clause = f"{partition_column.name} >= {partition}" + else: + logger.warning( + f"Partitioned table {table.name} without partiton column" + ) + return None, None else: - logger.warning( - f"Not supported partition type {table.time_partitioning.type_}" + logger.debug( + f"{table.name} is partitioned and partition column is {partition}" ) - return None, None - + try: + ( + partition_datetime, + upper_bound_partition_datetime, + ) = self.get_partition_range_from_partition_id( + partition, partition_datetime + ) + except ValueError as e: + logger.error( + f"Unable to get partition range for partition id: {partition} it failed with exception {e}" + ) + self.report.invalid_partition_ids[ + f"{schema}.{table.name}" + ] = partition + return None, None + + if table.time_partitioning.type_ in ("DAY", "MONTH", "YEAR"): + partition_where_clause = "{column_name} BETWEEN DATE('{partition_id}') AND DATE('{upper_bound_partition_id}')".format( + column_name=table.time_partitioning.field, + partition_id=partition_datetime, + upper_bound_partition_id=upper_bound_partition_datetime, + ) + elif table.time_partitioning.type_ in ("HOUR"): + partition_where_clause = "{column_name} BETWEEN '{partition_id}' AND '{upper_bound_partition_id}'".format( + column_name=table.time_partitioning.field, + partition_id=partition_datetime, + upper_bound_partition_id=upper_bound_partition_datetime, + ) + else: + logger.warning( + f"Not supported partition type {table.time_partitioning.type_}" + ) + return None, None custom_sql = """ SELECT * diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index 5a1ada41810ee0..b175703890f439 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -546,10 +546,25 @@ def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]: def _get_sql_info(cls, sql: str, sql_parser_path: str) -> SQLInfo: parser_cls = cls._import_sql_parser_cls(sql_parser_path) - parser_instance: SQLParser = parser_cls(sql) + try: + parser_instance: SQLParser = parser_cls(sql) + except Exception as e: + logger.warning(f"Sql parser failed on {sql} with {e}") + return SQLInfo(table_names=[], column_names=[]) + + sql_table_names: List[str] + try: + sql_table_names = parser_instance.get_tables() + except Exception as e: + logger.warning(f"Sql parser failed on {sql} with {e}") + sql_table_names = [] + + try: + column_names: List[str] = parser_instance.get_columns() + except Exception as e: + logger.warning(f"Sql parser failed on {sql} with {e}") + column_names = [] - sql_table_names: List[str] = parser_instance.get_tables() - column_names: List[str] = parser_instance.get_columns() logger.debug(f"Column names parsed = {column_names}") # Drop table names with # in them sql_table_names = [t for t in sql_table_names if "#" not in t] diff --git a/metadata-ingestion/src/datahub/ingestion/source/redash.py b/metadata-ingestion/src/datahub/ingestion/source/redash.py index 9d33f57bb09348..0678058e8dc626 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redash.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redash.py @@ -390,7 +390,11 @@ def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]: def _get_sql_table_names(cls, sql: str, sql_parser_path: str) -> List[str]: parser_cls = cls._import_sql_parser_cls(sql_parser_path) - sql_table_names: List[str] = parser_cls(sql).get_tables() + try: + sql_table_names: List[str] = parser_cls(sql).get_tables() + except Exception as e: + logger.warning(f"Sql parser failed on {sql} with {e}") + return [] # Remove quotes from table names sql_table_names = [t.replace('"', "") for t in sql_table_names] diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py index 85dbf1b54b742e..d35064fb3a70d8 100644 --- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py +++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py @@ -642,7 +642,12 @@ def get_dataset_partitions(self, batch_identifier, data_asset): query=query, customProperties=batchSpecProperties, ) - tables = DefaultSQLParser(query).get_tables() + try: + tables = DefaultSQLParser(query).get_tables() + except Exception as e: + logger.warning(f"Sql parser failed on {query} with {e}") + tables = [] + if len(set(tables)) != 1: warn( "DataHubValidationAction does not support cross dataset assertions." diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py index 06caf726be2528..a386a000c50cc8 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py +++ b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py @@ -8,6 +8,8 @@ from sqllineage.core.holders import Column, SQLLineageHolder from sqllineage.exceptions import SQLLineageException +from datahub.utilities.sql_parser_base import SQLParser, SqlParserException + with contextlib.suppress(ImportError): import sqlparse from networkx import DiGraph @@ -17,7 +19,7 @@ logger = logging.getLogger(__name__) -class SqlLineageSQLParserImpl: +class SqlLineageSQLParserImpl(SQLParser): _DATE_SWAP_TOKEN = "__d_a_t_e" _HOUR_SWAP_TOKEN = "__h_o_u_r" _TIMESTAMP_SWAP_TOKEN = "__t_i_m_e_s_t_a_m_p" @@ -27,6 +29,7 @@ class SqlLineageSQLParserImpl: _MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME" def __init__(self, sql_query: str) -> None: + super().__init__(sql_query) original_sql_query = sql_query # SqlLineageParser makes mistakes on lateral flatten queries, use the prefix @@ -97,7 +100,9 @@ def __init__(self, sql_query: str) -> None: ] self._sql_holder = SQLLineageHolder.of(*self._stmt_holders) except SQLLineageException as e: - logger.error(f"SQL lineage analyzer error '{e}' for query: '{self._sql}") + raise SqlParserException( + f"SQL lineage analyzer error '{e}' for query: '{self._sql}" + ) from e def get_tables(self) -> List[str]: result: List[str] = [] @@ -123,8 +128,7 @@ def get_tables(self) -> List[str]: def get_columns(self) -> List[str]: if self._sql_holder is None: - logger.error("sql holder not present so cannot get columns") - return [] + raise SqlParserException("sql holder not present so cannot get columns") graph: DiGraph = self._sql_holder.graph # For mypy attribute checking column_nodes = [n for n in graph.nodes if isinstance(n, Column)] column_graph = graph.subgraph(column_nodes) diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py index 12bd7e5f40bfe5..d090e1d8df3a78 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/sql_parser.py @@ -9,25 +9,13 @@ from typing import List, Optional, Tuple, Type from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl +from datahub.utilities.sql_parser_base import SQLParser, SqlParserException with contextlib.suppress(ImportError): from sql_metadata import Parser as MetadataSQLParser logger = logging.getLogger(__name__) -class SQLParser(metaclass=ABCMeta): - def __init__(self, sql_query: str) -> None: - self._sql_query = sql_query - - @abstractmethod - def get_tables(self) -> List[str]: - pass - - @abstractmethod - def get_columns(self) -> List[str]: - pass - - class MetadataSQLSQLParser(SQLParser): _DATE_SWAP_TOKEN = "__d_a_t_e" @@ -104,7 +92,7 @@ def sql_lineage_parser_impl_func_wrapper( exc_info = sys.exc_info() exc_msg: str = str(exc_info[1]) + "".join(traceback.format_tb(exc_info[2])) exception_details = (exc_info[0], exc_msg) - logger.error(exc_msg) + logger.debug(exc_msg) finally: queue.put((tables, columns, exception_details)) From 00ee4bdfb0226881868ccf3dc96bc954335d3f59 Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 26 Sep 2022 15:18:19 +0200 Subject: [PATCH 3/4] Removing unused imports --- metadata-ingestion/src/datahub/utilities/sql_parser.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py index d090e1d8df3a78..39792c60ffc006 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/sql_parser.py @@ -4,12 +4,11 @@ import re import sys import traceback -from abc import ABCMeta, abstractmethod from multiprocessing import Process, Queue from typing import List, Optional, Tuple, Type from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl -from datahub.utilities.sql_parser_base import SQLParser, SqlParserException +from datahub.utilities.sql_parser_base import SQLParser with contextlib.suppress(ImportError): from sql_metadata import Parser as MetadataSQLParser From a028d58b8e92897531cea316252ee32fa068902a Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 26 Sep 2022 15:31:21 +0200 Subject: [PATCH 4/4] bumping sqllineage everywhere --- metadata-ingestion/setup.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 4c384a81ea584e..56ce98c0723942 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -150,7 +150,7 @@ def get_long_description(): "sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", - "sqllineage==1.3.5", + "sqllineage==1.3.6", *path_spec_common, } @@ -223,11 +223,11 @@ def get_long_description(): "azure-ad": set(), "bigquery": sql_common | bigquery_common - | {"sqlalchemy-bigquery>=1.4.1", "sqllineage==1.3.5", "sqlparse"}, + | {"sqlalchemy-bigquery>=1.4.1", "sqllineage==1.3.6", "sqlparse"}, "bigquery-usage": bigquery_common | usage_common | {"cachetools"}, "bigquery-beta": sql_common | bigquery_common - | {"sqllineage==1.3.5", "sql_metadata"}, + | {"sqllineage==1.3.6", "sql_metadata"}, "clickhouse": sql_common | {"clickhouse-sqlalchemy==0.1.8"}, "clickhouse-usage": sql_common | usage_common @@ -269,9 +269,9 @@ def get_long_description(): "looker": looker_common, # lkml>=1.1.2 is required to support the sql_preamble expression in LookML "lookml": looker_common - | {"lkml>=1.1.2", "sql-metadata==2.2.2", "sqllineage==1.3.5", "GitPython>2"}, - "metabase": {"requests", "sqllineage==1.3.5"}, - "mode": {"requests", "sqllineage==1.3.5", "tenacity>=8.0.1"}, + | {"lkml>=1.1.2", "sql-metadata==2.2.2", "sqllineage==1.3.6", "GitPython>2"}, + "metabase": {"requests", "sqllineage==1.3.6"}, + "mode": {"requests", "sqllineage==1.3.6", "tenacity>=8.0.1"}, "mongodb": {"pymongo[srv]>=3.11", "packaging"}, "mssql": sql_common | {"sqlalchemy-pytds>=0.3"}, "mssql-odbc": sql_common | {"pyodbc"}, @@ -284,7 +284,7 @@ def get_long_description(): "presto-on-hive": sql_common | {"psycopg2-binary", "acryl-pyhive[hive]>=0.6.12", "pymysql>=1.0.2"}, "pulsar": {"requests"}, - "redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.5"}, + "redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.6"}, "redshift": sql_common | redshift_common, "redshift-usage": sql_common | usage_common | redshift_common, "s3": {*s3_base, *data_lake_profiling},