Skip to content

Commit

Permalink
port to other transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Nov 22, 2022
1 parent 10cea63 commit 959eedb
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def create(
return cls(config, ctx)

@staticmethod
def get_browse_paths_to_set(
def _merge_with_server_browse_paths(
graph: DataHubGraph, urn: str, mce_browse_paths: Optional[BrowsePathsClass]
) -> Optional[BrowsePathsClass]:
if not mce_browse_paths or not mce_browse_paths.paths:
Expand Down Expand Up @@ -82,12 +82,11 @@ def transform_aspect(

if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_browse_paths: Optional[
BrowsePathsClass
] = AddDatasetBrowsePathTransformer.get_browse_paths_to_set(
self.ctx.graph, entity_urn, browse_paths
return cast(
Optional[Aspect],
AddDatasetBrowsePathTransformer._merge_with_server_browse_paths(
self.ctx.graph, entity_urn, browse_paths
),
)
if patch_browse_paths is not None:
browse_paths = patch_browse_paths

return cast(Optional[Aspect], browse_paths)
else:
return cast(Aspect, browse_paths)
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetPropertie
return cls(config, ctx)

@staticmethod
def get_patch_dataset_properties_aspect(
def _merge_with_server_properties(
graph: DataHubGraph,
entity_urn: str,
dataset_properties_aspect: Optional[DatasetPropertiesClass],
Expand Down Expand Up @@ -108,15 +108,11 @@ def transform_aspect(
if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_dataset_properties_aspect = (
AddDatasetProperties.get_patch_dataset_properties_aspect(
AddDatasetProperties._merge_with_server_properties(
self.ctx.graph, entity_urn, out_dataset_properties_aspect
)
)
out_dataset_properties_aspect = (
patch_dataset_properties_aspect
if patch_dataset_properties_aspect is not None
else out_dataset_properties_aspect
)
return cast(Optional[Aspect], patch_dataset_properties_aspect)

return cast(Aspect, out_dataset_properties_aspect)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,25 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags":
return cls(config, ctx)

@staticmethod
def get_patch_global_tags_aspect(
def _merge_with_server_global_tags(
graph: DataHubGraph, urn: str, global_tags_aspect: Optional[GlobalTagsClass]
) -> Optional[GlobalTagsClass]:
if not global_tags_aspect or not global_tags_aspect.tags:
# nothing to add, no need to consult server
return global_tags_aspect
return None

# Merge the transformed tags with existing server tags.
# The transformed tags takes precedence, which may change the tag context.
server_global_tags_aspect = graph.get_tags(entity_urn=urn)
# No server aspect to patch
if server_global_tags_aspect is None:
return global_tags_aspect

# Compute patch
# We only include tags which are not present in the server tag list
server_tag_urns: List[str] = [
tag_association.tag for tag_association in server_global_tags_aspect.tags
]
tags_to_add: List[TagAssociationClass] = [
tag_association
for tag_association in global_tags_aspect.tags
if tag_association.tag not in server_tag_urns
]
# Lets patch
patch_global_tags_aspect: GlobalTagsClass = GlobalTagsClass(tags=[])
patch_global_tags_aspect.tags.extend(server_global_tags_aspect.tags)
patch_global_tags_aspect.tags.extend(tags_to_add)

return patch_global_tags_aspect
if server_global_tags_aspect:
global_tags_aspect.tags = list(
{
**{tag.tag: tag for tag in server_global_tags_aspect.tags},
**{tag.tag: tag for tag in global_tags_aspect.tags},
}.values()
)

return global_tags_aspect

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
Expand All @@ -78,18 +69,16 @@ def transform_aspect(
if tags_to_add is not None:
out_global_tags_aspect.tags.extend(tags_to_add)

patch_global_tags_aspect: Optional[GlobalTagsClass] = None
if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_global_tags_aspect = AddDatasetTags.get_patch_global_tags_aspect(
self.ctx.graph, entity_urn, out_global_tags_aspect
return cast(
Optional[Aspect],
AddDatasetTags._merge_with_server_global_tags(
self.ctx.graph, entity_urn, out_global_tags_aspect
),
)

return (
cast(Optional[Aspect], patch_global_tags_aspect)
if patch_global_tags_aspect is not None
else cast(Optional[Aspect], out_global_tags_aspect)
)
return cast(Aspect, out_global_tags_aspect)


class SimpleDatasetTagConfig(TransformerSemanticsConfigModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,27 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTerms":
return cls(config, ctx)

@staticmethod
def get_patch_glossary_terms_aspect(
def _merge_with_server_glossary_terms(
graph: DataHubGraph,
urn: str,
glossary_terms_aspect: Optional[GlossaryTermsClass],
) -> Optional[GlossaryTermsClass]:
if not glossary_terms_aspect or not glossary_terms_aspect.terms:
# nothing to add, no need to consult server
return glossary_terms_aspect
return None

# Merge the transformed terms with existing server terms.
# The transformed terms takes precedence, which may change the term context.
server_glossary_terms_aspect = graph.get_glossary_terms(entity_urn=urn)
# No server glossary_terms_aspect to compute a patch
if server_glossary_terms_aspect is None:
return glossary_terms_aspect

# Compute patch
server_term_urns: List[str] = [
term.urn for term in server_glossary_terms_aspect.terms
]
# We only include terms which are not present in the server_term_urns list
terms_to_add: List[GlossaryTermAssociationClass] = [
term
for term in glossary_terms_aspect.terms
if term.urn not in server_term_urns
]
# Lets patch
patch_glossary_terms_aspect: GlossaryTermsClass = GlossaryTermsClass(
terms=[], auditStamp=glossary_terms_aspect.auditStamp
)
patch_glossary_terms_aspect.terms.extend(server_glossary_terms_aspect.terms)
patch_glossary_terms_aspect.terms.extend(terms_to_add)
if server_glossary_terms_aspect is not None:
glossary_terms_aspect.terms = list(
{
**{term.urn: term for term in server_glossary_terms_aspect.terms},
**{term.urn: term for term in glossary_terms_aspect.terms},
}.values()
)

return patch_glossary_terms_aspect
return glossary_terms_aspect

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
Expand Down Expand Up @@ -103,15 +92,12 @@ def transform_aspect(
patch_glossary_terms: Optional[GlossaryTermsClass] = None
if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_glossary_terms = AddDatasetTerms.get_patch_glossary_terms_aspect(
patch_glossary_terms = AddDatasetTerms._merge_with_server_glossary_terms(
self.ctx.graph, entity_urn, out_glossary_terms
)

return (
cast(Optional[Aspect], patch_glossary_terms)
if patch_glossary_terms is not None
else cast(Optional[Aspect], out_glossary_terms)
)
return cast(Optional[Aspect], patch_glossary_terms)
else:
return cast(Aspect, out_glossary_terms)


class SimpleDatasetTermsConfig(TransformerSemanticsConfigModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get_domain_class(
return domain_class

@staticmethod
def get_domains_to_set(
def _merge_with_server_domains(
graph: DataHubGraph, urn: str, mce_domain: Optional[DomainsClass]
) -> Optional[DomainsClass]:
if not mce_domain or not mce_domain.domains:
Expand Down Expand Up @@ -108,15 +108,10 @@ def transform_aspect(
assert self.ctx.graph
patch_domain_aspect: Optional[
DomainsClass
] = AddDatasetDomain.get_domains_to_set(
] = AddDatasetDomain._merge_with_server_domains(
self.ctx.graph, entity_urn, domain_aspect
)
# This will pass the mypy lint
domain_aspect = (
patch_domain_aspect
if patch_domain_aspect is not None
else domain_aspect
)
return cast(Optional[Aspect], patch_domain_aspect)

return cast(Optional[Aspect], domain_aspect)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ def transform_aspect(
) -> Optional[Aspect]:
in_ownership_aspect = cast(OwnershipClass, aspect)
if in_ownership_aspect is None:
return cast(Aspect, in_ownership_aspect)
return None

in_ownership_aspect.owners = []

return cast(Aspect, in_ownership_aspect)

0 comments on commit 959eedb

Please sign in to comment.