Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding get_online_features_async to feature store sdk #4172

Merged
merged 4 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 206 additions & 27 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,54 @@ def get_online_features(
native_entity_values=True,
)

@log_exceptions_and_usage
async def get_online_features_async(
self,
features: Union[List[str], FeatureService],
entity_rows: List[Dict[str, Any]],
full_feature_names: bool = False,
) -> OnlineResponse:
"""
[Alpha] Retrieves the latest online feature data asynchronously.

Note: This method will download the full feature registry the first time it is run. If you are using a
remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL
duration (which can be set to infinity). If the cached registry is stale (more time than the TTL has
passed), then a new registry will be downloaded synchronously by this method. This download may
introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call
refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to
infinity (cache forever).

Args:
features: The list of features that should be retrieved from the online store. These features can be
specified either as a list of string feature references or as a feature service. String feature
references must have format "feature_view:feature", e.g. "customer_fv:daily_transactions".
entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair.
full_feature_names: If True, feature names will be prefixed with the corresponding feature view name,
changing them from the format "feature" to "feature_view__feature" (e.g. "daily_transactions"
changes to "customer_fv__daily_transactions").

Returns:
OnlineResponse containing the feature data in records.

Raises:
Exception: No entity with the specified name exists.
"""
columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()}
for entity_row in entity_rows:
for key, value in entity_row.items():
try:
columnar[key].append(value)
except KeyError as e:
raise ValueError("All entity_rows must have the same keys.") from e

return await self._get_online_features_async(
features=features,
entity_values=columnar,
full_feature_names=full_feature_names,
native_entity_values=True,
)

def _get_online_request_context(
self, features: Union[List[str], FeatureService], full_feature_names: bool
):
Expand Down Expand Up @@ -1609,7 +1657,7 @@ def _get_online_request_context(
entityless_case,
)

def _get_online_features(
def _prepare_entities_to_read_from_online_store(
self,
features: Union[List[str], FeatureService],
entity_values: Mapping[
Expand All @@ -1619,7 +1667,7 @@ def _get_online_features(
native_entity_values: bool = True,
):
(
_feature_refs,
feature_refs,
requested_on_demand_feature_views,
entity_name_to_join_key_map,
entity_type_map,
Expand Down Expand Up @@ -1694,6 +1742,40 @@ def _get_online_features(
[DUMMY_ENTITY_VAL] * num_rows, DUMMY_ENTITY.value_type
)

return (
join_key_values,
grouped_refs,
entity_name_to_join_key_map,
requested_on_demand_feature_views,
feature_refs,
requested_result_row_names,
online_features_response,
)

def _get_online_features(
self,
features: Union[List[str], FeatureService],
entity_values: Mapping[
str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
],
full_feature_names: bool = False,
native_entity_values: bool = True,
):
(
join_key_values,
grouped_refs,
entity_name_to_join_key_map,
requested_on_demand_feature_views,
feature_refs,
requested_result_row_names,
online_features_response,
) = self._prepare_entities_to_read_from_online_store(
features=features,
entity_values=entity_values,
full_feature_names=full_feature_names,
native_entity_values=native_entity_values,
)

provider = self._get_provider()
for table, requested_features in grouped_refs:
# Get the correct set of entity values with the correct join keys.
Expand Down Expand Up @@ -1724,7 +1806,71 @@ def _get_online_features(
if requested_on_demand_feature_views:
self._augment_response_with_on_demand_transforms(
online_features_response,
_feature_refs,
feature_refs,
requested_on_demand_feature_views,
full_feature_names,
)

self._drop_unneeded_columns(
online_features_response, requested_result_row_names
)
return OnlineResponse(online_features_response)

async def _get_online_features_async(
self,
features: Union[List[str], FeatureService],
entity_values: Mapping[
str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
],
full_feature_names: bool = False,
native_entity_values: bool = True,
):
(
join_key_values,
grouped_refs,
entity_name_to_join_key_map,
requested_on_demand_feature_views,
feature_refs,
requested_result_row_names,
online_features_response,
) = self._prepare_entities_to_read_from_online_store(
features=features,
entity_values=entity_values,
full_feature_names=full_feature_names,
native_entity_values=native_entity_values,
)

provider = self._get_provider()
for table, requested_features in grouped_refs:
# Get the correct set of entity values with the correct join keys.
table_entity_values, idxs = self._get_unique_entities(
table,
join_key_values,
entity_name_to_join_key_map,
)

# Fetch feature data for the minimum set of Entities.
feature_data = await self._read_from_online_store_async(
table_entity_values,
provider,
requested_features,
table,
)

# Populate the result_rows with the Features from the OnlineStore inplace.
self._populate_response_from_feature_data(
feature_data,
idxs,
online_features_response,
full_feature_names,
requested_features,
table,
)

if requested_on_demand_feature_views:
self._augment_response_with_on_demand_transforms(
online_features_response,
feature_refs,
requested_on_demand_feature_views,
full_feature_names,
)
Expand Down Expand Up @@ -1965,38 +2111,24 @@ def _get_unique_entities(
)
return unique_entities, indexes

def _read_from_online_store(
def _get_entity_key_protos(
self,
entity_rows: Iterable[Mapping[str, Value]],
provider: Provider,
requested_features: List[str],
table: FeatureView,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
"""Read and process data from the OnlineStore for a given FeatureView.

This method guarantees that the order of the data in each element of the
List returned is the same as the order of `requested_features`.

This method assumes that `provider.online_read` returns data for each
combination of Entities in `entity_rows` in the same order as they
are provided.
"""
) -> List[EntityKeyProto]:
# Instantiate one EntityKeyProto per Entity.
entity_key_protos = [
EntityKeyProto(join_keys=row.keys(), entity_values=row.values())
for row in entity_rows
]
return entity_key_protos

# Fetch data for Entities.
read_rows = provider.online_read(
config=self.config,
table=table,
entity_keys=entity_key_protos,
requested_features=requested_features,
)

# Each row is a set of features for a given entity key. We only need to convert
# the data to Protobuf once.
def _convert_rows_to_protobuf(
self,
requested_features: List[str],
read_rows: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]],
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
# Each row is a set of features for a given entity key.
# We only need to convert the data to Protobuf once.
null_value = Value()
read_row_protos = []
for read_row in read_rows:
Expand All @@ -2023,6 +2155,53 @@ def _read_from_online_store(
read_row_protos.append((event_timestamps, statuses, values))
return read_row_protos

def _read_from_online_store(
self,
entity_rows: Iterable[Mapping[str, Value]],
provider: Provider,
requested_features: List[str],
table: FeatureView,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
"""Read and process data from the OnlineStore for a given FeatureView.

This method guarantees that the order of the data in each element of the
List returned is the same as the order of `requested_features`.

This method assumes that `provider.online_read` returns data for each
combination of Entities in `entity_rows` in the same order as they
are provided.
"""
entity_key_protos = self._get_entity_key_protos(entity_rows)

# Fetch data for Entities.
read_rows = provider.online_read(
config=self.config,
table=table,
entity_keys=entity_key_protos,
requested_features=requested_features,
)

return self._convert_rows_to_protobuf(requested_features, read_rows)

async def _read_from_online_store_async(
self,
entity_rows: Iterable[Mapping[str, Value]],
provider: Provider,
requested_features: List[str],
table: FeatureView,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
entity_key_protos = self._get_entity_key_protos(entity_rows)

# Fetch data for Entities.
read_rows = await provider.online_read_async(
config=self.config,
table=table,
entity_keys=entity_key_protos,
requested_features=requested_features,
)

return self._convert_rows_to_protobuf(requested_features, read_rows)

def _retrieve_from_online_store(
self,
provider: Provider,
Expand Down
25 changes: 25 additions & 0 deletions sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,31 @@ def online_read(
"""
pass

async def online_read_async(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Reads features values for the given entity keys asynchronously.

Args:
config: The config for the current feature store.
table: The feature view whose feature values should be read.
entity_keys: The list of entity keys for which feature values should be read.
requested_features: The list of features that should be read.

Returns:
A list of the same length as entity_keys. Each item in the list is a tuple where the first
item is the event timestamp for the row, and the second item is a dict mapping feature names
to values, which are returned in proto format.
"""
raise NotImplementedError(
f"Online store {self.__class__.__name__} does not support online read async"
)

@abstractmethod
def update(
self,
Expand Down
Loading
Loading