From fee40e8605d0934d31cad6aaa9bf48d553619e4c Mon Sep 17 00:00:00 2001 From: Tornike Gurgenidze Date: Tue, 2 Jul 2024 01:05:44 +0400 Subject: [PATCH] feat: Move get_online_features to OnlineStore interface (#4319) * move get_online_features to OnlineStore interface Signed-off-by: tokoko * fix pydantic warnings Signed-off-by: tokoko * run ruff format Signed-off-by: tokoko --------- Signed-off-by: tokoko --- sdk/python/feast/feature_store.py | 185 +----------------- .../kubernetes/k8s_materialization_engine.py | 4 +- .../feast/infra/online_stores/online_store.py | 182 ++++++++++++++++- .../feast/infra/passthrough_provider.py | 46 ++++- sdk/python/feast/infra/provider.py | 34 +++- sdk/python/tests/foo_provider.py | 32 ++- .../universal/data_sources/file.py | 2 +- 7 files changed, 303 insertions(+), 182 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index b7e4ef619f..6476af5ac8 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1559,75 +1559,16 @@ def get_online_features( ... ) >>> online_response_dict = online_response.to_dict() """ - if isinstance(entity_rows, list): - columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()} - for entity_row in entity_rows: - for key, value in entity_row.items(): - try: - columnar[key].append(value) - except KeyError as e: - raise ValueError( - "All entity_rows must have the same keys." - ) from e - - entity_rows = columnar + provider = self._get_provider() - ( - join_key_values, - grouped_refs, - entity_name_to_join_key_map, - requested_on_demand_feature_views, - feature_refs, - requested_result_row_names, - online_features_response, - ) = utils._prepare_entities_to_read_from_online_store( + return provider.get_online_features( + config=self.config, + features=features, + entity_rows=entity_rows, registry=self._registry, project=self.project, - features=features, - entity_values=entity_rows, full_feature_names=full_feature_names, - native_entity_values=True, - ) - - provider = self._get_provider() - for table, requested_features in grouped_refs: - # Get the correct set of entity values with the correct join keys. - table_entity_values, idxs = utils._get_unique_entities( - table, - join_key_values, - entity_name_to_join_key_map, - ) - - # Fetch feature data for the minimum set of Entities. - feature_data = self._read_from_online_store( - table_entity_values, - provider, - requested_features, - table, - ) - - # Populate the result_rows with the Features from the OnlineStore inplace. - utils._populate_response_from_feature_data( - feature_data, - idxs, - online_features_response, - full_feature_names, - requested_features, - table, - ) - - if requested_on_demand_feature_views: - utils._augment_response_with_on_demand_transforms( - online_features_response, - feature_refs, - requested_on_demand_feature_views, - full_feature_names, - ) - - utils._drop_unneeded_columns( - online_features_response, requested_result_row_names ) - return OnlineResponse(online_features_response) async def get_online_features_async( self, @@ -1664,75 +1605,16 @@ async def get_online_features_async( Raises: Exception: No entity with the specified name exists. """ - if isinstance(entity_rows, list): - columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()} - for entity_row in entity_rows: - for key, value in entity_row.items(): - try: - columnar[key].append(value) - except KeyError as e: - raise ValueError( - "All entity_rows must have the same keys." - ) from e - - entity_rows = columnar + provider = self._get_provider() - ( - join_key_values, - grouped_refs, - entity_name_to_join_key_map, - requested_on_demand_feature_views, - feature_refs, - requested_result_row_names, - online_features_response, - ) = utils._prepare_entities_to_read_from_online_store( + return await provider.get_online_features_async( + config=self.config, + features=features, + entity_rows=entity_rows, registry=self._registry, project=self.project, - features=features, - entity_values=entity_rows, full_feature_names=full_feature_names, - native_entity_values=True, - ) - - provider = self._get_provider() - for table, requested_features in grouped_refs: - # Get the correct set of entity values with the correct join keys. - table_entity_values, idxs = utils._get_unique_entities( - table, - join_key_values, - entity_name_to_join_key_map, - ) - - # Fetch feature data for the minimum set of Entities. - feature_data = await self._read_from_online_store_async( - table_entity_values, - provider, - requested_features, - table, - ) - - # Populate the result_rows with the Features from the OnlineStore inplace. - utils._populate_response_from_feature_data( - feature_data, - idxs, - online_features_response, - full_feature_names, - requested_features, - table, - ) - - if requested_on_demand_feature_views: - utils._augment_response_with_on_demand_transforms( - online_features_response, - feature_refs, - requested_on_demand_feature_views, - full_feature_names, - ) - - utils._drop_unneeded_columns( - online_features_response, requested_result_row_names ) - return OnlineResponse(online_features_response) def retrieve_online_documents( self, @@ -1806,53 +1688,6 @@ def retrieve_online_documents( ) return OnlineResponse(online_features_response) - def _read_from_online_store( - self, - entity_rows: Iterable[Mapping[str, Value]], - provider: Provider, - requested_features: List[str], - table: FeatureView, - ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: - """Read and process data from the OnlineStore for a given FeatureView. - - This method guarantees that the order of the data in each element of the - List returned is the same as the order of `requested_features`. - - This method assumes that `provider.online_read` returns data for each - combination of Entities in `entity_rows` in the same order as they - are provided. - """ - entity_key_protos = utils._get_entity_key_protos(entity_rows) - - # Fetch data for Entities. - read_rows = provider.online_read( - config=self.config, - table=table, - entity_keys=entity_key_protos, - requested_features=requested_features, - ) - - return utils._convert_rows_to_protobuf(requested_features, read_rows) - - async def _read_from_online_store_async( - self, - entity_rows: Iterable[Mapping[str, Value]], - provider: Provider, - requested_features: List[str], - table: FeatureView, - ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: - entity_key_protos = utils._get_entity_key_protos(entity_rows) - - # Fetch data for Entities. - read_rows = await provider.online_read_async( - config=self.config, - table=table, - entity_keys=entity_key_protos, - requested_features=requested_features, - ) - - return utils._convert_rows_to_protobuf(requested_features, read_rows) - def _retrieve_from_online_store( self, provider: Provider, diff --git a/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_engine.py b/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_engine.py index 2e7129b037..510b6b4e4c 100644 --- a/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/kubernetes/k8s_materialization_engine.py @@ -306,7 +306,9 @@ def _create_kubernetes_job(self, job_id, paths, feature_view): def _create_configuration_map(self, job_id, paths, feature_view, namespace): """Create a Kubernetes configmap for this job""" - feature_store_configuration = yaml.dump(self.repo_config.dict(by_alias=True)) + feature_store_configuration = yaml.dump( + self.repo_config.model_dump(by_alias=True) + ) materialization_config = yaml.dump( {"paths": paths, "feature_view": feature_view.name} diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 05983a494c..9cf2ef95f6 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -14,13 +14,17 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple +from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union -from feast import Entity +from feast import Entity, utils +from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import InfraObject +from feast.infra.registry.base_registry import BaseRegistry +from feast.online_response import OnlineResponse from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import RepeatedValue from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RepoConfig @@ -105,6 +109,180 @@ async def online_read_async( f"Online store {self.__class__.__name__} does not support online read async" ) + def get_online_features( + self, + config: RepoConfig, + features: Union[List[str], FeatureService], + entity_rows: Union[ + List[Dict[str, Any]], + Mapping[str, Union[Sequence[Any], Sequence[ValueProto], RepeatedValue]], + ], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> OnlineResponse: + if isinstance(entity_rows, list): + columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()} + for entity_row in entity_rows: + for key, value in entity_row.items(): + try: + columnar[key].append(value) + except KeyError as e: + raise ValueError( + "All entity_rows must have the same keys." + ) from e + + entity_rows = columnar + + ( + join_key_values, + grouped_refs, + entity_name_to_join_key_map, + requested_on_demand_feature_views, + feature_refs, + requested_result_row_names, + online_features_response, + ) = utils._prepare_entities_to_read_from_online_store( + registry=registry, + project=project, + features=features, + entity_values=entity_rows, + full_feature_names=full_feature_names, + native_entity_values=True, + ) + + for table, requested_features in grouped_refs: + # Get the correct set of entity values with the correct join keys. + table_entity_values, idxs = utils._get_unique_entities( + table, + join_key_values, + entity_name_to_join_key_map, + ) + + entity_key_protos = utils._get_entity_key_protos(table_entity_values) + + # Fetch data for Entities. + read_rows = self.online_read( + config=config, + table=table, + entity_keys=entity_key_protos, + requested_features=requested_features, + ) + + feature_data = utils._convert_rows_to_protobuf( + requested_features, read_rows + ) + + # Populate the result_rows with the Features from the OnlineStore inplace. + utils._populate_response_from_feature_data( + feature_data, + idxs, + online_features_response, + full_feature_names, + requested_features, + table, + ) + + if requested_on_demand_feature_views: + utils._augment_response_with_on_demand_transforms( + online_features_response, + feature_refs, + requested_on_demand_feature_views, + full_feature_names, + ) + + utils._drop_unneeded_columns( + online_features_response, requested_result_row_names + ) + return OnlineResponse(online_features_response) + + async def get_online_features_async( + self, + config: RepoConfig, + features: Union[List[str], FeatureService], + entity_rows: Union[ + List[Dict[str, Any]], + Mapping[str, Union[Sequence[Any], Sequence[ValueProto], RepeatedValue]], + ], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> OnlineResponse: + if isinstance(entity_rows, list): + columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()} + for entity_row in entity_rows: + for key, value in entity_row.items(): + try: + columnar[key].append(value) + except KeyError as e: + raise ValueError( + "All entity_rows must have the same keys." + ) from e + + entity_rows = columnar + + ( + join_key_values, + grouped_refs, + entity_name_to_join_key_map, + requested_on_demand_feature_views, + feature_refs, + requested_result_row_names, + online_features_response, + ) = utils._prepare_entities_to_read_from_online_store( + registry=registry, + project=project, + features=features, + entity_values=entity_rows, + full_feature_names=full_feature_names, + native_entity_values=True, + ) + + for table, requested_features in grouped_refs: + # Get the correct set of entity values with the correct join keys. + table_entity_values, idxs = utils._get_unique_entities( + table, + join_key_values, + entity_name_to_join_key_map, + ) + + entity_key_protos = utils._get_entity_key_protos(table_entity_values) + + # Fetch data for Entities. + read_rows = await self.online_read_async( + config=config, + table=table, + entity_keys=entity_key_protos, + requested_features=requested_features, + ) + + feature_data = utils._convert_rows_to_protobuf( + requested_features, read_rows + ) + + # Populate the result_rows with the Features from the OnlineStore inplace. + utils._populate_response_from_feature_data( + feature_data, + idxs, + online_features_response, + full_feature_names, + requested_features, + table, + ) + + if requested_on_demand_feature_views: + utils._augment_response_with_on_demand_transforms( + online_features_response, + feature_refs, + requested_on_demand_feature_views, + full_feature_names, + ) + + utils._drop_unneeded_columns( + online_features_response, requested_result_row_names + ) + return OnlineResponse(online_features_response) + @abstractmethod def update( self, diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index bad6f86cc6..c3c3048a89 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union import pandas as pd import pyarrow as pa @@ -23,8 +23,10 @@ from feast.infra.online_stores.helpers import get_online_store_from_config from feast.infra.provider import Provider from feast.infra.registry.base_registry import BaseRegistry +from feast.online_response import OnlineResponse from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import RepeatedValue from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import BATCH_ENGINE_CLASS_FOR_TYPE, RepoConfig from feast.saved_dataset import SavedDataset @@ -193,6 +195,48 @@ def online_read( ) return result + def get_online_features( + self, + config: RepoConfig, + features: Union[List[str], FeatureService], + entity_rows: Union[ + List[Dict[str, Any]], + Mapping[str, Union[Sequence[Any], Sequence[ValueProto], RepeatedValue]], + ], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> OnlineResponse: + return self.online_store.get_online_features( + config=config, + features=features, + entity_rows=entity_rows, + registry=registry, + project=project, + full_feature_names=full_feature_names, + ) + + async def get_online_features_async( + self, + config: RepoConfig, + features: Union[List[str], FeatureService], + entity_rows: Union[ + List[Dict[str, Any]], + Mapping[str, Union[Sequence[Any], Sequence[ValueProto], RepeatedValue]], + ], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> OnlineResponse: + return await self.online_store.get_online_features_async( + config=config, + features=features, + entity_rows=entity_rows, + registry=registry, + project=project, + full_feature_names=full_feature_names, + ) + async def online_read_async( self, config: RepoConfig, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 75afd6bba8..9940af1d02 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union import pandas as pd import pyarrow @@ -15,8 +15,10 @@ from feast.infra.infra_object import Infra from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.registry.base_registry import BaseRegistry +from feast.online_response import OnlineResponse from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import RepeatedValue from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDataset @@ -230,6 +232,36 @@ def online_read( """ pass + @abstractmethod + def get_online_features( + self, + config: RepoConfig, + features: Union[List[str], FeatureService], + entity_rows: Union[ + List[Dict[str, Any]], + Mapping[str, Union[Sequence[Any], Sequence[ValueProto], RepeatedValue]], + ], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> OnlineResponse: + pass + + @abstractmethod + async def get_online_features_async( + self, + config: RepoConfig, + features: Union[List[str], FeatureService], + entity_rows: Union[ + List[Dict[str, Any]], + Mapping[str, Union[Sequence[Any], Sequence[ValueProto], RepeatedValue]], + ], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> OnlineResponse: + pass + @abstractmethod async def online_read_async( self, diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index bd1e247a7b..8e8f54db24 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -1,6 +1,6 @@ from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union import pandas import pyarrow @@ -11,7 +11,9 @@ from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.provider import Provider from feast.infra.registry.base_registry import BaseRegistry +from feast.online_response import OnlineResponse from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import RepeatedValue from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.saved_dataset import SavedDataset @@ -138,3 +140,31 @@ def validate_data_source( data_source: DataSource, ): pass + + def get_online_features( + self, + config: RepoConfig, + features: Union[List[str], FeatureService], + entity_rows: Union[ + List[Dict[str, Any]], + Mapping[str, Union[Sequence[Any], Sequence[ValueProto], RepeatedValue]], + ], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> OnlineResponse: + pass + + async def get_online_features_async( + self, + config: RepoConfig, + features: Union[List[str], FeatureService], + entity_rows: Union[ + List[Dict[str, Any]], + Mapping[str, Union[Sequence[Any], Sequence[ValueProto], RepeatedValue]], + ], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> OnlineResponse: + pass diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index f7ab55d868..4a4a7360d8 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -381,7 +381,7 @@ def setup(self, registry: RegistryConfig): repo_path = Path(tempfile.mkdtemp()) with open(repo_path / "feature_store.yaml", "w") as outfile: - yaml.dump(config.dict(by_alias=True), outfile) + yaml.dump(config.model_dump(by_alias=True), outfile) repo_path = str(repo_path.resolve()) self.server_port = free_port()