Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): correctly handle transformer patch semantics #6505

Merged
merged 2 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def create(
return cls(config, ctx)

@staticmethod
def get_browse_paths_to_set(
def _merge_with_server_browse_paths(
graph: DataHubGraph, urn: str, mce_browse_paths: Optional[BrowsePathsClass]
) -> Optional[BrowsePathsClass]:
if not mce_browse_paths or not mce_browse_paths.paths:
Expand Down Expand Up @@ -82,12 +82,11 @@ def transform_aspect(

if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_browse_paths: Optional[
BrowsePathsClass
] = AddDatasetBrowsePathTransformer.get_browse_paths_to_set(
self.ctx.graph, entity_urn, browse_paths
return cast(
Optional[Aspect],
AddDatasetBrowsePathTransformer._merge_with_server_browse_paths(
self.ctx.graph, entity_urn, browse_paths
),
)
if patch_browse_paths is not None:
browse_paths = patch_browse_paths

return cast(Optional[Aspect], browse_paths)
else:
return cast(Aspect, browse_paths)
Original file line number Diff line number Diff line change
Expand Up @@ -52,41 +52,23 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetOwnership
return cls(config, ctx)

@staticmethod
def get_patch_ownership_aspect(
def _merge_with_server_ownership(
graph: DataHubGraph, urn: str, mce_ownership: Optional[OwnershipClass]
) -> Optional[OwnershipClass]:
if not mce_ownership or not mce_ownership.owners:
# nothing to add, no need to consult server
# If there are no owners to add, we don't need to patch anything.
return None
assert mce_ownership

# Merge the transformed ownership with existing server ownership.
# The transformed ownership takes precedence, which may change the ownership type.

server_ownership = graph.get_ownership(entity_urn=urn)
if server_ownership:
# compute patch
# we only include owners who are not present in the server ownership
# if owner ids match, but the ownership type differs, we prefer the transformers opinion
owners_to_add: List[OwnerClass] = []
needs_update = False
server_owner_ids = [o.owner for o in server_ownership.owners]
for owner in mce_ownership.owners:
if owner.owner not in server_owner_ids:
owners_to_add.append(owner)
else:
# we need to check if the type matches, and if it doesn't, update it
for server_owner in server_ownership.owners:
if (
owner.owner == server_owner.owner
and owner.type != server_owner.type
):
server_owner.type = owner.type
needs_update = True

if owners_to_add or needs_update:
mce_ownership.owners = server_ownership.owners + owners_to_add
return mce_ownership
else:
return None
else:
return mce_ownership
owners = {owner.owner: owner for owner in server_ownership.owners}
owners.update({owner.owner: owner for owner in mce_ownership.owners})
mce_ownership.owners = list(owners.values())

return mce_ownership

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
Expand All @@ -108,18 +90,16 @@ def transform_aspect(
if owners_to_add is not None:
out_ownership_aspect.owners.extend(owners_to_add)

patch_ownership: Optional[OwnershipClass] = None
if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_ownership = AddDatasetOwnership.get_patch_ownership_aspect(
self.ctx.graph, entity_urn, out_ownership_aspect
return cast(
Optional[Aspect],
self._merge_with_server_ownership(
self.ctx.graph, entity_urn, out_ownership_aspect
),
)

return (
cast(Aspect, patch_ownership)
if patch_ownership is not None
else cast(Aspect, out_ownership_aspect)
)
else:
return cast(Aspect, out_ownership_aspect)


class DatasetOwnershipBaseConfig(TransformerSemanticsConfigModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetPropertie
return cls(config, ctx)

@staticmethod
def get_patch_dataset_properties_aspect(
def _merge_with_server_properties(
graph: DataHubGraph,
entity_urn: str,
dataset_properties_aspect: Optional[DatasetPropertiesClass],
Expand Down Expand Up @@ -108,15 +108,11 @@ def transform_aspect(
if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_dataset_properties_aspect = (
AddDatasetProperties.get_patch_dataset_properties_aspect(
AddDatasetProperties._merge_with_server_properties(
self.ctx.graph, entity_urn, out_dataset_properties_aspect
)
)
out_dataset_properties_aspect = (
patch_dataset_properties_aspect
if patch_dataset_properties_aspect is not None
else out_dataset_properties_aspect
)
return cast(Optional[Aspect], patch_dataset_properties_aspect)

return cast(Aspect, out_dataset_properties_aspect)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,25 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags":
return cls(config, ctx)

@staticmethod
def get_patch_global_tags_aspect(
def _merge_with_server_global_tags(
graph: DataHubGraph, urn: str, global_tags_aspect: Optional[GlobalTagsClass]
) -> Optional[GlobalTagsClass]:
if not global_tags_aspect or not global_tags_aspect.tags:
# nothing to add, no need to consult server
return global_tags_aspect
return None

# Merge the transformed tags with existing server tags.
# The transformed tags takes precedence, which may change the tag context.
server_global_tags_aspect = graph.get_tags(entity_urn=urn)
# No server aspect to patch
if server_global_tags_aspect is None:
return global_tags_aspect

# Compute patch
# We only include tags which are not present in the server tag list
server_tag_urns: List[str] = [
tag_association.tag for tag_association in server_global_tags_aspect.tags
]
tags_to_add: List[TagAssociationClass] = [
tag_association
for tag_association in global_tags_aspect.tags
if tag_association.tag not in server_tag_urns
]
# Lets patch
patch_global_tags_aspect: GlobalTagsClass = GlobalTagsClass(tags=[])
patch_global_tags_aspect.tags.extend(server_global_tags_aspect.tags)
patch_global_tags_aspect.tags.extend(tags_to_add)

return patch_global_tags_aspect
if server_global_tags_aspect:
global_tags_aspect.tags = list(
{
**{tag.tag: tag for tag in server_global_tags_aspect.tags},
**{tag.tag: tag for tag in global_tags_aspect.tags},
}.values()
)

return global_tags_aspect

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
Expand All @@ -78,18 +69,16 @@ def transform_aspect(
if tags_to_add is not None:
out_global_tags_aspect.tags.extend(tags_to_add)

patch_global_tags_aspect: Optional[GlobalTagsClass] = None
if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_global_tags_aspect = AddDatasetTags.get_patch_global_tags_aspect(
self.ctx.graph, entity_urn, out_global_tags_aspect
return cast(
Optional[Aspect],
AddDatasetTags._merge_with_server_global_tags(
self.ctx.graph, entity_urn, out_global_tags_aspect
),
)

return (
cast(Optional[Aspect], patch_global_tags_aspect)
if patch_global_tags_aspect is not None
else cast(Optional[Aspect], out_global_tags_aspect)
)
return cast(Aspect, out_global_tags_aspect)


class SimpleDatasetTagConfig(TransformerSemanticsConfigModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,27 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTerms":
return cls(config, ctx)

@staticmethod
def get_patch_glossary_terms_aspect(
def _merge_with_server_glossary_terms(
graph: DataHubGraph,
urn: str,
glossary_terms_aspect: Optional[GlossaryTermsClass],
) -> Optional[GlossaryTermsClass]:
if not glossary_terms_aspect or not glossary_terms_aspect.terms:
# nothing to add, no need to consult server
return glossary_terms_aspect
return None

# Merge the transformed terms with existing server terms.
# The transformed terms takes precedence, which may change the term context.
server_glossary_terms_aspect = graph.get_glossary_terms(entity_urn=urn)
# No server glossary_terms_aspect to compute a patch
if server_glossary_terms_aspect is None:
return glossary_terms_aspect

# Compute patch
server_term_urns: List[str] = [
term.urn for term in server_glossary_terms_aspect.terms
]
# We only include terms which are not present in the server_term_urns list
terms_to_add: List[GlossaryTermAssociationClass] = [
term
for term in glossary_terms_aspect.terms
if term.urn not in server_term_urns
]
# Lets patch
patch_glossary_terms_aspect: GlossaryTermsClass = GlossaryTermsClass(
terms=[], auditStamp=glossary_terms_aspect.auditStamp
)
patch_glossary_terms_aspect.terms.extend(server_glossary_terms_aspect.terms)
patch_glossary_terms_aspect.terms.extend(terms_to_add)
if server_glossary_terms_aspect is not None:
glossary_terms_aspect.terms = list(
{
**{term.urn: term for term in server_glossary_terms_aspect.terms},
**{term.urn: term for term in glossary_terms_aspect.terms},
}.values()
)

return patch_glossary_terms_aspect
return glossary_terms_aspect

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
Expand Down Expand Up @@ -103,15 +92,12 @@ def transform_aspect(
patch_glossary_terms: Optional[GlossaryTermsClass] = None
if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_glossary_terms = AddDatasetTerms.get_patch_glossary_terms_aspect(
patch_glossary_terms = AddDatasetTerms._merge_with_server_glossary_terms(
self.ctx.graph, entity_urn, out_glossary_terms
)

return (
cast(Optional[Aspect], patch_glossary_terms)
if patch_glossary_terms is not None
else cast(Optional[Aspect], out_glossary_terms)
)
return cast(Optional[Aspect], patch_glossary_terms)
else:
return cast(Aspect, out_glossary_terms)


class SimpleDatasetTermsConfig(TransformerSemanticsConfigModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get_domain_class(
return domain_class

@staticmethod
def get_domains_to_set(
def _merge_with_server_domains(
graph: DataHubGraph, urn: str, mce_domain: Optional[DomainsClass]
) -> Optional[DomainsClass]:
if not mce_domain or not mce_domain.domains:
Expand Down Expand Up @@ -108,15 +108,10 @@ def transform_aspect(
assert self.ctx.graph
patch_domain_aspect: Optional[
DomainsClass
] = AddDatasetDomain.get_domains_to_set(
] = AddDatasetDomain._merge_with_server_domains(
self.ctx.graph, entity_urn, domain_aspect
)
# This will pass the mypy lint
domain_aspect = (
patch_domain_aspect
if patch_domain_aspect is not None
else domain_aspect
)
return cast(Optional[Aspect], patch_domain_aspect)

return cast(Optional[Aspect], domain_aspect)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ def transform_aspect(
) -> Optional[Aspect]:
in_ownership_aspect = cast(OwnershipClass, aspect)
if in_ownership_aspect is None:
return cast(Aspect, in_ownership_aspect)
return None

in_ownership_aspect.owners = []

return cast(Aspect, in_ownership_aspect)
14 changes: 7 additions & 7 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ def test_ownership_patching_intersect(mock_time):
mce_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = server_ownership

test_ownership = AddDatasetOwnership.get_patch_ownership_aspect(
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
Expand All @@ -858,7 +858,7 @@ def test_ownership_patching_with_nones(mock_time):
mock_graph = mock.MagicMock()
mce_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = None
test_ownership = AddDatasetOwnership.get_patch_ownership_aspect(
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
Expand All @@ -867,7 +867,7 @@ def test_ownership_patching_with_nones(mock_time):

server_ownership = gen_owners(["baz", "foo"])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_patch_ownership_aspect(
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", None
)
assert not test_ownership
Expand All @@ -877,7 +877,7 @@ def test_ownership_patching_with_empty_mce_none_server(mock_time):
mock_graph = mock.MagicMock()
mce_ownership = gen_owners([])
mock_graph.get_ownership.return_value = None
test_ownership = AddDatasetOwnership.get_patch_ownership_aspect(
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
# nothing to add, so we omit writing
Expand All @@ -889,7 +889,7 @@ def test_ownership_patching_with_empty_mce_nonempty_server(mock_time):
server_ownership = gen_owners(["baz", "foo"])
mce_ownership = gen_owners([])
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_patch_ownership_aspect(
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
# nothing to add, so we omit writing
Expand All @@ -901,7 +901,7 @@ def test_ownership_patching_with_different_types_1(mock_time):
server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER)
mce_ownership = gen_owners(["foo"], models.OwnershipTypeClass.DATAOWNER)
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_patch_ownership_aspect(
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
Expand All @@ -919,7 +919,7 @@ def test_ownership_patching_with_different_types_2(mock_time):
server_ownership = gen_owners(["baz", "foo"], models.OwnershipTypeClass.PRODUCER)
mce_ownership = gen_owners(["foo", "baz"], models.OwnershipTypeClass.DATAOWNER)
mock_graph.get_ownership.return_value = server_ownership
test_ownership = AddDatasetOwnership.get_patch_ownership_aspect(
test_ownership = AddDatasetOwnership._merge_with_server_ownership(
mock_graph, "test_urn", mce_ownership
)
assert test_ownership and test_ownership.owners
Expand Down