Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add entity registry in codegen #6984

Merged
merged 6 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 94 additions & 28 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,70 @@
import types
import unittest.mock
from pathlib import Path
from typing import Any, Dict, Iterable, List, Union
from typing import Any, Dict, Iterable, List, Optional, Union

import avro.schema
import click
import pydantic
import yaml
from avrogen import write_schema_files

ENTITY_CATEGORY_UNSET = "_unset_"


class EntityType(pydantic.BaseModel):
name: str
doc: Optional[str] = None
category: str = ENTITY_CATEGORY_UNSET

keyAspect: str
aspects: List[str]


def load_entity_registry(entity_registry_file: Path) -> List[EntityType]:
with entity_registry_file.open() as f:
raw_entity_registry = yaml.safe_load(f)

entities = pydantic.parse_obj_as(List[EntityType], raw_entity_registry["entities"])
return entities


def load_schema_file(schema_file: Union[str, Path]) -> dict:
raw_schema_text = Path(schema_file).read_text()
return json.loads(raw_schema_text)


def load_schemas(schemas_path: str) -> Dict[str, dict]:
required_schema_files = {
"mxe/MetadataChangeEvent.avsc",
"mxe/MetadataChangeProposal.avsc",
"usage/UsageAggregation.avsc",
"mxe/MetadataChangeLog.avsc",
"mxe/PlatformEvent.avsc",
"platform/event/v1/EntityChangeEvent.avsc",
"metadata/query/filter/Filter.avsc", # temporarily added to test reserved keywords support
}

# Find all the aspect schemas / other important schemas.
schema_files: List[Path] = []
for schema_file in Path(schemas_path).glob("**/*.avsc"):
relative_path = schema_file.relative_to(schemas_path).as_posix()
if relative_path in required_schema_files:
schema_files.append(schema_file)
required_schema_files.remove(relative_path)
elif load_schema_file(schema_file).get("Aspect"):
schema_files.append(schema_file)

assert not required_schema_files, f"Schema files not found: {required_schema_files}"

schemas: Dict[str, dict] = {}
for schema_file in schema_files:
schema = load_schema_file(schema_file)
schemas[Path(schema_file).stem] = schema

return schemas


def merge_schemas(schemas_obj: List[Any]) -> str:
# Combine schemas.
merged = ["null"] + schemas_obj
Expand Down Expand Up @@ -127,6 +179,7 @@ def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None:
class _Aspect(DictWrapper):
ASPECT_NAME: str = None # type: ignore
ASPECT_TYPE: str = "default"
ASPECT_INFO: dict = None # type: ignore

def __init__(self):
if type(self) is _Aspect:
Expand All @@ -140,6 +193,10 @@ def get_aspect_name(cls) -> str:
@classmethod
def get_aspect_type(cls) -> str:
return cls.ASPECT_TYPE

@classmethod
def get_aspect_info(cls) -> dict:
return cls.ASPECT_INFO
"""

for aspect in aspects:
Expand Down Expand Up @@ -168,6 +225,9 @@ def get_aspect_type(cls) -> str:
schema_classes_lines[
empty_line
] += f"\n ASPECT_TYPE = '{aspect['Aspect']['type']}'"
schema_classes_lines[empty_line] += f"\n ASPECT_INFO = {aspect['Aspect']}"

schema_classes_lines[empty_line + 1] += "\n"

# Finally, generate a big list of all available aspects.
newline = "\n"
Expand All @@ -178,56 +238,62 @@ def get_aspect_type(cls) -> str:
ASPECT_CLASSES: List[Type[_Aspect]] = [
{f',{newline} '.join(f"{aspect['name']}Class" for aspect in aspects)}
]

KEY_ASPECTS: Dict[str, Type[_Aspect]] = {{
{f',{newline} '.join(f"'{aspect['Aspect']['keyForEntity']}': {aspect['name']}Class" for aspect in aspects if aspect['Aspect'].get('keyForEntity'))}
}}
"""
)

schema_class_file.write_text("\n".join(schema_classes_lines))


@click.command()
@click.argument(
"entity_registry", type=click.Path(exists=True, dir_okay=False), required=True
)
@click.argument(
"schemas_path", type=click.Path(exists=True, file_okay=False), required=True
)
@click.argument("outdir", type=click.Path(), required=True)
def generate(schemas_path: str, outdir: str) -> None:
required_schema_files = {
"mxe/MetadataChangeEvent.avsc",
"mxe/MetadataChangeProposal.avsc",
"usage/UsageAggregation.avsc",
"mxe/MetadataChangeLog.avsc",
"mxe/PlatformEvent.avsc",
"platform/event/v1/EntityChangeEvent.avsc",
"metadata/query/filter/Filter.avsc", # temporarily added to test reserved keywords support
def generate(entity_registry: str, schemas_path: str, outdir: str) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When folks have custom extensions to base entity registry, is the idea that they would have merged these in advanced?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

If they have custom extensions to the entity registry, they're almost certainly building this from source anyways, so it's an ok assumption to make

entities = load_entity_registry(Path(entity_registry))
schemas = load_schemas(schemas_path)

# Special handling for aspects.
aspects = {
schema["Aspect"]["name"]: schema
for schema in schemas.values()
if schema.get("Aspect")
}

# Find all the aspect schemas / other important schemas.
aspect_file_stems: List[str] = []
schema_files: List[Path] = []
for schema_file in Path(schemas_path).glob("**/*.avsc"):
relative_path = schema_file.relative_to(schemas_path).as_posix()
if relative_path in required_schema_files:
schema_files.append(schema_file)
required_schema_files.remove(relative_path)
elif load_schema_file(schema_file).get("Aspect"):
aspect_file_stems.append(schema_file.stem)
schema_files.append(schema_file)
for entity in entities:
# This implicitly requires that all keyAspects are resolvable.
aspect = aspects[entity.keyAspect]

assert not required_schema_files, f"Schema files not found: {required_schema_files}"
# This requires that entities cannot share a keyAspect.
assert "keyForEntity" not in aspect["Aspect"]

schemas: Dict[str, dict] = {}
for schema_file in schema_files:
schema = load_schema_file(schema_file)
schemas[Path(schema_file).stem] = schema
aspect["Aspect"]["keyForEntity"] = entity.name
aspect["Aspect"]["entityCategory"] = entity.category
aspect["Aspect"]["entityAspects"] = entity.aspects
if entity.doc is not None:
aspect["Aspect"]["entityDoc"] = entity.doc

merged_schema = merge_schemas(list(schemas.values()))
# Check for unused aspects. We currently have quite a few.
# unused_aspects = set(aspects.keys()) - set().union(
# {entity.keyAspect for entity in entities},
# *(set(entity.aspects) for entity in entities),
# )

merged_schema = merge_schemas(list(schemas.values()))
write_schema_files(merged_schema, outdir)

# Schema files post-processing.
(Path(outdir) / "__init__.py").write_text("# This file is intentionally empty.\n")
add_avro_python3_warning(Path(outdir) / "schema_classes.py")
annotate_aspects(
[schemas[aspect_file_stem] for aspect_file_stem in aspect_file_stems],
list(aspects.values()),
Path(outdir) / "schema_classes.py",
)

Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/scripts/codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ OUTDIR=./src/datahub/metadata
# Note: this assumes that datahub has already been built with `./gradlew build`.
DATAHUB_ROOT=..
SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin"
ENTITY_REGISTRY="$DATAHUB_ROOT/metadata-models/src/main/resources/entity-registry.yml"

rm -r $OUTDIR 2>/dev/null || true
python scripts/avro_codegen.py $SCHEMAS_ROOT $OUTDIR
python scripts/avro_codegen.py $ENTITY_REGISTRY $SCHEMAS_ROOT $OUTDIR
16 changes: 0 additions & 16 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,6 @@ def dataset_key_to_urn(key: DatasetKeyClass) -> str:
)


def make_container_new_urn(guid: str) -> str:
return f"urn:dh:container:0:({guid})"


def container_new_urn_to_key(dataset_urn: str) -> Optional[ContainerKeyClass]:
pattern = r"urn:dh:container:0:\((.*)\)"
results = re.search(pattern, dataset_urn)
if results is not None:
return ContainerKeyClass(guid=results[1])
return None


# def make_container_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
# return f"urn:li:container:({make_data_platform_urn(platform)},{env},{name})"


def make_container_urn(guid: str) -> str:
return f"urn:li:container:{guid}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, config: LookerAPIConfig) -> None:
)
except SDKError as e:
raise ConfigurationError(
"Failed to initialize Looker client. Please check your configuration."
f"Failed to connect/authenticate with looker - check your configuration: {e}"
) from e

self.client_stats = LookerAPIStats()
Expand Down
8 changes: 8 additions & 0 deletions metadata-ingestion/tests/unit/serde/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.file import FileSourceConfig, GenericFileSource
from datahub.metadata.schema_classes import (
ASPECT_CLASSES,
KEY_ASPECTS,
MetadataChangeEventClass,
OwnershipClass,
_Aspect,
Expand All @@ -34,6 +36,12 @@ def test_codegen_aspect_name():
assert OwnershipClass.get_aspect_name() == "ownership"


def test_codegen_aspects():
# These bounds are extremely loose, and mainly verify that the lists aren't empty.
assert len(ASPECT_CLASSES) > 30
assert len(KEY_ASPECTS) > 10


def test_cannot_instantiated_codegen_aspect():
with pytest.raises(TypeError, match="instantiate"):
_Aspect()
Expand Down