From a151f196cd4408fca0db2e0551abf99166e67569 Mon Sep 17 00:00:00 2001 From: Aezo <45879156+aezomz@users.noreply.github.com> Date: Wed, 13 Jul 2022 01:10:07 +0800 Subject: [PATCH] feat(sdk): python - add get_aspects_for_entity (#5255) Co-authored-by: Shirshanka Das --- .../library/dataset_query_entity_v2.py | 33 ++++++++++++ .../src/datahub/ingestion/graph/client.py | 53 +++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 metadata-ingestion/examples/library/dataset_query_entity_v2.py diff --git a/metadata-ingestion/examples/library/dataset_query_entity_v2.py b/metadata-ingestion/examples/library/dataset_query_entity_v2.py new file mode 100644 index 00000000000000..a8b8439079f22c --- /dev/null +++ b/metadata-ingestion/examples/library/dataset_query_entity_v2.py @@ -0,0 +1,33 @@ +import logging + +from datahub.emitter.mce_builder import make_dataset_urn + +# read-modify-write requires access to the DataHubGraph (RestEmitter is not enough) +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph + +# Imports for metadata model classes +from datahub.metadata.schema_classes import ( + DataPlatformInstanceClass, + DatasetKeyClass, + StatusClass, +) + +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD") + +gms_endpoint = "http://localhost:8080" +graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint)) + +# Query multiple aspects from entity +result = graph.get_aspects_for_entity( + entity_urn=dataset_urn, + aspects=["status", "dataPlatformInstance", "datasetKey"], + aspect_types=[StatusClass, DataPlatformInstanceClass, DatasetKeyClass], +) + +# result are typed according to their class if exist +if result is not None: + if result["datasetKey"]: + log.info(result["datasetKey"].name) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 2e876bfa8d15bc..9b069f148f7f94 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -288,6 +288,59 @@ def get_latest_timeseries_value( ) return None + def get_aspects_for_entity( + self, + entity_urn: str, + aspects: List[str], + aspect_types: List[Type[Aspect]], + ) -> Optional[Dict[str, Optional[Aspect]]]: + """ + Get multiple aspects for an entity. To get a single aspect for an entity, use the `get_aspect_v2` method. + Warning: Do not use this method to determine if an entity exists! + This method will always return an entity, even if it doesn't exist. This is an issue with how DataHub server + responds to these calls, and will be fixed automatically when the server-side issue is fixed. + + :param str entity_urn: The urn of the entity + :param List[Type[Aspect]] aspect_type_list: List of aspect type classes being requested (e.g. [datahub.metadata.schema_classes.DatasetProperties]) + :param List[str] aspects_list: List of aspect names being requested (e.g. [schemaMetadata, datasetProperties]) + :return: Optionally, a map of aspect_name to aspect_value as a dictionary if present, aspect_value will be set to None if that aspect was not found. Returns None on HTTP status 404. + :rtype: Optional[Dict[str, Optional[Aspect]]] + :raises HttpError: if the HTTP response is not a 200 or a 404 + """ + assert len(aspects) == len( + aspect_types + ), f"number of aspects requested ({len(aspects)}) should be the same as number of aspect types provided ({len(aspect_types)})" + aspects_list = ",".join(aspects) + url: str = f"{self._gms_server}/entitiesV2/{Urn.url_encode(entity_urn)}?aspects=List({aspects_list})" + + response = self._session.get(url) + if response.status_code == 404: + # not found + return None + response.raise_for_status() + response_json = response.json() + + result: Dict[str, Optional[Aspect]] = {} + for aspect_type in aspect_types: + record_schema: RecordSchema = aspect_type.__getattribute__( + aspect_type, "RECORD_SCHEMA" + ) + if not record_schema: + logger.warning( + f"Failed to infer type name of the aspect from the aspect type class {aspect_type}. Continuing, but this will fail." + ) + else: + aspect_type_name = record_schema.props["Aspect"]["name"] + aspect_json = response_json.get("aspects", {}).get(aspect_type_name) + if aspect_json: + # need to apply a transform to the response to match rest.li and avro serialization + post_json_obj = post_json_transform(aspect_json) + result[aspect_type_name] = aspect_type.from_obj(post_json_obj["value"]) + else: + result[aspect_type_name] = None + + return result + def _get_search_endpoint(self): return f"{self.config.server}/entities?action=search"