Skip to content

Commit

Permalink
feat(ingest): avoid embedding serialized json in metadata files (data…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and cccs-Dustin committed Feb 1, 2023
1 parent 86ddbb7 commit 11e4330
Show file tree
Hide file tree
Showing 17 changed files with 67,486 additions and 79,642 deletions.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Breaking Changes

- #6742 The metadata file sink's output format no longer contains nested JSON strings for MCP aspects, but instead unpacks the stringified JSON into a real JSON object. The previous sink behavior can be recovered using the `legacy_nested_json_string` option. The file source is backwards compatible and supports both formats.

### Potential Downtime

### Deprecations
Expand Down
146,231 changes: 66,725 additions & 79,506 deletions metadata-ingestion/examples/demo_data/demo_data.json

Large diffs are not rendered by default.

21 changes: 7 additions & 14 deletions metadata-ingestion/examples/demo_data/enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

import csv
import dataclasses
import json
import os
import pathlib
import time
from typing import Dict, List
from typing import Dict, List, cast

from datahub.ingestion.sink.file import write_metadata_file as write_mces
from datahub.ingestion.source.file import read_metadata_file
from datahub.metadata.schema_classes import (
AuditStampClass,
CorpUserInfoClass,
Expand Down Expand Up @@ -42,17 +42,10 @@ class Directive:
depends_on: List[str]


def read_mces(path: os.PathLike) -> List[MetadataChangeEventClass]:
with open(path) as f:
objs = json.load(f)
mces = [MetadataChangeEventClass.from_obj(obj) for obj in objs]
return mces


def write_mces(path: os.PathLike, mces: List[MetadataChangeEventClass]) -> None:
objs = [mce.to_obj() for mce in mces]
with open(path, "w") as f:
json.dump(objs, f, indent=4)
def read_mces(path: pathlib.Path) -> List[MetadataChangeEventClass]:
objs = read_metadata_file(path)
assert all(isinstance(obj, MetadataChangeEventClass) for obj in objs)
return cast(List[MetadataChangeEventClass], objs)


def parse_directive(row: Dict) -> Directive:
Expand Down
109 changes: 100 additions & 9 deletions metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import dataclasses
import json
from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, Optional, Tuple, Union

from datahub.emitter.serialization_helper import pre_json_transform
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
from datahub.metadata.schema_classes import (
ChangeTypeClass,
DictWrapper,
Expand All @@ -19,15 +20,42 @@

_ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET"

_ASPECT_CONTENT_TYPE = "application/json"


def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass:
serialized = json.dumps(pre_json_transform(codegen_obj.to_obj()))
return GenericAspectClass(
value=serialized.encode(),
contentType="application/json",
contentType=_ASPECT_CONTENT_TYPE,
)


def _try_from_generic_aspect(
aspectName: Optional[str],
aspect: Optional[GenericAspectClass],
) -> Tuple[bool, Optional[_Aspect]]:
# The first value in the tuple indicates the success of the conversion,
# while the second value is the deserialized aspect.

if aspect is None:
return True, None
assert aspectName is not None, "aspectName must be set if aspect is set"

if aspect.contentType != _ASPECT_CONTENT_TYPE:
return False, None

if aspectName not in ASPECT_MAP:
return False, None

aspect_cls = ASPECT_MAP[aspectName]

serialized = aspect.value.decode()
obj = post_json_transform(json.loads(serialized))

return True, aspect_cls.from_obj(obj)


@dataclasses.dataclass
class MetadataChangeProposalWrapper:
# TODO: remove manually set aspectName from the codebase
Expand Down Expand Up @@ -104,16 +132,79 @@ def validate(self) -> bool:
return False
return True

def to_obj(self, tuples: bool = False) -> dict:
return self.make_mcp().to_obj(tuples=tuples)
def to_obj(self, tuples: bool = False, simplified_structure: bool = False) -> dict:
# The simplified_structure parameter is used to make the output
# not contain nested JSON strings. Instead, it unpacks the JSON
# string into an object.

obj = self.make_mcp().to_obj(tuples=tuples)
if simplified_structure:
# Undo the double JSON serialization that happens in the MCP aspect.
if (
obj.get("aspect")
and obj["aspect"].get("contentType") == _ASPECT_CONTENT_TYPE
):
obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])}
return obj

@classmethod
def from_obj(
cls, obj: dict, tuples: bool = False
) -> Union["MetadataChangeProposalWrapper", MetadataChangeProposalClass]:
"""
Attempt to deserialize into an MCPW, but fall back
to a standard MCP if we're missing codegen'd classes for the
entity key or aspect.
"""

# Redo the double JSON serialization so that the rest of deserialization
# routine works.
if obj.get("aspect") and obj["aspect"].get("json"):
obj["aspect"] = {
"contentType": _ASPECT_CONTENT_TYPE,
"value": json.dumps(obj["aspect"]["json"]),
}

mcp = MetadataChangeProposalClass.from_obj(obj, tuples=tuples)

# We don't know how to deserialize the entity key aspects yet.
if mcp.entityKeyAspect is not None:
return mcp

# Try to deserialize the aspect.
converted, aspect = _try_from_generic_aspect(mcp.aspectName, mcp.aspect)
if converted:
return cls(
entityType=mcp.entityType,
entityUrn=mcp.entityUrn,
changeType=mcp.changeType,
auditHeader=mcp.auditHeader,
aspectName=mcp.aspectName,
aspect=aspect,
systemMetadata=mcp.systemMetadata,
)

return mcp

# TODO: add a from_obj method. Implementing this would require us to
# inspect the aspectName field to determine which class to deserialize into.
@classmethod
def from_obj_require_wrapper(
cls, obj: dict, tuples: bool = False
) -> "MetadataChangeProposalWrapper":
mcp = cls.from_obj(obj, tuples=tuples)
assert isinstance(mcp, cls)
return mcp

def as_workunit(self) -> "MetadataWorkUnit":
from datahub.ingestion.api.workunit import MetadataWorkUnit

# TODO: If the aspect is a timeseries aspect, we should do some
# customization of the ID here.
if self.aspect and self.aspectName in TIMESERIES_ASPECT_MAP:
# TODO: Make this a cleaner interface.
ts = getattr(self.aspect, "timestampMillis", None)
assert ts is not None

# If the aspect is a timeseries aspect, include the timestampMillis in the ID.
return MetadataWorkUnit(
id=f"{self.entityUrn}-{self.aspectName}-{ts}", mcp=self
)

return MetadataWorkUnit(id=f"{self.entityUrn}-{self.aspectName}", mcp=self)
44 changes: 42 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/sink/file.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
import pathlib
from typing import Union
from typing import Iterable, Union

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand All @@ -16,9 +16,25 @@
logger = logging.getLogger(__name__)


def _to_obj_for_file(
obj: Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
UsageAggregation,
],
simplified_structure: bool = True,
) -> dict:
if isinstance(obj, MetadataChangeProposalWrapper):
return obj.to_obj(simplified_structure=simplified_structure)
return obj.to_obj()


class FileSinkConfig(ConfigModel):
filename: str

legacy_nested_json_string: bool = False


class FileSink(Sink[FileSinkConfig, SinkReport]):
def __post_init__(self) -> None:
Expand All @@ -40,7 +56,9 @@ def write_record_async(
write_callback: WriteCallback,
) -> None:
record = record_envelope.record
obj = record.to_obj()
obj = _to_obj_for_file(
record, simplified_structure=not self.config.legacy_nested_json_string
)

if self.wrote_something:
self.file.write(",\n")
Expand All @@ -55,3 +73,25 @@ def write_record_async(
def close(self):
self.file.write("\n]")
self.file.close()


def write_metadata_file(
file: pathlib.Path,
records: Iterable[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
UsageAggregation,
]
],
) -> None:
# This simplified version of the FileSink can be used for testing purposes.
with file.open("w") as f:
f.write("[\n")
for i, record in enumerate(records):
if i > 0:
f.write(",\n")
obj = _to_obj_for_file(record)
json.dump(obj, f, indent=4)
f.write("\n]")
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ def create_test_entity_mcps(
custom_props: Dict[str, str],
all_nodes_map: Dict[str, DBTNode],
) -> Iterable[MetadataWorkUnit]:
for node in test_nodes:
for node in sorted(test_nodes, key=lambda n: n.dbt_name):
assertion_urn = mce_builder.make_assertion_urn(
mce_builder.datahub_guid(
{
Expand Down Expand Up @@ -727,7 +727,7 @@ def create_test_entity_mcps(
legacy_skip_source_lineage=self.config.backcompat_skip_source_on_lineage_edge,
)

for upstream_urn in upstream_urns:
for upstream_urn in sorted(upstream_urns):
if self.config.entities_enabled.can_emit_node_type("test"):
wu = self._make_assertion_from_test(
custom_props,
Expand Down Expand Up @@ -957,7 +957,8 @@ def create_platform_mces(
"SOURCE_CONTROL",
self.config.strip_user_ids_from_email,
)
for node in dbt_nodes:
for node in sorted(dbt_nodes, key=lambda n: n.dbt_name):

node_datahub_urn = node.get_urn(
mce_platform,
self.config.env,
Expand Down
Loading

0 comments on commit 11e4330

Please sign in to comment.