Skip to content

Commit

Permalink
feat(sdk): python - add get_aspects_for_entity (datahub-project#5255)
Browse files Browse the repository at this point in the history
Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
2 people authored and maggiehays committed Aug 1, 2022
1 parent 272e060 commit a151f19
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
33 changes: 33 additions & 0 deletions metadata-ingestion/examples/library/dataset_query_entity_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging

from datahub.emitter.mce_builder import make_dataset_urn

# read-modify-write requires access to the DataHubGraph (RestEmitter is not enough)
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

# Imports for metadata model classes
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetKeyClass,
StatusClass,
)

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")

gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))

# Query multiple aspects from entity
result = graph.get_aspects_for_entity(
entity_urn=dataset_urn,
aspects=["status", "dataPlatformInstance", "datasetKey"],
aspect_types=[StatusClass, DataPlatformInstanceClass, DatasetKeyClass],
)

# result are typed according to their class if exist
if result is not None:
if result["datasetKey"]:
log.info(result["datasetKey"].name)
53 changes: 53 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,59 @@ def get_latest_timeseries_value(
)
return None

def get_aspects_for_entity(
self,
entity_urn: str,
aspects: List[str],
aspect_types: List[Type[Aspect]],
) -> Optional[Dict[str, Optional[Aspect]]]:
"""
Get multiple aspects for an entity. To get a single aspect for an entity, use the `get_aspect_v2` method.
Warning: Do not use this method to determine if an entity exists!
This method will always return an entity, even if it doesn't exist. This is an issue with how DataHub server
responds to these calls, and will be fixed automatically when the server-side issue is fixed.
:param str entity_urn: The urn of the entity
:param List[Type[Aspect]] aspect_type_list: List of aspect type classes being requested (e.g. [datahub.metadata.schema_classes.DatasetProperties])
:param List[str] aspects_list: List of aspect names being requested (e.g. [schemaMetadata, datasetProperties])
:return: Optionally, a map of aspect_name to aspect_value as a dictionary if present, aspect_value will be set to None if that aspect was not found. Returns None on HTTP status 404.
:rtype: Optional[Dict[str, Optional[Aspect]]]
:raises HttpError: if the HTTP response is not a 200 or a 404
"""
assert len(aspects) == len(
aspect_types
), f"number of aspects requested ({len(aspects)}) should be the same as number of aspect types provided ({len(aspect_types)})"
aspects_list = ",".join(aspects)
url: str = f"{self._gms_server}/entitiesV2/{Urn.url_encode(entity_urn)}?aspects=List({aspects_list})"

response = self._session.get(url)
if response.status_code == 404:
# not found
return None
response.raise_for_status()
response_json = response.json()

result: Dict[str, Optional[Aspect]] = {}
for aspect_type in aspect_types:
record_schema: RecordSchema = aspect_type.__getattribute__(
aspect_type, "RECORD_SCHEMA"
)
if not record_schema:
logger.warning(
f"Failed to infer type name of the aspect from the aspect type class {aspect_type}. Continuing, but this will fail."
)
else:
aspect_type_name = record_schema.props["Aspect"]["name"]
aspect_json = response_json.get("aspects", {}).get(aspect_type_name)
if aspect_json:
# need to apply a transform to the response to match rest.li and avro serialization
post_json_obj = post_json_transform(aspect_json)
result[aspect_type_name] = aspect_type.from_obj(post_json_obj["value"])
else:
result[aspect_type_name] = None

return result

def _get_search_endpoint(self):
return f"{self.config.server}/entities?action=search"

Expand Down

0 comments on commit a151f19

Please sign in to comment.