Skip to content

Commit

Permalink
Proper fixes for consistency and to make sure entities aren't inferre…
Browse files Browse the repository at this point in the history
…d as features

Signed-off-by: David Y Liu <[email protected]>
  • Loading branch information
mavysavydav committed Nov 17, 2021
1 parent 7032559 commit 44a5a4f
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 124 deletions.
10 changes: 6 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from feast.inference import (
update_data_sources_with_inferred_event_timestamp_col,
update_entities_with_inferred_types_from_feature_views,
update_feature_views_with_inferred_features,
update_odfvs_with_inferred_features,
)
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.on_demand_feature_view import OnDemandFeatureView
Expand Down Expand Up @@ -479,11 +481,11 @@ def apply(
[view.batch_source for view in views_to_update], self.config
)

for view in views_to_update:
view.infer_features_from_batch_source(self.config)
update_feature_views_with_inferred_features(
views_to_update, entities_to_update, self.config
)

for odfv in odfvs_to_update:
odfv.infer_features()
update_odfvs_with_inferred_features(odfvs_to_update)

# Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity.
DUMMY_ENTITY = Entity(
Expand Down
69 changes: 0 additions & 69 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import re
import warnings
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Type, Union
Expand All @@ -22,7 +21,6 @@
from feast import utils
from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource
from feast.errors import RegistryInferenceFailure
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
Expand All @@ -35,7 +33,6 @@
from feast.protos.feast.core.FeatureView_pb2 import (
MaterializationInterval as MaterializationIntervalProto,
)
from feast.repo_config import RepoConfig
from feast.usage import log_exceptions
from feast.value_type import ValueType

Expand Down Expand Up @@ -406,69 +403,3 @@ def most_recent_end_time(self) -> Optional[datetime]:
if len(self.materialization_intervals) == 0:
return None
return max([interval[1] for interval in self.materialization_intervals])

def infer_features_from_batch_source(self, config: RepoConfig):
"""
Infers the set of features associated to this feature view from the input source.
Args:
config: Configuration object used to configure the feature store.
Raises:
RegistryInferenceFailure: The set of features could not be inferred.
"""
if not self.features:
columns_to_exclude = {
self.batch_source.event_timestamp_column,
self.batch_source.created_timestamp_column,
} | set(self.entities)

if (
self.batch_source.event_timestamp_column
in self.batch_source.field_mapping
):
columns_to_exclude.add(
self.batch_source.field_mapping[
self.batch_source.event_timestamp_column
]
)
if (
self.batch_source.created_timestamp_column
in self.batch_source.field_mapping
):
columns_to_exclude.add(
self.batch_source.field_mapping[
self.batch_source.created_timestamp_column
]
)
for e in self.entities:
if e in self.batch_source.field_mapping:
columns_to_exclude.add(self.batch_source.field_mapping[e])

for (
col_name,
col_datatype,
) in self.batch_source.get_table_column_names_and_types(config):
if col_name not in columns_to_exclude and not re.match(
"^__|__$",
col_name, # double underscores often signal an internal-use column
):
feature_name = (
self.batch_source.field_mapping[col_name]
if col_name in self.batch_source.field_mapping
else col_name
)
self.features.append(
Feature(
feature_name,
self.batch_source.source_datatype_to_feast_value_type()(
col_datatype
),
)
)

if not self.features:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {self.name}.",
)
119 changes: 118 additions & 1 deletion sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import re
from typing import List

from feast import BigQuerySource, Entity, FileSource, RedshiftSource
import pandas as pd

from feast import BigQuerySource, Entity, Feature, FileSource, RedshiftSource, errors
from feast.data_source import DataSource
from feast.errors import RegistryInferenceFailure
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import RepoConfig
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
)
from feast.value_type import ValueType


Expand Down Expand Up @@ -118,3 +125,113 @@ def update_data_sources_with_inferred_event_timestamp_col(
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
""",
)


def update_feature_views_with_inferred_features(
fvs: List[FeatureView], entities: List[Entity], config: RepoConfig
) -> None:
"""
Infers the set of features associated to each FeatureView and updates the FeatureView with those features.
Inference occurs through considering each column of the underlying data source as a feature except columns that are
associated with the data source's timestamp columns and the FeatureView's entity columns.
"""
entity_name_to_join_key_map = {entity.name: entity.join_key for entity in entities}

for fv in fvs:
if not fv.features:
columns_to_exclude = {
fv.batch_source.event_timestamp_column,
fv.batch_source.created_timestamp_column,
} | {
entity_name_to_join_key_map[entity_name] for entity_name in fv.entities
}

if fv.batch_source.event_timestamp_column in fv.batch_source.field_mapping:
columns_to_exclude.add(
fv.batch_source.field_mapping[
fv.batch_source.event_timestamp_column
]
)
if (
fv.batch_source.created_timestamp_column
in fv.batch_source.field_mapping
):
columns_to_exclude.add(
fv.batch_source.field_mapping[
fv.batch_source.created_timestamp_column
]
)

for (
col_name,
col_datatype,
) in fv.batch_source.get_table_column_names_and_types(config):
if col_name not in columns_to_exclude and not re.match(
"^__|__$",
col_name, # double underscores often signal an internal-use column
):
feature_name = (
fv.batch_source.field_mapping[col_name]
if col_name in fv.batch_source.field_mapping
else col_name
)
fv.features.append(
Feature(
feature_name,
fv.batch_source.source_datatype_to_feast_value_type()(
col_datatype
),
)
)

if not fv.features:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {fv.name}.",
)


def update_odfvs_with_inferred_features(odfvs: List[OnDemandFeatureView]) -> None:
"""
Infers the set of features associated to this feature view from the input source.
Raises:
RegistryInferenceFailure: The set of features could not be inferred.
"""
for odfv in odfvs:
df = pd.DataFrame()
for feature_view in odfv.input_feature_views.values():
for feature in feature_view.features:
dtype = feast_value_type_to_pandas_type(feature.dtype)
df[f"{feature_view.name}__{feature.name}"] = pd.Series(dtype=dtype)
df[f"{feature.name}"] = pd.Series(dtype=dtype)
for request_data in odfv.input_request_data_sources.values():
for feature_name, feature_type in request_data.schema.items():
dtype = feast_value_type_to_pandas_type(feature_type)
df[f"{feature_name}"] = pd.Series(dtype=dtype)
output_df: pd.DataFrame = odfv.udf.__call__(df)
inferred_features = []
for f, dt in zip(output_df.columns, output_df.dtypes):
inferred_features.append(
Feature(
name=f, dtype=python_type_to_feast_value_type(f, type_name=str(dt))
)
)

if odfv.features:
missing_features = []
for specified_features in odfv.features:
if specified_features not in inferred_features:
missing_features.append(specified_features)
if missing_features:
raise errors.SpecifiedFeaturesNotPresentError(
[f.name for f in missing_features], odfv.name
)
else:
odfv.features = inferred_features

if not odfv.features:
raise RegistryInferenceFailure(
"OnDemandFeatureView",
f"Could not infer Features for the feature view '{odfv.name}'.",
)
50 changes: 0 additions & 50 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
import dill
import pandas as pd

from feast import errors
from feast.base_feature_view import BaseFeatureView
from feast.data_source import RequestDataSource
from feast.errors import RegistryInferenceFailure
from feast.feature import Feature
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
Expand All @@ -23,10 +21,6 @@
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
)
from feast.usage import log_exceptions
from feast.value_type import ValueType

Expand Down Expand Up @@ -187,50 +181,6 @@ def get_transformed_features_df(
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
return df_with_transformed_features

def infer_features(self):
"""
Infers the set of features associated to this feature view from the input source.
Raises:
RegistryInferenceFailure: The set of features could not be inferred.
"""
df = pd.DataFrame()
for feature_view in self.input_feature_views.values():
for feature in feature_view.features:
dtype = feast_value_type_to_pandas_type(feature.dtype)
df[f"{feature_view.name}__{feature.name}"] = pd.Series(dtype=dtype)
df[f"{feature.name}"] = pd.Series(dtype=dtype)
for request_data in self.input_request_data_sources.values():
for feature_name, feature_type in request_data.schema.items():
dtype = feast_value_type_to_pandas_type(feature_type)
df[f"{feature_name}"] = pd.Series(dtype=dtype)
output_df: pd.DataFrame = self.udf.__call__(df)
inferred_features = []
for f, dt in zip(output_df.columns, output_df.dtypes):
inferred_features.append(
Feature(
name=f, dtype=python_type_to_feast_value_type(f, type_name=str(dt))
)
)

if self.features:
missing_features = []
for specified_features in self.features:
if specified_features not in inferred_features:
missing_features.append(specified_features)
if missing_features:
raise errors.SpecifiedFeaturesNotPresentError(
[f.name for f in missing_features], self.name
)
else:
self.features = inferred_features

if not self.features:
raise RegistryInferenceFailure(
"OnDemandFeatureView",
f"Could not infer Features for the feature view '{self.name}'.",
)

@staticmethod
def get_requested_odfvs(feature_refs, project, registry):
all_on_demand_feature_views = registry.list_on_demand_feature_views(
Expand Down

0 comments on commit 44a5a4f

Please sign in to comment.