Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Business Glossary# Add domain support in GlossaryTerm ingestion #6829

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The business glossary source file should be a `.yml` file with the following top
- **inherits**: (optional) List of **GlossaryTerm** that this term inherits from
- **contains**: (optional) List of **GlossaryTerm** that this term contains
- **custom_properties**: A map of key/value pairs of arbitrary custom properties
- **domain**: (optional) domain name or domain urn

You can also view an example business glossary file checked in [here](../../../examples/bootstrap_data/business_glossary.yml)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ nodes:
description: Highly Confidential Data
custom_properties:
is_confidential: true
domain: Marketing
- name: PersonalInformation
description: All terms related to personal information
owners:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
make_user_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import ( # SourceCapability,; capability,
SupportStatus,
config_class,
Expand All @@ -26,6 +27,8 @@
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.urn_encoder import UrnEncoder

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,6 +67,7 @@ class GlossaryTermConfig(ConfigModel):
related_terms: Optional[List[str]]
custom_properties: Optional[Dict[str, str]]
knowledge_links: Optional[List[KnowledgeCard]]
domain: Optional[str]


class GlossaryNodeConfig(ConfigModel):
Expand Down Expand Up @@ -170,7 +174,9 @@ def get_owners(owners: Owners) -> models.OwnershipClass:


def get_mces(
glossary: BusinessGlossaryConfig, ingestion_config: BusinessGlossarySourceConfig
glossary: BusinessGlossaryConfig,
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
path: List[str] = []
root_owners = get_owners(glossary.owners)
Expand All @@ -184,6 +190,7 @@ def get_mces(
parentOwners=root_owners,
defaults=glossary,
ingestion_config=ingestion_config,
ctx=ctx,
)

if glossary.terms:
Expand All @@ -195,6 +202,7 @@ def get_mces(
parentOwnership=root_owners,
defaults=glossary,
ingestion_config=ingestion_config,
ctx=ctx,
)


Expand Down Expand Up @@ -230,13 +238,20 @@ def make_institutional_memory_mcp(
return None


def make_domain_mcp(
term_urn: str, domain_aspect: models.DomainsClass
) -> MetadataChangeProposalWrapper:
return MetadataChangeProposalWrapper(entityUrn=term_urn, aspect=domain_aspect)


def get_mces_from_node(
glossaryNode: GlossaryNodeConfig,
path: List[str],
parentNode: Optional[str],
parentOwners: models.OwnershipClass,
defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]:
node_urn = make_glossary_node_urn(
path, glossaryNode.id, ingestion_config.enable_auto_id
Expand Down Expand Up @@ -273,6 +288,7 @@ def get_mces_from_node(
parentOwners=node_owners,
defaults=defaults,
ingestion_config=ingestion_config,
ctx=ctx,
)

if glossaryNode.terms:
Expand All @@ -284,16 +300,34 @@ def get_mces_from_node(
parentOwnership=node_owners,
defaults=defaults,
ingestion_config=ingestion_config,
ctx=ctx,
)


def get_domain_class(
graph: Optional[DataHubGraph], domains: List[str]
) -> models.DomainsClass:
# FIXME: In the ideal case, the domain registry would be an instance variable so that it
# preserves its cache across calls to this function. However, the current implementation
# requires the full list of domains to be passed in at instantiation time, so we can't
# actually do that.
domain_registry: DomainRegistry = DomainRegistry(
jjoyce0510 marked this conversation as resolved.
Show resolved Hide resolved
cached_domains=[k for k in domains], graph=graph
)
domain_class = models.DomainsClass(
domains=[domain_registry.get_domain_urn(domain) for domain in domains]
)
return domain_class


def get_mces_from_term(
glossaryTerm: GlossaryTermConfig,
path: List[str],
parentNode: Optional[str],
parentOwnership: models.OwnershipClass,
defaults: DefaultConfig,
ingestion_config: BusinessGlossarySourceConfig,
ctx: PipelineContext,
) -> Iterable[Union[models.MetadataChangeEventClass, MetadataChangeProposalWrapper]]:
term_urn = make_glossary_term_urn(
path, glossaryTerm.id, ingestion_config.enable_auto_id
Expand Down Expand Up @@ -388,6 +422,11 @@ def get_mces_from_term(
ownership = get_owners(glossaryTerm.owners)
aspects.append(ownership)

if glossaryTerm.domain is not None:
yield make_domain_mcp(
term_urn, get_domain_class(ctx.graph, [glossaryTerm.domain])
)

term_snapshot: models.GlossaryTermSnapshotClass = models.GlossaryTermSnapshotClass(
urn=term_urn,
aspects=aspects,
Expand Down Expand Up @@ -450,7 +489,9 @@ def load_glossary_config(self, file_name: pathlib.Path) -> BusinessGlossaryConfi
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
glossary_config = self.load_glossary_config(self.config.file)
populate_path_vs_id(glossary_config)
for event in get_mces(glossary_config, ingestion_config=self.config):
for event in get_mces(
glossary_config, ingestion_config=self.config, ctx=self.ctx
):
if isinstance(event, models.MetadataChangeEventClass):
wu = MetadataWorkUnit(f"{event.proposedSnapshot.urn}", mce=event)
self.report.report_workunit(wu)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ nodes:
description: Highly Confidential Data
custom_properties:
is_confidential: true
domain: Marketing

- name: Personal Information
description: All terms related to personal information
owners:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"entityType": "glossaryTerm",
"entityUrn": "urn:li:glossaryTerm:Classification.Highly Confidential",
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"value": "{\"domains\": [\"urn:li:domain:Marketing\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "datahub-business-glossary-2020_04_14-07_00_00"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.GlossaryTermSnapshot": {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,39 +1,69 @@
import shutil
from typing import List
from typing import Any, Dict, List

import pytest
from freezegun import freeze_time

from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.metadata import business_glossary
from tests.test_helpers import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd

FROZEN_TIME = "2020-04-14 07:00:00"


def get_default_recipe(
glossary_yml_file_path: str, event_output_file_path: str
) -> Dict[str, Any]:
return {
"source": {
"type": "datahub-business-glossary",
"config": {"file": glossary_yml_file_path},
},
"sink": {
"type": "file",
"config": {
"filename": event_output_file_path,
},
},
}


@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_glossary_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
def test_glossary_ingest(
mock_datahub_graph, docker_compose_runner, pytestconfig, tmp_path, mock_time
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/business-glossary"

# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "glossary_to_file.yml").resolve()
shutil.copy(test_resources_dir / "business_glossary.yml", tmp_path)
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)
# These paths change from one instance run of the clickhouse docker to the other, and the FROZEN_TIME does not apply to these.
# These paths change from one instance run of the clickhouse docker to the other,
# and the FROZEN_TIME does not apply to these.
ignore_paths: List[str] = [
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['metadata_modification_time'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['data_paths'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['metadata_path'\]",
]

output_mces_path: str = f"{tmp_path}/glossary_events.json"
golden_mces_path: str = f"{test_resources_dir}/glossary_events_golden.json"

pipeline = Pipeline.create(
get_default_recipe(
glossary_yml_file_path=f"{test_resources_dir}/business_glossary.yml",
event_output_file_path=output_mces_path,
)
)
pipeline.ctx.graph = mock_datahub_graph(
DatahubClientConfig()
) # Mock to resolve domain
pipeline.run()
pipeline.raise_from_status()

# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
ignore_paths=ignore_paths,
output_path=tmp_path / "glossary_events.json",
golden_path=test_resources_dir / "glossary_events_golden.json",
output_path=output_mces_path,
golden_path=golden_mces_path,
)


Expand Down