Skip to content

Commit

Permalink
Disabling by default to run sql parser in a separate process
Browse files Browse the repository at this point in the history
Fixing adding views to the global view list
  • Loading branch information
treff7es committed Dec 6, 2022
1 parent 815c00f commit ec243d1
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 278 deletions.
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,8 @@ class LineageConfig(ConfigModel):
default=True,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

sql_parser_use_external_process: bool = Field(
default=False,
description="When enabled, sql parser will run in isolated in a separate process. This can affect processing time but can protect from sql parser's mem leak.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,11 @@ def _process_view(
conn, table_identifier, column_limit=self.config.column_limit
)

if dataset_name not in self.db_views[project_id]:
self.db_views[project_id][dataset_name] = []

self.db_views[project_id][dataset_name].append(view)

view_workunits = self.gen_view_dataset_workunits(view, project_id, dataset_name)
for wu in view_workunits:
self.report.report_workunit(wu)
Expand Down Expand Up @@ -1142,8 +1147,6 @@ def get_views_for_dataset(

views = self.db_views.get(project_id)

# get all views for database failed,
# falling back to get views for schema
if not views:
return BigQueryDataDictionary.get_views_for_dataset(
conn, project_id, dataset_name, self.config.profiling.enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,9 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
# in the references. There is no distinction between direct/base objects accessed. So doing sql parsing
# to ensure we only use direct objects accessed for lineage
try:
parser = BigQuerySQLParser(e.query)
parser = BigQuerySQLParser(
e.query, self.config.sql_parser_use_external_process
)
referenced_objs = set(
map(lambda x: x.split(".")[-1], parser.get_tables())
)
Expand Down Expand Up @@ -468,7 +470,9 @@ def parse_view_lineage(
parsed_tables = set()
if view.ddl:
try:
parser = BigQuerySQLParser(view.ddl)
parser = BigQuerySQLParser(
view.ddl, self.config.sql_parser_use_external_process
)
tables = parser.get_tables()
except Exception as ex:
logger.debug(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
class BigQuerySQLParser(SQLParser):
parser: SQLParser

def __init__(self, sql_query: str) -> None:
def __init__(self, sql_query: str, use_external_process: bool = False) -> None:
super().__init__(sql_query)

self._parsed_sql_query = self.parse_sql_query(sql_query)
self.parser = SqlLineageSQLParser(self._parsed_sql_query)
self.parser = SqlLineageSQLParser(self._parsed_sql_query, use_external_process)

def parse_sql_query(self, sql_query: str) -> str:
sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query)
Expand Down

0 comments on commit ec243d1

Please sign in to comment.