Skip to content

Commit

Permalink
feat: 1) add lineage for mssql stored_procedures; 2) add lineage for …
Browse files Browse the repository at this point in the history
…mssql objects; 3) add mssql_lineage flag for config
  • Loading branch information
sleeperdeep committed Jul 22, 2024
1 parent 425ec1d commit 8d6b7f9
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ class ProcedureDependency:
schema: str
name: str
type: str
incoming: int
outgoing: int
incoming: Union[str, int]
outgoing: Union[str, int]
source: str = "mssql"

def __post_init__(self):
self.incoming = int(self.incoming)
self.outgoing = int(self.outgoing)


@dataclass
class ProcedureLineageStream:
Expand Down
33 changes: 17 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ def __init__(self, config: SQLServerConfig, ctx: PipelineContext):
self.current_database = None
self.table_descriptions: Dict[str, str] = {}
self.column_descriptions: Dict[str, str] = {}
self.full_lineage: Dict[str, Dict[str, List[Dict[str, str]]]] = {}
self.procedures_dependencies: Dict[str, List[Dict[str, str | int]]] = {}
self.full_lineage: Dict[str, List[Dict[str, str]]] = {}
self.procedures_dependencies: Dict[str, List[Dict[str, str]]] = {}
if self.config.include_descriptions:
for inspector in self.get_inspectors():
db_name = self.get_db_name(inspector)
Expand Down Expand Up @@ -274,9 +274,11 @@ def get_ucs(self, inspector: Inspector, key: str) -> List[UpstreamClass]:
if not self.config.mssql_lineage:
return []
result = []
for dest in self.full_lineage.get(key, {}):
for dest in self.full_lineage.get(key, []):
dest_name = self.get_identifier(
schema=dest.get("schema"), entity=dest.get("name"), inspector=inspector # type: ignore[attr-defined]
schema=dest.get("schema", ""),
entity=dest.get("name", ""),
inspector=inspector,
)
dest_urn = make_dataset_urn_with_platform_instance(
self.platform,
Expand All @@ -292,7 +294,7 @@ def get_ucs(self, inspector: Inspector, key: str) -> List[UpstreamClass]:
return result

def _populate_stored_procedures_dependencies(self, conn: Connection) -> None:
_procedures_dependencies: Dict[str, List[Dict[str, str | int]]] = {}
_procedures_dependencies: Dict[str, List[Dict[str, str]]] = {}

trans = conn.begin()

Expand Down Expand Up @@ -563,9 +565,9 @@ def _populate_stored_procedures_dependencies(self, conn: Connection) -> None:
"referenced_schema_name": row["referenced_schema_name"],
"referenced_entity_name": row["referenced_entity_name"],
"referenced_object_type": row["referenced_object_type"],
"is_selected": int(row["is_selected"]),
"is_select_all": int(row["is_select_all"]),
"is_updated": int(row["is_updated"]),
"is_selected": row["is_selected"],
"is_select_all": row["is_select_all"],
"is_updated": row["is_updated"],
}
)

Expand Down Expand Up @@ -597,7 +599,7 @@ def _populate_object_links(self, conn: Connection, db_name: str) -> None:
# {"U ": "USER_TABLE", "V ": "VIEW", "P ": "SQL_STORED_PROCEDURE"}
for row in _links:
_key = f"{db_name}.{row['dst_schema_name']}.{row['dst_object_name']}"
self.full_lineage.setdefault(_key, []).append( # type: ignore[attr-defined, arg-type]
self.full_lineage.setdefault(_key, []).append(
{
"schema": row["src_schema_name"] or row["dst_schema_name"],
"name": row["src_object_name"],
Expand Down Expand Up @@ -895,13 +897,12 @@ def _extract_procedure_dependency(
upstream_dependencies.append(
ProcedureDependency(
flow_id=f"{dependency['referenced_database_name']}.{dependency['referenced_schema_name']}.stored_procedures",
db=str(dependency["referenced_database_name"]),
schema=str(dependency["referenced_schema_name"]),
name=str(dependency["referenced_entity_name"]),
type=str(dependency["referenced_object_type"]),
incoming=int(dependency["is_selected"])
or int(dependency["is_select_all"]),
outgoing=int(dependency["is_updated"]),
db=dependency["referenced_database_name"],
schema=dependency["referenced_schema_name"],
name=dependency["referenced_entity_name"],
type=dependency["referenced_object_type"],
incoming=dependency["is_selected"] or dependency["is_select_all"],
outgoing=dependency["is_updated"],
env=procedure.flow.env,
server=procedure.flow.platform_instance,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,8 +797,8 @@
"customProperties": {
"code": "CREATE PROCEDURE schema_with_lineage.procedure_with_lineage\nAS\nBEGIN\n INSERT INTO destination_table_3\n SELECT ID, original_source\n FROM schema_with_lineage.source_table_2\nEND;\n",
"input parameters": "[]",
"date_created": "2024-07-19 08:24:16.697000",
"date_modified": "2024-07-19 08:24:16.697000"
"date_created": "2024-07-22 06:53:18.693000",
"date_modified": "2024-07-22 06:53:18.693000"
},
"externalUrl": "",
"name": "lineagedb.schema_with_lineage.procedure_with_lineage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@
"aspect": {
"json": {
"customProperties": {
"job_id": "4ab477a3-0899-4044-81d8-b57a7fb00ae8",
"job_id": "1ac53c0f-b8d0-4f58-8349-f61e4700afb7",
"job_name": "Weekly Demo Data Backup",
"description": "No description available.",
"date_created": "2024-07-19 05:44:50.180000",
"date_modified": "2024-07-19 05:44:50.353000",
"date_created": "2024-07-22 06:53:18.037000",
"date_modified": "2024-07-22 06:53:18.213000",
"step_id": "1",
"step_name": "Set database to read only",
"subsystem": "TSQL",
Expand Down Expand Up @@ -1959,8 +1959,8 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}",
"date_created": "2024-07-19 05:44:50.100000",
"date_modified": "2024-07-19 05:44:50.100000"
"date_created": "2024-07-22 06:53:17.953000",
"date_modified": "2024-07-22 06:53:17.953000"
},
"externalUrl": "",
"name": "demodata.Foo.Proc.With.SpecialChar",
Expand Down Expand Up @@ -4401,8 +4401,8 @@
"customProperties": {
"code": "CREATE PROCEDURE schema_with_lineage.procedure_with_lineage\nAS\nBEGIN\n INSERT INTO destination_table_3\n SELECT ID, original_source\n FROM schema_with_lineage.source_table_2\nEND;\n",
"input parameters": "[]",
"date_created": "2024-07-19 05:44:50.830000",
"date_modified": "2024-07-19 05:44:50.830000"
"date_created": "2024-07-22 06:53:18.693000",
"date_modified": "2024-07-22 06:53:18.693000"
},
"externalUrl": "",
"name": "lineagedb.schema_with_lineage.procedure_with_lineage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@
"aspect": {
"json": {
"customProperties": {
"job_id": "4ab477a3-0899-4044-81d8-b57a7fb00ae8",
"job_id": "1ac53c0f-b8d0-4f58-8349-f61e4700afb7",
"job_name": "Weekly Demo Data Backup",
"description": "No description available.",
"date_created": "2024-07-19 05:44:50.180000",
"date_modified": "2024-07-19 05:44:50.353000",
"date_created": "2024-07-22 06:53:18.037000",
"date_modified": "2024-07-22 06:53:18.213000",
"step_id": "1",
"step_name": "Set database to read only",
"subsystem": "TSQL",
Expand Down Expand Up @@ -1959,8 +1959,8 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}",
"date_created": "2024-07-19 05:44:50.100000",
"date_modified": "2024-07-19 05:44:50.100000"
"date_created": "2024-07-22 06:53:17.953000",
"date_modified": "2024-07-22 06:53:17.953000"
},
"externalUrl": "",
"name": "demodata.Foo.Proc.With.SpecialChar",
Expand Down Expand Up @@ -4401,8 +4401,8 @@
"customProperties": {
"code": "CREATE PROCEDURE schema_with_lineage.procedure_with_lineage\nAS\nBEGIN\n INSERT INTO destination_table_3\n SELECT ID, original_source\n FROM schema_with_lineage.source_table_2\nEND;\n",
"input parameters": "[]",
"date_created": "2024-07-19 05:44:50.830000",
"date_modified": "2024-07-19 05:44:50.830000"
"date_created": "2024-07-22 06:53:18.693000",
"date_modified": "2024-07-22 06:53:18.693000"
},
"externalUrl": "",
"name": "lineagedb.schema_with_lineage.procedure_with_lineage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,4 @@ BEGIN
SELECT ID, original_source
FROM schema_with_lineage.source_table_2
END;
GO
GO

0 comments on commit 8d6b7f9

Please sign in to comment.