Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): avoid embedding serialized json in metadata files #6742

Merged
merged 21 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Breaking Changes

- #6742 The metadata file sink's output format no longer contains nested JSON strings for MCP aspects, but instead unpacks the stringified JSON into a real JSON object. The previous sink behavior can be recovered using the `legacy_nested_json_string` option. The file source is backwards compatible and supports both formats.

### Potential Downtime

### Deprecations
Expand Down
146,231 changes: 66,725 additions & 79,506 deletions metadata-ingestion/examples/demo_data/demo_data.json

Large diffs are not rendered by default.

21 changes: 7 additions & 14 deletions metadata-ingestion/examples/demo_data/enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

import csv
import dataclasses
import json
import os
import pathlib
import time
from typing import Dict, List
from typing import Dict, List, cast

from datahub.ingestion.sink.file import write_metadata_file as write_mces
from datahub.ingestion.source.file import read_metadata_file
from datahub.metadata.schema_classes import (
AuditStampClass,
CorpUserInfoClass,
Expand Down Expand Up @@ -42,17 +42,10 @@ class Directive:
depends_on: List[str]


def read_mces(path: os.PathLike) -> List[MetadataChangeEventClass]:
with open(path) as f:
objs = json.load(f)
mces = [MetadataChangeEventClass.from_obj(obj) for obj in objs]
return mces


def write_mces(path: os.PathLike, mces: List[MetadataChangeEventClass]) -> None:
objs = [mce.to_obj() for mce in mces]
with open(path, "w") as f:
json.dump(objs, f, indent=4)
def read_mces(path: pathlib.Path) -> List[MetadataChangeEventClass]:
objs = read_metadata_file(path)
assert all(isinstance(obj, MetadataChangeEventClass) for obj in objs)
return cast(List[MetadataChangeEventClass], objs)


def parse_directive(row: Dict) -> Directive:
Expand Down
109 changes: 100 additions & 9 deletions metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import dataclasses
import json
from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, Optional, Tuple, Union

from datahub.emitter.serialization_helper import pre_json_transform
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
from datahub.metadata.schema_classes import (
ChangeTypeClass,
DictWrapper,
Expand All @@ -19,15 +20,42 @@

_ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET"

_ASPECT_CONTENT_TYPE = "application/json"


def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass:
serialized = json.dumps(pre_json_transform(codegen_obj.to_obj()))
return GenericAspectClass(
value=serialized.encode(),
contentType="application/json",
contentType=_ASPECT_CONTENT_TYPE,
)


def _try_from_generic_aspect(
aspectName: Optional[str],
aspect: Optional[GenericAspectClass],
) -> Tuple[bool, Optional[_Aspect]]:
# The first value in the tuple indicates the success of the conversion,
# while the second value is the deserialized aspect.

if aspect is None:
return True, None
assert aspectName is not None, "aspectName must be set if aspect is set"

if aspect.contentType != _ASPECT_CONTENT_TYPE:
return False, None

if aspectName not in ASPECT_MAP:
return False, None

aspect_cls = ASPECT_MAP[aspectName]

serialized = aspect.value.decode()
obj = post_json_transform(json.loads(serialized))

return True, aspect_cls.from_obj(obj)


@dataclasses.dataclass
class MetadataChangeProposalWrapper:
# TODO: remove manually set aspectName from the codebase
Expand Down Expand Up @@ -104,16 +132,79 @@ def validate(self) -> bool:
return False
return True

def to_obj(self, tuples: bool = False) -> dict:
return self.make_mcp().to_obj(tuples=tuples)
def to_obj(self, tuples: bool = False, simplified_structure: bool = False) -> dict:
# The simplified_structure parameter is used to make the output
# not contain nested JSON strings. Instead, it unpacks the JSON
# string into an object.

obj = self.make_mcp().to_obj(tuples=tuples)
if simplified_structure:
# Undo the double JSON serialization that happens in the MCP aspect.
if (
obj.get("aspect")
and obj["aspect"].get("contentType") == _ASPECT_CONTENT_TYPE
):
obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the implications of doing this? Are any backwards compatibilities broken?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no this is backwards compatible, since the source has been updated to read either format

I added a flag to the file sink to preserve the old behavior

return obj

@classmethod
def from_obj(
cls, obj: dict, tuples: bool = False
) -> Union["MetadataChangeProposalWrapper", MetadataChangeProposalClass]:
"""
Attempt to deserialize into an MCPW, but fall back
to a standard MCP if we're missing codegen'd classes for the
entity key or aspect.
"""

# Redo the double JSON serialization so that the rest of deserialization
# routine works.
if obj.get("aspect") and obj["aspect"].get("json"):
obj["aspect"] = {
"contentType": _ASPECT_CONTENT_TYPE,
"value": json.dumps(obj["aspect"]["json"]),
}

mcp = MetadataChangeProposalClass.from_obj(obj, tuples=tuples)

# We don't know how to deserialize the entity key aspects yet.
if mcp.entityKeyAspect is not None:
return mcp

# Try to deserialize the aspect.
converted, aspect = _try_from_generic_aspect(mcp.aspectName, mcp.aspect)
if converted:
return cls(
entityType=mcp.entityType,
entityUrn=mcp.entityUrn,
changeType=mcp.changeType,
auditHeader=mcp.auditHeader,
aspectName=mcp.aspectName,
aspect=aspect,
systemMetadata=mcp.systemMetadata,
)

return mcp

# TODO: add a from_obj method. Implementing this would require us to
# inspect the aspectName field to determine which class to deserialize into.
@classmethod
def from_obj_require_wrapper(
cls, obj: dict, tuples: bool = False
) -> "MetadataChangeProposalWrapper":
mcp = cls.from_obj(obj, tuples=tuples)
assert isinstance(mcp, cls)
return mcp

def as_workunit(self) -> "MetadataWorkUnit":
from datahub.ingestion.api.workunit import MetadataWorkUnit

# TODO: If the aspect is a timeseries aspect, we should do some
# customization of the ID here.
if self.aspect and self.aspectName in TIMESERIES_ASPECT_MAP:
# TODO: Make this a cleaner interface.
ts = getattr(self.aspect, "timestampMillis", None)
assert ts is not None

# If the aspect is a timeseries aspect, include the timestampMillis in the ID.
return MetadataWorkUnit(
id=f"{self.entityUrn}-{self.aspectName}-{ts}", mcp=self
)

return MetadataWorkUnit(id=f"{self.entityUrn}-{self.aspectName}", mcp=self)
44 changes: 42 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/sink/file.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
import pathlib
from typing import Union
from typing import Iterable, Union

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand All @@ -16,9 +16,25 @@
logger = logging.getLogger(__name__)


def _to_obj_for_file(
obj: Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
UsageAggregation,
],
simplified_structure: bool = True,
) -> dict:
if isinstance(obj, MetadataChangeProposalWrapper):
return obj.to_obj(simplified_structure=simplified_structure)
return obj.to_obj()


class FileSinkConfig(ConfigModel):
filename: str

legacy_nested_json_string: bool = False


class FileSink(Sink[FileSinkConfig, SinkReport]):
def __post_init__(self) -> None:
Expand All @@ -40,7 +56,9 @@ def write_record_async(
write_callback: WriteCallback,
) -> None:
record = record_envelope.record
obj = record.to_obj()
obj = _to_obj_for_file(
record, simplified_structure=not self.config.legacy_nested_json_string
)

if self.wrote_something:
self.file.write(",\n")
Expand All @@ -55,3 +73,25 @@ def write_record_async(
def close(self):
self.file.write("\n]")
self.file.close()


def write_metadata_file(
file: pathlib.Path,
records: Iterable[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
UsageAggregation,
]
],
) -> None:
# This simplified version of the FileSink can be used for testing purposes.
with file.open("w") as f:
f.write("[\n")
for i, record in enumerate(records):
if i > 0:
f.write(",\n")
obj = _to_obj_for_file(record)
json.dump(obj, f, indent=4)
f.write("\n]")
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ def create_test_entity_mcps(
custom_props: Dict[str, str],
all_nodes_map: Dict[str, DBTNode],
) -> Iterable[MetadataWorkUnit]:
for node in test_nodes:
for node in sorted(test_nodes, key=lambda n: n.dbt_name):
assertion_urn = mce_builder.make_assertion_urn(
mce_builder.datahub_guid(
{
Expand Down Expand Up @@ -727,7 +727,7 @@ def create_test_entity_mcps(
legacy_skip_source_lineage=self.config.backcompat_skip_source_on_lineage_edge,
)

for upstream_urn in upstream_urns:
for upstream_urn in sorted(upstream_urns):
if self.config.entities_enabled.can_emit_node_type("test"):
wu = self._make_assertion_from_test(
custom_props,
Expand Down Expand Up @@ -957,7 +957,8 @@ def create_platform_mces(
"SOURCE_CONTROL",
self.config.strip_user_ids_from_email,
)
for node in dbt_nodes:
for node in sorted(dbt_nodes, key=lambda n: n.dbt_name):

node_datahub_urn = node.get_urn(
mce_platform,
self.config.env,
Expand Down
Loading