Skip to content

Commit

Permalink
feat(ingest): supports MCEs in domain transformer (datahub-project#6364)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and cccs-Dustin committed Feb 1, 2023
1 parent 7264555 commit b33d0b8
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 22 deletions.
22 changes: 13 additions & 9 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,21 +342,25 @@ def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> b

constructor_annotations = get_type_hints(SnapshotType.__init__)
aspect_list_union = typing_inspect.get_args(constructor_annotations["aspects"])[0]
if not isinstance(aspect_list_union, tuple):
supported_aspect_types = typing_inspect.get_args(aspect_list_union)
else:
# On Python 3.6, the union type is represented as a tuple, where
# the first item is typing.Union and the subsequent elements are
# the types within the union.
supported_aspect_types = aspect_list_union[1:]

supported_aspect_types = typing_inspect.get_args(aspect_list_union)

return issubclass(AspectType, supported_aspect_types)


def assert_can_add_aspect(
mce: MetadataChangeEventClass, AspectType: Type[Aspect]
) -> None:
if not can_add_aspect(mce, AspectType):
raise AssertionError(
f"Cannot add aspect {AspectType} to {type(mce.proposedSnapshot)}"
)


def get_aspect_if_available(
mce: MetadataChangeEventClass, AspectType: Type[Aspect]
) -> Optional[Aspect]:
assert can_add_aspect(mce, AspectType)
assert_can_add_aspect(mce, AspectType)

all_aspects = mce.proposedSnapshot.aspects
aspects: List[Aspect] = [
Expand All @@ -375,7 +379,7 @@ def get_aspect_if_available(
def remove_aspect_if_available(
mce: MetadataChangeEventClass, aspect_type: Type[Aspect]
) -> bool:
assert can_add_aspect(mce, aspect_type)
assert_can_add_aspect(mce, aspect_type)
# loose type annotations since we checked before
aspects: List[Any] = [
aspect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, Iterable, List, Optional, Type, Union

import datahub.emitter.mce_builder
import datahub.emitter.mce_builder as builder
from datahub.emitter.aspect import ASPECT_MAP
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -127,9 +127,13 @@ def _transform_or_record_mce(
aspect_type = ASPECT_MAP.get(self.aspect_name())
if aspect_type:
# if we find a type corresponding to the aspect name we look for it in the mce
old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(
mce,
aspect_type,
old_aspect = (
builder.get_aspect_if_available(
mce,
aspect_type,
)
if builder.can_add_aspect(mce, aspect_type)
else None
)
if old_aspect:
if isinstance(self, LegacyMCETransformer):
Expand All @@ -141,7 +145,7 @@ def _transform_or_record_mce(
aspect_name=self.aspect_name(),
aspect=old_aspect,
)
datahub.emitter.mce_builder.set_aspect(
builder.set_aspect(
mce,
aspect_type=aspect_type,
aspect=transformed_aspect,
Expand Down
55 changes: 47 additions & 8 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1636,25 +1636,33 @@ def test_pattern_dataset_schema_tags_transformation(mock_time):

def run_dataset_transformer_pipeline(
transformer_type: Type[DatasetTransformer],
aspect: builder.Aspect,
aspect: Optional[builder.Aspect],
config: dict,
pipeline_context: PipelineContext = PipelineContext(run_id="transformer_pipe_line"),
use_mce: bool = False,
) -> List[RecordEnvelope]:

transformer: DatasetTransformer = cast(
DatasetTransformer, transformer_type.create(config, pipeline_context)
)

dataset_mcp = make_generic_dataset_mcp(
aspect=aspect, aspect_name=transformer.aspect_name()
)
dataset: Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]
if use_mce:
dataset = MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[],
)
)
else:
assert aspect
dataset = make_generic_dataset_mcp(
aspect=aspect, aspect_name=transformer.aspect_name()
)

outputs = list(
transformer.transform(
[
RecordEnvelope(input, metadata={})
for input in [dataset_mcp, EndOfStream()]
]
[RecordEnvelope(input, metadata={}) for input in [dataset, EndOfStream()]]
)
)
return outputs
Expand Down Expand Up @@ -1688,6 +1696,37 @@ def test_simple_add_dataset_domain(mock_datahub_graph):
assert acryl_domain in transformed_aspect.domains


def test_simple_add_dataset_domain_mce_support(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
gslab_domain = builder.make_domain_urn("gslab.io")

pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=SimpleAddDatasetDomain,
aspect=None,
config={"domains": [gslab_domain, acryl_domain]},
pipeline_context=pipeline_context,
use_mce=True,
)

assert len(output) == 3
assert isinstance(output[0].record, MetadataChangeEventClass)
assert isinstance(output[0].record.proposedSnapshot, models.DatasetSnapshotClass)
assert len(output[0].record.proposedSnapshot.aspects) == 0

assert isinstance(output[1].record, MetadataChangeProposalWrapper)
assert output[1].record.aspect is not None
assert isinstance(output[1].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[1].record.aspect)
assert len(transformed_aspect.domains) == 2
assert gslab_domain in transformed_aspect.domains
assert acryl_domain in transformed_aspect.domains


def test_simple_add_dataset_domain_replace_existing(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
gslab_domain = builder.make_domain_urn("gslab.io")
Expand Down

0 comments on commit b33d0b8

Please sign in to comment.