From b548889ff2efc70182b1e808d749764c7b2c1826 Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Thu, 12 May 2022 22:52:46 +0800 Subject: [PATCH 1/7] Fix another CI test --- feathr_project/test/test_feature_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feathr_project/test/test_feature_registry.py b/feathr_project/test/test_feature_registry.py index 82ffc1585..4abb73b6d 100644 --- a/feathr_project/test/test_feature_registry.py +++ b/feathr_project/test/test_feature_registry.py @@ -41,7 +41,7 @@ def test_feathr_register_features_e2e(): all_features = client.list_registered_features(project_name=client.project_name) all_feature_names = [x['name'] for x in all_features] - assert 'f_is_long_trip_distance' in all_features # test regular ones + assert 'f_is_long_trip_distance' in all_feature_names # test regular ones assert 'f_trip_time_rounded' in all_feature_names # make sure derived features are there assert 'f_location_avg_fare' in all_feature_names # make sure aggregated features are there assert 'f_trip_time_rounded_plus' in all_feature_names # make sure derived features are there From 04f5674f08c40f3c9de2ed39aa5486d5049d36ce Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Sun, 29 May 2022 08:19:54 +0800 Subject: [PATCH 2/7] quick fix for conflict of feature registration. --- feathr_project/feathr/_feature_registry.py | 32 ++++- feathr_project/test/test_feature_registry.py | 22 ++++ feathr_project/test/test_fixture.py | 122 +++++++++++++++++++ 3 files changed, 173 insertions(+), 3 deletions(-) diff --git a/feathr_project/feathr/_feature_registry.py b/feathr_project/feathr/_feature_registry.py index 304d2b5d2..603cd4869 100644 --- a/feathr_project/feathr/_feature_registry.py +++ b/feathr_project/feathr/_feature_registry.py @@ -218,13 +218,16 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]: anchor_feature_entities = self._parse_anchor_features(anchor) # then parse the source of that anchor source_entity = self._parse_source(anchor.source) - + fully_qualified_name = self.project_name+self.registry_delimiter+anchor.name + original_id = self.get_feature_id(fully_qualified_name) + original_anchor = self.get_feature_by_guid(original_id) if original_id else None + merged_elements = self._merge_anchor(original_anchor,anchor_feature_entities) anchor_entity = AtlasEntity( name=anchor.name, - qualified_name=self.project_name + self.registry_delimiter + anchor.name, + qualified_name=fully_qualified_name, attributes={ "source": source_entity.to_json(minimum=True), - "features": [s.to_json(minimum=True) for s in anchor_feature_entities], + "features": merged_elements, "tags": anchor.registry_tags }, typeName=TYPEDEF_ANCHOR, @@ -260,6 +263,29 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]: anchors_batch.append(anchor_entity) return anchors_batch + def _merge_anchor(self,original_anchor, new_anchor): + ''' + This function merges existing anchor with new anchor, in order to fix concurrent conflict. + This will serve as a quick fix, but will not solved it entirely. + Full fix will work with MVCC, and is in progress. + ''' + new_anchor_json_repr = [s.to_json(minimum=True) for s in new_anchor] + if not original_anchor: + return new_anchor_json_repr + else: + original_anchor_elements = [x for x in original_anchor['entity']['attributes']['features']] + transformed_original_elements = { + x['uniqueAttributes']['qualifiedName']: + { + 'guid':x['guid'], + 'typeName':x['typeName'], + 'qualifiedName':x['uniqueAttributes']['qualifiedName'] + } + for x in original_anchor_elements} + for elem in new_anchor_json_repr: + transformed_original_elements.setdefault(elem['qualifiedName'],elem) + return list(transformed_original_elements.values()) + def _parse_source(self, source: Union[Source, HdfsSource]) -> AtlasEntity: """ parse the input sources diff --git a/feathr_project/test/test_feature_registry.py b/feathr_project/test/test_feature_registry.py index 20eedfb6d..d8b59706f 100644 --- a/feathr_project/test/test_feature_registry.py +++ b/feathr_project/test/test_feature_registry.py @@ -4,6 +4,8 @@ from datetime import datetime, timedelta from pathlib import Path +from test_fixture import registry_test_setup_append, registry_test_setup_partially + import pytest from click.testing import CliRunner from feathr import (FeatureAnchor, FeatureQuery, ObservationSettings, TypedKey, @@ -62,7 +64,27 @@ def test_feathr_register_features_e2e(): output_path=output_path) client.wait_job_to_finish(timeout_sec=900) +def test_feathr_register_features_partially(): + """ + This test will register features, get all the registered features, then query a set of already registered features. + """ + test_workspace_dir = Path( + __file__).parent.resolve() / "test_user_workspace" + client: FeathrClient = registry_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client.register_features() + full_registration = client.get_features_from_registry(client.project_name) + + client: FeathrClient = registry_test_setup_partially(os.path.join(test_workspace_dir, "feathr_config.yaml")) + new_project_name = client.project_name + client.register_features() + + client: FeathrClient = registry_test_setup_append(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client.project_name = new_project_name + client.register_features() + appended_registration = client.get_features_from_registry(client.project_name) + # after a full registration, another registration should not affect the registered anchor features. + assert len(full_registration.items())==len(appended_registration.items()) def test_get_feature_from_registry(): registry = _FeatureRegistry("mock_project","mock_purview","mock_delimeter") diff --git a/feathr_project/test/test_fixture.py b/feathr_project/test/test_fixture.py index e7fce6d88..692a1f610 100644 --- a/feathr_project/test/test_fixture.py +++ b/feathr_project/test/test_fixture.py @@ -161,6 +161,128 @@ def kafka_test_setup(config_path: str): def registry_test_setup(config_path: str): + # use a new project name every time to make sure all features are registered correctly + now = datetime.now() + os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) + + client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"}) + request_anchor, agg_anchor, derived_feature_list = generate_entities() + + client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list) + return client +def registry_test_setup_partially(config_path: str): + + + # use a new project name every time to make sure all features are registered correctly + now = datetime.now() + os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) + + client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"}) + + request_anchor, agg_anchor, derived_feature_list = generate_entities() + agg_anchor.features = agg_anchor.features[:1] + client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list) + return client + +def registry_test_setup_append(config_path: str): + + + # use a new project name every time to make sure all features are registered correctly + now = datetime.now() + os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) + + client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"}) + + request_anchor, agg_anchor, derived_feature_list = generate_entities() + agg_anchor.features = agg_anchor.features[1:] + client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list) + return client + + +def generate_entities(): + def add_new_dropoff_and_fare_amount_column(df: DataFrame): + df = df.withColumn("new_lpep_dropoff_datetime", col("lpep_dropoff_datetime")) + df = df.withColumn("new_fare_amount", col("fare_amount") + 1000000) + return df + + batch_source = HdfsSource(name="nycTaxiBatchSource", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", + event_timestamp_column="lpep_dropoff_datetime", + timestamp_format="yyyy-MM-dd HH:mm:ss", + preprocessing=add_new_dropoff_and_fare_amount_column, + registry_tags={"for_test_purpose":"true"} + ) + + f_trip_distance = Feature(name="f_trip_distance", + feature_type=FLOAT, transform="trip_distance", + registry_tags={"for_test_purpose":"true"} + ) + f_trip_time_duration = Feature(name="f_trip_time_duration", + feature_type=INT32, + transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60") + + + features = [ + f_trip_distance, + f_trip_time_duration, + Feature(name="f_is_long_trip_distance", + feature_type=BOOLEAN, + transform="cast_float(trip_distance)>30"), + Feature(name="f_day_of_week", + feature_type=INT32, + transform="dayofweek(lpep_dropoff_datetime)"), + ] + + + request_anchor = FeatureAnchor(name="request_features", + source=INPUT_CONTEXT, + features=features, + registry_tags={"for_test_purpose":"true"} + ) + + f_trip_time_distance = DerivedFeature(name="f_trip_time_distance", + feature_type=FLOAT, + input_features=[ + f_trip_distance, f_trip_time_duration], + transform="f_trip_distance * f_trip_time_duration") + + f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded", + feature_type=INT32, + input_features=[f_trip_time_duration], + transform="f_trip_time_duration % 10") + f_trip_time_rounded_plus = DerivedFeature(name="f_trip_time_rounded_plus", + feature_type=INT32, + input_features=[f_trip_time_rounded], + transform="f_trip_time_rounded + 100") + + location_id = TypedKey(key_column="DOLocationID", + key_column_type=ValueType.INT32, + description="location id in NYC", + full_name="nyc_taxi.location_id") + agg_features = [Feature(name="f_location_avg_fare", + key=location_id, + feature_type=FLOAT, + transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", + agg_func="AVG", + window="90d")) + ] + + agg_anchor = FeatureAnchor(name="aggregationFeatures", + source=batch_source, + features=agg_features) + + derived_feature_list = [ + f_trip_time_distance, f_trip_time_rounded, f_trip_time_rounded_plus] + + # shuffule the order to make sure they can be parsed correctly + # Those input derived features can be in arbitrary order, but in order to parse the right dependencies, we need to reorder them internally in a certain order. + # This shuffle is to make sure that each time we have random shuffle for the input and make sure the internal sorting algorithm works (we are using topological sort). + random.shuffle(derived_feature_list) + return request_anchor,agg_anchor,derived_feature_list + +def registry_test_setup_append(config_path: str): + + # use a new project name every time to make sure all features are registered correctly now = datetime.now() os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) From 9da84febde50cabac3ee45048781804a6ec9919d Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Tue, 31 May 2022 21:37:24 +0800 Subject: [PATCH 3/7] Change comment and type hint --- feathr_project/feathr/_feature_registry.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/feathr_project/feathr/_feature_registry.py b/feathr_project/feathr/_feature_registry.py index 603cd4869..fea1e2360 100644 --- a/feathr_project/feathr/_feature_registry.py +++ b/feathr_project/feathr/_feature_registry.py @@ -263,14 +263,16 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]: anchors_batch.append(anchor_entity) return anchors_batch - def _merge_anchor(self,original_anchor, new_anchor): + def _merge_anchor(self,original_anchor:dict, new_anchor:dict)->dict[str,any]: + print('gua?') ''' - This function merges existing anchor with new anchor, in order to fix concurrent conflict. - This will serve as a quick fix, but will not solved it entirely. - Full fix will work with MVCC, and is in progress. + Merge the new anchors defined locally with the anchors that is defined in the centralized registry. ''' + # TODO: This will serve as a quick fix, full fix will work with MVCC, and is in progress. new_anchor_json_repr = [s.to_json(minimum=True) for s in new_anchor] if not original_anchor: + # if the anchor is not present on the registry, return json representation of the locally defined anchor. + # sample : [{'guid':'GUID_OF_ANCHOR','typeName':'','qualifiedName':'QUALIFIED_NAME'} return new_anchor_json_repr else: original_anchor_elements = [x for x in original_anchor['entity']['attributes']['features']] From 8fe5d7bbffbc68fd4aec4b96ebd594ac1983e489 Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Tue, 31 May 2022 21:40:16 +0800 Subject: [PATCH 4/7] rename --- feathr_project/feathr/_feature_registry.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/feathr_project/feathr/_feature_registry.py b/feathr_project/feathr/_feature_registry.py index fea1e2360..981fc7352 100644 --- a/feathr_project/feathr/_feature_registry.py +++ b/feathr_project/feathr/_feature_registry.py @@ -218,13 +218,13 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]: anchor_feature_entities = self._parse_anchor_features(anchor) # then parse the source of that anchor source_entity = self._parse_source(anchor.source) - fully_qualified_name = self.project_name+self.registry_delimiter+anchor.name - original_id = self.get_feature_id(fully_qualified_name) + anchor_fully_qualified_name = self.project_name+self.registry_delimiter+anchor.name + original_id = self.get_feature_id(anchor_fully_qualified_name ) original_anchor = self.get_feature_by_guid(original_id) if original_id else None merged_elements = self._merge_anchor(original_anchor,anchor_feature_entities) anchor_entity = AtlasEntity( name=anchor.name, - qualified_name=fully_qualified_name, + qualified_name=anchor_fully_qualified_name , attributes={ "source": source_entity.to_json(minimum=True), "features": merged_elements, From 4f4e21f2ceb8fabfaea8e7cd9a2d297c0d43bb7c Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Wed, 1 Jun 2022 00:39:28 +0800 Subject: [PATCH 5/7] Try to fix typing issue --- feathr_project/feathr/_feature_registry.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/feathr_project/feathr/_feature_registry.py b/feathr_project/feathr/_feature_registry.py index 981fc7352..814e04890 100644 --- a/feathr_project/feathr/_feature_registry.py +++ b/feathr_project/feathr/_feature_registry.py @@ -263,8 +263,7 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]: anchors_batch.append(anchor_entity) return anchors_batch - def _merge_anchor(self,original_anchor:dict, new_anchor:dict)->dict[str,any]: - print('gua?') + def _merge_anchor(self,original_anchor:Dict, new_anchor:Dict)->Dict[str,any]: ''' Merge the new anchors defined locally with the anchors that is defined in the centralized registry. ''' From 378f30d634bb3be2ef1e12d5fd71e89e8780c5f5 Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Wed, 1 Jun 2022 00:46:06 +0800 Subject: [PATCH 6/7] change dict to list of dict, add comment to test --- feathr_project/feathr/_feature_registry.py | 2 +- feathr_project/test/test_feature_registry.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/feathr_project/feathr/_feature_registry.py b/feathr_project/feathr/_feature_registry.py index 814e04890..727168a28 100644 --- a/feathr_project/feathr/_feature_registry.py +++ b/feathr_project/feathr/_feature_registry.py @@ -263,7 +263,7 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]: anchors_batch.append(anchor_entity) return anchors_batch - def _merge_anchor(self,original_anchor:Dict, new_anchor:Dict)->Dict[str,any]: + def _merge_anchor(self,original_anchor:Dict, new_anchor:Dict)->List[Dict[str,any]]: ''' Merge the new anchors defined locally with the anchors that is defined in the centralized registry. ''' diff --git a/feathr_project/test/test_feature_registry.py b/feathr_project/test/test_feature_registry.py index d8b59706f..68c5c3449 100644 --- a/feathr_project/test/test_feature_registry.py +++ b/feathr_project/test/test_feature_registry.py @@ -66,7 +66,8 @@ def test_feathr_register_features_e2e(): def test_feathr_register_features_partially(): """ - This test will register features, get all the registered features, then query a set of already registered features. + This test will register full set of features into one project, then register another project in two partial registrations. + The length of the return value of get_features_from_registry should be identical. """ test_workspace_dir = Path( __file__).parent.resolve() / "test_user_workspace" From 354a7094b05424a04796d7a0e5827937ee86a443 Mon Sep 17 00:00:00 2001 From: Yihui Guo Date: Wed, 1 Jun 2022 01:09:55 +0800 Subject: [PATCH 7/7] add sleep to ensure register finish. --- feathr_project/test/test_feature_registry.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/feathr_project/test/test_feature_registry.py b/feathr_project/test/test_feature_registry.py index 68c5c3449..9c38e9579 100644 --- a/feathr_project/test/test_feature_registry.py +++ b/feathr_project/test/test_feature_registry.py @@ -73,15 +73,20 @@ def test_feathr_register_features_partially(): __file__).parent.resolve() / "test_user_workspace" client: FeathrClient = registry_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) client.register_features() + time.sleep(30) full_registration = client.get_features_from_registry(client.project_name) client: FeathrClient = registry_test_setup_partially(os.path.join(test_workspace_dir, "feathr_config.yaml")) new_project_name = client.project_name client.register_features() + time.sleep(30) + client: FeathrClient = registry_test_setup_append(os.path.join(test_workspace_dir, "feathr_config.yaml")) client.project_name = new_project_name client.register_features() + time.sleep(30) + appended_registration = client.get_features_from_registry(client.project_name) # after a full registration, another registration should not affect the registered anchor features.