From aaaac39f920d83976a9bc585bc2a181cfb3de6cd Mon Sep 17 00:00:00 2001 From: Breno Costa Date: Thu, 2 May 2024 21:43:30 +0200 Subject: [PATCH 1/4] feat: Adding get_online_features_async to feature store sdk Signed-off-by: Breno Costa --- sdk/python/feast/feature_store.py | 233 ++++++++++++++++-- .../feast/infra/online_stores/online_store.py | 25 ++ sdk/python/feast/infra/online_stores/redis.py | 132 ++++++++-- .../feast/infra/passthrough_provider.py | 16 ++ sdk/python/feast/infra/provider.py | 24 ++ sdk/python/tests/foo_provider.py | 9 + .../unit/infra/online_store/test_redis.py | 75 ++++++ 7 files changed, 462 insertions(+), 52 deletions(-) create mode 100644 sdk/python/tests/unit/infra/online_store/test_redis.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f45dbb1bc8..270ea45a26 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1550,6 +1550,54 @@ def get_online_features( native_entity_values=True, ) + @log_exceptions_and_usage + async def get_online_features_async( + self, + features: Union[List[str], FeatureService], + entity_rows: List[Dict[str, Any]], + full_feature_names: bool = False, + ) -> OnlineResponse: + """ + [Alpha] Retrieves the latest online feature data asynchronously. + + Note: This method will download the full feature registry the first time it is run. If you are using a + remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL + duration (which can be set to infinity). If the cached registry is stale (more time than the TTL has + passed), then a new registry will be downloaded synchronously by this method. This download may + introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call + refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to + infinity (cache forever). + + Args: + features: The list of features that should be retrieved from the online store. These features can be + specified either as a list of string feature references or as a feature service. String feature + references must have format "feature_view:feature", e.g. "customer_fv:daily_transactions". + entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair. + full_feature_names: If True, feature names will be prefixed with the corresponding feature view name, + changing them from the format "feature" to "feature_view__feature" (e.g. "daily_transactions" + changes to "customer_fv__daily_transactions"). + + Returns: + OnlineResponse containing the feature data in records. + + Raises: + Exception: No entity with the specified name exists. + """ + columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()} + for entity_row in entity_rows: + for key, value in entity_row.items(): + try: + columnar[key].append(value) + except KeyError as e: + raise ValueError("All entity_rows must have the same keys.") from e + + return await self._get_online_features_async( + features=features, + entity_values=columnar, + full_feature_names=full_feature_names, + native_entity_values=True, + ) + def _get_online_request_context( self, features: Union[List[str], FeatureService], full_feature_names: bool ): @@ -1609,7 +1657,7 @@ def _get_online_request_context( entityless_case, ) - def _get_online_features( + def _prepare_entities_to_read_from_online_store( self, features: Union[List[str], FeatureService], entity_values: Mapping[ @@ -1619,7 +1667,7 @@ def _get_online_features( native_entity_values: bool = True, ): ( - _feature_refs, + feature_refs, requested_on_demand_feature_views, entity_name_to_join_key_map, entity_type_map, @@ -1694,6 +1742,40 @@ def _get_online_features( [DUMMY_ENTITY_VAL] * num_rows, DUMMY_ENTITY.value_type ) + return ( + join_key_values, + grouped_refs, + entity_name_to_join_key_map, + requested_on_demand_feature_views, + feature_refs, + requested_result_row_names, + online_features_response, + ) + + def _get_online_features( + self, + features: Union[List[str], FeatureService], + entity_values: Mapping[ + str, Union[Sequence[Any], Sequence[Value], RepeatedValue] + ], + full_feature_names: bool = False, + native_entity_values: bool = True, + ): + ( + join_key_values, + grouped_refs, + entity_name_to_join_key_map, + requested_on_demand_feature_views, + feature_refs, + requested_result_row_names, + online_features_response, + ) = self._prepare_entities_to_read_from_online_store( + features=features, + entity_values=entity_values, + full_feature_names=full_feature_names, + native_entity_values=native_entity_values, + ) + provider = self._get_provider() for table, requested_features in grouped_refs: # Get the correct set of entity values with the correct join keys. @@ -1724,7 +1806,71 @@ def _get_online_features( if requested_on_demand_feature_views: self._augment_response_with_on_demand_transforms( online_features_response, - _feature_refs, + feature_refs, + requested_on_demand_feature_views, + full_feature_names, + ) + + self._drop_unneeded_columns( + online_features_response, requested_result_row_names + ) + return OnlineResponse(online_features_response) + + async def _get_online_features_async( + self, + features: Union[List[str], FeatureService], + entity_values: Mapping[ + str, Union[Sequence[Any], Sequence[Value], RepeatedValue] + ], + full_feature_names: bool = False, + native_entity_values: bool = True, + ): + ( + join_key_values, + grouped_refs, + entity_name_to_join_key_map, + requested_on_demand_feature_views, + feature_refs, + requested_result_row_names, + online_features_response, + ) = self._prepare_entities_to_read_from_online_store( + features=features, + entity_values=entity_values, + full_feature_names=full_feature_names, + native_entity_values=native_entity_values, + ) + + provider = self._get_provider() + for table, requested_features in grouped_refs: + # Get the correct set of entity values with the correct join keys. + table_entity_values, idxs = self._get_unique_entities( + table, + join_key_values, + entity_name_to_join_key_map, + ) + + # Fetch feature data for the minimum set of Entities. + feature_data = await self._read_from_online_store_async( + table_entity_values, + provider, + requested_features, + table, + ) + + # Populate the result_rows with the Features from the OnlineStore inplace. + self._populate_response_from_feature_data( + feature_data, + idxs, + online_features_response, + full_feature_names, + requested_features, + table, + ) + + if requested_on_demand_feature_views: + self._augment_response_with_on_demand_transforms( + online_features_response, + feature_refs, requested_on_demand_feature_views, full_feature_names, ) @@ -1965,38 +2111,24 @@ def _get_unique_entities( ) return unique_entities, indexes - def _read_from_online_store( + def _get_entity_key_protos( self, entity_rows: Iterable[Mapping[str, Value]], - provider: Provider, - requested_features: List[str], - table: FeatureView, - ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: - """Read and process data from the OnlineStore for a given FeatureView. - - This method guarantees that the order of the data in each element of the - List returned is the same as the order of `requested_features`. - - This method assumes that `provider.online_read` returns data for each - combination of Entities in `entity_rows` in the same order as they - are provided. - """ + ) -> List[EntityKeyProto]: # Instantiate one EntityKeyProto per Entity. entity_key_protos = [ EntityKeyProto(join_keys=row.keys(), entity_values=row.values()) for row in entity_rows ] + return entity_key_protos - # Fetch data for Entities. - read_rows = provider.online_read( - config=self.config, - table=table, - entity_keys=entity_key_protos, - requested_features=requested_features, - ) - - # Each row is a set of features for a given entity key. We only need to convert - # the data to Protobuf once. + def _convert_rows_to_protobuf( + self, + requested_features: List[str], + read_rows: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]], + ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: + # Each row is a set of features for a given entity key. + # We only need to convert the data to Protobuf once. null_value = Value() read_row_protos = [] for read_row in read_rows: @@ -2023,6 +2155,53 @@ def _read_from_online_store( read_row_protos.append((event_timestamps, statuses, values)) return read_row_protos + def _read_from_online_store( + self, + entity_rows: Iterable[Mapping[str, Value]], + provider: Provider, + requested_features: List[str], + table: FeatureView, + ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: + """Read and process data from the OnlineStore for a given FeatureView. + + This method guarantees that the order of the data in each element of the + List returned is the same as the order of `requested_features`. + + This method assumes that `provider.online_read` returns data for each + combination of Entities in `entity_rows` in the same order as they + are provided. + """ + entity_key_protos = self._get_entity_key_protos(entity_rows) + + # Fetch data for Entities. + read_rows = provider.online_read( + config=self.config, + table=table, + entity_keys=entity_key_protos, + requested_features=requested_features, + ) + + return self._convert_rows_to_protobuf(requested_features, read_rows) + + async def _read_from_online_store_async( + self, + entity_rows: Iterable[Mapping[str, Value]], + provider: Provider, + requested_features: List[str], + table: FeatureView, + ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: + entity_key_protos = self._get_entity_key_protos(entity_rows) + + # Fetch data for Entities. + read_rows = await provider.online_read_async( + config=self.config, + table=table, + entity_keys=entity_key_protos, + requested_features=requested_features, + ) + + return self._convert_rows_to_protobuf(requested_features, read_rows) + def _retrieve_from_online_store( self, provider: Provider, diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 2a81e37042..7dd03a8417 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -80,6 +80,31 @@ def online_read( """ pass + async def online_read_async( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Reads features values for the given entity keys asynchronously. + + Args: + config: The config for the current feature store. + table: The feature view whose feature values should be read. + entity_keys: The list of entity keys for which feature values should be read. + requested_features: The list of features that should be read. + + Returns: + A list of the same length as entity_keys. Each item in the list is a tuple where the first + item is the event timestamp for the row, and the second item is a dict mapping feature names + to values, which are returned in proto format. + """ + raise NotImplementedError( + f"Online store {self.__class__.__name__} does not support online read async" + ) + @abstractmethod def update( self, diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 6f6c2fb45c..181588d6f1 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -42,6 +42,7 @@ try: from redis import Redis + from redis import asyncio as redis_asyncio from redis.cluster import ClusterNode, RedisCluster from redis.sentinel import Sentinel except ImportError as e: @@ -90,6 +91,9 @@ class RedisOnlineStore(OnlineStore): """ _client: Optional[Union[Redis, RedisCluster]] = None + _client_async: Optional[Union[redis_asyncio.Redis, redis_asyncio.RedisCluster]] = ( + None + ) def delete_entity_values(self, config: RepoConfig, join_keys: List[str]): client = self._get_client(config.online_store) @@ -234,6 +238,30 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig): self._client = Redis(**kwargs) return self._client + async def _get_client_async(self, online_store_config: RedisOnlineStoreConfig): + if not self._client_async: + startup_nodes, kwargs = self._parse_connection_string( + online_store_config.connection_string + ) + if online_store_config.redis_type == RedisType.redis_cluster: + kwargs["startup_nodes"] = [ + redis_asyncio.cluster.ClusterNode(**node) for node in startup_nodes + ] + self._client_async = redis_asyncio.RedisCluster(**kwargs) + elif online_store_config.redis_type == RedisType.redis_sentinel: + sentinel_hosts = [] + for item in startup_nodes: + sentinel_hosts.append((item["host"], int(item["port"]))) + + sentinel = redis_asyncio.Sentinel(sentinel_hosts, **kwargs) + master = sentinel.master_for(online_store_config.sentinel_master) + self._client_async = master + else: + kwargs["host"] = startup_nodes[0]["host"] + kwargs["port"] = startup_nodes[0]["port"] + self._client_async = redis_asyncio.Redis(**kwargs) + return self._client_async + @log_exceptions_and_usage(online_store="redis") def online_write_batch( self, @@ -304,6 +332,48 @@ def online_write_batch( if progress: progress(len(results)) + def _generate_entity_redis_keys( + self, config: RepoConfig, entity_keys: List[EntityKeyProto] + ) -> List[bytes]: + keys = [] + for entity_key in entity_keys: + redis_key_bin = _redis_key( + config.project, + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + keys.append(redis_key_bin) + return keys + + def _generate_feature_hset_keys( + self, + feature_view: FeatureView, + requested_features: Optional[List[str]] = None, + ) -> Tuple[List[str], List[str]]: + if not requested_features: + requested_features = [f.name for f in feature_view.features] + + ts_key = f"_ts:{feature_view.name}" + requested_features.append(ts_key) + + hset_keys = [_mmh3(f"{feature_view.name}:{k}") for k in requested_features] + hset_keys.append(ts_key) + return requested_features, hset_keys + + def _convert_redis_values_to_protobuf( + self, + redis_values: List[List[ByteString]], + feature_view: str, + requested_features: List[str], + ): + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for values in redis_values: + features = self._get_features_for_entity( + values, feature_view, requested_features + ) + result.append(features) + return result + @log_exceptions_and_usage(online_store="redis") def online_read( self, @@ -316,39 +386,51 @@ def online_read( assert isinstance(online_store_config, RedisOnlineStoreConfig) client = self._get_client(online_store_config) - feature_view = table.name - project = config.project + feature_view = table - result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + requested_features, hset_keys = self._generate_feature_hset_keys( + feature_view, requested_features + ) + keys = self._generate_entity_redis_keys(config, entity_keys) - if not requested_features: - requested_features = [f.name for f in table.features] + with client.pipeline(transaction=False) as pipe: + for redis_key_bin in keys: + pipe.hmget(redis_key_bin, hset_keys) + with tracing_span(name="remote_call"): + redis_values = pipe.execute() - hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features] + return self._convert_redis_values_to_protobuf( + redis_values, feature_view.name, requested_features + ) - ts_key = f"_ts:{feature_view}" - hset_keys.append(ts_key) - requested_features.append(ts_key) + @log_exceptions_and_usage(online_store="redis") + async def online_read_async( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + online_store_config = config.online_store + assert isinstance(online_store_config, RedisOnlineStoreConfig) - keys = [] - for entity_key in entity_keys: - redis_key_bin = _redis_key( - project, - entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ) - keys.append(redis_key_bin) - with client.pipeline(transaction=False) as pipe: + client = await self._get_client_async(online_store_config) + feature_view = table + + requested_features, hset_keys = self._generate_feature_hset_keys( + feature_view, requested_features + ) + keys = self._generate_entity_redis_keys(config, entity_keys) + + async with client.pipeline(transaction=False) as pipe: for redis_key_bin in keys: pipe.hmget(redis_key_bin, hset_keys) with tracing_span(name="remote_call"): - redis_values = pipe.execute() - for values in redis_values: - features = self._get_features_for_entity( - values, feature_view, requested_features - ) - result.append(features) - return result + redis_values = await pipe.execute() + + return self._convert_redis_values_to_protobuf( + redis_values, feature_view.name, requested_features + ) def _get_features_for_entity( self, diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 2f3e30018a..97c2820d41 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -188,6 +188,22 @@ def online_read( ) return result + @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001)) + async def online_read_async( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List: + set_usage_attribute("provider", self.__class__.__name__) + result = [] + if self.online_store: + result = await self.online_store.online_read_async( + config, table, entity_keys, requested_features + ) + return result + @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001)) def retrieve_online_documents( self, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 02fba0c1f6..68d36da17f 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -229,6 +229,30 @@ def online_read( """ pass + @abstractmethod + async def online_read_async( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Reads features values for the given entity keys asynchronously. + + Args: + config: The config for the current feature store. + table: The feature view whose feature values should be read. + entity_keys: The list of entity keys for which feature values should be read. + requested_features: The list of features that should be read. + + Returns: + A list of the same length as entity_keys. Each item in the list is a tuple where the first + item is the event timestamp for the row, and the second item is a dict mapping feature names + to values, which are returned in proto format. + """ + pass + @abstractmethod def retrieve_saved_dataset( self, config: RepoConfig, dataset: SavedDataset diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index f869d82e11..eb7fe5d6ac 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -82,6 +82,15 @@ def online_read( ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: return [] + async def online_read_async( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + return [] + def retrieve_saved_dataset(self, config: RepoConfig, dataset: SavedDataset): pass diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py new file mode 100644 index 0000000000..aac3c629c0 --- /dev/null +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -0,0 +1,75 @@ +import pytest + +from feast import Entity, FeatureView, Field, FileSource, RepoConfig +from feast.infra.online_stores.redis import RedisOnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.types import Int32 + + +@pytest.fixture +def redis_online_store() -> RedisOnlineStore: + return RedisOnlineStore() + + +@pytest.fixture +def repo_config(): + return RepoConfig( + provider="local", + project="test", + entity_key_serialization_version=2, + registry="dummy_registry.db", + ) + + +@pytest.fixture +def feature_view(): + file_source = FileSource(name="my_file_source", path="test.parquet") + entity = Entity(name="entity", join_keys=["entity"]) + feature_view = FeatureView( + name="feature_view_1", + entities=[entity], + schema=[ + Field(name="feature_10", dtype=Int32), + Field(name="feature_11", dtype=Int32), + Field(name="feature_12", dtype=Int32), + ], + source=file_source, + ) + return feature_view + + +def test_generate_entity_redis_keys(redis_online_store: RedisOnlineStore, repo_config): + entity_keys = [ + EntityKeyProto(join_keys=["entity"], entity_values=[ValueProto(int32_val=1)]), + ] + + actual = redis_online_store._generate_entity_redis_keys(repo_config, entity_keys) + expected = [ + b"\x02\x00\x00\x00entity\x03\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00test" + ] + assert actual == expected + + +def test_generate_hset_keys_for_features( + redis_online_store: RedisOnlineStore, feature_view +): + actual = redis_online_store._generate_feature_hset_keys(feature_view) + expected = ( + ["feature_10", "feature_11", "feature_12", "_ts:feature_view_1"], + [b"&m_9", b"\xc37\x9a\xbf", b"wr\xb5d", b" \xf0v\xde", "_ts:feature_view_1"], + ) + assert actual == expected + + +def test_generate_hset_keys_for_features_with_requested_features( + redis_online_store: RedisOnlineStore, feature_view +): + actual = redis_online_store._generate_feature_hset_keys( + feature_view=feature_view, requested_features=["my-feature-view:feature1"] + ) + expected = ( + ["my-feature-view:feature1", "_ts:feature_view_1"], + [b"Si\x86J", b" \xf0v\xde", "_ts:feature_view_1"], + ) + assert actual == expected From 5dd611524881286efa7cebdf1f6f5f157a372667 Mon Sep 17 00:00:00 2001 From: Breno Costa Date: Thu, 2 May 2024 22:49:03 +0200 Subject: [PATCH 2/4] add more unit tests Signed-off-by: Breno Costa --- sdk/python/feast/infra/online_stores/redis.py | 12 ++-- .../unit/infra/online_store/test_redis.py | 61 ++++++++++++++++++- 2 files changed, 64 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 181588d6f1..69383c6cfd 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -332,7 +332,7 @@ def online_write_batch( if progress: progress(len(results)) - def _generate_entity_redis_keys( + def _generate_redis_keys_for_entities( self, config: RepoConfig, entity_keys: List[EntityKeyProto] ) -> List[bytes]: keys = [] @@ -345,7 +345,7 @@ def _generate_entity_redis_keys( keys.append(redis_key_bin) return keys - def _generate_feature_hset_keys( + def _generate_hset_keys_for_features( self, feature_view: FeatureView, requested_features: Optional[List[str]] = None, @@ -388,10 +388,10 @@ def online_read( client = self._get_client(online_store_config) feature_view = table - requested_features, hset_keys = self._generate_feature_hset_keys( + requested_features, hset_keys = self._generate_hset_keys_for_features( feature_view, requested_features ) - keys = self._generate_entity_redis_keys(config, entity_keys) + keys = self._generate_redis_keys_for_entities(config, entity_keys) with client.pipeline(transaction=False) as pipe: for redis_key_bin in keys: @@ -417,10 +417,10 @@ async def online_read_async( client = await self._get_client_async(online_store_config) feature_view = table - requested_features, hset_keys = self._generate_feature_hset_keys( + requested_features, hset_keys = self._generate_hset_keys_for_features( feature_view, requested_features ) - keys = self._generate_entity_redis_keys(config, entity_keys) + keys = self._generate_redis_keys_for_entities(config, entity_keys) async with client.pipeline(transaction=False) as pipe: for redis_key_bin in keys: diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index aac3c629c0..8aeabc7a5a 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -1,4 +1,5 @@ import pytest +from google.protobuf.timestamp_pb2 import Timestamp from feast import Entity, FeatureView, Field, FileSource, RepoConfig from feast.infra.online_stores.redis import RedisOnlineStore @@ -44,7 +45,9 @@ def test_generate_entity_redis_keys(redis_online_store: RedisOnlineStore, repo_c EntityKeyProto(join_keys=["entity"], entity_values=[ValueProto(int32_val=1)]), ] - actual = redis_online_store._generate_entity_redis_keys(repo_config, entity_keys) + actual = redis_online_store._generate_redis_keys_for_entities( + repo_config, entity_keys + ) expected = [ b"\x02\x00\x00\x00entity\x03\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00test" ] @@ -54,7 +57,7 @@ def test_generate_entity_redis_keys(redis_online_store: RedisOnlineStore, repo_c def test_generate_hset_keys_for_features( redis_online_store: RedisOnlineStore, feature_view ): - actual = redis_online_store._generate_feature_hset_keys(feature_view) + actual = redis_online_store._generate_hset_keys_for_features(feature_view) expected = ( ["feature_10", "feature_11", "feature_12", "_ts:feature_view_1"], [b"&m_9", b"\xc37\x9a\xbf", b"wr\xb5d", b" \xf0v\xde", "_ts:feature_view_1"], @@ -65,7 +68,7 @@ def test_generate_hset_keys_for_features( def test_generate_hset_keys_for_features_with_requested_features( redis_online_store: RedisOnlineStore, feature_view ): - actual = redis_online_store._generate_feature_hset_keys( + actual = redis_online_store._generate_hset_keys_for_features( feature_view=feature_view, requested_features=["my-feature-view:feature1"] ) expected = ( @@ -73,3 +76,55 @@ def test_generate_hset_keys_for_features_with_requested_features( [b"Si\x86J", b" \xf0v\xde", "_ts:feature_view_1"], ) assert actual == expected + + +def test_convert_redis_values_to_protobuf( + redis_online_store: RedisOnlineStore, feature_view +): + requested_features = [ + "feature_view_1:feature_10", + "feature_view_1:feature_11", + "_ts:feature_view_1", + ] + values = [ + [ + ValueProto(int32_val=1).SerializeToString(), + ValueProto(int32_val=2).SerializeToString(), + Timestamp().SerializeToString(), + ] + ] + + features = redis_online_store._convert_redis_values_to_protobuf( + redis_values=values, + feature_view=feature_view.name, + requested_features=requested_features, + ) + assert isinstance(features, list) + assert len(features) == 1 + + timestamp, features = features[0] + assert features["feature_view_1:feature_10"].int32_val == 1 + assert features["feature_view_1:feature_11"].int32_val == 2 + + +def test_get_features_for_entity(redis_online_store: RedisOnlineStore, feature_view): + requested_features = [ + "feature_view_1:feature_10", + "feature_view_1:feature_11", + "_ts:feature_view_1", + ] + values = [ + ValueProto(int32_val=1).SerializeToString(), + ValueProto(int32_val=2).SerializeToString(), + Timestamp().SerializeToString(), + ] + + timestamp, features = redis_online_store._get_features_for_entity( + values=values, + feature_view=feature_view.name, + requested_features=requested_features, + ) + assert "feature_view_1:feature_10" in features + assert "feature_view_1:feature_11" in features + assert features["feature_view_1:feature_10"].int32_val == 1 + assert features["feature_view_1:feature_11"].int32_val == 2 From e0b2df21d83daec42bd1d0af5d592b9a5d6fff4d Mon Sep 17 00:00:00 2001 From: Breno Costa Date: Wed, 8 May 2024 23:47:27 +0200 Subject: [PATCH 3/4] fix redis key generation Signed-off-by: Breno Costa --- README.md | 2 +- sdk/python/feast/infra/online_stores/redis.py | 5 +++-- sdk/python/feast/online_response.py | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index e9b7ff4743..a1e06774da 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@

-
+
[![unit-tests](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml/badge.svg?branch=master&event=push)](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml) [![integration-tests-and-build](https://github.com/feast-dev/feast/actions/workflows/master_only.yml/badge.svg?branch=master&event=push)](https://github.com/feast-dev/feast/actions/workflows/master_only.yml) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 69383c6cfd..f681d8473e 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -353,11 +353,12 @@ def _generate_hset_keys_for_features( if not requested_features: requested_features = [f.name for f in feature_view.features] + hset_keys = [_mmh3(f"{feature_view.name}:{k}") for k in requested_features] + ts_key = f"_ts:{feature_view.name}" + hset_keys.append(ts_key) requested_features.append(ts_key) - hset_keys = [_mmh3(f"{feature_view.name}:{k}") for k in requested_features] - hset_keys.append(ts_key) return requested_features, hset_keys def _convert_redis_values_to_protobuf( diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 050b374340..a4e5694127 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -50,7 +50,7 @@ def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]: Converts GetOnlineFeaturesResponse features into a dictionary form. Args: - is_with_event_timestamps: bool Optionally include feature timestamps in the dictionary + include_event_timestamps: bool Optionally include feature timestamps in the dictionary """ response: Dict[str, List[Any]] = {} @@ -74,7 +74,7 @@ def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame: Converts GetOnlineFeaturesResponse features into Panda dataframe form. Args: - is_with_event_timestamps: bool Optionally include feature timestamps in the dataframe + include_event_timestamps: bool Optionally include feature timestamps in the dataframe """ return pd.DataFrame(self.to_dict(include_event_timestamps)) @@ -84,7 +84,7 @@ def to_arrow(self, include_event_timestamps: bool = False) -> pa.Table: Converts GetOnlineFeaturesResponse features into pyarrow Table. Args: - is_with_event_timestamps: bool Optionally include feature timestamps in the table + include_event_timestamps: bool Optionally include feature timestamps in the table """ return pa.Table.from_pydict(self.to_dict(include_event_timestamps)) From 410d5eb3f466c079814814b196eb17fe862d4f07 Mon Sep 17 00:00:00 2001 From: Breno Costa Date: Wed, 8 May 2024 23:55:28 +0200 Subject: [PATCH 4/4] fix unit tests Signed-off-by: Breno Costa --- sdk/python/tests/unit/infra/online_store/test_redis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index 8aeabc7a5a..c26c2f25c5 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -60,7 +60,7 @@ def test_generate_hset_keys_for_features( actual = redis_online_store._generate_hset_keys_for_features(feature_view) expected = ( ["feature_10", "feature_11", "feature_12", "_ts:feature_view_1"], - [b"&m_9", b"\xc37\x9a\xbf", b"wr\xb5d", b" \xf0v\xde", "_ts:feature_view_1"], + [b"&m_9", b"\xc37\x9a\xbf", b"wr\xb5d", "_ts:feature_view_1"], ) assert actual == expected @@ -73,7 +73,7 @@ def test_generate_hset_keys_for_features_with_requested_features( ) expected = ( ["my-feature-view:feature1", "_ts:feature_view_1"], - [b"Si\x86J", b" \xf0v\xde", "_ts:feature_view_1"], + [b"Si\x86J", "_ts:feature_view_1"], ) assert actual == expected