Skip to content

Commit

Permalink
feat(ingest): support knowledge links in business glossary (datahub-p…
Browse files Browse the repository at this point in the history
…roject#6375)

Co-authored-by: Shirshanka Das <[email protected]>
Co-authored-by: MohdSiddique Bagwan <[email protected]>
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
4 people authored and cccs-Dustin committed Feb 1, 2023
1 parent 8d6498f commit a66390c
Show file tree
Hide file tree
Showing 8 changed files with 775 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ url: "https://github.com/datahub-project/datahub/"
nodes:
- name: Classification
description: A set of terms related to Data Classification
knowledge_links:
- label: Wiki link for classification
url: "https://en.wikipedia.org/wiki/Classification"
terms:
- name: Sensitive
description: Sensitive Data
Expand Down Expand Up @@ -110,3 +113,6 @@ nodes:
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account"
related_terms:
- House.Kitchen
knowledge_links:
- url: "https://en.wikipedia.org/wiki/Spoon"
label: Wiki link
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
import pathlib
import time
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Union

import pydantic
from pydantic import validator
from pydantic.fields import Field

Expand All @@ -14,6 +17,7 @@
make_group_urn,
make_user_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.decorators import ( # SourceCapability,; capability,
SupportStatus,
config_class,
Expand All @@ -22,6 +26,7 @@
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
from datahub.utilities.urn_encoder import UrnEncoder

logger = logging.getLogger(__name__)

Expand All @@ -40,6 +45,11 @@ class Owners(ConfigModel):
groups: Optional[List[str]]


class KnowledgeCard(ConfigModel):
url: Optional[str]
label: Optional[str]


class GlossaryTermConfig(ConfigModel):
id: Optional[str]
name: str
Expand All @@ -53,6 +63,7 @@ class GlossaryTermConfig(ConfigModel):
values: Optional[List[str]]
related_terms: Optional[List[str]]
custom_properties: Optional[Dict[str, str]]
knowledge_links: Optional[List[KnowledgeCard]]


class GlossaryNodeConfig(ConfigModel):
Expand All @@ -62,6 +73,7 @@ class GlossaryNodeConfig(ConfigModel):
owners: Optional[Owners]
terms: Optional[List[GlossaryTermConfig]]
nodes: Optional[List["GlossaryNodeConfig"]]
knowledge_links: Optional[List[KnowledgeCard]]


GlossaryNodeConfig.update_forward_refs()
Expand All @@ -77,7 +89,9 @@ class DefaultConfig(ConfigModel):


class BusinessGlossarySourceConfig(ConfigModel):
file: str = Field(description="Path to business glossary file to ingest.")
file: pydantic.FilePath = Field(
description="Path to business glossary file to ingest."
)
enable_auto_id: bool = Field(
description="Generate id field from GlossaryNode and GlossaryTerm's name field",
default=False,
Expand All @@ -101,6 +115,10 @@ def create_id(path: List[str], default_id: Optional[str], enable_auto_id: bool)
return default_id # No need to create id from path as default_id is provided

id_: str = ".".join(path)

if UrnEncoder.contains_reserved_char(id_):
enable_auto_id = True

if enable_auto_id:
id_ = datahub_guid({"path": id_})
return id_
Expand Down Expand Up @@ -153,14 +171,13 @@ def get_owners(owners: Owners) -> models.OwnershipClass:

def get_mces(
glossary: BusinessGlossaryConfig, ingestion_config: BusinessGlossarySourceConfig
) -> List[models.MetadataChangeEventClass]:
events: List[models.MetadataChangeEventClass] = []
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
path: List[str] = []
root_owners = get_owners(glossary.owners)

if glossary.nodes:
for node in glossary.nodes:
events += get_mces_from_node(
yield from get_mces_from_node(
node,
path + [node.name],
parentNode=None,
Expand All @@ -171,7 +188,7 @@ def get_mces(

if glossary.terms:
for term in glossary.terms:
events += get_mces_from_term(
yield from get_mces_from_term(
term,
path + [term.name],
parentNode=None,
Expand All @@ -180,21 +197,47 @@ def get_mces(
ingestion_config=ingestion_config,
)

return events


def get_mce_from_snapshot(snapshot: Any) -> models.MetadataChangeEventClass:
return models.MetadataChangeEventClass(proposedSnapshot=snapshot)


def make_institutional_memory_mcp(
urn: str, knowledge_cards: List[KnowledgeCard]
) -> Optional[MetadataChangeProposalWrapper]:
elements: List[models.InstitutionalMemoryMetadataClass] = []

for knowledge_card in knowledge_cards:
if knowledge_card.label and knowledge_card.url:
elements.append(
models.InstitutionalMemoryMetadataClass(
url=knowledge_card.url,
description=knowledge_card.label,
createStamp=models.AuditStampClass(
time=int(time.time() * 1000.0),
actor="urn:li:corpuser:datahub",
message="ingestion bot",
),
)
)

if elements:
return MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=models.InstitutionalMemoryClass(elements=elements),
)

return None


def get_mces_from_node(
glossaryNode: GlossaryNodeConfig,
path: List[str],
parentNode: Optional[str],
parentOwners: models.OwnershipClass,
defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig,
) -> List[models.MetadataChangeEventClass]:
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
node_urn = make_glossary_node_urn(
path, glossaryNode.id, ingestion_config.enable_auto_id
)
Expand All @@ -212,10 +255,18 @@ def get_mces_from_node(
urn=node_urn,
aspects=[node_info, node_owners, valid_status],
)
mces = [get_mce_from_snapshot(node_snapshot)]
yield get_mce_from_snapshot(node_snapshot)

if glossaryNode.knowledge_links is not None:
mcp: Optional[MetadataChangeProposalWrapper] = make_institutional_memory_mcp(
node_urn, glossaryNode.knowledge_links
)
if mcp is not None:
yield mcp

if glossaryNode.nodes:
for node in glossaryNode.nodes:
mces += get_mces_from_node(
yield from get_mces_from_node(
node,
path + [node.name],
parentNode=node_urn,
Expand All @@ -226,15 +277,14 @@ def get_mces_from_node(

if glossaryNode.terms:
for term in glossaryNode.terms:
mces += get_mces_from_term(
yield from get_mces_from_term(
glossaryTerm=term,
path=path + [term.name],
parentNode=node_urn,
parentOwnership=node_owners,
defaults=defaults,
ingestion_config=ingestion_config,
)
return mces


def get_mces_from_term(
Expand All @@ -244,7 +294,7 @@ def get_mces_from_term(
parentOwnership: models.OwnershipClass,
defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig,
) -> List[models.MetadataChangeEventClass]:
) -> Iterable[Union[models.MetadataChangeEventClass, MetadataChangeProposalWrapper]]:
term_urn = make_glossary_term_urn(
path, glossaryTerm.id, ingestion_config.enable_auto_id
)
Expand Down Expand Up @@ -338,14 +388,18 @@ def get_mces_from_term(
ownership = get_owners(glossaryTerm.owners)
aspects.append(ownership)

term_browse = models.BrowsePathsClass(paths=["/" + "/".join(path)])
aspects.append(term_browse)

term_snapshot: models.GlossaryTermSnapshotClass = models.GlossaryTermSnapshotClass(
urn=term_urn,
aspects=aspects,
)
return [get_mce_from_snapshot(term_snapshot)]
yield get_mce_from_snapshot(term_snapshot)

if glossaryTerm.knowledge_links:
mcp: Optional[MetadataChangeProposalWrapper] = make_institutional_memory_mcp(
term_urn, glossaryTerm.knowledge_links
)
if mcp is not None:
yield mcp


def populate_path_vs_id(glossary: BusinessGlossaryConfig) -> None:
Expand Down Expand Up @@ -388,18 +442,25 @@ def create(cls, config_dict, ctx):
config = BusinessGlossarySourceConfig.parse_obj(config_dict)
return cls(ctx, config)

def load_glossary_config(self, file_name: str) -> BusinessGlossaryConfig:
def load_glossary_config(self, file_name: pathlib.Path) -> BusinessGlossaryConfig:
config = load_config_file(file_name)
glossary_cfg = BusinessGlossaryConfig.parse_obj(config)
return glossary_cfg

def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
glossary_config = self.load_glossary_config(self.config.file)
populate_path_vs_id(glossary_config)
for mce in get_mces(glossary_config, ingestion_config=self.config):
wu = MetadataWorkUnit(f"{mce.proposedSnapshot.urn}", mce=mce)
self.report.report_workunit(wu)
yield wu
for event in get_mces(glossary_config, ingestion_config=self.config):
if isinstance(event, models.MetadataChangeEventClass):
wu = MetadataWorkUnit(f"{event.proposedSnapshot.urn}", mce=event)
self.report.report_workunit(wu)
yield wu
elif isinstance(event, MetadataChangeProposalWrapper):
wu = MetadataWorkUnit(
id=f"{event.entityType}-{event.aspectName}-{event.entityUrn}",
mcp=event,
)
yield wu

def get_report(self):
return self.report
7 changes: 6 additions & 1 deletion metadata-ingestion/src/datahub/utilities/urn_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

# NOTE: Frontend relies on encoding these three characters. Specifically, we decode and encode schema fields for column level lineage.
# If this changes, make appropriate changes to datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts
RESERVED_CHARS = [",", "(", ")"]
RESERVED_CHARS = {",", "(", ")"}
RESERVED_CHARS_EXTENDED = RESERVED_CHARS.union({"%"})


class UrnEncoder:
Expand All @@ -19,3 +20,7 @@ def encode_string(s: str) -> str:
def encode_char(c: str) -> str:
assert len(c) == 1, "Invalid input, Expected single character"
return urllib.parse.quote(c) if c in RESERVED_CHARS else c

@staticmethod
def contains_reserved_char(value: str) -> bool:
return bool(set(value).intersection(RESERVED_CHARS_EXTENDED))
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
version: 1
source: DataHub
owners:
users:
- mjames
url: "https://github.com/datahub-project/datahub/"
nodes:
- name: Classification
description: A set of terms related to Data Classification
knowledge_links:
- label: Wiki link for classification
url: "https://en.wikipedia.org/wiki/Classification"
terms:
- name: Sensitive
description: Sensitive Data
custom_properties:
is_confidential: false
knowledge_links:
- label: Google Link
url: "https://www.google.com"
- name: Confidential
description: Confidential Data
custom_properties:
is_confidential: true
- name: Highly Confidential
description: Highly Confidential Data
custom_properties:
is_confidential: true
- name: Personal Information
description: All terms related to personal information
owners:
users:
- mjames
terms:
- name: Email
description: An individual's email address
inherits:
- Classification.Confidential
owners:
groups:
- Trust and Safety
- name: Address
description: A physical address
- name: Gender
description: The gender identity of the individual
inherits:
- Classification.Sensitive
- name: Clients And Accounts
description: Provides basic concepts such as account, account holder, account provider, relationship manager that are commonly used by financial services providers to describe customers and to determine counterparty identities
owners:
groups:
- finance
terms:
- name: Account
description: Container for records associated with a business arrangement for regular transactions and services
term_source: "EXTERNAL"
source_ref: FIBO
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account"
inherits:
- Classification.Highly Confidential
contains:
- Clients And Accounts.Balance
- name: Balance
description: Amount of money available or owed
term_source: "EXTERNAL"
source_ref: FIBO
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Balance"
- name: KPIs
description: Common Business KPIs
terms:
- name: CSAT %
description: Customer Satisfaction Score
Loading

0 comments on commit a66390c

Please sign in to comment.