Skip to content

Commit

Permalink
feat(ingest): infer aspect name from type in get_aspect
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Sep 23, 2022
1 parent d53cbbb commit fe926e3
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
EditableSchemaFieldInfoClass,
EditableSchemaMetadataClass,
GlobalTagsClass,
Expand Down Expand Up @@ -45,9 +44,8 @@ def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))


current_editable_schema_metadata = graph.get_aspect_v2(
current_editable_schema_metadata = graph.get_aspect(
entity_urn=dataset_urn,
aspect="editableSchemaMetadata",
aspect_type=EditableSchemaMetadataClass,
)

Expand Down Expand Up @@ -94,10 +92,7 @@ def get_simple_field_path_from_v2_field_path(field_path: str) -> str:

if need_write:
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="editableSchemaMetadata",
aspect=current_editable_schema_metadata,
)
graph.emit(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))


current_editable_schema_metadata = graph.get_aspect_v2(
entity_urn=dataset_urn,
aspect="editableSchemaMetadata",
aspect_type=EditableSchemaMetadataClass,
current_editable_schema_metadata = graph.get_aspect(
entity_urn=dataset_urn, aspect_type=EditableSchemaMetadataClass
)


Expand Down
19 changes: 4 additions & 15 deletions metadata-ingestion/examples/library/dataset_add_documentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
EditableDatasetPropertiesClass,
InstitutionalMemoryClass,
InstitutionalMemoryMetadataClass,
Expand Down Expand Up @@ -40,10 +39,8 @@
gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(config=DatahubClientConfig(server=gms_endpoint))

current_editable_properties = graph.get_aspect_v2(
entity_urn=dataset_urn,
aspect="editableDatasetProperties",
aspect_type=EditableDatasetPropertiesClass,
current_editable_properties = graph.get_aspect(
entity_urn=dataset_urn, aspect_type=EditableDatasetPropertiesClass
)

need_write = False
Expand All @@ -60,10 +57,7 @@

if need_write:
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="editableDatasetProperties",
aspect=current_editable_properties,
)
graph.emit(event)
Expand All @@ -73,10 +67,8 @@
log.info("Documentation already exists and is identical, omitting write")


current_institutional_memory = graph.get_aspect_v2(
entity_urn=dataset_urn,
aspect="institutionalMemory",
aspect_type=InstitutionalMemoryClass,
current_institutional_memory = graph.get_aspect(
entity_urn=dataset_urn, aspect_type=InstitutionalMemoryClass
)

need_write = False
Expand All @@ -94,10 +86,7 @@

if need_write:
event = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="institutionalMemory",
aspect=current_institutional_memory,
)
graph.emit(event)
Expand Down
10 changes: 2 additions & 8 deletions metadata-ingestion/examples/library/dataset_add_owner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

# Imports for metadata model classes
from datahub.metadata.schema_classes import (
ChangeTypeClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
Expand All @@ -34,10 +33,8 @@
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))


current_owners: Optional[OwnershipClass] = graph.get_aspect_v2(
entity_urn=dataset_urn,
aspect="ownership",
aspect_type=OwnershipClass,
current_owners: Optional[OwnershipClass] = graph.get_aspect(
entity_urn=dataset_urn, aspect_type=OwnershipClass
)


Expand All @@ -56,10 +53,7 @@

if need_write:
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="ownership",
aspect=current_owners,
)
graph.emit(event)
Expand Down
12 changes: 2 additions & 10 deletions metadata-ingestion/examples/library/dataset_add_tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

# Imports for metadata model classes
from datahub.metadata.schema_classes import (
ChangeTypeClass,
GlobalTagsClass,
TagAssociationClass,
)
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
Expand All @@ -24,9 +20,8 @@

dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")

current_tags: Optional[GlobalTagsClass] = graph.get_aspect_v2(
current_tags: Optional[GlobalTagsClass] = graph.get_aspect(
entity_urn=dataset_urn,
aspect="globalTags",
aspect_type=GlobalTagsClass,
)

Expand All @@ -46,10 +41,7 @@

if need_write:
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="globalTags",
aspect=current_tags,
)
graph.emit(event)
Expand Down
10 changes: 2 additions & 8 deletions metadata-ingestion/examples/library/dataset_add_term.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
GlossaryTermAssociationClass,
GlossaryTermsClass,
)
Expand All @@ -25,10 +24,8 @@

dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")

current_terms: Optional[GlossaryTermsClass] = graph.get_aspect_v2(
entity_urn=dataset_urn,
aspect="glossaryTerms",
aspect_type=GlossaryTermsClass,
current_terms: Optional[GlossaryTermsClass] = graph.get_aspect(
entity_urn=dataset_urn, aspect_type=GlossaryTermsClass
)

term_to_add = make_term_urn("Classification.HighlyConfidential")
Expand All @@ -52,10 +49,7 @@

if need_write:
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="glossaryTerms",
aspect=current_terms,
)
graph.emit(event)
Expand Down
108 changes: 38 additions & 70 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def __init__(self, config: Union[DatahubClientConfig, DataHubGraphConfig]) -> No
self.server_id = "missing"
return
try:
client_id: Optional[TelemetryClientIdClass] = self.get_aspect_v2(
"urn:li:telemetry:clientId", TelemetryClientIdClass, "telemetryClientId"
client_id: Optional[TelemetryClientIdClass] = self.get_aspect(
"urn:li:telemetry:clientId", TelemetryClientIdClass
)
self.server_id = client_id.clientId if client_id else "missing"
except Exception as e:
Expand Down Expand Up @@ -119,60 +119,39 @@ def _post_generic(self, url: str, payload_dict: Dict) -> Dict:
"Unable to get metadata from DataHub", {"message": str(e)}
) from e

@deprecated(
reason="Use get_aspect_v2 instead which makes aspect_type_name truly optional"
)
def get_aspect(
self,
entity_urn: str,
aspect: str,
aspect_type_name: Optional[str],
aspect_type: Type[Aspect],
) -> Optional[Aspect]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect=aspect,
aspect_type=aspect_type,
aspect_type_name=aspect_type_name,
)

def get_aspect_v2(
self,
entity_urn: str,
aspect_type: Type[Aspect],
aspect: str,
aspect_type_name: Optional[str] = None,
version: int = 0,
) -> Optional[Aspect]:
"""
Get an aspect for an entity.
:param str entity_urn: The urn of the entity
:param Type[Aspect] aspect_type: The type class of the aspect being requested (e.g. datahub.metadata.schema_classes.DatasetProperties)
:param str aspect: The name of the aspect being requested (e.g. schemaMetadata, datasetProperties, etc.)
:param Optional[str] aspect_type_name: The fully qualified classname of the aspect being requested. Typically not needed and extracted automatically from the class directly. (e.g. com.linkedin.common.DatasetProperties)
:param version: The version of the aspect to retrieve. The default of 0 means latest. Versions > 0 go from oldest to newest, so 1 is the oldest.
:return: the Aspect as a dictionary if present, None if no aspect was found (HTTP status 404)
:rtype: Optional[Aspect]
:raises HttpError: if the HTTP response is not a 200 or a 404
"""

aspect = aspect_type.ASPECT_NAME
url: str = f"{self._gms_server}/aspects/{Urn.url_encode(entity_urn)}?aspect={aspect}&version={version}"
response = self._session.get(url)
if response.status_code == 404:
# not found
return None
response.raise_for_status()
response_json = response.json()
if not aspect_type_name:
record_schema: RecordSchema = aspect_type.__getattribute__(
aspect_type, "RECORD_SCHEMA"
)
if not record_schema:
logger.warning(
f"Failed to infer type name of the aspect from the aspect type class {aspect_type}. Please provide an aspect_type_name. Continuing, but this will fail."
)
else:
aspect_type_name = record_schema.fullname.replace(".pegasus2avro", "")

# Figure out what field to look in.
record_schema: RecordSchema = aspect_type.__getattribute__(
aspect_type, "RECORD_SCHEMA"
)
aspect_type_name = record_schema.fullname.replace(".pegasus2avro", "")

# Deserialize the aspect json into the aspect type.
aspect_json = response_json.get("aspect", {}).get(aspect_type_name)
if aspect_json:
# need to apply a transform to the response to match rest.li and avro serialization
Expand All @@ -183,64 +162,53 @@ def get_aspect_v2(
f"Failed to find {aspect_type_name} in response {response_json}"
)

@deprecated(reason="Use get_aspect instead which makes aspect string name optional")
def get_aspect_v2(
self,
entity_urn: str,
aspect_type: Type[Aspect],
aspect: str,
aspect_type_name: Optional[str] = None,
version: int = 0,
) -> Optional[Aspect]:
assert aspect_type.ASPECT_NAME == aspect
return self.get_aspect(
entity_urn=entity_urn,
aspect_type=aspect_type,
version=version,
)

def get_config(self) -> Dict[str, Any]:
return self._get_generic(f"{self.config.server}/config")

def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="ownership",
aspect_type=OwnershipClass,
)
return self.get_aspect(entity_urn=entity_urn, aspect_type=OwnershipClass)

def get_schema_metadata(self, entity_urn: str) -> Optional[SchemaMetadataClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="schemaMetadata",
aspect_type=SchemaMetadataClass,
)
return self.get_aspect(entity_urn=entity_urn, aspect_type=SchemaMetadataClass)

def get_domain_properties(self, entity_urn: str) -> Optional[DomainPropertiesClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="domainProperties",
aspect_type=DomainPropertiesClass,
)
return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainPropertiesClass)

def get_dataset_properties(
self, entity_urn: str
) -> Optional[DatasetPropertiesClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="datasetProperties",
aspect_type=DatasetPropertiesClass,
return self.get_aspect(
entity_urn=entity_urn, aspect_type=DatasetPropertiesClass
)

def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="globalTags",
aspect_type=GlobalTagsClass,
)
return self.get_aspect(entity_urn=entity_urn, aspect_type=GlobalTagsClass)

def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="glossaryTerms",
aspect_type=GlossaryTermsClass,
)
return self.get_aspect(entity_urn=entity_urn, aspect_type=GlossaryTermsClass)

def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="domains",
aspect_type=DomainsClass,
)
return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainsClass)

def get_browse_path(self, entity_urn: str) -> Optional[BrowsePathsClass]:
return self.get_aspect_v2(
return self.get_aspect(
entity_urn=entity_urn,
aspect="browsePaths",
aspect_type=BrowsePathsClass,
)

Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,9 +1196,8 @@ def get_s3_tags() -> Optional[GlobalTagsClass]:
logger.debug(
"Connected to DatahubApi, grabbing current tags to maintain."
)
current_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_aspect_v2(
current_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_aspect(
entity_urn=dataset_urn,
aspect="globalTags",
aspect_type=GlobalTagsClass,
)
if current_tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ def get_s3_tags(
return None
if ctx.graph is not None:
logger.debug("Connected to DatahubApi, grabbing current tags to maintain.")
current_tags: Optional[GlobalTagsClass] = ctx.graph.get_aspect_v2(
current_tags: Optional[GlobalTagsClass] = ctx.graph.get_aspect(
entity_urn=dataset_urn,
aspect="globalTags",
aspect_type=GlobalTagsClass,
)
if current_tags:
Expand Down
Loading

0 comments on commit fe926e3

Please sign in to comment.