Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yihui/moderate registration conflict #304

Merged
merged 11 commits into from
Jun 1, 2022
34 changes: 31 additions & 3 deletions feathr_project/feathr/_feature_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,16 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]:
anchor_feature_entities = self._parse_anchor_features(anchor)
# then parse the source of that anchor
source_entity = self._parse_source(anchor.source)

anchor_fully_qualified_name = self.project_name+self.registry_delimiter+anchor.name
original_id = self.get_feature_id(anchor_fully_qualified_name )
original_anchor = self.get_feature_by_guid(original_id) if original_id else None
merged_elements = self._merge_anchor(original_anchor,anchor_feature_entities)
anchor_entity = AtlasEntity(
name=anchor.name,
qualified_name=self.project_name + self.registry_delimiter + anchor.name,
qualified_name=anchor_fully_qualified_name ,
attributes={
"source": source_entity.to_json(minimum=True),
"features": [s.to_json(minimum=True) for s in anchor_feature_entities],
"features": merged_elements,
"tags": anchor.registry_tags
},
typeName=TYPEDEF_ANCHOR,
Expand Down Expand Up @@ -260,6 +263,31 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]:
anchors_batch.append(anchor_entity)
return anchors_batch

def _merge_anchor(self,original_anchor:dict, new_anchor:dict)->dict[str,any]:
print('gua?')
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
'''
Merge the new anchors defined locally with the anchors that is defined in the centralized registry.
'''
# TODO: This will serve as a quick fix, full fix will work with MVCC, and is in progress.
new_anchor_json_repr = [s.to_json(minimum=True) for s in new_anchor]
if not original_anchor:
# if the anchor is not present on the registry, return json representation of the locally defined anchor.
# sample : [{'guid':'GUID_OF_ANCHOR','typeName':'','qualifiedName':'QUALIFIED_NAME'}
return new_anchor_json_repr
else:
original_anchor_elements = [x for x in original_anchor['entity']['attributes']['features']]
YihuiGuo marked this conversation as resolved.
Show resolved Hide resolved
transformed_original_elements = {
x['uniqueAttributes']['qualifiedName']:
{
'guid':x['guid'],
'typeName':x['typeName'],
'qualifiedName':x['uniqueAttributes']['qualifiedName']
}
for x in original_anchor_elements}
for elem in new_anchor_json_repr:
transformed_original_elements.setdefault(elem['qualifiedName'],elem)
return list(transformed_original_elements.values())

def _parse_source(self, source: Union[Source, HdfsSource]) -> AtlasEntity:
"""
parse the input sources
Expand Down
22 changes: 22 additions & 0 deletions feathr_project/test/test_feature_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from datetime import datetime, timedelta
from pathlib import Path

from test_fixture import registry_test_setup_append, registry_test_setup_partially

import pytest
from click.testing import CliRunner
from feathr import (FeatureAnchor, FeatureQuery, ObservationSettings, TypedKey,
Expand Down Expand Up @@ -62,7 +64,27 @@ def test_feathr_register_features_e2e():
output_path=output_path)
client.wait_job_to_finish(timeout_sec=900)

def test_feathr_register_features_partially():
YihuiGuo marked this conversation as resolved.
Show resolved Hide resolved
"""
This test will register features, get all the registered features, then query a set of already registered features.
"""
test_workspace_dir = Path(
__file__).parent.resolve() / "test_user_workspace"
client: FeathrClient = registry_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))
client.register_features()
full_registration = client.get_features_from_registry(client.project_name)

client: FeathrClient = registry_test_setup_partially(os.path.join(test_workspace_dir, "feathr_config.yaml"))
new_project_name = client.project_name
client.register_features()

client: FeathrClient = registry_test_setup_append(os.path.join(test_workspace_dir, "feathr_config.yaml"))
client.project_name = new_project_name
client.register_features()
appended_registration = client.get_features_from_registry(client.project_name)

# after a full registration, another registration should not affect the registered anchor features.
assert len(full_registration.items())==len(appended_registration.items())

def test_get_feature_from_registry():
registry = _FeatureRegistry("mock_project","mock_purview","mock_delimeter")
Expand Down
122 changes: 122 additions & 0 deletions feathr_project/test/test_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,128 @@ def kafka_test_setup(config_path: str):
def registry_test_setup(config_path: str):


# use a new project name every time to make sure all features are registered correctly
now = datetime.now()
os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])
Yuqing-cat marked this conversation as resolved.
Show resolved Hide resolved

client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"})
request_anchor, agg_anchor, derived_feature_list = generate_entities()

client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list)
return client
def registry_test_setup_partially(config_path: str):


# use a new project name every time to make sure all features are registered correctly
now = datetime.now()
os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])

client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"})

request_anchor, agg_anchor, derived_feature_list = generate_entities()
agg_anchor.features = agg_anchor.features[:1]
client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list)
return client

def registry_test_setup_append(config_path: str):


# use a new project name every time to make sure all features are registered correctly
now = datetime.now()
os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])

client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"})

request_anchor, agg_anchor, derived_feature_list = generate_entities()
agg_anchor.features = agg_anchor.features[1:]
client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list)
return client


def generate_entities():
def add_new_dropoff_and_fare_amount_column(df: DataFrame):
df = df.withColumn("new_lpep_dropoff_datetime", col("lpep_dropoff_datetime"))
df = df.withColumn("new_fare_amount", col("fare_amount") + 1000000)
return df

batch_source = HdfsSource(name="nycTaxiBatchSource",
path="wasbs://[email protected]/sample_data/green_tripdata_2020-04.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss",
preprocessing=add_new_dropoff_and_fare_amount_column,
registry_tags={"for_test_purpose":"true"}
)

f_trip_distance = Feature(name="f_trip_distance",
feature_type=FLOAT, transform="trip_distance",
registry_tags={"for_test_purpose":"true"}
)
f_trip_time_duration = Feature(name="f_trip_time_duration",
feature_type=INT32,
transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60")


features = [
f_trip_distance,
f_trip_time_duration,
Feature(name="f_is_long_trip_distance",
feature_type=BOOLEAN,
transform="cast_float(trip_distance)>30"),
Feature(name="f_day_of_week",
feature_type=INT32,
transform="dayofweek(lpep_dropoff_datetime)"),
]


request_anchor = FeatureAnchor(name="request_features",
source=INPUT_CONTEXT,
features=features,
registry_tags={"for_test_purpose":"true"}
)

f_trip_time_distance = DerivedFeature(name="f_trip_time_distance",
feature_type=FLOAT,
input_features=[
f_trip_distance, f_trip_time_duration],
transform="f_trip_distance * f_trip_time_duration")

f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded",
feature_type=INT32,
input_features=[f_trip_time_duration],
transform="f_trip_time_duration % 10")
f_trip_time_rounded_plus = DerivedFeature(name="f_trip_time_rounded_plus",
feature_type=INT32,
input_features=[f_trip_time_rounded],
transform="f_trip_time_rounded + 100")

location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")
agg_features = [Feature(name="f_location_avg_fare",
key=location_id,
feature_type=FLOAT,
transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)",
agg_func="AVG",
window="90d"))
]

agg_anchor = FeatureAnchor(name="aggregationFeatures",
source=batch_source,
features=agg_features)

derived_feature_list = [
f_trip_time_distance, f_trip_time_rounded, f_trip_time_rounded_plus]

# shuffule the order to make sure they can be parsed correctly
# Those input derived features can be in arbitrary order, but in order to parse the right dependencies, we need to reorder them internally in a certain order.
# This shuffle is to make sure that each time we have random shuffle for the input and make sure the internal sorting algorithm works (we are using topological sort).
random.shuffle(derived_feature_list)
return request_anchor,agg_anchor,derived_feature_list

def registry_test_setup_append(config_path: str):


# use a new project name every time to make sure all features are registered correctly
now = datetime.now()
os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])
Expand Down