diff --git a/feathr_project/feathr/constants.py b/feathr_project/feathr/constants.py index 13adb785b..3a06ae6d8 100644 --- a/feathr_project/feathr/constants.py +++ b/feathr_project/feathr/constants.py @@ -15,6 +15,7 @@ REGISTRY_TYPEDEF_VERSION="v1" TYPEDEF_SOURCE=f'feathr_source_{REGISTRY_TYPEDEF_VERSION}' +# TODO: change the name from feathr_workspace_ to feathr_project_ TYPEDEF_FEATHR_PROJECT=f'feathr_workspace_{REGISTRY_TYPEDEF_VERSION}' TYPEDEF_DERIVED_FEATURE=f'feathr_derived_feature_{REGISTRY_TYPEDEF_VERSION}' TYPEDEF_ANCHOR=f'feathr_anchor_{REGISTRY_TYPEDEF_VERSION}' diff --git a/feathr_project/feathr/registry/_feature_registry_purview.py b/feathr_project/feathr/registry/_feature_registry_purview.py index 0a8b3af07..ca1f4af96 100644 --- a/feathr_project/feathr/registry/_feature_registry_purview.py +++ b/feathr_project/feathr/registry/_feature_registry_purview.py @@ -11,6 +11,7 @@ from tracemalloc import stop from typing import Dict, List, Optional, Tuple, Union from urllib.parse import urlparse +from time import sleep from azure.identity import DefaultAzureCredential from jinja2 import Template @@ -75,6 +76,7 @@ def _register_feathr_feature_types(self): type_feathr_project = EntityTypeDef( name=TYPEDEF_FEATHR_PROJECT, attributeDefs=[ + # TODO: this should be called "anchors" rather than "anchor_features" to make it less confusing. AtlasAttributeDef( name="anchor_features", typeName=TYPEDEF_ARRAY_ANCHOR, cardinality=Cardinality.SET), AtlasAttributeDef( @@ -219,7 +221,7 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]: # then parse the source of that anchor source_entity = self._parse_source(anchor.source) anchor_fully_qualified_name = self.project_name+self.registry_delimiter+anchor.name - original_id = self.get_feature_id(anchor_fully_qualified_name ) + original_id = self.get_feature_id(anchor_fully_qualified_name, type=TYPEDEF_ANCHOR ) original_anchor = self.get_feature_by_guid(original_id) if original_id else None merged_elements = self._merge_anchor(original_anchor,anchor_feature_entities) anchor_entity = AtlasEntity( @@ -733,18 +735,26 @@ def _delete_all_feathr_entities(self): :param guid: The guid or guids you want to remove. """ - entities = self.purview_client.discovery.search_entities( - "feathr*", limit=20) + # should not be large than this, otherwise the backend might throw out error + batch_delte_size = 100 - # [print(entity) for entity in entities] - guid_list = [entity["id"] for entity in entities] + # use the `query` API so that it can return immediatelly (don't use the search_entity API as it will try to return all the results in a single request) - # should not be large than this, otherwise the backend might throw out error - batch_delte_size = 15 - for i in range(0, len(guid_list), batch_delte_size): - self.purview_client.delete_entity( - guid=guid_list[i:i+batch_delte_size]) + while True: + result = self.purview_client.discovery.query( + "feathr", limit=batch_delte_size) + logger.info("Total number of entities:",result['@search.count'] ) + + # if no results, break: + if result['@search.count'] == 0: + break + entities = result['value'] + guid_list = [entity["id"] for entity in entities] + self.purview_client.delete_entity(guid=guid_list) logger.info("{} feathr entities deleted", batch_delte_size) + # sleep here, otherwise backend might throttle + # process the next batch after sleep + sleep(1) @classmethod def _get_registry_client(self): @@ -753,26 +763,43 @@ def _get_registry_client(self): """ return self.purview_client - def list_registered_features(self, project_name: str = None, limit=50, starting_offset=0) -> List[Dict[str,str]]: + def list_registered_features(self, project_name: str, limit=1000, starting_offset=0) -> List[Dict[str,str]]: """ List all the already registered features. If project_name is not provided or is None, it will return all the registered features; otherwise it will only return only features under this project """ - entities = self.purview_client.discovery.search_entities( - f"entityType:{TYPEDEF_ANCHOR_FEATURE} or entityType:{TYPEDEF_DERIVED_FEATURE}", limit=limit, starting_offset=starting_offset) + feature_list = [] + + if not project_name: + raise RuntimeError("project_name must be specified.") + + # get the corresponding features belongs to a certain project. + # note that we need to use "startswith" to filter out the features that don't belong to this project. + # see syntax here: https://docs.microsoft.com/en-us/rest/api/purview/catalogdataplane/discovery/query#discovery_query_andornested + query_filter = { + "and": [ + { + "or": + [ + {"entityType": TYPEDEF_DERIVED_FEATURE}, + {"entityType": TYPEDEF_ANCHOR_FEATURE} + ] + }, + { + "attributeName": "qualifiedName", + "operator": "startswith", + "attributeValue": project_name + self.registry_delimiter + } + ] + } + result = self.purview_client.discovery.query(filter=query_filter) + + entities = result['value'] + # entities = self.purview_client.discovery.search_entities(query = None, search_filter=query_filter, limit=limit) + for entity in entities: - if project_name: - # if project_name is a valid string, only append entities if the qualified name start with - # project_name+delimiter - qualified_name: str = entity["qualifiedName"] - # split the name based on delimiter - result = qualified_name.split(self.registry_delimiter) - if result[0].casefold() == project_name: - feature_list.append({"name":entity["name"],'id':entity['id'],"qualifiedName":entity['qualifiedName']}) - else: - # otherwise append all the entities - feature_list.append({"name":entity["name"],'id':entity['id'],"qualifiedName":entity['qualifiedName']}) + feature_list.append({"name":entity["name"],'id':entity['id'],"qualifiedName":entity['qualifiedName']}) return feature_list @@ -810,12 +837,27 @@ def get_feature_lineage(self, guid): """ return self.purview_client.get_entity_lineage(guid=guid) - def get_feature_id(self, qualifiedName): + def get_feature_id(self, qualifiedName, type: str): """ Get guid of a feature given its qualifiedName """ - search_term = "qualifiedName:{0}".format(qualifiedName) - entities = self.purview_client.discovery.search_entities(search_term) + # the search term should be full qualified name + # TODO: need to update the calling functions to add `type` field to make it more performant + # purview_client.get_entity(qualifiedName=qualifiedName) might not work here since it requires an additonal typeName parameter + # Currently still use the `query` API to get the result in a "full name match" way. + # self.purview_client.get_entity(qualifiedName=qualifiedName, typeName=type) + + # get the corresponding features belongs to a certain project. + # note that we need to use "eq" to filter exactly this qualified name + # see syntax here: https://docs.microsoft.com/en-us/rest/api/purview/catalogdataplane/discovery/query#discovery_query_andornested + query_filter = { + "attributeName": "qualifiedName", + "operator": "eq", + "attributeValue": qualifiedName + } + result = self.purview_client.discovery.query(keywords = None, filter=query_filter) + entities = result['value'] + # There should be exactly one result, but we don't enforce the check here for entity in entities: if entity.get('qualifiedName') == qualifiedName: return entity.get('id') @@ -829,7 +871,7 @@ def search_features(self, searchTerm): entities = self.purview_client.discovery.search_entities(searchTerm) return entities - def _list_registered_entities_with_details(self, project_name: str = None, entity_type: Union[str, List[str]] = None, limit=50, starting_offset=0,) -> List[Dict]: + def _list_registered_entities_with_details(self, project_name: str, entity_type: Union[str, List[str]] = None, limit=1000, starting_offset=0,) -> List[Dict]: """ List all the already registered entities. entity_type should be one of: SOURCE, DERIVED_FEATURE, ANCHOR, ANCHOR_FEATURE, FEATHR_PROJECT, or a list of those values limit: a maximum 1000 will be enforced at the underlying API @@ -844,30 +886,56 @@ def _list_registered_entities_with_details(self, project_name: str = None, entit raise RuntimeError( f'only SOURCE, DERIVED_FEATURE, ANCHOR, ANCHOR_FEATURE, FEATHR_PROJECT are supported when listing the registered entities, {entity_type} is not one of them.') - # the search grammar is less documented in Atlas/Purview. - # Here's the query grammar: https://atlas.apache.org/2.0.0/Search-Advanced.html - search_string = "".join( - [f" or entityType:{e}" for e in entity_type_list]) - # remvoe the first additional " or " - search_string = search_string[4:] - result_entities = self.purview_client.discovery.search_entities( - search_string, limit=limit, starting_offset=starting_offset) + if project_name is None: + raise RuntimeError("You need to specify a project_name") + # the search grammar: + # https://docs.microsoft.com/en-us/azure/purview/how-to-search-catalog#search-query-syntax + # https://docs.microsoft.com/en-us/rest/api/datacatalog/data-catalog-search-syntax-reference + + # get the corresponding features belongs to a certain project. + # note that we need to use "startswith" to filter out the features that don't belong to this project. + # see syntax here: https://docs.microsoft.com/en-us/rest/api/purview/catalogdataplane/discovery/query#discovery_query_andornested + # this search does the following: + # search all the entities that start with project_name+delimiter for all the search entities + # However, for TYPEDEF_FEATHR_PROJECT, it doesn't have delimiter in the qualifiedName + # Hence if TYPEDEF_FEATHR_PROJECT is in the `entity_type` input, we need to search for that specifically + # and finally "OR" the result to union them + query_filter = { + "or": + [{ + "and": [{ + # this is a list of the entity types that you want to query + "or": [{"entityType": e} for e in entity_type_list] + }, + { + "attributeName": "qualifiedName", + "operator": "startswith", + # use `project_name + self.registry_delimiter` to limit the search results + "attributeValue": project_name + self.registry_delimiter + }]}, + # if we are querying TYPEDEF_FEATHR_PROJECT, then "union" the result by using this query + { + "and": [{ + "or": [{"entityType": TYPEDEF_FEATHR_PROJECT}] if TYPEDEF_FEATHR_PROJECT in entity_type_list else None + }, + { + "attributeName": "qualifiedName", + "operator": "startswith", + "attributeValue": project_name + }]}] + } + # Important properties returned includes: # id (the guid of the entity), name, qualifiedName, @search.score, # and @search.highlights - guid_list = [] - for entity in result_entities: - if project_name: - # if project_name is a valid string, only append entities if the qualified name start with - # project_name+delimiter - qualified_name: str = entity["qualifiedName"] - # split the name based on delimiter - result = qualified_name.split(self.registry_delimiter) - if result[0].casefold() == project_name: - guid_list.append(entity["id"]) - else: - # otherwise append all the entities - guid_list.append(entity["id"]) + # TODO: it might be throttled in the backend and wait for the `pyapacheatlas` to fix this + # https://github.com/wjohnson/pyapacheatlas/issues/206 + # `pyapacheatlas` needs a bit optimization to avoid additional calls. + result_entities = self.purview_client.discovery.search_entities(query=None, search_filter=query_filter, limit = limit) + + # append the guid list. Since we are using project_name + delimiter to search, all the result will be valid. + guid_list = [entity["id"] for entity in result_entities] + entity_res = [] if guid_list is None or len(guid_list)==0 else self.purview_client.get_entity( guid=guid_list)["entities"] return entity_res @@ -879,15 +947,14 @@ def get_features_from_registry(self, project_name: str) -> Tuple[List[FeatureAnc Args: project_name (str): project name. """ - - entities = self._list_registered_entities_with_details(project_name=project_name,entity_type=[TYPEDEF_DERIVED_FEATURE, TYPEDEF_ANCHOR_FEATURE, TYPEDEF_FEATHR_PROJECT]) - if not entities: + all_entities_in_project = self._list_registered_entities_with_details(project_name=project_name,entity_type=[TYPEDEF_DERIVED_FEATURE, TYPEDEF_ANCHOR_FEATURE, TYPEDEF_FEATHR_PROJECT, TYPEDEF_ANCHOR, TYPEDEF_SOURCE]) + if not all_entities_in_project: # if the result is empty return (None, None) # get project entity, the else are feature entities (derived+anchor) - project_entity = [x for x in entities if x['typeName']==TYPEDEF_FEATHR_PROJECT][0] # there's only one available - feature_entities = [x for x in entities if x!=project_entity] + project_entity = [x for x in all_entities_in_project if x['typeName']==TYPEDEF_FEATHR_PROJECT][0] # there's only one available + feature_entities = [x for x in all_entities_in_project if (x['typeName']==TYPEDEF_ANCHOR_FEATURE or x['typeName']==TYPEDEF_DERIVED_FEATURE)] feature_entity_guid_mapping = {x['guid']:x for x in feature_entities} # this is guid for feature anchor (GROUP of anchor features) @@ -900,7 +967,6 @@ def get_features_from_registry(self, project_name: str) -> Tuple[List[FeatureAnc for derived_feature_entity_id in derived_feature_ids: # this will be used to generate DerivedFeature instance derived_feature_key_list = [] - for key in derived_feature_entity_id["attributes"]["key"]: derived_feature_key_list.append(TypedKey(key_column=key["key_column"], key_column_type=key["key_column_type"], full_name=key["full_name"], description=key["description"], key_column_alias=key["key_column_alias"])) @@ -908,30 +974,29 @@ def get_features_from_registry(self, project_name: str) -> Tuple[List[FeatureAnc # for feature anchor (GROUP), input features are splitted into input anchor features & input derived features anchor_feature_guid = [e["guid"] for e in derived_feature_entity_id["attributes"]["input_anchor_features"]] derived_feature_guid = [e["guid"] for e in derived_feature_entity_id["attributes"]["input_derived_features"]] - # for derived features, search all related input features. input_features_guid = self.search_input_anchor_features(derived_feature_guid,feature_entity_guid_mapping) - # chain the input features together - all_input_features = list(itertools.chain.from_iterable( - [self._get_features_by_guid(x) for x in input_features_guid+anchor_feature_guid])) - + # filter out features that is related with this derived feature + all_input_features = self._get_features_by_guid_or_entities(guid_list=input_features_guid+anchor_feature_guid, entity_list=all_entities_in_project) derived_feature_list.append(DerivedFeature(name=derived_feature_entity_id["attributes"]["name"], feature_type=self._get_feature_type_from_hocon(derived_feature_entity_id["attributes"]["type"]), transform=self._get_transformation_from_dict(derived_feature_entity_id["attributes"]['transformation']), key=derived_feature_key_list, input_features= all_input_features, registry_tags=derived_feature_entity_id["attributes"]["tags"])) - anchor_result = self.purview_client.get_entity(guid=anchor_guid)["entities"] + + # anchor_result = self.purview_client.get_entity(guid=anchor_guid)["entities"] + anchor_result = [x for x in all_entities_in_project if x['typeName']==TYPEDEF_ANCHOR] anchor_list = [] + for anchor_entity in anchor_result: feature_guid = [e["guid"] for e in anchor_entity["attributes"]["features"]] anchor_list.append(FeatureAnchor(name=anchor_entity["attributes"]["name"], - source=self._get_source_by_guid(anchor_entity["attributes"]["source"]["guid"]), - features=self._get_features_by_guid(feature_guid), + source=self._get_source_by_guid(anchor_entity["attributes"]["source"]["guid"], entity_list = all_entities_in_project), + features=self._get_features_by_guid_or_entities(guid_list = feature_guid, entity_list=all_entities_in_project), registry_tags=anchor_entity["attributes"]["tags"])) - return (anchor_list, derived_feature_list) def search_input_anchor_features(self,derived_guids,feature_entity_guid_mapping) ->List[str]: @@ -978,9 +1043,13 @@ def feathr_udf2(df) udf_source_code = [line+'\n' for line in udf_source_code_striped] return " ".join(udf_source_code) - def _get_source_by_guid(self, guid) -> Source: + def _get_source_by_guid(self, guid, entity_list) -> Source: + """give a entity list and the target GUID for the source entity, return a python `Source` object. + """ # TODO: currently return HDFS source by default. For JDBC source, it's currently implemented using HDFS Source so we should split in the future - source_entity = self.purview_client.get_entity(guid=guid)["entities"][0] + + # there should be only one entity available + source_entity = [x for x in entity_list if x['guid'] == guid][0] # if source_entity["attributes"]["path"] is INPUT_CONTEXT, it will also be assigned to this returned object return HdfsSource(name=source_entity["attributes"]["name"], @@ -1045,8 +1114,21 @@ def _get_transformation_from_dict(self, input: Dict) -> FeatureType: # no transformation function observed return None - def _get_features_by_guid(self, guid) -> List[FeatureAnchor]: - feature_entities = self.purview_client.get_entity(guid=guid)["entities"] + def _get_features_by_guid_or_entities(self, guid_list, entity_list) -> List[FeatureAnchor]: + """return a python list of the features that are referenced by a list of guids. + If entity_list is provided, use entity_list to reconstruct those features + This is for "anchor feature" only. + """ + if not entity_list: + feature_entities = self.purview_client.get_entity(guid=guid_list)["entities"] + else: + guid_set = set(guid_list) + feature_entities = [x for x in entity_list if x['guid'] in guid_set] + + # raise error if we cannot find all the guid + if len(feature_entities) != len(guid_list): + raise RuntimeError("Number of `feature_entities` is less than provided GUID list for search. The project might be broken.") + feature_list=[] key_list = [] for feature_entity in feature_entities: diff --git a/feathr_project/test/test_feature_registry.py b/feathr_project/test/test_feature_registry.py index 970fccdea..b5647d213 100644 --- a/feathr_project/test/test_feature_registry.py +++ b/feathr_project/test/test_feature_registry.py @@ -76,6 +76,9 @@ def test_feathr_register_features_partially(): client.register_features() time.sleep(30) full_registration = client.get_features_from_registry(client.project_name) + + now = datetime.now() + os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) client: FeathrClient = registry_test_setup_partially(os.path.join(test_workspace_dir, "feathr_config.yaml")) new_project_name = client.project_name diff --git a/feathr_project/test/test_fixture.py b/feathr_project/test/test_fixture.py index b7d431d33..145a4de81 100644 --- a/feathr_project/test/test_fixture.py +++ b/feathr_project/test/test_fixture.py @@ -171,12 +171,8 @@ def registry_test_setup(config_path: str): client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list) return client def registry_test_setup_partially(config_path: str): - - - # use a new project name every time to make sure all features are registered correctly - now = datetime.now() - os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) - + """Register a partial of a project. Will call `generate_entities()` and register only the first anchor feature. + """ client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"}) request_anchor, agg_anchor, derived_feature_list = generate_entities() @@ -185,11 +181,8 @@ def registry_test_setup_partially(config_path: str): return client def registry_test_setup_append(config_path: str): - - - # use a new project name every time to make sure all features are registered correctly - now = datetime.now() - os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) + """Append features to a project. Will call `generate_entities()` and register from the 2nd anchor feature + """ client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"})