From b4f84c4c9ab969a0283d4b602ae13a12354b56d2 Mon Sep 17 00:00:00 2001 From: Tobias Ruby Date: Sat, 8 Feb 2025 23:18:30 +1100 Subject: [PATCH 1/4] Ability to access internal lineage graph --- sqllineage/runner.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sqllineage/runner.py b/sqllineage/runner.py index 113d0360..912ff95e 100644 --- a/sqllineage/runner.py +++ b/sqllineage/runner.py @@ -3,6 +3,8 @@ from collections import OrderedDict from typing import Any, Dict, List, Optional, Tuple +import networkx as nx + from sqllineage import DEFAULT_DIALECT, SQLPARSE_DIALECT from sqllineage.config import SQLLineageConfig from sqllineage.core.holders import SQLLineageHolder @@ -154,6 +156,13 @@ def intermediate_tables(self) -> List[Table]: """ return sorted(self._sql_holder.intermediate_tables, key=lambda x: str(x)) + @lazy_property + def graph(self) -> nx.DiGraph: + """ + the fully parsed lineage graph :class:`networkx.DiGraph` + """ + return self._sql_holder.graph.copy() + @lazy_method def get_column_lineage( self, exclude_path_ending_in_subquery=True, exclude_subquery_columns=False From bfbedfef8d33e1e765b666bc47d2b27cf5783104 Mon Sep 17 00:00:00 2001 From: Tobias Ruby Date: Sat, 8 Feb 2025 23:19:24 +1100 Subject: [PATCH 2/4] swap graph operations to work on rustworkx --- setup.py | 1 + sqllineage/core/holders.py | 29 +++++++++++++++++++++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 2e819e21..0c9955cd 100644 --- a/setup.py +++ b/setup.py @@ -68,6 +68,7 @@ def run(self) -> None: install_requires=[ "sqlparse==0.5.3", "networkx>=2.4", + "rustworkx>=0.16.0", "sqlfluff==3.3.1", "sqlalchemy>=2.0.0", ], diff --git a/sqllineage/core/holders.py b/sqllineage/core/holders.py index d8938161..47f949f4 100644 --- a/sqllineage/core/holders.py +++ b/sqllineage/core/holders.py @@ -1,8 +1,9 @@ import itertools -from typing import Dict, List, Optional, Set, Tuple, Union +from typing import Dict, List, Optional, Set, Tuple, Union, cast import networkx as nx from networkx import DiGraph +import rustworkx as rx from sqllineage.core.metadata_provider import MetaDataProvider from sqllineage.core.models import Column, Path, Schema, SubQuery, Table @@ -13,7 +14,9 @@ class ColumnLineageMixin: def get_column_lineage( - self, exclude_path_ending_in_subquery=True, exclude_subquery_columns=False + self, + exclude_path_ending_in_subquery: bool = True, + exclude_subquery_columns: bool = False, ) -> Set[Tuple[Column, ...]]: """ :param exclude_path_ending_in_subquery: exclude_subquery rename to exclude_path_ending_in_subquery @@ -38,9 +41,27 @@ def get_column_lineage( node for node in target_columns if isinstance(node.parent, Table) } columns = set() + + # Computing on networkx graph can be slow for very large graphs. + # We convert the graph to rustworkx which is significantly faster. + rx_graph = cast(rx.PyDiGraph, rx.networkx_converter(self.graph)) + + # rustworkx is based on indices only, meaning we have to keep a mapping + # between indices and actual nodes for both ways. + rx_mapping: Dict[Column, int] = { + node: index for index, node in enumerate(rx_graph.nodes()) + } + rx_mapping_inv: Dict[int, Column] = {v: k for k, v in rx_mapping.items()} + for source, target in itertools.product(source_columns, target_columns): - simple_paths = list(nx.all_simple_paths(self.graph, source, target)) - for path in simple_paths: + simple_paths = list( + rx.digraph_all_simple_paths( + rx_graph, rx_mapping[source], rx_mapping[target] + ) + ) + for rx_path in simple_paths: + path = [rx_mapping_inv[p] for p in rx_path] + if exclude_subquery_columns: path = [ node for node in path if not isinstance(node.parent, SubQuery) From bb2d9376bbaea57ce41cdf6517b8f57b20e6deda Mon Sep 17 00:00:00 2001 From: Tobias Ruby Date: Sat, 8 Feb 2025 23:29:51 +1100 Subject: [PATCH 3/4] fix import order --- sqllineage/core/holders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqllineage/core/holders.py b/sqllineage/core/holders.py index 47f949f4..535e18ce 100644 --- a/sqllineage/core/holders.py +++ b/sqllineage/core/holders.py @@ -2,8 +2,8 @@ from typing import Dict, List, Optional, Set, Tuple, Union, cast import networkx as nx -from networkx import DiGraph import rustworkx as rx +from networkx import DiGraph from sqllineage.core.metadata_provider import MetaDataProvider from sqllineage.core.models import Column, Path, Schema, SubQuery, Table From 2db876954bb34fb6077a8ccd890099a19774c320 Mon Sep 17 00:00:00 2001 From: Tobias Ruby Date: Sat, 8 Feb 2025 23:37:03 +1100 Subject: [PATCH 4/4] fix test coverage --- tests/core/test_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/core/test_runner.py b/tests/core/test_runner.py index aec0d521..92701f15 100644 --- a/tests/core/test_runner.py +++ b/tests/core/test_runner.py @@ -17,6 +17,7 @@ def test_runner_dummy(): assert str(runner) assert runner.to_cytoscape() is not None assert runner.to_cytoscape(level=LineageLevel.COLUMN) is not None + assert runner.graph is not None def test_statements_trim_comment():