Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve performance on large SQL/graphs #685

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def run(self) -> None:
install_requires=[
"sqlparse==0.5.3",
"networkx>=2.4",
"rustworkx>=0.16.0",
"sqlfluff==3.3.1",
"sqlalchemy>=2.0.0",
],
Expand Down
29 changes: 25 additions & 4 deletions sqllineage/core/holders.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import itertools
from typing import Dict, List, Optional, Set, Tuple, Union
from typing import Dict, List, Optional, Set, Tuple, Union, cast

import networkx as nx
import rustworkx as rx
from networkx import DiGraph

from sqllineage.core.metadata_provider import MetaDataProvider
Expand All @@ -13,7 +14,9 @@

class ColumnLineageMixin:
def get_column_lineage(
self, exclude_path_ending_in_subquery=True, exclude_subquery_columns=False
self,
exclude_path_ending_in_subquery: bool = True,
exclude_subquery_columns: bool = False,
) -> Set[Tuple[Column, ...]]:
"""
:param exclude_path_ending_in_subquery: exclude_subquery rename to exclude_path_ending_in_subquery
Expand All @@ -38,9 +41,27 @@ def get_column_lineage(
node for node in target_columns if isinstance(node.parent, Table)
}
columns = set()

# Computing on networkx graph can be slow for very large graphs.
# We convert the graph to rustworkx which is significantly faster.
rx_graph = cast(rx.PyDiGraph, rx.networkx_converter(self.graph))

# rustworkx is based on indices only, meaning we have to keep a mapping
# between indices and actual nodes for both ways.
rx_mapping: Dict[Column, int] = {
node: index for index, node in enumerate(rx_graph.nodes())
}
rx_mapping_inv: Dict[int, Column] = {v: k for k, v in rx_mapping.items()}

for source, target in itertools.product(source_columns, target_columns):
simple_paths = list(nx.all_simple_paths(self.graph, source, target))
for path in simple_paths:
simple_paths = list(
rx.digraph_all_simple_paths(
rx_graph, rx_mapping[source], rx_mapping[target]
)
)
for rx_path in simple_paths:
path = [rx_mapping_inv[p] for p in rx_path]

if exclude_subquery_columns:
path = [
node for node in path if not isinstance(node.parent, SubQuery)
Expand Down
9 changes: 9 additions & 0 deletions sqllineage/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Tuple

import networkx as nx

from sqllineage import DEFAULT_DIALECT, SQLPARSE_DIALECT
from sqllineage.config import SQLLineageConfig
from sqllineage.core.holders import SQLLineageHolder
Expand Down Expand Up @@ -154,6 +156,13 @@ def intermediate_tables(self) -> List[Table]:
"""
return sorted(self._sql_holder.intermediate_tables, key=lambda x: str(x))

@lazy_property
def graph(self) -> nx.DiGraph:
"""
the fully parsed lineage graph :class:`networkx.DiGraph`
"""
return self._sql_holder.graph.copy()

@lazy_method
def get_column_lineage(
self, exclude_path_ending_in_subquery=True, exclude_subquery_columns=False
Expand Down
1 change: 1 addition & 0 deletions tests/core/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_runner_dummy():
assert str(runner)
assert runner.to_cytoscape() is not None
assert runner.to_cytoscape(level=LineageLevel.COLUMN) is not None
assert runner.graph is not None


def test_statements_trim_comment():
Expand Down