diff --git a/metadata-ingestion/docs/sources/business-glossary/datahub-business-glossary.md b/metadata-ingestion/docs/sources/business-glossary/datahub-business-glossary.md index 6b2dbcef25b285..da1be4be8a6a34 100644 --- a/metadata-ingestion/docs/sources/business-glossary/datahub-business-glossary.md +++ b/metadata-ingestion/docs/sources/business-glossary/datahub-business-glossary.md @@ -36,6 +36,7 @@ The business glossary source file should be a `.yml` file with the following top - **inherits**: (optional) List of **GlossaryTerm** that this term inherits from - **contains**: (optional) List of **GlossaryTerm** that this term contains - **custom_properties**: A map of key/value pairs of arbitrary custom properties +- **domain**: (optional) domain name or domain urn You can also view an example business glossary file checked in [here](../../../examples/bootstrap_data/business_glossary.yml) diff --git a/metadata-ingestion/examples/bootstrap_data/business_glossary.yml b/metadata-ingestion/examples/bootstrap_data/business_glossary.yml index a80f2fec849244..b3fc7002584de3 100644 --- a/metadata-ingestion/examples/bootstrap_data/business_glossary.yml +++ b/metadata-ingestion/examples/bootstrap_data/business_glossary.yml @@ -23,6 +23,7 @@ nodes: description: Highly Confidential Data custom_properties: is_confidential: true + domain: Marketing - name: PersonalInformation description: All terms related to personal information owners: diff --git a/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py b/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py index 068e107c762799..db138d761904f8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metadata/business_glossary.py @@ -18,6 +18,7 @@ make_user_urn, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( # SourceCapability,; capability, SupportStatus, config_class, @@ -26,6 +27,8 @@ ) from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit +from datahub.ingestion.graph.client import DataHubGraph +from datahub.utilities.registries.domain_registry import DomainRegistry from datahub.utilities.urn_encoder import UrnEncoder logger = logging.getLogger(__name__) @@ -64,6 +67,7 @@ class GlossaryTermConfig(ConfigModel): related_terms: Optional[List[str]] custom_properties: Optional[Dict[str, str]] knowledge_links: Optional[List[KnowledgeCard]] + domain: Optional[str] class GlossaryNodeConfig(ConfigModel): @@ -170,7 +174,9 @@ def get_owners(owners: Owners) -> models.OwnershipClass: def get_mces( - glossary: BusinessGlossaryConfig, ingestion_config: BusinessGlossarySourceConfig + glossary: BusinessGlossaryConfig, + ingestion_config: BusinessGlossarySourceConfig, + ctx: PipelineContext, ) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]: path: List[str] = [] root_owners = get_owners(glossary.owners) @@ -184,6 +190,7 @@ def get_mces( parentOwners=root_owners, defaults=glossary, ingestion_config=ingestion_config, + ctx=ctx, ) if glossary.terms: @@ -195,6 +202,7 @@ def get_mces( parentOwnership=root_owners, defaults=glossary, ingestion_config=ingestion_config, + ctx=ctx, ) @@ -230,6 +238,12 @@ def make_institutional_memory_mcp( return None +def make_domain_mcp( + term_urn: str, domain_aspect: models.DomainsClass +) -> MetadataChangeProposalWrapper: + return MetadataChangeProposalWrapper(entityUrn=term_urn, aspect=domain_aspect) + + def get_mces_from_node( glossaryNode: GlossaryNodeConfig, path: List[str], @@ -237,6 +251,7 @@ def get_mces_from_node( parentOwners: models.OwnershipClass, defaults: DefaultConfig, ingestion_config: BusinessGlossarySourceConfig, + ctx: PipelineContext, ) -> Iterable[Union[MetadataChangeProposalWrapper, models.MetadataChangeEventClass]]: node_urn = make_glossary_node_urn( path, glossaryNode.id, ingestion_config.enable_auto_id @@ -273,6 +288,7 @@ def get_mces_from_node( parentOwners=node_owners, defaults=defaults, ingestion_config=ingestion_config, + ctx=ctx, ) if glossaryNode.terms: @@ -284,9 +300,26 @@ def get_mces_from_node( parentOwnership=node_owners, defaults=defaults, ingestion_config=ingestion_config, + ctx=ctx, ) +def get_domain_class( + graph: Optional[DataHubGraph], domains: List[str] +) -> models.DomainsClass: + # FIXME: In the ideal case, the domain registry would be an instance variable so that it + # preserves its cache across calls to this function. However, the current implementation + # requires the full list of domains to be passed in at instantiation time, so we can't + # actually do that. + domain_registry: DomainRegistry = DomainRegistry( + cached_domains=[k for k in domains], graph=graph + ) + domain_class = models.DomainsClass( + domains=[domain_registry.get_domain_urn(domain) for domain in domains] + ) + return domain_class + + def get_mces_from_term( glossaryTerm: GlossaryTermConfig, path: List[str], @@ -294,6 +327,7 @@ def get_mces_from_term( parentOwnership: models.OwnershipClass, defaults: DefaultConfig, ingestion_config: BusinessGlossarySourceConfig, + ctx: PipelineContext, ) -> Iterable[Union[models.MetadataChangeEventClass, MetadataChangeProposalWrapper]]: term_urn = make_glossary_term_urn( path, glossaryTerm.id, ingestion_config.enable_auto_id @@ -388,6 +422,11 @@ def get_mces_from_term( ownership = get_owners(glossaryTerm.owners) aspects.append(ownership) + if glossaryTerm.domain is not None: + yield make_domain_mcp( + term_urn, get_domain_class(ctx.graph, [glossaryTerm.domain]) + ) + term_snapshot: models.GlossaryTermSnapshotClass = models.GlossaryTermSnapshotClass( urn=term_urn, aspects=aspects, @@ -450,7 +489,9 @@ def load_glossary_config(self, file_name: pathlib.Path) -> BusinessGlossaryConfi def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]: glossary_config = self.load_glossary_config(self.config.file) populate_path_vs_id(glossary_config) - for event in get_mces(glossary_config, ingestion_config=self.config): + for event in get_mces( + glossary_config, ingestion_config=self.config, ctx=self.ctx + ): if isinstance(event, models.MetadataChangeEventClass): wu = MetadataWorkUnit(f"{event.proposedSnapshot.urn}", mce=event) self.report.report_workunit(wu) diff --git a/metadata-ingestion/tests/integration/business-glossary/business_glossary.yml b/metadata-ingestion/tests/integration/business-glossary/business_glossary.yml index 9550960282872c..da238701e718d4 100644 --- a/metadata-ingestion/tests/integration/business-glossary/business_glossary.yml +++ b/metadata-ingestion/tests/integration/business-glossary/business_glossary.yml @@ -26,6 +26,8 @@ nodes: description: Highly Confidential Data custom_properties: is_confidential: true + domain: Marketing + - name: Personal Information description: All terms related to personal information owners: diff --git a/metadata-ingestion/tests/integration/business-glossary/glossary_events_golden.json b/metadata-ingestion/tests/integration/business-glossary/glossary_events_golden.json index 8bd977993dca70..9b60f863b6d29c 100644 --- a/metadata-ingestion/tests/integration/business-glossary/glossary_events_golden.json +++ b/metadata-ingestion/tests/integration/business-glossary/glossary_events_golden.json @@ -145,6 +145,20 @@ "runId": "datahub-business-glossary-2020_04_14-07_00_00" } }, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Classification.Highly Confidential", + "changeType": "UPSERT", + "aspectName": "domains", + "aspect": { + "value": "{\"domains\": [\"urn:li:domain:Marketing\"]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "datahub-business-glossary-2020_04_14-07_00_00" + } +}, { "proposedSnapshot": { "com.linkedin.pegasus2avro.metadata.snapshot.GlossaryTermSnapshot": { diff --git a/metadata-ingestion/tests/integration/business-glossary/glossary_to_file.yml b/metadata-ingestion/tests/integration/business-glossary/glossary_to_file.yml deleted file mode 100644 index 4e7c4977f073fb..00000000000000 --- a/metadata-ingestion/tests/integration/business-glossary/glossary_to_file.yml +++ /dev/null @@ -1,10 +0,0 @@ -source: - type: datahub-business-glossary - config: - # Coordinates - file: ./business_glossary.yml - -sink: - type: file - config: - filename: glossary_events.json diff --git a/metadata-ingestion/tests/integration/business-glossary/test_business_glossary.py b/metadata-ingestion/tests/integration/business-glossary/test_business_glossary.py index 2ac9cca972badd..dd42d88bf1193f 100644 --- a/metadata-ingestion/tests/integration/business-glossary/test_business_glossary.py +++ b/metadata-ingestion/tests/integration/business-glossary/test_business_glossary.py @@ -1,39 +1,69 @@ -import shutil -from typing import List +from typing import Any, Dict, List import pytest from freezegun import freeze_time +from datahub.ingestion.graph.client import DatahubClientConfig +from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.metadata import business_glossary from tests.test_helpers import mce_helpers -from tests.test_helpers.click_helpers import run_datahub_cmd FROZEN_TIME = "2020-04-14 07:00:00" +def get_default_recipe( + glossary_yml_file_path: str, event_output_file_path: str +) -> Dict[str, Any]: + return { + "source": { + "type": "datahub-business-glossary", + "config": {"file": glossary_yml_file_path}, + }, + "sink": { + "type": "file", + "config": { + "filename": event_output_file_path, + }, + }, + } + + @freeze_time(FROZEN_TIME) @pytest.mark.integration -def test_glossary_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time): +def test_glossary_ingest( + mock_datahub_graph, docker_compose_runner, pytestconfig, tmp_path, mock_time +): test_resources_dir = pytestconfig.rootpath / "tests/integration/business-glossary" - # Run the metadata ingestion pipeline. - config_file = (test_resources_dir / "glossary_to_file.yml").resolve() - shutil.copy(test_resources_dir / "business_glossary.yml", tmp_path) - run_datahub_cmd( - ["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path - ) - # These paths change from one instance run of the clickhouse docker to the other, and the FROZEN_TIME does not apply to these. + # These paths change from one instance run of the clickhouse docker to the other, + # and the FROZEN_TIME does not apply to these. ignore_paths: List[str] = [ r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['metadata_modification_time'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['data_paths'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['metadata_path'\]", ] + + output_mces_path: str = f"{tmp_path}/glossary_events.json" + golden_mces_path: str = f"{test_resources_dir}/glossary_events_golden.json" + + pipeline = Pipeline.create( + get_default_recipe( + glossary_yml_file_path=f"{test_resources_dir}/business_glossary.yml", + event_output_file_path=output_mces_path, + ) + ) + pipeline.ctx.graph = mock_datahub_graph( + DatahubClientConfig() + ) # Mock to resolve domain + pipeline.run() + pipeline.raise_from_status() + # Verify the output. mce_helpers.check_golden_file( pytestconfig, ignore_paths=ignore_paths, - output_path=tmp_path / "glossary_events.json", - golden_path=test_resources_dir / "glossary_events_golden.json", + output_path=output_mces_path, + golden_path=golden_mces_path, )