Skip to content

Commit

Permalink
feat(ingest): add entity registry in codegen (#6984)
Browse files Browse the repository at this point in the history
Co-authored-by: Pedro Silva <[email protected]>
  • Loading branch information
hsheth2 and pedro93 authored Jan 18, 2023
1 parent 84f429c commit cb12910
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 46 deletions.
122 changes: 94 additions & 28 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,70 @@
import types
import unittest.mock
from pathlib import Path
from typing import Any, Dict, Iterable, List, Union
from typing import Any, Dict, Iterable, List, Optional, Union

import avro.schema
import click
import pydantic
import yaml
from avrogen import write_schema_files

ENTITY_CATEGORY_UNSET = "_unset_"


class EntityType(pydantic.BaseModel):
name: str
doc: Optional[str] = None
category: str = ENTITY_CATEGORY_UNSET

keyAspect: str
aspects: List[str]


def load_entity_registry(entity_registry_file: Path) -> List[EntityType]:
with entity_registry_file.open() as f:
raw_entity_registry = yaml.safe_load(f)

entities = pydantic.parse_obj_as(List[EntityType], raw_entity_registry["entities"])
return entities


def load_schema_file(schema_file: Union[str, Path]) -> dict:
raw_schema_text = Path(schema_file).read_text()
return json.loads(raw_schema_text)


def load_schemas(schemas_path: str) -> Dict[str, dict]:
required_schema_files = {
"mxe/MetadataChangeEvent.avsc",
"mxe/MetadataChangeProposal.avsc",
"usage/UsageAggregation.avsc",
"mxe/MetadataChangeLog.avsc",
"mxe/PlatformEvent.avsc",
"platform/event/v1/EntityChangeEvent.avsc",
"metadata/query/filter/Filter.avsc", # temporarily added to test reserved keywords support
}

# Find all the aspect schemas / other important schemas.
schema_files: List[Path] = []
for schema_file in Path(schemas_path).glob("**/*.avsc"):
relative_path = schema_file.relative_to(schemas_path).as_posix()
if relative_path in required_schema_files:
schema_files.append(schema_file)
required_schema_files.remove(relative_path)
elif load_schema_file(schema_file).get("Aspect"):
schema_files.append(schema_file)

assert not required_schema_files, f"Schema files not found: {required_schema_files}"

schemas: Dict[str, dict] = {}
for schema_file in schema_files:
schema = load_schema_file(schema_file)
schemas[Path(schema_file).stem] = schema

return schemas


def merge_schemas(schemas_obj: List[Any]) -> str:
# Combine schemas.
merged = ["null"] + schemas_obj
Expand Down Expand Up @@ -127,6 +179,7 @@ def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None:
class _Aspect(DictWrapper):
ASPECT_NAME: str = None # type: ignore
ASPECT_TYPE: str = "default"
ASPECT_INFO: dict = None # type: ignore
def __init__(self):
if type(self) is _Aspect:
Expand All @@ -140,6 +193,10 @@ def get_aspect_name(cls) -> str:
@classmethod
def get_aspect_type(cls) -> str:
return cls.ASPECT_TYPE
@classmethod
def get_aspect_info(cls) -> dict:
return cls.ASPECT_INFO
"""

for aspect in aspects:
Expand Down Expand Up @@ -168,6 +225,9 @@ def get_aspect_type(cls) -> str:
schema_classes_lines[
empty_line
] += f"\n ASPECT_TYPE = '{aspect['Aspect']['type']}'"
schema_classes_lines[empty_line] += f"\n ASPECT_INFO = {aspect['Aspect']}"

schema_classes_lines[empty_line + 1] += "\n"

# Finally, generate a big list of all available aspects.
newline = "\n"
Expand All @@ -178,56 +238,62 @@ def get_aspect_type(cls) -> str:
ASPECT_CLASSES: List[Type[_Aspect]] = [
{f',{newline} '.join(f"{aspect['name']}Class" for aspect in aspects)}
]
KEY_ASPECTS: Dict[str, Type[_Aspect]] = {{
{f',{newline} '.join(f"'{aspect['Aspect']['keyForEntity']}': {aspect['name']}Class" for aspect in aspects if aspect['Aspect'].get('keyForEntity'))}
}}
"""
)

schema_class_file.write_text("\n".join(schema_classes_lines))


@click.command()
@click.argument(
"entity_registry", type=click.Path(exists=True, dir_okay=False), required=True
)
@click.argument(
"schemas_path", type=click.Path(exists=True, file_okay=False), required=True
)
@click.argument("outdir", type=click.Path(), required=True)
def generate(schemas_path: str, outdir: str) -> None:
required_schema_files = {
"mxe/MetadataChangeEvent.avsc",
"mxe/MetadataChangeProposal.avsc",
"usage/UsageAggregation.avsc",
"mxe/MetadataChangeLog.avsc",
"mxe/PlatformEvent.avsc",
"platform/event/v1/EntityChangeEvent.avsc",
"metadata/query/filter/Filter.avsc", # temporarily added to test reserved keywords support
def generate(entity_registry: str, schemas_path: str, outdir: str) -> None:
entities = load_entity_registry(Path(entity_registry))
schemas = load_schemas(schemas_path)

# Special handling for aspects.
aspects = {
schema["Aspect"]["name"]: schema
for schema in schemas.values()
if schema.get("Aspect")
}

# Find all the aspect schemas / other important schemas.
aspect_file_stems: List[str] = []
schema_files: List[Path] = []
for schema_file in Path(schemas_path).glob("**/*.avsc"):
relative_path = schema_file.relative_to(schemas_path).as_posix()
if relative_path in required_schema_files:
schema_files.append(schema_file)
required_schema_files.remove(relative_path)
elif load_schema_file(schema_file).get("Aspect"):
aspect_file_stems.append(schema_file.stem)
schema_files.append(schema_file)
for entity in entities:
# This implicitly requires that all keyAspects are resolvable.
aspect = aspects[entity.keyAspect]

assert not required_schema_files, f"Schema files not found: {required_schema_files}"
# This requires that entities cannot share a keyAspect.
assert "keyForEntity" not in aspect["Aspect"]

schemas: Dict[str, dict] = {}
for schema_file in schema_files:
schema = load_schema_file(schema_file)
schemas[Path(schema_file).stem] = schema
aspect["Aspect"]["keyForEntity"] = entity.name
aspect["Aspect"]["entityCategory"] = entity.category
aspect["Aspect"]["entityAspects"] = entity.aspects
if entity.doc is not None:
aspect["Aspect"]["entityDoc"] = entity.doc

merged_schema = merge_schemas(list(schemas.values()))
# Check for unused aspects. We currently have quite a few.
# unused_aspects = set(aspects.keys()) - set().union(
# {entity.keyAspect for entity in entities},
# *(set(entity.aspects) for entity in entities),
# )

merged_schema = merge_schemas(list(schemas.values()))
write_schema_files(merged_schema, outdir)

# Schema files post-processing.
(Path(outdir) / "__init__.py").write_text("# This file is intentionally empty.\n")
add_avro_python3_warning(Path(outdir) / "schema_classes.py")
annotate_aspects(
[schemas[aspect_file_stem] for aspect_file_stem in aspect_file_stems],
list(aspects.values()),
Path(outdir) / "schema_classes.py",
)

Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/scripts/codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ OUTDIR=./src/datahub/metadata
# Note: this assumes that datahub has already been built with `./gradlew build`.
DATAHUB_ROOT=..
SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin"
ENTITY_REGISTRY="$DATAHUB_ROOT/metadata-models/src/main/resources/entity-registry.yml"

rm -r $OUTDIR 2>/dev/null || true
python scripts/avro_codegen.py $SCHEMAS_ROOT $OUTDIR
python scripts/avro_codegen.py $ENTITY_REGISTRY $SCHEMAS_ROOT $OUTDIR
16 changes: 0 additions & 16 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,6 @@ def dataset_key_to_urn(key: DatasetKeyClass) -> str:
)


def make_container_new_urn(guid: str) -> str:
return f"urn:dh:container:0:({guid})"


def container_new_urn_to_key(dataset_urn: str) -> Optional[ContainerKeyClass]:
pattern = r"urn:dh:container:0:\((.*)\)"
results = re.search(pattern, dataset_urn)
if results is not None:
return ContainerKeyClass(guid=results[1])
return None


# def make_container_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
# return f"urn:li:container:({make_data_platform_urn(platform)},{env},{name})"


def make_container_urn(guid: str) -> str:
return f"urn:li:container:{guid}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, config: LookerAPIConfig) -> None:
)
except SDKError as e:
raise ConfigurationError(
"Failed to initialize Looker client. Please check your configuration."
f"Failed to connect/authenticate with looker - check your configuration: {e}"
) from e

self.client_stats = LookerAPIStats()
Expand Down
8 changes: 8 additions & 0 deletions metadata-ingestion/tests/unit/serde/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.file import FileSourceConfig, GenericFileSource
from datahub.metadata.schema_classes import (
ASPECT_CLASSES,
KEY_ASPECTS,
MetadataChangeEventClass,
OwnershipClass,
_Aspect,
Expand All @@ -34,6 +36,12 @@ def test_codegen_aspect_name():
assert OwnershipClass.get_aspect_name() == "ownership"


def test_codegen_aspects():
# These bounds are extremely loose, and mainly verify that the lists aren't empty.
assert len(ASPECT_CLASSES) > 30
assert len(KEY_ASPECTS) > 10


def test_cannot_instantiated_codegen_aspect():
with pytest.raises(TypeError, match="instantiate"):
_Aspect()
Expand Down

0 comments on commit cb12910

Please sign in to comment.