From aaaac39f920d83976a9bc585bc2a181cfb3de6cd Mon Sep 17 00:00:00 2001
From: Breno Costa
Date: Thu, 2 May 2024 21:43:30 +0200
Subject: [PATCH 1/4] feat: Adding get_online_features_async to feature store
sdk
Signed-off-by: Breno Costa
---
sdk/python/feast/feature_store.py | 233 ++++++++++++++++--
.../feast/infra/online_stores/online_store.py | 25 ++
sdk/python/feast/infra/online_stores/redis.py | 132 ++++++++--
.../feast/infra/passthrough_provider.py | 16 ++
sdk/python/feast/infra/provider.py | 24 ++
sdk/python/tests/foo_provider.py | 9 +
.../unit/infra/online_store/test_redis.py | 75 ++++++
7 files changed, 462 insertions(+), 52 deletions(-)
create mode 100644 sdk/python/tests/unit/infra/online_store/test_redis.py
diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py
index f45dbb1bc8..270ea45a26 100644
--- a/sdk/python/feast/feature_store.py
+++ b/sdk/python/feast/feature_store.py
@@ -1550,6 +1550,54 @@ def get_online_features(
native_entity_values=True,
)
+ @log_exceptions_and_usage
+ async def get_online_features_async(
+ self,
+ features: Union[List[str], FeatureService],
+ entity_rows: List[Dict[str, Any]],
+ full_feature_names: bool = False,
+ ) -> OnlineResponse:
+ """
+ [Alpha] Retrieves the latest online feature data asynchronously.
+
+ Note: This method will download the full feature registry the first time it is run. If you are using a
+ remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL
+ duration (which can be set to infinity). If the cached registry is stale (more time than the TTL has
+ passed), then a new registry will be downloaded synchronously by this method. This download may
+ introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call
+ refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to
+ infinity (cache forever).
+
+ Args:
+ features: The list of features that should be retrieved from the online store. These features can be
+ specified either as a list of string feature references or as a feature service. String feature
+ references must have format "feature_view:feature", e.g. "customer_fv:daily_transactions".
+ entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair.
+ full_feature_names: If True, feature names will be prefixed with the corresponding feature view name,
+ changing them from the format "feature" to "feature_view__feature" (e.g. "daily_transactions"
+ changes to "customer_fv__daily_transactions").
+
+ Returns:
+ OnlineResponse containing the feature data in records.
+
+ Raises:
+ Exception: No entity with the specified name exists.
+ """
+ columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()}
+ for entity_row in entity_rows:
+ for key, value in entity_row.items():
+ try:
+ columnar[key].append(value)
+ except KeyError as e:
+ raise ValueError("All entity_rows must have the same keys.") from e
+
+ return await self._get_online_features_async(
+ features=features,
+ entity_values=columnar,
+ full_feature_names=full_feature_names,
+ native_entity_values=True,
+ )
+
def _get_online_request_context(
self, features: Union[List[str], FeatureService], full_feature_names: bool
):
@@ -1609,7 +1657,7 @@ def _get_online_request_context(
entityless_case,
)
- def _get_online_features(
+ def _prepare_entities_to_read_from_online_store(
self,
features: Union[List[str], FeatureService],
entity_values: Mapping[
@@ -1619,7 +1667,7 @@ def _get_online_features(
native_entity_values: bool = True,
):
(
- _feature_refs,
+ feature_refs,
requested_on_demand_feature_views,
entity_name_to_join_key_map,
entity_type_map,
@@ -1694,6 +1742,40 @@ def _get_online_features(
[DUMMY_ENTITY_VAL] * num_rows, DUMMY_ENTITY.value_type
)
+ return (
+ join_key_values,
+ grouped_refs,
+ entity_name_to_join_key_map,
+ requested_on_demand_feature_views,
+ feature_refs,
+ requested_result_row_names,
+ online_features_response,
+ )
+
+ def _get_online_features(
+ self,
+ features: Union[List[str], FeatureService],
+ entity_values: Mapping[
+ str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
+ ],
+ full_feature_names: bool = False,
+ native_entity_values: bool = True,
+ ):
+ (
+ join_key_values,
+ grouped_refs,
+ entity_name_to_join_key_map,
+ requested_on_demand_feature_views,
+ feature_refs,
+ requested_result_row_names,
+ online_features_response,
+ ) = self._prepare_entities_to_read_from_online_store(
+ features=features,
+ entity_values=entity_values,
+ full_feature_names=full_feature_names,
+ native_entity_values=native_entity_values,
+ )
+
provider = self._get_provider()
for table, requested_features in grouped_refs:
# Get the correct set of entity values with the correct join keys.
@@ -1724,7 +1806,71 @@ def _get_online_features(
if requested_on_demand_feature_views:
self._augment_response_with_on_demand_transforms(
online_features_response,
- _feature_refs,
+ feature_refs,
+ requested_on_demand_feature_views,
+ full_feature_names,
+ )
+
+ self._drop_unneeded_columns(
+ online_features_response, requested_result_row_names
+ )
+ return OnlineResponse(online_features_response)
+
+ async def _get_online_features_async(
+ self,
+ features: Union[List[str], FeatureService],
+ entity_values: Mapping[
+ str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
+ ],
+ full_feature_names: bool = False,
+ native_entity_values: bool = True,
+ ):
+ (
+ join_key_values,
+ grouped_refs,
+ entity_name_to_join_key_map,
+ requested_on_demand_feature_views,
+ feature_refs,
+ requested_result_row_names,
+ online_features_response,
+ ) = self._prepare_entities_to_read_from_online_store(
+ features=features,
+ entity_values=entity_values,
+ full_feature_names=full_feature_names,
+ native_entity_values=native_entity_values,
+ )
+
+ provider = self._get_provider()
+ for table, requested_features in grouped_refs:
+ # Get the correct set of entity values with the correct join keys.
+ table_entity_values, idxs = self._get_unique_entities(
+ table,
+ join_key_values,
+ entity_name_to_join_key_map,
+ )
+
+ # Fetch feature data for the minimum set of Entities.
+ feature_data = await self._read_from_online_store_async(
+ table_entity_values,
+ provider,
+ requested_features,
+ table,
+ )
+
+ # Populate the result_rows with the Features from the OnlineStore inplace.
+ self._populate_response_from_feature_data(
+ feature_data,
+ idxs,
+ online_features_response,
+ full_feature_names,
+ requested_features,
+ table,
+ )
+
+ if requested_on_demand_feature_views:
+ self._augment_response_with_on_demand_transforms(
+ online_features_response,
+ feature_refs,
requested_on_demand_feature_views,
full_feature_names,
)
@@ -1965,38 +2111,24 @@ def _get_unique_entities(
)
return unique_entities, indexes
- def _read_from_online_store(
+ def _get_entity_key_protos(
self,
entity_rows: Iterable[Mapping[str, Value]],
- provider: Provider,
- requested_features: List[str],
- table: FeatureView,
- ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
- """Read and process data from the OnlineStore for a given FeatureView.
-
- This method guarantees that the order of the data in each element of the
- List returned is the same as the order of `requested_features`.
-
- This method assumes that `provider.online_read` returns data for each
- combination of Entities in `entity_rows` in the same order as they
- are provided.
- """
+ ) -> List[EntityKeyProto]:
# Instantiate one EntityKeyProto per Entity.
entity_key_protos = [
EntityKeyProto(join_keys=row.keys(), entity_values=row.values())
for row in entity_rows
]
+ return entity_key_protos
- # Fetch data for Entities.
- read_rows = provider.online_read(
- config=self.config,
- table=table,
- entity_keys=entity_key_protos,
- requested_features=requested_features,
- )
-
- # Each row is a set of features for a given entity key. We only need to convert
- # the data to Protobuf once.
+ def _convert_rows_to_protobuf(
+ self,
+ requested_features: List[str],
+ read_rows: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]],
+ ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
+ # Each row is a set of features for a given entity key.
+ # We only need to convert the data to Protobuf once.
null_value = Value()
read_row_protos = []
for read_row in read_rows:
@@ -2023,6 +2155,53 @@ def _read_from_online_store(
read_row_protos.append((event_timestamps, statuses, values))
return read_row_protos
+ def _read_from_online_store(
+ self,
+ entity_rows: Iterable[Mapping[str, Value]],
+ provider: Provider,
+ requested_features: List[str],
+ table: FeatureView,
+ ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
+ """Read and process data from the OnlineStore for a given FeatureView.
+
+ This method guarantees that the order of the data in each element of the
+ List returned is the same as the order of `requested_features`.
+
+ This method assumes that `provider.online_read` returns data for each
+ combination of Entities in `entity_rows` in the same order as they
+ are provided.
+ """
+ entity_key_protos = self._get_entity_key_protos(entity_rows)
+
+ # Fetch data for Entities.
+ read_rows = provider.online_read(
+ config=self.config,
+ table=table,
+ entity_keys=entity_key_protos,
+ requested_features=requested_features,
+ )
+
+ return self._convert_rows_to_protobuf(requested_features, read_rows)
+
+ async def _read_from_online_store_async(
+ self,
+ entity_rows: Iterable[Mapping[str, Value]],
+ provider: Provider,
+ requested_features: List[str],
+ table: FeatureView,
+ ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
+ entity_key_protos = self._get_entity_key_protos(entity_rows)
+
+ # Fetch data for Entities.
+ read_rows = await provider.online_read_async(
+ config=self.config,
+ table=table,
+ entity_keys=entity_key_protos,
+ requested_features=requested_features,
+ )
+
+ return self._convert_rows_to_protobuf(requested_features, read_rows)
+
def _retrieve_from_online_store(
self,
provider: Provider,
diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py
index 2a81e37042..7dd03a8417 100644
--- a/sdk/python/feast/infra/online_stores/online_store.py
+++ b/sdk/python/feast/infra/online_stores/online_store.py
@@ -80,6 +80,31 @@ def online_read(
"""
pass
+ async def online_read_async(
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
+ ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
+ """
+ Reads features values for the given entity keys asynchronously.
+
+ Args:
+ config: The config for the current feature store.
+ table: The feature view whose feature values should be read.
+ entity_keys: The list of entity keys for which feature values should be read.
+ requested_features: The list of features that should be read.
+
+ Returns:
+ A list of the same length as entity_keys. Each item in the list is a tuple where the first
+ item is the event timestamp for the row, and the second item is a dict mapping feature names
+ to values, which are returned in proto format.
+ """
+ raise NotImplementedError(
+ f"Online store {self.__class__.__name__} does not support online read async"
+ )
+
@abstractmethod
def update(
self,
diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py
index 6f6c2fb45c..181588d6f1 100644
--- a/sdk/python/feast/infra/online_stores/redis.py
+++ b/sdk/python/feast/infra/online_stores/redis.py
@@ -42,6 +42,7 @@
try:
from redis import Redis
+ from redis import asyncio as redis_asyncio
from redis.cluster import ClusterNode, RedisCluster
from redis.sentinel import Sentinel
except ImportError as e:
@@ -90,6 +91,9 @@ class RedisOnlineStore(OnlineStore):
"""
_client: Optional[Union[Redis, RedisCluster]] = None
+ _client_async: Optional[Union[redis_asyncio.Redis, redis_asyncio.RedisCluster]] = (
+ None
+ )
def delete_entity_values(self, config: RepoConfig, join_keys: List[str]):
client = self._get_client(config.online_store)
@@ -234,6 +238,30 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig):
self._client = Redis(**kwargs)
return self._client
+ async def _get_client_async(self, online_store_config: RedisOnlineStoreConfig):
+ if not self._client_async:
+ startup_nodes, kwargs = self._parse_connection_string(
+ online_store_config.connection_string
+ )
+ if online_store_config.redis_type == RedisType.redis_cluster:
+ kwargs["startup_nodes"] = [
+ redis_asyncio.cluster.ClusterNode(**node) for node in startup_nodes
+ ]
+ self._client_async = redis_asyncio.RedisCluster(**kwargs)
+ elif online_store_config.redis_type == RedisType.redis_sentinel:
+ sentinel_hosts = []
+ for item in startup_nodes:
+ sentinel_hosts.append((item["host"], int(item["port"])))
+
+ sentinel = redis_asyncio.Sentinel(sentinel_hosts, **kwargs)
+ master = sentinel.master_for(online_store_config.sentinel_master)
+ self._client_async = master
+ else:
+ kwargs["host"] = startup_nodes[0]["host"]
+ kwargs["port"] = startup_nodes[0]["port"]
+ self._client_async = redis_asyncio.Redis(**kwargs)
+ return self._client_async
+
@log_exceptions_and_usage(online_store="redis")
def online_write_batch(
self,
@@ -304,6 +332,48 @@ def online_write_batch(
if progress:
progress(len(results))
+ def _generate_entity_redis_keys(
+ self, config: RepoConfig, entity_keys: List[EntityKeyProto]
+ ) -> List[bytes]:
+ keys = []
+ for entity_key in entity_keys:
+ redis_key_bin = _redis_key(
+ config.project,
+ entity_key,
+ entity_key_serialization_version=config.entity_key_serialization_version,
+ )
+ keys.append(redis_key_bin)
+ return keys
+
+ def _generate_feature_hset_keys(
+ self,
+ feature_view: FeatureView,
+ requested_features: Optional[List[str]] = None,
+ ) -> Tuple[List[str], List[str]]:
+ if not requested_features:
+ requested_features = [f.name for f in feature_view.features]
+
+ ts_key = f"_ts:{feature_view.name}"
+ requested_features.append(ts_key)
+
+ hset_keys = [_mmh3(f"{feature_view.name}:{k}") for k in requested_features]
+ hset_keys.append(ts_key)
+ return requested_features, hset_keys
+
+ def _convert_redis_values_to_protobuf(
+ self,
+ redis_values: List[List[ByteString]],
+ feature_view: str,
+ requested_features: List[str],
+ ):
+ result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
+ for values in redis_values:
+ features = self._get_features_for_entity(
+ values, feature_view, requested_features
+ )
+ result.append(features)
+ return result
+
@log_exceptions_and_usage(online_store="redis")
def online_read(
self,
@@ -316,39 +386,51 @@ def online_read(
assert isinstance(online_store_config, RedisOnlineStoreConfig)
client = self._get_client(online_store_config)
- feature_view = table.name
- project = config.project
+ feature_view = table
- result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
+ requested_features, hset_keys = self._generate_feature_hset_keys(
+ feature_view, requested_features
+ )
+ keys = self._generate_entity_redis_keys(config, entity_keys)
- if not requested_features:
- requested_features = [f.name for f in table.features]
+ with client.pipeline(transaction=False) as pipe:
+ for redis_key_bin in keys:
+ pipe.hmget(redis_key_bin, hset_keys)
+ with tracing_span(name="remote_call"):
+ redis_values = pipe.execute()
- hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features]
+ return self._convert_redis_values_to_protobuf(
+ redis_values, feature_view.name, requested_features
+ )
- ts_key = f"_ts:{feature_view}"
- hset_keys.append(ts_key)
- requested_features.append(ts_key)
+ @log_exceptions_and_usage(online_store="redis")
+ async def online_read_async(
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
+ ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
+ online_store_config = config.online_store
+ assert isinstance(online_store_config, RedisOnlineStoreConfig)
- keys = []
- for entity_key in entity_keys:
- redis_key_bin = _redis_key(
- project,
- entity_key,
- entity_key_serialization_version=config.entity_key_serialization_version,
- )
- keys.append(redis_key_bin)
- with client.pipeline(transaction=False) as pipe:
+ client = await self._get_client_async(online_store_config)
+ feature_view = table
+
+ requested_features, hset_keys = self._generate_feature_hset_keys(
+ feature_view, requested_features
+ )
+ keys = self._generate_entity_redis_keys(config, entity_keys)
+
+ async with client.pipeline(transaction=False) as pipe:
for redis_key_bin in keys:
pipe.hmget(redis_key_bin, hset_keys)
with tracing_span(name="remote_call"):
- redis_values = pipe.execute()
- for values in redis_values:
- features = self._get_features_for_entity(
- values, feature_view, requested_features
- )
- result.append(features)
- return result
+ redis_values = await pipe.execute()
+
+ return self._convert_redis_values_to_protobuf(
+ redis_values, feature_view.name, requested_features
+ )
def _get_features_for_entity(
self,
diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py
index 2f3e30018a..97c2820d41 100644
--- a/sdk/python/feast/infra/passthrough_provider.py
+++ b/sdk/python/feast/infra/passthrough_provider.py
@@ -188,6 +188,22 @@ def online_read(
)
return result
+ @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
+ async def online_read_async(
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
+ ) -> List:
+ set_usage_attribute("provider", self.__class__.__name__)
+ result = []
+ if self.online_store:
+ result = await self.online_store.online_read_async(
+ config, table, entity_keys, requested_features
+ )
+ return result
+
@log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
def retrieve_online_documents(
self,
diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py
index 02fba0c1f6..68d36da17f 100644
--- a/sdk/python/feast/infra/provider.py
+++ b/sdk/python/feast/infra/provider.py
@@ -229,6 +229,30 @@ def online_read(
"""
pass
+ @abstractmethod
+ async def online_read_async(
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
+ ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
+ """
+ Reads features values for the given entity keys asynchronously.
+
+ Args:
+ config: The config for the current feature store.
+ table: The feature view whose feature values should be read.
+ entity_keys: The list of entity keys for which feature values should be read.
+ requested_features: The list of features that should be read.
+
+ Returns:
+ A list of the same length as entity_keys. Each item in the list is a tuple where the first
+ item is the event timestamp for the row, and the second item is a dict mapping feature names
+ to values, which are returned in proto format.
+ """
+ pass
+
@abstractmethod
def retrieve_saved_dataset(
self, config: RepoConfig, dataset: SavedDataset
diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py
index f869d82e11..eb7fe5d6ac 100644
--- a/sdk/python/tests/foo_provider.py
+++ b/sdk/python/tests/foo_provider.py
@@ -82,6 +82,15 @@ def online_read(
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
return []
+ async def online_read_async(
+ self,
+ config: RepoConfig,
+ table: FeatureView,
+ entity_keys: List[EntityKeyProto],
+ requested_features: Optional[List[str]] = None,
+ ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
+ return []
+
def retrieve_saved_dataset(self, config: RepoConfig, dataset: SavedDataset):
pass
diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py
new file mode 100644
index 0000000000..aac3c629c0
--- /dev/null
+++ b/sdk/python/tests/unit/infra/online_store/test_redis.py
@@ -0,0 +1,75 @@
+import pytest
+
+from feast import Entity, FeatureView, Field, FileSource, RepoConfig
+from feast.infra.online_stores.redis import RedisOnlineStore
+from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
+from feast.protos.feast.types.Value_pb2 import Value as ValueProto
+from feast.types import Int32
+
+
+@pytest.fixture
+def redis_online_store() -> RedisOnlineStore:
+ return RedisOnlineStore()
+
+
+@pytest.fixture
+def repo_config():
+ return RepoConfig(
+ provider="local",
+ project="test",
+ entity_key_serialization_version=2,
+ registry="dummy_registry.db",
+ )
+
+
+@pytest.fixture
+def feature_view():
+ file_source = FileSource(name="my_file_source", path="test.parquet")
+ entity = Entity(name="entity", join_keys=["entity"])
+ feature_view = FeatureView(
+ name="feature_view_1",
+ entities=[entity],
+ schema=[
+ Field(name="feature_10", dtype=Int32),
+ Field(name="feature_11", dtype=Int32),
+ Field(name="feature_12", dtype=Int32),
+ ],
+ source=file_source,
+ )
+ return feature_view
+
+
+def test_generate_entity_redis_keys(redis_online_store: RedisOnlineStore, repo_config):
+ entity_keys = [
+ EntityKeyProto(join_keys=["entity"], entity_values=[ValueProto(int32_val=1)]),
+ ]
+
+ actual = redis_online_store._generate_entity_redis_keys(repo_config, entity_keys)
+ expected = [
+ b"\x02\x00\x00\x00entity\x03\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00test"
+ ]
+ assert actual == expected
+
+
+def test_generate_hset_keys_for_features(
+ redis_online_store: RedisOnlineStore, feature_view
+):
+ actual = redis_online_store._generate_feature_hset_keys(feature_view)
+ expected = (
+ ["feature_10", "feature_11", "feature_12", "_ts:feature_view_1"],
+ [b"&m_9", b"\xc37\x9a\xbf", b"wr\xb5d", b" \xf0v\xde", "_ts:feature_view_1"],
+ )
+ assert actual == expected
+
+
+def test_generate_hset_keys_for_features_with_requested_features(
+ redis_online_store: RedisOnlineStore, feature_view
+):
+ actual = redis_online_store._generate_feature_hset_keys(
+ feature_view=feature_view, requested_features=["my-feature-view:feature1"]
+ )
+ expected = (
+ ["my-feature-view:feature1", "_ts:feature_view_1"],
+ [b"Si\x86J", b" \xf0v\xde", "_ts:feature_view_1"],
+ )
+ assert actual == expected
From 5dd611524881286efa7cebdf1f6f5f157a372667 Mon Sep 17 00:00:00 2001
From: Breno Costa
Date: Thu, 2 May 2024 22:49:03 +0200
Subject: [PATCH 2/4] add more unit tests
Signed-off-by: Breno Costa
---
sdk/python/feast/infra/online_stores/redis.py | 12 ++--
.../unit/infra/online_store/test_redis.py | 61 ++++++++++++++++++-
2 files changed, 64 insertions(+), 9 deletions(-)
diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py
index 181588d6f1..69383c6cfd 100644
--- a/sdk/python/feast/infra/online_stores/redis.py
+++ b/sdk/python/feast/infra/online_stores/redis.py
@@ -332,7 +332,7 @@ def online_write_batch(
if progress:
progress(len(results))
- def _generate_entity_redis_keys(
+ def _generate_redis_keys_for_entities(
self, config: RepoConfig, entity_keys: List[EntityKeyProto]
) -> List[bytes]:
keys = []
@@ -345,7 +345,7 @@ def _generate_entity_redis_keys(
keys.append(redis_key_bin)
return keys
- def _generate_feature_hset_keys(
+ def _generate_hset_keys_for_features(
self,
feature_view: FeatureView,
requested_features: Optional[List[str]] = None,
@@ -388,10 +388,10 @@ def online_read(
client = self._get_client(online_store_config)
feature_view = table
- requested_features, hset_keys = self._generate_feature_hset_keys(
+ requested_features, hset_keys = self._generate_hset_keys_for_features(
feature_view, requested_features
)
- keys = self._generate_entity_redis_keys(config, entity_keys)
+ keys = self._generate_redis_keys_for_entities(config, entity_keys)
with client.pipeline(transaction=False) as pipe:
for redis_key_bin in keys:
@@ -417,10 +417,10 @@ async def online_read_async(
client = await self._get_client_async(online_store_config)
feature_view = table
- requested_features, hset_keys = self._generate_feature_hset_keys(
+ requested_features, hset_keys = self._generate_hset_keys_for_features(
feature_view, requested_features
)
- keys = self._generate_entity_redis_keys(config, entity_keys)
+ keys = self._generate_redis_keys_for_entities(config, entity_keys)
async with client.pipeline(transaction=False) as pipe:
for redis_key_bin in keys:
diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py
index aac3c629c0..8aeabc7a5a 100644
--- a/sdk/python/tests/unit/infra/online_store/test_redis.py
+++ b/sdk/python/tests/unit/infra/online_store/test_redis.py
@@ -1,4 +1,5 @@
import pytest
+from google.protobuf.timestamp_pb2 import Timestamp
from feast import Entity, FeatureView, Field, FileSource, RepoConfig
from feast.infra.online_stores.redis import RedisOnlineStore
@@ -44,7 +45,9 @@ def test_generate_entity_redis_keys(redis_online_store: RedisOnlineStore, repo_c
EntityKeyProto(join_keys=["entity"], entity_values=[ValueProto(int32_val=1)]),
]
- actual = redis_online_store._generate_entity_redis_keys(repo_config, entity_keys)
+ actual = redis_online_store._generate_redis_keys_for_entities(
+ repo_config, entity_keys
+ )
expected = [
b"\x02\x00\x00\x00entity\x03\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00test"
]
@@ -54,7 +57,7 @@ def test_generate_entity_redis_keys(redis_online_store: RedisOnlineStore, repo_c
def test_generate_hset_keys_for_features(
redis_online_store: RedisOnlineStore, feature_view
):
- actual = redis_online_store._generate_feature_hset_keys(feature_view)
+ actual = redis_online_store._generate_hset_keys_for_features(feature_view)
expected = (
["feature_10", "feature_11", "feature_12", "_ts:feature_view_1"],
[b"&m_9", b"\xc37\x9a\xbf", b"wr\xb5d", b" \xf0v\xde", "_ts:feature_view_1"],
@@ -65,7 +68,7 @@ def test_generate_hset_keys_for_features(
def test_generate_hset_keys_for_features_with_requested_features(
redis_online_store: RedisOnlineStore, feature_view
):
- actual = redis_online_store._generate_feature_hset_keys(
+ actual = redis_online_store._generate_hset_keys_for_features(
feature_view=feature_view, requested_features=["my-feature-view:feature1"]
)
expected = (
@@ -73,3 +76,55 @@ def test_generate_hset_keys_for_features_with_requested_features(
[b"Si\x86J", b" \xf0v\xde", "_ts:feature_view_1"],
)
assert actual == expected
+
+
+def test_convert_redis_values_to_protobuf(
+ redis_online_store: RedisOnlineStore, feature_view
+):
+ requested_features = [
+ "feature_view_1:feature_10",
+ "feature_view_1:feature_11",
+ "_ts:feature_view_1",
+ ]
+ values = [
+ [
+ ValueProto(int32_val=1).SerializeToString(),
+ ValueProto(int32_val=2).SerializeToString(),
+ Timestamp().SerializeToString(),
+ ]
+ ]
+
+ features = redis_online_store._convert_redis_values_to_protobuf(
+ redis_values=values,
+ feature_view=feature_view.name,
+ requested_features=requested_features,
+ )
+ assert isinstance(features, list)
+ assert len(features) == 1
+
+ timestamp, features = features[0]
+ assert features["feature_view_1:feature_10"].int32_val == 1
+ assert features["feature_view_1:feature_11"].int32_val == 2
+
+
+def test_get_features_for_entity(redis_online_store: RedisOnlineStore, feature_view):
+ requested_features = [
+ "feature_view_1:feature_10",
+ "feature_view_1:feature_11",
+ "_ts:feature_view_1",
+ ]
+ values = [
+ ValueProto(int32_val=1).SerializeToString(),
+ ValueProto(int32_val=2).SerializeToString(),
+ Timestamp().SerializeToString(),
+ ]
+
+ timestamp, features = redis_online_store._get_features_for_entity(
+ values=values,
+ feature_view=feature_view.name,
+ requested_features=requested_features,
+ )
+ assert "feature_view_1:feature_10" in features
+ assert "feature_view_1:feature_11" in features
+ assert features["feature_view_1:feature_10"].int32_val == 1
+ assert features["feature_view_1:feature_11"].int32_val == 2
From e0b2df21d83daec42bd1d0af5d592b9a5d6fff4d Mon Sep 17 00:00:00 2001
From: Breno Costa
Date: Wed, 8 May 2024 23:47:27 +0200
Subject: [PATCH 3/4] fix redis key generation
Signed-off-by: Breno Costa
---
README.md | 2 +-
sdk/python/feast/infra/online_stores/redis.py | 5 +++--
sdk/python/feast/online_response.py | 6 +++---
3 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/README.md b/README.md
index e9b7ff4743..a1e06774da 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@
-
+
[![unit-tests](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml/badge.svg?branch=master&event=push)](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml)
[![integration-tests-and-build](https://github.com/feast-dev/feast/actions/workflows/master_only.yml/badge.svg?branch=master&event=push)](https://github.com/feast-dev/feast/actions/workflows/master_only.yml)
diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py
index 69383c6cfd..f681d8473e 100644
--- a/sdk/python/feast/infra/online_stores/redis.py
+++ b/sdk/python/feast/infra/online_stores/redis.py
@@ -353,11 +353,12 @@ def _generate_hset_keys_for_features(
if not requested_features:
requested_features = [f.name for f in feature_view.features]
+ hset_keys = [_mmh3(f"{feature_view.name}:{k}") for k in requested_features]
+
ts_key = f"_ts:{feature_view.name}"
+ hset_keys.append(ts_key)
requested_features.append(ts_key)
- hset_keys = [_mmh3(f"{feature_view.name}:{k}") for k in requested_features]
- hset_keys.append(ts_key)
return requested_features, hset_keys
def _convert_redis_values_to_protobuf(
diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py
index 050b374340..a4e5694127 100644
--- a/sdk/python/feast/online_response.py
+++ b/sdk/python/feast/online_response.py
@@ -50,7 +50,7 @@ def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]:
Converts GetOnlineFeaturesResponse features into a dictionary form.
Args:
- is_with_event_timestamps: bool Optionally include feature timestamps in the dictionary
+ include_event_timestamps: bool Optionally include feature timestamps in the dictionary
"""
response: Dict[str, List[Any]] = {}
@@ -74,7 +74,7 @@ def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame:
Converts GetOnlineFeaturesResponse features into Panda dataframe form.
Args:
- is_with_event_timestamps: bool Optionally include feature timestamps in the dataframe
+ include_event_timestamps: bool Optionally include feature timestamps in the dataframe
"""
return pd.DataFrame(self.to_dict(include_event_timestamps))
@@ -84,7 +84,7 @@ def to_arrow(self, include_event_timestamps: bool = False) -> pa.Table:
Converts GetOnlineFeaturesResponse features into pyarrow Table.
Args:
- is_with_event_timestamps: bool Optionally include feature timestamps in the table
+ include_event_timestamps: bool Optionally include feature timestamps in the table
"""
return pa.Table.from_pydict(self.to_dict(include_event_timestamps))
From 410d5eb3f466c079814814b196eb17fe862d4f07 Mon Sep 17 00:00:00 2001
From: Breno Costa
Date: Wed, 8 May 2024 23:55:28 +0200
Subject: [PATCH 4/4] fix unit tests
Signed-off-by: Breno Costa
---
sdk/python/tests/unit/infra/online_store/test_redis.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py
index 8aeabc7a5a..c26c2f25c5 100644
--- a/sdk/python/tests/unit/infra/online_store/test_redis.py
+++ b/sdk/python/tests/unit/infra/online_store/test_redis.py
@@ -60,7 +60,7 @@ def test_generate_hset_keys_for_features(
actual = redis_online_store._generate_hset_keys_for_features(feature_view)
expected = (
["feature_10", "feature_11", "feature_12", "_ts:feature_view_1"],
- [b"&m_9", b"\xc37\x9a\xbf", b"wr\xb5d", b" \xf0v\xde", "_ts:feature_view_1"],
+ [b"&m_9", b"\xc37\x9a\xbf", b"wr\xb5d", "_ts:feature_view_1"],
)
assert actual == expected
@@ -73,7 +73,7 @@ def test_generate_hset_keys_for_features_with_requested_features(
)
expected = (
["my-feature-view:feature1", "_ts:feature_view_1"],
- [b"Si\x86J", b" \xf0v\xde", "_ts:feature_view_1"],
+ [b"Si\x86J", "_ts:feature_view_1"],
)
assert actual == expected