Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add entity type inference to mcpw #5880

Merged
merged 1 commit into from
Sep 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import json
from typing import Union
from typing import TYPE_CHECKING, Union

from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata.schema_classes import (
Expand All @@ -12,6 +12,12 @@
SystemMetadataClass,
_Aspect,
)
from datahub.utilities.urns.urn import guess_entity_type

if TYPE_CHECKING:
from datahub.ingestion.api.workunit import MetadataWorkUnit

_ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET"


def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass:
Expand All @@ -24,13 +30,12 @@ def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass:

@dataclasses.dataclass
class MetadataChangeProposalWrapper:
# TODO: remove manually aspectName from the codebase
# TODO: remove manually set aspectName from the codebase
# TODO: (after) remove aspectName field from this class
# TODO: infer entityType from entityUrn
# TODO: set changeType's default to UPSERT
# TODO: remove manually set entityType from the codebase

entityType: str
changeType: Union[str, ChangeTypeClass]
entityType: str = _ENTITY_TYPE_UNSET
changeType: Union[str, ChangeTypeClass] = ChangeTypeClass.UPSERT
entityUrn: Union[None, str] = None
entityKeyAspect: Union[None, _Aspect] = None
auditHeader: Union[None, KafkaAuditHeaderClass] = None
Expand All @@ -39,6 +44,22 @@ class MetadataChangeProposalWrapper:
systemMetadata: Union[None, SystemMetadataClass] = None

def __post_init__(self) -> None:
if self.entityUrn and self.entityType == _ENTITY_TYPE_UNSET:
self.entityType = guess_entity_type(self.entityUrn)
elif self.entityUrn and self.entityType:
guessed_entity_type = guess_entity_type(self.entityUrn).lower()
# Entity type checking is actually case insensitive.
# Note that urns are case sensitive, but entity types are not.
if self.entityType.lower() != guessed_entity_type:
raise ValueError(
f"entityType {self.entityType} does not match the entity type {guessed_entity_type} from entityUrn {self.entityUrn}",
)
elif self.entityType == _ENTITY_TYPE_UNSET:
raise ValueError("entityType must be set if entityUrn is not set")

if not self.entityUrn and not self.entityKeyAspect:
raise ValueError("entityUrn or entityKeyAspect must be set")

if not self.aspectName and self.aspect:
self.aspectName = self.aspect.get_aspect_name()
elif (
Expand Down Expand Up @@ -88,3 +109,11 @@ def to_obj(self, tuples: bool = False) -> dict:

# TODO: add a from_obj method. Implementing this would require us to
# inspect the aspectName field to determine which class to deserialize into.

def as_workunit(self) -> "MetadataWorkUnit":
from datahub.ingestion.api.workunit import MetadataWorkUnit

# TODO: If the aspect is a timeseries aspect, we should do some
# customization of the ID here.

return MetadataWorkUnit(id=f"{self.entityUrn}-{self.aspectName}", mcp=self)
14 changes: 4 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,16 +1291,10 @@ def _process_view(
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
)
view_properties_wu = MetadataWorkUnit(
id=f"{view}-viewProperties",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="viewProperties",
aspect=view_properties_aspect,
),
)
view_properties_wu = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()
self.report.report_workunit(view_properties_wu)
yield view_properties_wu

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,15 @@ def _run(
aspect=assertionResult,
)

# Emit Result! (timseries aspect)
# Emit Result! (timeseries aspect)
emitter.emit_mcp(dataset_assertionResult_mcp)
logger.info("Metadata sent to datahub.")
result = "DataHub notification succeeded"
except Exception as e:
result = "DataHub notification failed"
if self.graceful_exceptions:
logger.error(e)
logger.info("Supressing error because graceful_exceptions is set")
logger.info("Suppressing error because graceful_exceptions is set")
else:
raise

Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/tests/unit/test_mcp_builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datahub.emitter.mcp_builder as builder
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import datahub_guid
from datahub.emitter.mcp import MetadataChangeProposalWrapper


def test_guid_generator():
Expand Down Expand Up @@ -42,3 +44,12 @@ def test_guid_generators():

guid = key.guid()
assert guid == guid_datahub


def test_mcpw_inference():
mcpw = MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
aspect=models.DomainsClass(domains=["urn:li:domain:health"]),
)
assert mcpw.entityType == "dataset"
assert mcpw.aspectName == "domains"
18 changes: 14 additions & 4 deletions metadata-ingestion/tests/unit/test_usage_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BucketDuration, get_time_bucket
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.usage.usage_common import (
Expand All @@ -18,6 +19,15 @@
_TestAggregatedDataset = GenericAggregatedDataset[_TestTableRef]


def _simple_urn_builder(resource):
return make_dataset_urn_with_platform_instance(
"snowflake",
resource.lower(),
"snowflake-dev",
"DEV",
)


def test_add_one_query_without_columns():
test_email = "[email protected]"
test_query = "select * from test"
Expand Down Expand Up @@ -149,7 +159,7 @@ def test_make_usage_workunit():
)
wu: MetadataWorkUnit = ta.make_usage_workunit(
bucket_duration=BucketDuration.DAY,
urn_builder=lambda x: x,
urn_builder=_simple_urn_builder,
top_n_queries=10,
format_sql_queries=False,
include_top_n_queries=True,
Expand Down Expand Up @@ -181,7 +191,7 @@ def test_query_formatting():
)
wu: MetadataWorkUnit = ta.make_usage_workunit(
bucket_duration=BucketDuration.DAY,
urn_builder=lambda x: x,
urn_builder=_simple_urn_builder,
top_n_queries=10,
format_sql_queries=True,
include_top_n_queries=True,
Expand Down Expand Up @@ -212,7 +222,7 @@ def test_query_trimming():
)
wu: MetadataWorkUnit = ta.make_usage_workunit(
bucket_duration=BucketDuration.DAY,
urn_builder=lambda x: x,
urn_builder=_simple_urn_builder,
top_n_queries=top_n_queries,
format_sql_queries=False,
include_top_n_queries=True,
Expand Down Expand Up @@ -248,7 +258,7 @@ def test_make_usage_workunit_include_top_n_queries():
)
wu: MetadataWorkUnit = ta.make_usage_workunit(
bucket_duration=BucketDuration.DAY,
urn_builder=lambda x: x,
urn_builder=_simple_urn_builder,
top_n_queries=10,
format_sql_queries=False,
include_top_n_queries=False,
Expand Down