Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix feature service inference logic #3089

Merged
42 changes: 34 additions & 8 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,55 @@ def __init__(
self.feature_view_projections.append(feature_grouping.projection)

def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None):
"""
Infers the features for the projections of this feature service, and updates this feature
service in place.

This method is necessary since feature services may rely on feature views which require
feature inference.
"""
for feature_grouping in self._features:
if isinstance(feature_grouping, BaseFeatureView):
# For feature services that depend on an unspecified feature view, apply inferred schema
projection = feature_grouping.projection

if fvs_to_update and feature_grouping.name in fvs_to_update:
if feature_grouping.projection.desired_features:
desired_features = set(
feature_grouping.projection.desired_features
)
# There are three situations to be handled. First, the projection specifies
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be easier to read if you have the cases within the blocks themselves, instead of all in one block before the if starts

# desired features, in which case we should select those desired features.
# Second, the projection does not specify any desired features but has
# already selected features, in which case nothing needs to be done. And
# third, the projection does not specify any desired features but has not
# yet selected features (since the original feature view did not yet have
# features), in which case we should select all possible inferred features.
if projection.desired_features:
# First case, so we select the specific desired features.
desired_features = set(projection.desired_features)
actual_features = set(
[
f.name
for f in fvs_to_update[feature_grouping.name].features
]
)
assert desired_features.issubset(actual_features)

# We need to set the features for the projection at this point so we ensure we're starting with
# an empty list.
feature_grouping.projection.features = []
projection.features = []
for f in fvs_to_update[feature_grouping.name].features:
if f.name in desired_features:
feature_grouping.projection.features.append(f)
projection.features.append(f)
elif not projection.desired_features and projection.features:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be cleaner imo if you had early returns instead of the elif and else.. and maybe some example definitions

e.g.

if projection.desired_features:
  # The projection wants to reference inferred features. Validate they exist
   # Example: FeatureService(features=[[fv_with_no_schema["feature]])
   ...
   return

if projection.features:
  # The projection only references features from a FV's known schema (not inferred). 
  # Example 1: FeatureService(features=[fv_with_schema[["feature"]])
  # Example 2: FeatureService(features=[fv_with_schema])
  return

# The projection wants all features from a FV that has an inferred schema
# Example: FeatureService(features=[fv_with_no_schema])
...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we can exit earlier than this. i.e in line 98 instead

# Second cass, so nothing needs to be done. In case something went wrong
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Second cass, so nothing needs to be done. In case something went wrong
# Second case, so nothing needs to be done. In case something went wrong

# during feature inference, we check that the selected features still exist.
actual_features = set(
[
f.name
for f in fvs_to_update[feature_grouping.name].features
]
)
assert projection.features.issubset(actual_features)
else:
feature_grouping.projection.features = fvs_to_update[
# Third case, so all inferred features will be selected.
projection.features = fvs_to_update[
feature_grouping.name
].features
else:
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class FeatureViewProjection:
name: The unique name of the feature view from which this projection is created.
name_alias: An optional alias for the name.
features: The list of features represented by the feature view projection.
desired_features: The list of features that this feature view projection intends to select.
If empty, the projection intends to select all features. This attribute is only used
for feature service inference. It should only be set if the underlying feature view
is not ready to be projected, i.e. still needs to go through feature inference.
join_key_map: A map to modify join key columns during retrieval of this feature
view projection.
"""
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from datetime import timedelta

from feast import Entity, FeatureService, FeatureView, Field, FileSource
from feast.types import Float32, Int32, Int64

driver_hourly_stats = FileSource(
path="data/driver_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver = Entity(
name="driver_id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="driver_id", dtype=Int32),
],
online=True,
source=driver_hourly_stats,
tags={},
)

global_daily_stats = FileSource(
path="data/global_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

global_stats_feature_view = FeatureView(
name="global_daily_stats",
entities=[],
ttl=timedelta(days=1),
schema=[
Field(name="num_rides", dtype=Int32),
Field(name="avg_ride_length", dtype=Float32),
],
online=True,
source=global_daily_stats,
tags={},
)

all_stats_service = FeatureService(
name="all_stats",
features=[driver_hourly_stats_view, global_stats_feature_view],
tags={"release": "production"},
)

some_stats_service = FeatureService(
name="some_stats",
features=[
driver_hourly_stats_view[["conv_rate"]],
global_stats_feature_view[["num_rides"]],
],
tags={"release": "production"},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import timedelta

from feast import Entity, FeatureService, FeatureView, FileSource

driver_hourly_stats = FileSource(
path="%PARQUET_PATH%", # placeholder to be replaced by the test
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver = Entity(
name="driver_id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
online=True,
source=driver_hourly_stats,
tags={},
)

global_daily_stats = FileSource(
path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

global_stats_feature_view = FeatureView(
name="global_daily_stats",
entities=[],
ttl=timedelta(days=1),
online=True,
source=global_daily_stats,
tags={},
)

all_stats_service = FeatureService(
name="all_stats",
features=[driver_hourly_stats_view, global_stats_feature_view],
tags={"release": "production"},
)

some_stats_service = FeatureService(
name="some_stats",
features=[
driver_hourly_stats_view[["conv_rate"]],
global_stats_feature_view[["num_rides"]],
],
tags={"release": "production"},
)
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
60 changes: 60 additions & 0 deletions sdk/python/tests/unit/infra/test_inference_unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ def test_feature_view_inference_on_feature_columns(simple_dataset_1):


def test_update_feature_services_with_inferred_features(simple_dataset_1):
"""
Tests that a feature service that references feature views without specified features will
be updated with the correct projections after feature inference.
"""
with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source:
entity1 = Entity(name="test1", join_keys=["id_join_key"])
feature_view_1 = FeatureView(
Expand Down Expand Up @@ -338,4 +342,60 @@ def test_update_feature_services_with_inferred_features(simple_dataset_1):
assert len(feature_service.feature_view_projections[1].features) == 3


def test_update_feature_services_with_specified_features(simple_dataset_1):
"""
Tests that a feature service that references feature views with specified features will
have the correct projections both before and after feature inference.
"""
with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source:
entity1 = Entity(name="test1", join_keys=["id_join_key"])
feature_view_1 = FeatureView(
name="test1",
entities=[entity1],
schema=[
Field(name="float_col", dtype=Float32),
Field(name="id_join_key", dtype=Int64),
],
source=file_source,
)
feature_view_2 = FeatureView(
name="test2",
entities=[entity1],
schema=[
Field(name="int64_col", dtype=Int64),
Field(name="id_join_key", dtype=Int64),
],
source=file_source,
)

feature_service = FeatureService(
name="fs_1", features=[feature_view_1[["float_col"]], feature_view_2]
)
assert len(feature_service.feature_view_projections) == 2
assert len(feature_service.feature_view_projections[0].features) == 1
assert len(feature_service.feature_view_projections[0].desired_features) == 0
assert len(feature_service.feature_view_projections[1].features) == 1
assert len(feature_service.feature_view_projections[1].desired_features) == 0

update_feature_views_with_inferred_features_and_entities(
[feature_view_1, feature_view_2],
[entity1],
RepoConfig(
provider="local", project="test", entity_key_serialization_version=2
),
)
assert len(feature_view_1.features) == 1
assert len(feature_view_2.features) == 1

feature_service.infer_features(
fvs_to_update={
feature_view_1.name: feature_view_1,
feature_view_2.name: feature_view_2,
}
)

assert len(feature_service.feature_view_projections[0].features) == 1
assert len(feature_service.feature_view_projections[1].features) == 1


# TODO(felixwang9817): Add tests that interact with field mapping.
Empty file.
96 changes: 96 additions & 0 deletions sdk/python/tests/unit/local_feast_tests/test_feature_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import os
import tempfile
from datetime import datetime, timedelta

from feast.driver_test_data import (
create_driver_hourly_stats_df,
create_global_daily_stats_df,
)
from tests.utils.basic_read_write_test import basic_rw_test
from tests.utils.cli_repo_creator import CliRunner, get_example_repo


def test_apply_without_fv_inference() -> None:
"""
Tests that feature services based on feature views that do not require inference can be applied correctly.
"""
runner = CliRunner()
with runner.local_repo(
get_example_repo("example_feature_repo_with_feature_service_2.py"), "file"
) as store:
assert len(store.list_feature_services()) == 2

fs = store.get_feature_service("all_stats")
assert len(fs.feature_view_projections) == 2
assert len(fs.feature_view_projections[0].features) == 3
assert len(fs.feature_view_projections[0].desired_features) == 0
assert len(fs.feature_view_projections[1].features) == 2
assert len(fs.feature_view_projections[1].desired_features) == 0
assert len(fs.tags) == 1
assert fs.tags["release"] == "production"

fs = store.get_feature_service("some_stats")
assert len(fs.feature_view_projections) == 2
assert len(fs.feature_view_projections[0].features) == 1
assert len(fs.feature_view_projections[0].desired_features) == 0
assert len(fs.feature_view_projections[0].features) == 1
assert len(fs.feature_view_projections[0].desired_features) == 0


def test_apply_with_fv_inference() -> None:
"""
Tests that feature services based on feature views that require inference can be applied correctly.
"""
runner = CliRunner()
with tempfile.TemporaryDirectory() as data_dir:
# Generate test data.
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)
driver_stats_path = os.path.join(data_dir, "driver_stats.parquet")
driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True)

global_df = create_global_daily_stats_df(start_date, end_date)
global_stats_path = os.path.join(data_dir, "global_stats.parquet")
global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True)

with runner.local_repo(
get_example_repo("example_feature_repo_with_feature_service_3.py")
.replace("%PARQUET_PATH%", driver_stats_path)
.replace("%PARQUET_PATH_GLOBAL%", global_stats_path),
"file",
) as store:
assert len(store.list_feature_services()) == 2

fs = store.get_feature_service("all_stats")
assert len(fs.feature_view_projections) == 2
assert len(fs.feature_view_projections[0].features) == 3
assert len(fs.feature_view_projections[0].desired_features) == 0
assert len(fs.feature_view_projections[1].features) == 2
assert len(fs.feature_view_projections[1].desired_features) == 0
assert len(fs.tags) == 1
assert fs.tags["release"] == "production"

fs = store.get_feature_service("some_stats")
assert len(fs.feature_view_projections) == 2
assert len(fs.feature_view_projections[0].features) == 1
assert len(fs.feature_view_projections[0].desired_features) == 0
assert len(fs.feature_view_projections[0].features) == 1
assert len(fs.feature_view_projections[0].desired_features) == 0


def test_read() -> None:
"""
Test that feature values are correctly read through a feature service.
"""
runner = CliRunner()
with runner.local_repo(
get_example_repo("example_feature_repo_with_feature_service.py"), "file"
) as store:
basic_rw_test(
store,
view_name="driver_locations",
feature_service_name="driver_locations_service",
)

This file was deleted.

Loading