Skip to content

Commit

Permalink
feat(ingest): add utility for converting MCEs to MCPs (datahub-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and shirshanka committed Sep 8, 2022
1 parent b369671 commit 3900174
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 531 deletions.
10 changes: 8 additions & 2 deletions metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ def check() -> None:
is_flag=True,
help="Rewrite the JSON file to it's canonical form.",
)
def metadata_file(json_file: str, rewrite: bool) -> None:
@click.option(
"--unpack-mces", default=False, is_flag=True, help="Converts MCEs into MCPs"
)
def metadata_file(json_file: str, rewrite: bool, unpack_mces: bool) -> None:
"""Check the schema of a metadata (MCE or MCP) JSON file."""

if not rewrite:
Expand All @@ -42,7 +45,10 @@ def metadata_file(json_file: str, rewrite: bool) -> None:
"type": "file",
"config": {"filename": json_file},
"extractor": "generic",
"extractor_config": {"set_system_metadata": False},
"extractor_config": {
"set_system_metadata": False,
"unpack_mces_into_mcps": unpack_mces,
},
},
"sink": {
"type": "file",
Expand Down
7 changes: 1 addition & 6 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from datahub.emitter.request_helper import _make_curl_command
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import _ASPECT_CLASSES, _Aspect
from datahub.utilities.urns.urn import Urn
from datahub.utilities.urns.urn import Urn, guess_entity_type

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -125,11 +125,6 @@ def first_non_null(ls: List[Optional[str]]) -> Optional[str]:
return next((el for el in ls if el is not None and el.strip() != ""), None)


def guess_entity_type(urn: str) -> str:
assert urn.startswith("urn:li:"), "urns must start with urn:li:"
return urn.split(":")[2]


def get_system_auth() -> Optional[str]:
system_client_id = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_ID)
system_client_secret = os.environ.get(ENV_DATAHUB_SYSTEM_CLIENT_SECRET)
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from tabulate import tabulate

from datahub.cli import cli_utils
from datahub.cli.cli_utils import guess_entity_type
from datahub.emitter import rest_emitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
Expand All @@ -20,6 +19,7 @@
)
from datahub.telemetry import telemetry
from datahub.upgrade import upgrade
from datahub.utilities.urns.urn import guess_entity_type

logger = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/src/datahub/cli/put_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

import click

from datahub.cli.cli_utils import guess_entity_type, post_entity
from datahub.cli.cli_utils import post_entity
from datahub.telemetry import telemetry
from datahub.upgrade import upgrade
from datahub.utilities.urns.urn import guess_entity_type

logger = logging.getLogger(__name__)

Expand Down
20 changes: 18 additions & 2 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import hashlib
import json
from typing import Any, Iterable, List, Optional, TypeVar, Union
from typing import Any, Iterable, List, Optional, TypeVar

from pydantic.fields import Field
from pydantic.main import BaseModel
Expand All @@ -16,13 +16,15 @@
ContainerClass,
DomainsClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
SubTypesClass,
TagAssociationClass,
_Aspect,
)
from datahub.utilities.urns.urn import guess_entity_type


class DatahubKey(BaseModel):
Expand Down Expand Up @@ -250,7 +252,7 @@ def add_dataset_to_container(
# FIXME: Union requires two or more type arguments
container_key: KeyType,
dataset_urn: str,
) -> Iterable[Union[MetadataWorkUnit]]:
) -> Iterable[MetadataWorkUnit]:
container_urn = make_container_urn(
guid=container_key.guid(),
)
Expand Down Expand Up @@ -280,3 +282,17 @@ def add_entity_to_container(
)
wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{entity_urn}", mcp=mcp)
yield wu


def mcps_from_mce(
mce: MetadataChangeEventClass,
) -> Iterable[MetadataChangeProposalWrapper]:
for aspect in mce.proposedSnapshot.aspects:
yield MetadataChangeProposalWrapper(
entityType=guess_entity_type(mce.proposedSnapshot.urn),
changeType=ChangeTypeClass.UPSERT,
entityUrn=mce.proposedSnapshot.urn,
auditHeader=mce.auditHeader,
aspect=aspect,
systemMetadata=mce.systemMetadata,
)
16 changes: 15 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/workunit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Union, overload
from typing import Iterable, Union, overload

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source import WorkUnit
Expand Down Expand Up @@ -64,6 +64,20 @@ def __init__(
def get_metadata(self) -> dict:
return {"metadata": self.metadata}

def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]:
from datahub.emitter.mcp_builder import mcps_from_mce

assert isinstance(self.metadata, MetadataChangeEvent)

yield from [
MetadataWorkUnit(
id=self.id,
mcp=mcpw,
treat_errors_as_warnings=self.treat_errors_as_warnings,
)
for mcpw in mcps_from_mce(self.metadata)
]


@dataclass
class UsageStatsWorkUnit(WorkUnit):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

class WorkUnitRecordExtractorConfig(ConfigModel):
set_system_metadata = True
unpack_mces_into_mcps = False


class WorkUnitRecordExtractor(
Expand All @@ -41,6 +42,13 @@ def get_records(
]
]:
if isinstance(workunit, MetadataWorkUnit):
if self.config.unpack_mces_into_mcps and isinstance(
workunit.metadata, MetadataChangeEvent
):
for inner_workunit in workunit.decompose_mce_into_mcps():
yield from self.get_records(inner_workunit)
return

if isinstance(
workunit.metadata,
(
Expand Down
9 changes: 2 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
SchemaMetadataClass,
TelemetryClientIdClass,
)
from datahub.utilities.urns.urn import Urn
from datahub.utilities.urns.urn import Urn, guess_entity_type

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -111,11 +111,6 @@ def _post_generic(self, url: str, payload_dict: Dict) -> Dict:
"Unable to get metadata from DataHub", {"message": str(e)}
) from e

@staticmethod
def _guess_entity_type(urn: str) -> str:
assert urn.startswith("urn:li:"), "urns must start with urn:li:"
return urn.split(":")[2]

@deprecated(
reason="Use get_aspect_v2 instead which makes aspect_type_name truly optional"
)
Expand Down Expand Up @@ -312,7 +307,7 @@ def get_latest_timeseries_value(
]
query_body = {
"urn": entity_urn,
"entity": self._guess_entity_type(entity_urn),
"entity": guess_entity_type(entity_urn),
"aspect": aspect_name,
"latestValue": True,
"filter": {"or": [{"and": filter_criteria}]},
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/utilities/urns/urn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
from datahub.utilities.urns.error import InvalidUrnError


def guess_entity_type(urn: str) -> str:
assert urn.startswith("urn:li:"), "urns must start with urn:li:"
return urn.split(":")[2]


class Urn:
"""
URNs are Globally Unique Identifiers (GUID) used to represent an entity.
Expand Down
Loading

0 comments on commit 3900174

Please sign in to comment.