Skip to content

Commit

Permalink
feat(ingest): include instance in container dataPlatform when provided (
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 13, 2022
1 parent 8d02010 commit 09616ee
Show file tree
Hide file tree
Showing 16 changed files with 139 additions and 99 deletions.
59 changes: 47 additions & 12 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import hashlib
import json
from typing import Any, Iterable, List, Optional, TypeVar
from typing import Any, Dict, Iterable, List, Optional, TypeVar

from pydantic.fields import Field
from pydantic.main import BaseModel

from datahub.emitter.mce_builder import make_container_urn, make_data_platform_urn
from datahub.emitter.mce_builder import (
make_container_urn,
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
Expand All @@ -28,23 +32,51 @@
from datahub.utilities.urns.urn import guess_entity_type


def _stable_guid_from_dict(d: dict) -> str:
json_key = json.dumps(
d,
separators=(",", ":"),
sort_keys=True,
cls=DatahubKeyJSONEncoder,
)
md5_hash = hashlib.md5(json_key.encode("utf-8"))
return str(md5_hash.hexdigest())


class DatahubKey(BaseModel):
def guid_dict(self) -> Dict[str, str]:
return self.dict(by_alias=True, exclude_none=True)

def guid(self) -> str:
nonnull_dict = self.dict(by_alias=True, exclude_none=True)
json_key = json.dumps(
nonnull_dict,
separators=(",", ":"),
sort_keys=True,
cls=DatahubKeyJSONEncoder,
)
md5_hash = hashlib.md5(json_key.encode("utf-8"))
return str(md5_hash.hexdigest())
bag = self.guid_dict()
return _stable_guid_from_dict(bag)


class PlatformKey(DatahubKey):
platform: str
instance: Optional[str] = None

# BUG: In some of our sources, we incorrectly set the platform instance
# to the env if no platform instance was specified. Now, we have to maintain
# backwards compatibility with this bug, which means generating our GUIDs
# in the same way. Specifically, we need to use the backcompat value if
# the normal instance value is not set.
backcompat_instance_for_guid: Optional[str] = Field(default=None, exclude=True)

def guid_dict(self) -> Dict[str, str]:
# FIXME: Notice that we can't use exclude_none=True here. This is because
# we need to maintain the insertion order in the dict (so that instance)
# comes before the keys from any subclasses. While the guid computation
# method uses sort_keys=True, we also use the guid_dict method when
# generating custom properties, which are not sorted.
bag = self.dict(by_alias=True, exclude_none=False)

if self.instance is None:
bag["instance"] = self.backcompat_instance_for_guid

bag = {k: v for k, v in bag.items() if v is not None}
return bag


class DatabaseKey(PlatformKey):
database: str
Expand Down Expand Up @@ -173,7 +205,7 @@ def gen_containers(
aspect=ContainerProperties(
name=name,
description=description,
customProperties=container_key.dict(exclude_none=True, by_alias=True),
customProperties=container_key.guid_dict(),
externalUrl=external_url,
qualifiedName=qualified_name,
),
Expand All @@ -196,6 +228,9 @@ def gen_containers(
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=DataPlatformInstance(
platform=f"{make_data_platform_urn(container_key.platform)}",
instance=f"{make_dataplatform_instance_urn(container_key.platform, container_key.instance)}"
if container_key.instance
else None,
),
)
wu = MetadataWorkUnit(
Expand Down
6 changes: 2 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,10 +923,8 @@ def gen_database_key(self, database: str) -> DatabaseKey:
return DatabaseKey(
database=database,
platform=self.platform,
instance=self.source_config.platform_instance
# keeps backward compatibility when platform instance is missed
if self.source_config.platform_instance is not None
else self.source_config.env,
instance=self.source_config.platform_instance,
backcompat_instance_for_guid=self.source_config.env,
)

def gen_database_containers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,18 +399,16 @@ def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey:
project_id=db_name,
dataset_id=schema,
platform=self.platform,
instance=self.config.platform_instance
if self.config.platform_instance is not None
else self.config.env,
instance=self.config.platform_instance,
backcompat_instance_for_guid=self.config.env,
)

def gen_project_id_key(self, database: str) -> PlatformKey:
return ProjectIdKey(
project_id=database,
platform=self.platform,
instance=self.config.platform_instance
if self.config.platform_instance is not None
else self.config.env,
instance=self.config.platform_instance,
backcompat_instance_for_guid=self.config.env,
)

def _gen_domain_urn(self, dataset_name: str) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class ContainerWUCreator:
def __init__(self, platform, platform_instance, env):
self.processed_containers = []
self.platform = platform
self.instance = env if platform_instance is None else platform_instance
self.instance = platform_instance
self.env = env

def create_emit_containers(
self,
Expand All @@ -54,13 +55,15 @@ def gen_folder_key(self, abs_path):
return FolderKey(
platform=self.platform,
instance=self.instance,
backcompat_instance_for_guid=self.env,
folder_abs_path=abs_path,
)

def gen_bucket_key(self, name):
return S3BucketKey(
platform="s3",
instance=self.instance,
backcompat_instance_for_guid=self.env,
bucket_name=name,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,18 +798,16 @@ def gen_schema_key(self, db_name: str, schema: str) -> PlatformKey:
database=db_name,
schema=schema,
platform=self.platform,
instance=self.config.platform_instance
if self.config.platform_instance is not None
else self.config.env,
instance=self.config.platform_instance,
backcompat_instance_for_guid=self.config.env,
)

def gen_database_key(self, database: str) -> PlatformKey:
return DatabaseKey(
database=database,
platform=self.platform,
instance=self.config.platform_instance
if self.config.platform_instance is not None
else self.config.env,
instance=self.config.platform_instance,
backcompat_instance_for_guid=self.config.env,
)

def _gen_domain_urn(self, dataset_name: str) -> Optional[str]:
Expand Down
5 changes: 2 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/sql/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,8 @@ def gen_schema_key(self, db_name: str, schema: str) -> DatabaseKey:
return DatabaseKey(
database=schema,
platform=self.platform,
instance=self.config.platform_instance
if self.config.platform_instance is not None
else self.config.env,
instance=self.config.platform_instance,
backcompat_instance_for_guid=self.config.env,
)

def gen_schema_containers(
Expand Down
10 changes: 4 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,18 +1181,16 @@ def gen_schema_key(self, db_name: str, schema: str) -> PlatformKey:
project_id=db_name,
dataset_id=schema,
platform=self.platform,
instance=self.config.platform_instance
if self.config.platform_instance is not None
else self.config.env,
instance=self.config.platform_instance,
backcompat_instance_for_guid=self.config.env,
)

def gen_database_key(self, database: str) -> PlatformKey:
return ProjectIdKey(
project_id=database,
platform=self.platform,
instance=self.config.platform_instance
if self.config.platform_instance is not None
else self.config.env,
instance=self.config.platform_instance,
backcompat_instance_for_guid=self.config.env,
)

def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,18 +566,16 @@ def gen_schema_key(self, db_name: str, schema: str) -> PlatformKey:
database=db_name,
schema=schema,
platform=self.platform,
instance=self.config.platform_instance
if self.config.platform_instance is not None
else self.config.env,
instance=self.config.platform_instance,
backcompat_instance_for_guid=self.config.env,
)

def gen_database_key(self, database: str) -> PlatformKey:
return DatabaseKey(
database=database,
platform=self.platform,
instance=self.config.platform_instance
if self.config.platform_instance is not None
else self.config.env,
instance=self.config.platform_instance,
backcompat_instance_for_guid=self.config.env,
)

def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -605,6 +603,7 @@ def gen_schema_containers(
database_container_key = self.gen_database_key(database=db_name)

container_workunits = gen_containers(
# TODO: this one is bad
schema_container_key,
schema,
[SqlContainerSubTypes.SCHEMA],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:clickhouse\"}",
"value": "{\"platform\": \"urn:li:dataPlatform:clickhouse\", \"instance\": \"urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)\"}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -89,7 +89,7 @@
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:clickhouse\"}",
"value": "{\"platform\": \"urn:li:dataPlatform:clickhouse\", \"instance\": \"urn:li:dataPlatformInstance:(urn:li:dataPlatform:clickhouse,clickhousetestserver)\"}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -172,11 +172,11 @@
"primary_key": "col_Int64",
"sampling_key": "",
"storage_policy": "default",
"metadata_modification_time": "2022-10-08 00:28:00",
"metadata_modification_time": "2022-10-13 04:56:28",
"total_rows": "10",
"total_bytes": "671",
"data_paths": "['/var/lib/clickhouse/store/3d0/3d0dc2c7-992b-4867-9bce-9272fff6f4ce/']",
"metadata_path": "/var/lib/clickhouse/store/603/603d6700-1767-42bf-9669-b2b86c3de24e/mv_target_table.sql"
"data_paths": "['/var/lib/clickhouse/store/f12/f12ef1cd-d030-4ce1-aa00-2abe38a0af22/']",
"metadata_path": "/var/lib/clickhouse/store/1df/1df0ba79-5a9d-4f93-8462-1e7056c3aea5/mv_target_table.sql"
},
"name": "mv_target_table",
"description": "This is target table for materialized view",
Expand Down Expand Up @@ -340,11 +340,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "default",
"metadata_modification_time": "2022-10-08 00:28:00",
"metadata_modification_time": "2022-10-13 04:56:28",
"total_rows": "0",
"total_bytes": "0",
"data_paths": "['/var/lib/clickhouse/store/490/4900ce0f-0c6d-4dad-8317-647e14fa4fee/']",
"metadata_path": "/var/lib/clickhouse/store/603/603d6700-1767-42bf-9669-b2b86c3de24e/test_data_types.sql"
"data_paths": "['/var/lib/clickhouse/store/938/9383ca3f-c98e-4240-9e0d-0f32939d170b/']",
"metadata_path": "/var/lib/clickhouse/store/1df/1df0ba79-5a9d-4f93-8462-1e7056c3aea5/test_data_types.sql"
},
"name": "test_data_types",
"description": "This table has basic types",
Expand Down Expand Up @@ -951,11 +951,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "",
"metadata_modification_time": "2022-10-08 00:28:00",
"metadata_modification_time": "2022-10-13 04:56:28",
"total_rows": "None",
"total_bytes": "None",
"data_paths": "[]",
"metadata_path": "/var/lib/clickhouse/store/603/603d6700-1767-42bf-9669-b2b86c3de24e/test_dict.sql"
"metadata_path": "/var/lib/clickhouse/store/1df/1df0ba79-5a9d-4f93-8462-1e7056c3aea5/test_dict.sql"
},
"name": "test_dict",
"description": "",
Expand Down Expand Up @@ -1080,11 +1080,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "default",
"metadata_modification_time": "2022-10-08 00:28:00",
"metadata_modification_time": "2022-10-13 04:56:28",
"total_rows": "0",
"total_bytes": "0",
"data_paths": "['/var/lib/clickhouse/store/a35/a358faa5-d2c5-4f95-9557-064345ec20df/']",
"metadata_path": "/var/lib/clickhouse/store/603/603d6700-1767-42bf-9669-b2b86c3de24e/test_nested_data_types.sql"
"data_paths": "['/var/lib/clickhouse/store/f7f/f7f15fe6-10cc-45d6-928e-6eb4ecc7a502/']",
"metadata_path": "/var/lib/clickhouse/store/1df/1df0ba79-5a9d-4f93-8462-1e7056c3aea5/test_nested_data_types.sql"
},
"name": "test_nested_data_types",
"description": "This table has nested types",
Expand Down Expand Up @@ -1327,11 +1327,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "",
"metadata_modification_time": "2022-10-08 00:28:00",
"metadata_modification_time": "2022-10-13 04:56:28",
"total_rows": "None",
"total_bytes": "None",
"data_paths": "['/var/lib/clickhouse/store/3d0/3d0dc2c7-992b-4867-9bce-9272fff6f4ce/']",
"metadata_path": "/var/lib/clickhouse/store/603/603d6700-1767-42bf-9669-b2b86c3de24e/mv_with_target_table.sql",
"data_paths": "['/var/lib/clickhouse/store/f12/f12ef1cd-d030-4ce1-aa00-2abe38a0af22/']",
"metadata_path": "/var/lib/clickhouse/store/1df/1df0ba79-5a9d-4f93-8462-1e7056c3aea5/mv_with_target_table.sql",
"view_definition": "",
"is_view": "True"
},
Expand Down Expand Up @@ -1525,11 +1525,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "",
"metadata_modification_time": "2022-10-08 00:28:00",
"metadata_modification_time": "2022-10-13 04:56:28",
"total_rows": "0",
"total_bytes": "0",
"data_paths": "['/var/lib/clickhouse/store/3b4/3b4a7a66-2669-49b9-aa3d-07443d5ace06/']",
"metadata_path": "/var/lib/clickhouse/store/603/603d6700-1767-42bf-9669-b2b86c3de24e/mv_without_target_table.sql",
"data_paths": "['/var/lib/clickhouse/store/6f6/6f6a1b0a-8873-48b2-a370-1fbf5c04eea8/']",
"metadata_path": "/var/lib/clickhouse/store/1df/1df0ba79-5a9d-4f93-8462-1e7056c3aea5/mv_without_target_table.sql",
"view_definition": "",
"is_view": "True"
},
Expand Down Expand Up @@ -1723,11 +1723,11 @@
"primary_key": "",
"sampling_key": "",
"storage_policy": "",
"metadata_modification_time": "2022-10-08 00:28:00",
"metadata_modification_time": "2022-10-13 04:56:28",
"total_rows": "None",
"total_bytes": "None",
"data_paths": "[]",
"metadata_path": "/var/lib/clickhouse/store/603/603d6700-1767-42bf-9669-b2b86c3de24e/test_view.sql",
"metadata_path": "/var/lib/clickhouse/store/1df/1df0ba79-5a9d-4f93-8462-1e7056c3aea5/test_view.sql",
"view_definition": "",
"is_view": "True"
},
Expand Down
Loading

0 comments on commit 09616ee

Please sign in to comment.