diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_browse_path.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_browse_path.py index d0533af2db7151..55989cf17f2691 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_browse_path.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_browse_path.py @@ -36,7 +36,7 @@ def create( return cls(config, ctx) @staticmethod - def get_browse_paths_to_set( + def _merge_with_server_browse_paths( graph: DataHubGraph, urn: str, mce_browse_paths: Optional[BrowsePathsClass] ) -> Optional[BrowsePathsClass]: if not mce_browse_paths or not mce_browse_paths.paths: @@ -82,12 +82,11 @@ def transform_aspect( if self.config.semantics == TransformerSemantics.PATCH: assert self.ctx.graph - patch_browse_paths: Optional[ - BrowsePathsClass - ] = AddDatasetBrowsePathTransformer.get_browse_paths_to_set( - self.ctx.graph, entity_urn, browse_paths + return cast( + Optional[Aspect], + AddDatasetBrowsePathTransformer._merge_with_server_browse_paths( + self.ctx.graph, entity_urn, browse_paths + ), ) - if patch_browse_paths is not None: - browse_paths = patch_browse_paths - - return cast(Optional[Aspect], browse_paths) + else: + return cast(Aspect, browse_paths) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py index c849de5bafbbd0..6e87902fe219cc 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py @@ -52,41 +52,23 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetOwnership return cls(config, ctx) @staticmethod - def get_patch_ownership_aspect( + def _merge_with_server_ownership( graph: DataHubGraph, urn: str, mce_ownership: Optional[OwnershipClass] ) -> Optional[OwnershipClass]: if not mce_ownership or not mce_ownership.owners: - # nothing to add, no need to consult server + # If there are no owners to add, we don't need to patch anything. return None - assert mce_ownership + + # Merge the transformed ownership with existing server ownership. + # The transformed ownership takes precedence, which may change the ownership type. + server_ownership = graph.get_ownership(entity_urn=urn) if server_ownership: - # compute patch - # we only include owners who are not present in the server ownership - # if owner ids match, but the ownership type differs, we prefer the transformers opinion - owners_to_add: List[OwnerClass] = [] - needs_update = False - server_owner_ids = [o.owner for o in server_ownership.owners] - for owner in mce_ownership.owners: - if owner.owner not in server_owner_ids: - owners_to_add.append(owner) - else: - # we need to check if the type matches, and if it doesn't, update it - for server_owner in server_ownership.owners: - if ( - owner.owner == server_owner.owner - and owner.type != server_owner.type - ): - server_owner.type = owner.type - needs_update = True - - if owners_to_add or needs_update: - mce_ownership.owners = server_ownership.owners + owners_to_add - return mce_ownership - else: - return None - else: - return mce_ownership + owners = {owner.owner: owner for owner in server_ownership.owners} + owners.update({owner.owner: owner for owner in mce_ownership.owners}) + mce_ownership.owners = list(owners.values()) + + return mce_ownership def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] @@ -108,18 +90,16 @@ def transform_aspect( if owners_to_add is not None: out_ownership_aspect.owners.extend(owners_to_add) - patch_ownership: Optional[OwnershipClass] = None if self.config.semantics == TransformerSemantics.PATCH: assert self.ctx.graph - patch_ownership = AddDatasetOwnership.get_patch_ownership_aspect( - self.ctx.graph, entity_urn, out_ownership_aspect + return cast( + Optional[Aspect], + self._merge_with_server_ownership( + self.ctx.graph, entity_urn, out_ownership_aspect + ), ) - - return ( - cast(Aspect, patch_ownership) - if patch_ownership is not None - else cast(Aspect, out_ownership_aspect) - ) + else: + return cast(Aspect, out_ownership_aspect) class DatasetOwnershipBaseConfig(TransformerSemanticsConfigModel): diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py index 601a8eb612cd23..8f8e7b66db6a9e 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py @@ -54,7 +54,7 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetPropertie return cls(config, ctx) @staticmethod - def get_patch_dataset_properties_aspect( + def _merge_with_server_properties( graph: DataHubGraph, entity_urn: str, dataset_properties_aspect: Optional[DatasetPropertiesClass], @@ -108,15 +108,11 @@ def transform_aspect( if self.config.semantics == TransformerSemantics.PATCH: assert self.ctx.graph patch_dataset_properties_aspect = ( - AddDatasetProperties.get_patch_dataset_properties_aspect( + AddDatasetProperties._merge_with_server_properties( self.ctx.graph, entity_urn, out_dataset_properties_aspect ) ) - out_dataset_properties_aspect = ( - patch_dataset_properties_aspect - if patch_dataset_properties_aspect is not None - else out_dataset_properties_aspect - ) + return cast(Optional[Aspect], patch_dataset_properties_aspect) return cast(Aspect, out_dataset_properties_aspect) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py index fa077d205426d0..02df9e77a6a6d5 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py @@ -36,34 +36,25 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags": return cls(config, ctx) @staticmethod - def get_patch_global_tags_aspect( + def _merge_with_server_global_tags( graph: DataHubGraph, urn: str, global_tags_aspect: Optional[GlobalTagsClass] ) -> Optional[GlobalTagsClass]: if not global_tags_aspect or not global_tags_aspect.tags: # nothing to add, no need to consult server - return global_tags_aspect + return None + # Merge the transformed tags with existing server tags. + # The transformed tags takes precedence, which may change the tag context. server_global_tags_aspect = graph.get_tags(entity_urn=urn) - # No server aspect to patch - if server_global_tags_aspect is None: - return global_tags_aspect - - # Compute patch - # We only include tags which are not present in the server tag list - server_tag_urns: List[str] = [ - tag_association.tag for tag_association in server_global_tags_aspect.tags - ] - tags_to_add: List[TagAssociationClass] = [ - tag_association - for tag_association in global_tags_aspect.tags - if tag_association.tag not in server_tag_urns - ] - # Lets patch - patch_global_tags_aspect: GlobalTagsClass = GlobalTagsClass(tags=[]) - patch_global_tags_aspect.tags.extend(server_global_tags_aspect.tags) - patch_global_tags_aspect.tags.extend(tags_to_add) - - return patch_global_tags_aspect + if server_global_tags_aspect: + global_tags_aspect.tags = list( + { + **{tag.tag: tag for tag in server_global_tags_aspect.tags}, + **{tag.tag: tag for tag in global_tags_aspect.tags}, + }.values() + ) + + return global_tags_aspect def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] @@ -78,18 +69,16 @@ def transform_aspect( if tags_to_add is not None: out_global_tags_aspect.tags.extend(tags_to_add) - patch_global_tags_aspect: Optional[GlobalTagsClass] = None if self.config.semantics == TransformerSemantics.PATCH: assert self.ctx.graph - patch_global_tags_aspect = AddDatasetTags.get_patch_global_tags_aspect( - self.ctx.graph, entity_urn, out_global_tags_aspect + return cast( + Optional[Aspect], + AddDatasetTags._merge_with_server_global_tags( + self.ctx.graph, entity_urn, out_global_tags_aspect + ), ) - return ( - cast(Optional[Aspect], patch_global_tags_aspect) - if patch_global_tags_aspect is not None - else cast(Optional[Aspect], out_global_tags_aspect) - ) + return cast(Aspect, out_global_tags_aspect) class SimpleDatasetTagConfig(TransformerSemanticsConfigModel): diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py index 9bb7e23ec155e1..44cbc1a4662c21 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_terms.py @@ -43,38 +43,27 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTerms": return cls(config, ctx) @staticmethod - def get_patch_glossary_terms_aspect( + def _merge_with_server_glossary_terms( graph: DataHubGraph, urn: str, glossary_terms_aspect: Optional[GlossaryTermsClass], ) -> Optional[GlossaryTermsClass]: if not glossary_terms_aspect or not glossary_terms_aspect.terms: # nothing to add, no need to consult server - return glossary_terms_aspect + return None + # Merge the transformed terms with existing server terms. + # The transformed terms takes precedence, which may change the term context. server_glossary_terms_aspect = graph.get_glossary_terms(entity_urn=urn) - # No server glossary_terms_aspect to compute a patch - if server_glossary_terms_aspect is None: - return glossary_terms_aspect - - # Compute patch - server_term_urns: List[str] = [ - term.urn for term in server_glossary_terms_aspect.terms - ] - # We only include terms which are not present in the server_term_urns list - terms_to_add: List[GlossaryTermAssociationClass] = [ - term - for term in glossary_terms_aspect.terms - if term.urn not in server_term_urns - ] - # Lets patch - patch_glossary_terms_aspect: GlossaryTermsClass = GlossaryTermsClass( - terms=[], auditStamp=glossary_terms_aspect.auditStamp - ) - patch_glossary_terms_aspect.terms.extend(server_glossary_terms_aspect.terms) - patch_glossary_terms_aspect.terms.extend(terms_to_add) + if server_glossary_terms_aspect is not None: + glossary_terms_aspect.terms = list( + { + **{term.urn: term for term in server_glossary_terms_aspect.terms}, + **{term.urn: term for term in glossary_terms_aspect.terms}, + }.values() + ) - return patch_glossary_terms_aspect + return glossary_terms_aspect def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] @@ -103,15 +92,12 @@ def transform_aspect( patch_glossary_terms: Optional[GlossaryTermsClass] = None if self.config.semantics == TransformerSemantics.PATCH: assert self.ctx.graph - patch_glossary_terms = AddDatasetTerms.get_patch_glossary_terms_aspect( + patch_glossary_terms = AddDatasetTerms._merge_with_server_glossary_terms( self.ctx.graph, entity_urn, out_glossary_terms ) - - return ( - cast(Optional[Aspect], patch_glossary_terms) - if patch_glossary_terms is not None - else cast(Optional[Aspect], out_glossary_terms) - ) + return cast(Optional[Aspect], patch_glossary_terms) + else: + return cast(Aspect, out_glossary_terms) class SimpleDatasetTermsConfig(TransformerSemanticsConfigModel): diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py index 369ba1cea98c72..85317be0b567b0 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -68,7 +68,7 @@ def get_domain_class( return domain_class @staticmethod - def get_domains_to_set( + def _merge_with_server_domains( graph: DataHubGraph, urn: str, mce_domain: Optional[DomainsClass] ) -> Optional[DomainsClass]: if not mce_domain or not mce_domain.domains: @@ -108,15 +108,10 @@ def transform_aspect( assert self.ctx.graph patch_domain_aspect: Optional[ DomainsClass - ] = AddDatasetDomain.get_domains_to_set( + ] = AddDatasetDomain._merge_with_server_domains( self.ctx.graph, entity_urn, domain_aspect ) - # This will pass the mypy lint - domain_aspect = ( - patch_domain_aspect - if patch_domain_aspect is not None - else domain_aspect - ) + return cast(Optional[Aspect], patch_domain_aspect) return cast(Optional[Aspect], domain_aspect) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py index 35b1319c41aaec..f5d71a4340554f 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/remove_dataset_ownership.py @@ -31,8 +31,7 @@ def transform_aspect( ) -> Optional[Aspect]: in_ownership_aspect = cast(OwnershipClass, aspect) if in_ownership_aspect is None: - return cast(Aspect, in_ownership_aspect) + return None in_ownership_aspect.owners = [] - return cast(Aspect, in_ownership_aspect) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 6758169f94ef83..c332ee06ee3b06 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -845,7 +845,7 @@ def test_ownership_patching_intersect(mock_time): mce_ownership = gen_owners(["baz", "foo"]) mock_graph.get_ownership.return_value = server_ownership - test_ownership = AddDatasetOwnership.get_patch_ownership_aspect( + test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) assert test_ownership and test_ownership.owners @@ -858,7 +858,7 @@ def test_ownership_patching_with_nones(mock_time): mock_graph = mock.MagicMock() mce_ownership = gen_owners(["baz", "foo"]) mock_graph.get_ownership.return_value = None - test_ownership = AddDatasetOwnership.get_patch_ownership_aspect( + test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) assert test_ownership and test_ownership.owners @@ -867,7 +867,7 @@ def test_ownership_patching_with_nones(mock_time): server_ownership = gen_owners(["baz", "foo"]) mock_graph.get_ownership.return_value = server_ownership - test_ownership = AddDatasetOwnership.get_patch_ownership_aspect( + test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", None ) assert not test_ownership @@ -877,7 +877,7 @@ def test_ownership_patching_with_empty_mce_none_server(mock_time): mock_graph = mock.MagicMock() mce_ownership = gen_owners([]) mock_graph.get_ownership.return_value = None - test_ownership = AddDatasetOwnership.get_patch_ownership_aspect( + test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) # nothing to add, so we omit writing @@ -889,7 +889,7 @@ def test_ownership_patching_with_empty_mce_nonempty_server(mock_time): server_ownership = gen_owners(["baz", "foo"]) mce_ownership = gen_owners([]) mock_graph.get_ownership.return_value = server_ownership - test_ownership = AddDatasetOwnership.get_patch_ownership_aspect( + test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) # nothing to add, so we omit writing @@ -901,7 +901,7 @@ def test_ownership_patching_with_different_types_1(mock_time): server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER) mce_ownership = gen_owners(["foo"], models.OwnershipTypeClass.DATAOWNER) mock_graph.get_ownership.return_value = server_ownership - test_ownership = AddDatasetOwnership.get_patch_ownership_aspect( + test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) assert test_ownership and test_ownership.owners @@ -919,7 +919,7 @@ def test_ownership_patching_with_different_types_2(mock_time): server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER) mce_ownership = gen_owners(["foo", "baz"], models.OwnershipTypeClass.DATAOWNER) mock_graph.get_ownership.return_value = server_ownership - test_ownership = AddDatasetOwnership.get_patch_ownership_aspect( + test_ownership = AddDatasetOwnership._merge_with_server_ownership( mock_graph, "test_urn", mce_ownership ) assert test_ownership and test_ownership.owners