diff --git a/feathr_project/feathr/_feature_registry.py b/feathr_project/feathr/_feature_registry.py index 304d2b5d2..727168a28 100644 --- a/feathr_project/feathr/_feature_registry.py +++ b/feathr_project/feathr/_feature_registry.py @@ -218,13 +218,16 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]: anchor_feature_entities = self._parse_anchor_features(anchor) # then parse the source of that anchor source_entity = self._parse_source(anchor.source) - + anchor_fully_qualified_name = self.project_name+self.registry_delimiter+anchor.name + original_id = self.get_feature_id(anchor_fully_qualified_name ) + original_anchor = self.get_feature_by_guid(original_id) if original_id else None + merged_elements = self._merge_anchor(original_anchor,anchor_feature_entities) anchor_entity = AtlasEntity( name=anchor.name, - qualified_name=self.project_name + self.registry_delimiter + anchor.name, + qualified_name=anchor_fully_qualified_name , attributes={ "source": source_entity.to_json(minimum=True), - "features": [s.to_json(minimum=True) for s in anchor_feature_entities], + "features": merged_elements, "tags": anchor.registry_tags }, typeName=TYPEDEF_ANCHOR, @@ -260,6 +263,30 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]: anchors_batch.append(anchor_entity) return anchors_batch + def _merge_anchor(self,original_anchor:Dict, new_anchor:Dict)->List[Dict[str,any]]: + ''' + Merge the new anchors defined locally with the anchors that is defined in the centralized registry. + ''' + # TODO: This will serve as a quick fix, full fix will work with MVCC, and is in progress. + new_anchor_json_repr = [s.to_json(minimum=True) for s in new_anchor] + if not original_anchor: + # if the anchor is not present on the registry, return json representation of the locally defined anchor. + # sample : [{'guid':'GUID_OF_ANCHOR','typeName':'','qualifiedName':'QUALIFIED_NAME'} + return new_anchor_json_repr + else: + original_anchor_elements = [x for x in original_anchor['entity']['attributes']['features']] + transformed_original_elements = { + x['uniqueAttributes']['qualifiedName']: + { + 'guid':x['guid'], + 'typeName':x['typeName'], + 'qualifiedName':x['uniqueAttributes']['qualifiedName'] + } + for x in original_anchor_elements} + for elem in new_anchor_json_repr: + transformed_original_elements.setdefault(elem['qualifiedName'],elem) + return list(transformed_original_elements.values()) + def _parse_source(self, source: Union[Source, HdfsSource]) -> AtlasEntity: """ parse the input sources diff --git a/feathr_project/test/test_feature_registry.py b/feathr_project/test/test_feature_registry.py index 20eedfb6d..9c38e9579 100644 --- a/feathr_project/test/test_feature_registry.py +++ b/feathr_project/test/test_feature_registry.py @@ -4,6 +4,8 @@ from datetime import datetime, timedelta from pathlib import Path +from test_fixture import registry_test_setup_append, registry_test_setup_partially + import pytest from click.testing import CliRunner from feathr import (FeatureAnchor, FeatureQuery, ObservationSettings, TypedKey, @@ -62,7 +64,33 @@ def test_feathr_register_features_e2e(): output_path=output_path) client.wait_job_to_finish(timeout_sec=900) +def test_feathr_register_features_partially(): + """ + This test will register full set of features into one project, then register another project in two partial registrations. + The length of the return value of get_features_from_registry should be identical. + """ + test_workspace_dir = Path( + __file__).parent.resolve() / "test_user_workspace" + client: FeathrClient = registry_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client.register_features() + time.sleep(30) + full_registration = client.get_features_from_registry(client.project_name) + + client: FeathrClient = registry_test_setup_partially(os.path.join(test_workspace_dir, "feathr_config.yaml")) + new_project_name = client.project_name + client.register_features() + time.sleep(30) + + + client: FeathrClient = registry_test_setup_append(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client.project_name = new_project_name + client.register_features() + time.sleep(30) + + appended_registration = client.get_features_from_registry(client.project_name) + # after a full registration, another registration should not affect the registered anchor features. + assert len(full_registration.items())==len(appended_registration.items()) def test_get_feature_from_registry(): registry = _FeatureRegistry("mock_project","mock_purview","mock_delimeter") diff --git a/feathr_project/test/test_fixture.py b/feathr_project/test/test_fixture.py index e7fce6d88..692a1f610 100644 --- a/feathr_project/test/test_fixture.py +++ b/feathr_project/test/test_fixture.py @@ -161,6 +161,128 @@ def kafka_test_setup(config_path: str): def registry_test_setup(config_path: str): + # use a new project name every time to make sure all features are registered correctly + now = datetime.now() + os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) + + client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"}) + request_anchor, agg_anchor, derived_feature_list = generate_entities() + + client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list) + return client +def registry_test_setup_partially(config_path: str): + + + # use a new project name every time to make sure all features are registered correctly + now = datetime.now() + os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) + + client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"}) + + request_anchor, agg_anchor, derived_feature_list = generate_entities() + agg_anchor.features = agg_anchor.features[:1] + client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list) + return client + +def registry_test_setup_append(config_path: str): + + + # use a new project name every time to make sure all features are registered correctly + now = datetime.now() + os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) + + client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"}) + + request_anchor, agg_anchor, derived_feature_list = generate_entities() + agg_anchor.features = agg_anchor.features[1:] + client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list) + return client + + +def generate_entities(): + def add_new_dropoff_and_fare_amount_column(df: DataFrame): + df = df.withColumn("new_lpep_dropoff_datetime", col("lpep_dropoff_datetime")) + df = df.withColumn("new_fare_amount", col("fare_amount") + 1000000) + return df + + batch_source = HdfsSource(name="nycTaxiBatchSource", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", + event_timestamp_column="lpep_dropoff_datetime", + timestamp_format="yyyy-MM-dd HH:mm:ss", + preprocessing=add_new_dropoff_and_fare_amount_column, + registry_tags={"for_test_purpose":"true"} + ) + + f_trip_distance = Feature(name="f_trip_distance", + feature_type=FLOAT, transform="trip_distance", + registry_tags={"for_test_purpose":"true"} + ) + f_trip_time_duration = Feature(name="f_trip_time_duration", + feature_type=INT32, + transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60") + + + features = [ + f_trip_distance, + f_trip_time_duration, + Feature(name="f_is_long_trip_distance", + feature_type=BOOLEAN, + transform="cast_float(trip_distance)>30"), + Feature(name="f_day_of_week", + feature_type=INT32, + transform="dayofweek(lpep_dropoff_datetime)"), + ] + + + request_anchor = FeatureAnchor(name="request_features", + source=INPUT_CONTEXT, + features=features, + registry_tags={"for_test_purpose":"true"} + ) + + f_trip_time_distance = DerivedFeature(name="f_trip_time_distance", + feature_type=FLOAT, + input_features=[ + f_trip_distance, f_trip_time_duration], + transform="f_trip_distance * f_trip_time_duration") + + f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded", + feature_type=INT32, + input_features=[f_trip_time_duration], + transform="f_trip_time_duration % 10") + f_trip_time_rounded_plus = DerivedFeature(name="f_trip_time_rounded_plus", + feature_type=INT32, + input_features=[f_trip_time_rounded], + transform="f_trip_time_rounded + 100") + + location_id = TypedKey(key_column="DOLocationID", + key_column_type=ValueType.INT32, + description="location id in NYC", + full_name="nyc_taxi.location_id") + agg_features = [Feature(name="f_location_avg_fare", + key=location_id, + feature_type=FLOAT, + transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", + agg_func="AVG", + window="90d")) + ] + + agg_anchor = FeatureAnchor(name="aggregationFeatures", + source=batch_source, + features=agg_features) + + derived_feature_list = [ + f_trip_time_distance, f_trip_time_rounded, f_trip_time_rounded_plus] + + # shuffule the order to make sure they can be parsed correctly + # Those input derived features can be in arbitrary order, but in order to parse the right dependencies, we need to reorder them internally in a certain order. + # This shuffle is to make sure that each time we have random shuffle for the input and make sure the internal sorting algorithm works (we are using topological sort). + random.shuffle(derived_feature_list) + return request_anchor,agg_anchor,derived_feature_list + +def registry_test_setup_append(config_path: str): + + # use a new project name every time to make sure all features are registered correctly now = datetime.now() os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])