-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Fix feature service inference logic #3089
Changes from 6 commits
1e09ad3
5a5c6f6
07eaba6
51174b3
45ec63c
7a43b8f
40d7965
8dfb6db
c4d9c5a
64dfdef
31115ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -86,29 +86,55 @@ def __init__( | |||||
self.feature_view_projections.append(feature_grouping.projection) | ||||||
|
||||||
def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None): | ||||||
""" | ||||||
Infers the features for the projections of this feature service, and updates this feature | ||||||
service in place. | ||||||
|
||||||
This method is necessary since feature services may rely on feature views which require | ||||||
feature inference. | ||||||
""" | ||||||
for feature_grouping in self._features: | ||||||
if isinstance(feature_grouping, BaseFeatureView): | ||||||
# For feature services that depend on an unspecified feature view, apply inferred schema | ||||||
projection = feature_grouping.projection | ||||||
|
||||||
if fvs_to_update and feature_grouping.name in fvs_to_update: | ||||||
if feature_grouping.projection.desired_features: | ||||||
desired_features = set( | ||||||
feature_grouping.projection.desired_features | ||||||
) | ||||||
# There are three situations to be handled. First, the projection specifies | ||||||
# desired features, in which case we should select those desired features. | ||||||
# Second, the projection does not specify any desired features but has | ||||||
# already selected features, in which case nothing needs to be done. And | ||||||
# third, the projection does not specify any desired features but has not | ||||||
# yet selected features (since the original feature view did not yet have | ||||||
# features), in which case we should select all possible inferred features. | ||||||
if projection.desired_features: | ||||||
# First case, so we select the specific desired features. | ||||||
desired_features = set(projection.desired_features) | ||||||
actual_features = set( | ||||||
[ | ||||||
f.name | ||||||
for f in fvs_to_update[feature_grouping.name].features | ||||||
] | ||||||
) | ||||||
assert desired_features.issubset(actual_features) | ||||||
|
||||||
# We need to set the features for the projection at this point so we ensure we're starting with | ||||||
# an empty list. | ||||||
feature_grouping.projection.features = [] | ||||||
projection.features = [] | ||||||
for f in fvs_to_update[feature_grouping.name].features: | ||||||
if f.name in desired_features: | ||||||
feature_grouping.projection.features.append(f) | ||||||
projection.features.append(f) | ||||||
elif not projection.desired_features and projection.features: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be cleaner imo if you had early returns instead of the elif and else.. and maybe some example definitions e.g. if projection.desired_features:
# The projection wants to reference inferred features. Validate they exist
# Example: FeatureService(features=[[fv_with_no_schema["feature]])
...
return
if projection.features:
# The projection only references features from a FV's known schema (not inferred).
# Example 1: FeatureService(features=[fv_with_schema[["feature"]])
# Example 2: FeatureService(features=[fv_with_schema])
return
# The projection wants all features from a FV that has an inferred schema
# Example: FeatureService(features=[fv_with_no_schema])
... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wondering if we can exit earlier than this. i.e in line 98 instead |
||||||
# Second cass, so nothing needs to be done. In case something went wrong | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
# during feature inference, we check that the selected features still exist. | ||||||
actual_features = set( | ||||||
[ | ||||||
f.name | ||||||
for f in fvs_to_update[feature_grouping.name].features | ||||||
] | ||||||
) | ||||||
assert projection.features.issubset(actual_features) | ||||||
else: | ||||||
feature_grouping.projection.features = fvs_to_update[ | ||||||
# Third case, so all inferred features will be selected. | ||||||
projection.features = fvs_to_update[ | ||||||
feature_grouping.name | ||||||
].features | ||||||
else: | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
from datetime import timedelta | ||
|
||
from feast import Entity, FeatureService, FeatureView, Field, FileSource | ||
from feast.types import Float32, Int32, Int64 | ||
|
||
driver_hourly_stats = FileSource( | ||
path="data/driver_stats.parquet", # Fake path | ||
timestamp_field="event_timestamp", | ||
created_timestamp_column="created", | ||
) | ||
|
||
driver = Entity( | ||
name="driver_id", | ||
) | ||
|
||
driver_hourly_stats_view = FeatureView( | ||
name="driver_hourly_stats", | ||
entities=[driver], | ||
ttl=timedelta(days=1), | ||
schema=[ | ||
Field(name="conv_rate", dtype=Float32), | ||
Field(name="acc_rate", dtype=Float32), | ||
Field(name="avg_daily_trips", dtype=Int64), | ||
Field(name="driver_id", dtype=Int32), | ||
], | ||
online=True, | ||
source=driver_hourly_stats, | ||
tags={}, | ||
) | ||
|
||
global_daily_stats = FileSource( | ||
path="data/global_stats.parquet", # Fake path | ||
timestamp_field="event_timestamp", | ||
created_timestamp_column="created", | ||
) | ||
|
||
global_stats_feature_view = FeatureView( | ||
name="global_daily_stats", | ||
entities=[], | ||
ttl=timedelta(days=1), | ||
schema=[ | ||
Field(name="num_rides", dtype=Int32), | ||
Field(name="avg_ride_length", dtype=Float32), | ||
], | ||
online=True, | ||
source=global_daily_stats, | ||
tags={}, | ||
) | ||
|
||
all_stats_service = FeatureService( | ||
name="all_stats", | ||
features=[driver_hourly_stats_view, global_stats_feature_view], | ||
tags={"release": "production"}, | ||
) | ||
|
||
some_stats_service = FeatureService( | ||
name="some_stats", | ||
features=[ | ||
driver_hourly_stats_view[["conv_rate"]], | ||
global_stats_feature_view[["num_rides"]], | ||
], | ||
tags={"release": "production"}, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from datetime import timedelta | ||
|
||
from feast import Entity, FeatureService, FeatureView, FileSource | ||
|
||
driver_hourly_stats = FileSource( | ||
path="%PARQUET_PATH%", # placeholder to be replaced by the test | ||
timestamp_field="event_timestamp", | ||
created_timestamp_column="created", | ||
) | ||
|
||
driver = Entity( | ||
name="driver_id", | ||
) | ||
|
||
driver_hourly_stats_view = FeatureView( | ||
name="driver_hourly_stats", | ||
entities=[driver], | ||
ttl=timedelta(days=1), | ||
online=True, | ||
source=driver_hourly_stats, | ||
tags={}, | ||
) | ||
|
||
global_daily_stats = FileSource( | ||
path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test | ||
timestamp_field="event_timestamp", | ||
created_timestamp_column="created", | ||
) | ||
|
||
global_stats_feature_view = FeatureView( | ||
name="global_daily_stats", | ||
entities=[], | ||
ttl=timedelta(days=1), | ||
online=True, | ||
source=global_daily_stats, | ||
tags={}, | ||
) | ||
|
||
all_stats_service = FeatureService( | ||
name="all_stats", | ||
features=[driver_hourly_stats_view, global_stats_feature_view], | ||
tags={"release": "production"}, | ||
) | ||
|
||
some_stats_service = FeatureService( | ||
name="some_stats", | ||
features=[ | ||
driver_hourly_stats_view[["conv_rate"]], | ||
global_stats_feature_view[["num_rides"]], | ||
], | ||
tags={"release": "production"}, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import os | ||
import tempfile | ||
from datetime import datetime, timedelta | ||
|
||
from feast.driver_test_data import ( | ||
create_driver_hourly_stats_df, | ||
create_global_daily_stats_df, | ||
) | ||
from tests.utils.basic_read_write_test import basic_rw_test | ||
from tests.utils.cli_repo_creator import CliRunner, get_example_repo | ||
|
||
|
||
def test_apply_without_fv_inference() -> None: | ||
""" | ||
Tests that feature services based on feature views that do not require inference can be applied correctly. | ||
""" | ||
runner = CliRunner() | ||
with runner.local_repo( | ||
get_example_repo("example_feature_repo_with_feature_service_2.py"), "file" | ||
) as store: | ||
assert len(store.list_feature_services()) == 2 | ||
|
||
fs = store.get_feature_service("all_stats") | ||
assert len(fs.feature_view_projections) == 2 | ||
assert len(fs.feature_view_projections[0].features) == 3 | ||
assert len(fs.feature_view_projections[0].desired_features) == 0 | ||
assert len(fs.feature_view_projections[1].features) == 2 | ||
assert len(fs.feature_view_projections[1].desired_features) == 0 | ||
assert len(fs.tags) == 1 | ||
assert fs.tags["release"] == "production" | ||
|
||
fs = store.get_feature_service("some_stats") | ||
assert len(fs.feature_view_projections) == 2 | ||
assert len(fs.feature_view_projections[0].features) == 1 | ||
assert len(fs.feature_view_projections[0].desired_features) == 0 | ||
assert len(fs.feature_view_projections[0].features) == 1 | ||
assert len(fs.feature_view_projections[0].desired_features) == 0 | ||
|
||
|
||
def test_apply_with_fv_inference() -> None: | ||
""" | ||
Tests that feature services based on feature views that require inference can be applied correctly. | ||
""" | ||
runner = CliRunner() | ||
with tempfile.TemporaryDirectory() as data_dir: | ||
# Generate test data. | ||
end_date = datetime.now().replace(microsecond=0, second=0, minute=0) | ||
start_date = end_date - timedelta(days=15) | ||
|
||
driver_entities = [1001, 1002, 1003, 1004, 1005] | ||
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) | ||
driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") | ||
driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) | ||
|
||
global_df = create_global_daily_stats_df(start_date, end_date) | ||
global_stats_path = os.path.join(data_dir, "global_stats.parquet") | ||
global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True) | ||
|
||
with runner.local_repo( | ||
get_example_repo("example_feature_repo_with_feature_service_3.py") | ||
.replace("%PARQUET_PATH%", driver_stats_path) | ||
.replace("%PARQUET_PATH_GLOBAL%", global_stats_path), | ||
"file", | ||
) as store: | ||
assert len(store.list_feature_services()) == 2 | ||
|
||
fs = store.get_feature_service("all_stats") | ||
assert len(fs.feature_view_projections) == 2 | ||
assert len(fs.feature_view_projections[0].features) == 3 | ||
assert len(fs.feature_view_projections[0].desired_features) == 0 | ||
assert len(fs.feature_view_projections[1].features) == 2 | ||
assert len(fs.feature_view_projections[1].desired_features) == 0 | ||
assert len(fs.tags) == 1 | ||
assert fs.tags["release"] == "production" | ||
|
||
fs = store.get_feature_service("some_stats") | ||
assert len(fs.feature_view_projections) == 2 | ||
assert len(fs.feature_view_projections[0].features) == 1 | ||
assert len(fs.feature_view_projections[0].desired_features) == 0 | ||
assert len(fs.feature_view_projections[0].features) == 1 | ||
assert len(fs.feature_view_projections[0].desired_features) == 0 | ||
|
||
|
||
def test_read() -> None: | ||
""" | ||
Test that feature values are correctly read through a feature service. | ||
""" | ||
runner = CliRunner() | ||
with runner.local_repo( | ||
get_example_repo("example_feature_repo_with_feature_service.py"), "file" | ||
) as store: | ||
basic_rw_test( | ||
store, | ||
view_name="driver_locations", | ||
feature_service_name="driver_locations_service", | ||
) |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be easier to read if you have the cases within the blocks themselves, instead of all in one block before the if starts