Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): looker - deps, column level lineage fixes #6271

Merged
merged 1 commit into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ def get_long_description():
looker_common = {
# Looker Python SDK
"looker-sdk==22.2.1",
# This version of lkml contains a fix for parsing lists in
# LookML files with spaces between an item and the following comma.
# See https://github.com/joshtemple/lkml/issues/73.
"lkml>=1.3.0b5",
"sql-metadata==2.2.2",
"sqllineage==1.3.6",
"GitPython>2",
}

bigquery_common = {
Expand Down Expand Up @@ -267,16 +274,7 @@ def get_long_description():
"kafka-connect": sql_common | {"requests", "JPype1"},
"ldap": {"python-ldap>=2.4"},
"looker": looker_common,
"lookml": looker_common
| {
# This version of lkml contains a fix for parsing lists in
# LookML files with spaces between an item and the following comma.
# See https://github.com/joshtemple/lkml/issues/73.
"lkml>=1.3.0b5",
"sql-metadata==2.2.2",
"sqllineage==1.3.6",
"GitPython>2",
},
"lookml": looker_common,
"metabase": {"requests", "sqllineage==1.3.6"},
"mode": {"requests", "sqllineage==1.3.6", "tenacity>=8.0.1"},
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,11 @@ def _to_metadata_events( # noqa: C901
view_urn, field_path
)
],
downstreams=[
builder.make_schema_field_urn(
self.get_explore_urn(config), field.name
)
],
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
FineGrainedLineageUpstreamTypeClass,
SubTypesClass,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.sql_parser import SQLParser

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -200,6 +201,10 @@ class LookMLSourceConfig(LookerCommonConfig):
False,
description="When enabled, field descriptions will include the sql logic for computed fields if descriptions are missing",
)
process_isolation_for_sql_parsing: bool = Field(
True,
description="When enabled, sql parsing will be executed in a separate process to prevent memory leaks.",
)

@validator("platform_instance")
def platform_instance_not_supported(cls, v: str) -> str:
Expand Down Expand Up @@ -270,9 +275,12 @@ def check_base_folder_if_not_provided(
class LookMLSourceReport(SourceReport):
git_clone_latency: Optional[timedelta] = None
models_discovered: int = 0
models_dropped: List[str] = dataclass_field(default_factory=list)
models_dropped: List[str] = dataclass_field(default_factory=LossyList)
views_discovered: int = 0
views_dropped: List[str] = dataclass_field(default_factory=list)
views_dropped: List[str] = dataclass_field(default_factory=LossyList)
query_parse_attempts: int = 0
query_parse_failures: int = 0
query_parse_failure_views: List[str] = dataclass_field(default_factory=LossyList)
_looker_api: Optional[LookerAPI] = None

def report_models_scanned(self) -> None:
Expand Down Expand Up @@ -609,6 +617,9 @@ def _find_view_from_resolved_includes(
return None


_SQL_FUNCTIONS = ["UNNEST"]


@dataclass
class LookerView:
id: LookerViewId
Expand All @@ -629,11 +640,15 @@ def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
return parser_cls

@classmethod
def _get_sql_info(cls, sql: str, sql_parser_path: str) -> SQLInfo:
def _get_sql_info(
cls, sql: str, sql_parser_path: str, use_external_process: bool = True
) -> SQLInfo:
parser_cls = cls._import_sql_parser_cls(sql_parser_path)

try:
parser_instance: SQLParser = parser_cls(sql)
parser_instance: SQLParser = parser_cls(
sql, use_external_process=use_external_process
)
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
return SQLInfo(table_names=[], column_names=[])
Expand All @@ -658,6 +673,10 @@ def _get_sql_info(cls, sql: str, sql_parser_path: str) -> SQLInfo:
# Remove quotes from table names
sql_table_names = [t.replace('"', "") for t in sql_table_names]
sql_table_names = [t.replace("`", "") for t in sql_table_names]
# Remove reserved words from table names
sql_table_names = [
t for t in sql_table_names if t.upper() not in _SQL_FUNCTIONS
]

return SQLInfo(table_names=sql_table_names, column_names=column_names)

Expand Down Expand Up @@ -722,6 +741,7 @@ def from_looker_dict(
sql_parser_path: str = "datahub.utilities.sql_parser.DefaultSQLParser",
extract_col_level_lineage: bool = False,
populate_sql_logic_in_descriptions: bool = False,
process_isolation_for_sql_parsing: bool = True,
) -> Optional["LookerView"]:
view_name = looker_view["name"]
logger.debug(f"Handling view {view_name} in model {model_name}")
Expand Down Expand Up @@ -786,6 +806,7 @@ def from_looker_dict(
sql_table_name,
derived_table,
fields,
use_external_process=process_isolation_for_sql_parsing,
)
if "sql" in derived_table:
view_logic = derived_table["sql"]
Expand Down Expand Up @@ -845,31 +866,41 @@ def from_looker_dict(
@classmethod
def _extract_metadata_from_sql_query(
cls: Type,
reporter: SourceReport,
reporter: LookMLSourceReport,
parse_table_names_from_sql: bool,
sql_parser_path: str,
view_name: str,
sql_table_name: Optional[str],
derived_table: dict,
fields: List[ViewField],
use_external_process: bool,
) -> Tuple[List[ViewField], List[str]]:
sql_table_names: List[str] = []
if parse_table_names_from_sql and "sql" in derived_table:
logger.debug(f"Parsing sql from derived table section of view: {view_name}")
sql_query = derived_table["sql"]
reporter.query_parse_attempts += 1

# Skip queries that contain liquid variables. We currently don't parse them correctly
if "{%" in sql_query:
logger.debug(
f"{view_name}: Skipping sql_query parsing since it contains liquid variables"
)
# A hail-mary simple parse.
for maybe_table_match in re.finditer(
r"FROM\s*([a-zA-Z0-9_.]+)", sql_query
):
if maybe_table_match.group(1) not in sql_table_names:
sql_table_names.append(maybe_table_match.group(1))
return fields, sql_table_names
try:
# test if parsing works
sql_info: SQLInfo = cls._get_sql_info(
sql_query, sql_parser_path, use_external_process
)
if not sql_info.table_names:
raise Exception("Failed to find any tables")
except Exception:
logger.debug(
f"{view_name}: SQL Parsing didn't return any tables, trying a hail-mary"
)
# A hail-mary simple parse.
for maybe_table_match in re.finditer(
r"FROM\s*([a-zA-Z0-9_.`]+)", sql_query
):
if maybe_table_match.group(1) not in sql_table_names:
sql_table_names.append(maybe_table_match.group(1))
return fields, sql_table_names
# Looker supports sql fragments that omit the SELECT and FROM parts of the query
# Add those in if we detect that it is missing
if not re.search(r"SELECT\s", sql_query, flags=re.I):
Expand All @@ -880,7 +911,9 @@ def _extract_metadata_from_sql_query(
sql_query = f"{sql_query} FROM {sql_table_name if sql_table_name is not None else view_name}"
# Get the list of tables in the query
try:
sql_info = cls._get_sql_info(sql_query, sql_parser_path)
sql_info = cls._get_sql_info(
sql_query, sql_parser_path, use_external_process
)
sql_table_names = sql_info.table_names
column_names = sql_info.column_names
if not fields:
Expand All @@ -890,12 +923,20 @@ def _extract_metadata_from_sql_query(
ViewField(c, "", "unknown", "", ViewFieldType.UNKNOWN)
for c in sorted(column_names)
]
if not sql_info.table_names:
reporter.query_parse_failures += 1
reporter.query_parse_failure_views.append(view_name)
except Exception as e:
reporter.query_parse_failures += 1
reporter.report_warning(
f"looker-view-{view_name}",
f"Failed to parse sql query, lineage will not be accurate. Exception: {e}",
)

# remove fields or sql tables that contain liquid variables
fields = [f for f in fields if "{%" not in f.name]
sql_table_names = [table for table in sql_table_names if "{%" not in table]

return fields, sql_table_names

@classmethod
Expand Down Expand Up @@ -1156,7 +1197,11 @@ def _get_upstream_lineage(
sql_table_name, looker_view
)
fine_grained_lineages: List[FineGrainedLineageClass] = []
if self.source_config.extract_column_level_lineage:
if self.source_config.extract_column_level_lineage and (
looker_view.view_details is not None
and looker_view.view_details.viewLanguage
!= VIEW_LANGUAGE_SQL # we currently only map col-level lineage for views without sql
):
for field in looker_view.fields:
if field.upstream_fields:
fine_grained_lineage = FineGrainedLineageClass(
Expand Down Expand Up @@ -1571,6 +1616,7 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
self.source_config.sql_parser,
self.source_config.extract_column_level_lineage,
self.source_config.populate_sql_logic_for_missing_descriptions,
process_isolation_for_sql_parsing=self.source_config.process_isolation_for_sql_parsing,
)
except Exception as e:
self.reporter.report_warning(
Expand Down
33 changes: 24 additions & 9 deletions metadata-ingestion/src/datahub/utilities/sql_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import traceback
from multiprocessing import Process, Queue
from typing import List, Optional, Tuple, Type
from typing import Any, List, Optional, Tuple, Type

from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl
from datahub.utilities.sql_parser_base import SQLParser
Expand All @@ -18,8 +18,8 @@
class MetadataSQLSQLParser(SQLParser):
_DATE_SWAP_TOKEN = "__d_a_t_e"

def __init__(self, sql_query: str) -> None:
super().__init__(sql_query)
def __init__(self, sql_query: str, use_external_process: bool = True) -> None:
super().__init__(sql_query, use_external_process)

original_sql_query = sql_query

Expand Down Expand Up @@ -68,9 +68,9 @@ def get_columns(self) -> List[str]:


def sql_lineage_parser_impl_func_wrapper(
queue: multiprocessing.Queue,
queue: Optional[multiprocessing.Queue],
sql_query: str,
) -> None:
) -> Optional[Tuple[List[str], List[str], Any]]:
"""
The wrapper function that computes the tables and columns using the SqlLineageSQLParserImpl
and puts the results on the shared IPC queue. This is used to isolate SqlLineageSQLParserImpl
Expand All @@ -93,13 +93,28 @@ def sql_lineage_parser_impl_func_wrapper(
exception_details = (exc_info[0], exc_msg)
logger.debug(exc_msg)
finally:
queue.put((tables, columns, exception_details))
if queue is not None:
queue.put((tables, columns, exception_details))
return None
else:
return (tables, columns, exception_details)


class SqlLineageSQLParser(SQLParser):
def __init__(self, sql_query: str) -> None:
super().__init__(sql_query)
self.tables, self.columns = self._get_tables_columns_process_wrapped(sql_query)
def __init__(self, sql_query: str, use_external_process: bool = True) -> None:
super().__init__(sql_query, use_external_process)
if use_external_process:
self.tables, self.columns = self._get_tables_columns_process_wrapped(
sql_query
)
else:
return_tuple = sql_lineage_parser_impl_func_wrapper(None, sql_query)
if return_tuple is not None:
(
self.tables,
self.columns,
some_exception,
) = return_tuple

@staticmethod
def _get_tables_columns_process_wrapped(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class SqlParserException(Exception):


class SQLParser(metaclass=ABCMeta):
def __init__(self, sql_query: str) -> None:
def __init__(self, sql_query: str, use_external_process: bool = True) -> None:
self._sql_query = sql_query

@abstractmethod
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ def pytest_addoption(parser):
action="store_true",
default=False,
)
parser.addoption("--copy-output-files", action="store_true", default=False)
Loading