diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index f23cc62bd8..13927e2856 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -595,6 +595,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List "cassandra", "rockset", "hazelcast", + "ikv", ], case_sensitive=False, ), diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py new file mode 100644 index 0000000000..9d888aad3d --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -0,0 +1,300 @@ +from datetime import datetime +from typing import ( + Any, + Callable, + Dict, + Iterator, + List, + Literal, + Optional, + Sequence, + Tuple, +) + +from ikvpy.client import IKVReader, IKVWriter +from ikvpy.clientoptions import ClientOptions, ClientOptionsBuilder +from ikvpy.document import IKVDocument, IKVDocumentBuilder +from ikvpy.factory import create_new_reader, create_new_writer +from pydantic import StrictStr + +from feast import Entity, FeatureView, utils +from feast.infra.online_stores.helpers import compute_entity_id +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.usage import log_exceptions_and_usage + +PRIMARY_KEY_FIELD_NAME: str = "_entity_key" +EVENT_CREATION_TIMESTAMP_FIELD_NAME: str = "_event_timestamp" +CREATION_TIMESTAMP_FIELD_NAME: str = "_created_timestamp" + + +class IKVOnlineStoreConfig(FeastConfigBaseModel): + """Online store config for IKV store""" + + type: Literal["ikv"] = "ikv" + """Online store type selector""" + + account_id: StrictStr + """(Required) IKV account id""" + + account_passkey: StrictStr + """(Required) IKV account passkey""" + + store_name: StrictStr + """(Required) IKV store name""" + + mount_directory: Optional[StrictStr] = None + """(Required only for reader) IKV mount point i.e. directory for storing IKV data locally.""" + + +class IKVOnlineStore(OnlineStore): + """ + IKV (inlined.io key value) store implementation of the online store interface. + """ + + # lazy initialization + _reader: Optional[IKVReader] = None + _writer: Optional[IKVWriter] = None + + @log_exceptions_and_usage(online_store="ikv") + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Writes a batch of feature rows to the online store. + + If a tz-naive timestamp is passed to this method, it is assumed to be UTC. + + Args: + config: The config for the current feature store. + table: Feature view to which these feature rows correspond. + data: A list of quadruplets containing feature data. Each quadruplet contains an entity + key, a dict containing feature values, an event timestamp for the row, and the created + timestamp for the row if it exists. + progress: Function to be called once a batch of rows is written to the online store, used + to show progress. + """ + # update should have been called before + if self._writer is None: + return + + for entity_key, features, event_timestamp, _ in data: + entity_id: str = compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + document: IKVDocument = IKVOnlineStore._create_document( + entity_id, table, features, event_timestamp + ) + self._writer.upsert_fields(document) + if progress: + progress(1) + + @log_exceptions_and_usage(online_store="ikv") + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Reads features values for the given entity keys. + + Args: + config: The config for the current feature store. + table: The feature view whose feature values should be read. + entity_keys: The list of entity keys for which feature values should be read. + requested_features: The list of features that should be read. + + Returns: + A list of the same length as entity_keys. Each item in the list is a tuple where the first + item is the event timestamp for the row, and the second item is a dict mapping feature names + to values, which are returned in proto format. + """ + if not len(entity_keys): + return [] + + # create IKV primary keys + primary_keys = [ + compute_entity_id(ek, config.entity_key_serialization_version) + for ek in entity_keys + ] + + # create IKV field names + if requested_features is None: + requested_features = [] + + field_names: List[Optional[str]] = [None] * (1 + len(requested_features)) + field_names[0] = EVENT_CREATION_TIMESTAMP_FIELD_NAME + for i, fn in enumerate(requested_features): + field_names[i + 1] = IKVOnlineStore._create_ikv_field_name(table, fn) + + assert self._reader is not None + value_iter = self._reader.multiget_bytes_values( + bytes_primary_keys=[], + str_primary_keys=primary_keys, + field_names=field_names, + ) + + # decode results + return [ + IKVOnlineStore._decode_fields_for_primary_key( + requested_features, value_iter + ) + for _ in range(0, len(primary_keys)) + ] + + @staticmethod + def _decode_fields_for_primary_key( + requested_features: List[str], value_iter: Iterator[Optional[bytes]] + ) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: + # decode timestamp + dt: Optional[datetime] = None + dt_bytes = next(value_iter) + if dt_bytes: + dt = datetime.fromisoformat(str(dt_bytes, "utf-8")) + + # decode other features + features = {} + for requested_feature in requested_features: + value_proto_bytes: Optional[bytes] = next(value_iter) + if value_proto_bytes: + value_proto = ValueProto() + value_proto.ParseFromString(value_proto_bytes) + features[requested_feature] = value_proto + + return dt, features + + # called before any read/write requests are issued + @log_exceptions_and_usage(online_store="ikv") + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + Reconciles cloud resources with the specified set of Feast objects. + + Args: + config: The config for the current feature store. + tables_to_delete: Feature views whose corresponding infrastructure should be deleted. + tables_to_keep: Feature views whose corresponding infrastructure should not be deleted, and + may need to be updated. + entities_to_delete: Entities whose corresponding infrastructure should be deleted. + entities_to_keep: Entities whose corresponding infrastructure should not be deleted, and + may need to be updated. + partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so + infrastructure corresponding to other feature views should be not be touched. + """ + self._init_clients(config=config) + assert self._writer is not None + + # note: we assume tables_to_keep does not overlap with tables_to_delete + + for feature_view in tables_to_delete: + # each field in an IKV document is prefixed by the feature-view's name + self._writer.drop_fields_by_name_prefix([feature_view.name]) + + @log_exceptions_and_usage(online_store="ikv") + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + """ + Tears down all cloud resources for the specified set of Feast objects. + + Args: + config: The config for the current feature store. + tables: Feature views whose corresponding infrastructure should be deleted. + entities: Entities whose corresponding infrastructure should be deleted. + """ + self._init_clients(config=config) + assert self._writer is not None + + # drop fields corresponding to this feature-view + for feature_view in tables: + self._writer.drop_fields_by_name_prefix([feature_view.name]) + + # shutdown clients + self._writer.shutdown() + self._writer = None + + if self._reader is not None: + self._reader.shutdown() + self._reader = None + + @staticmethod + def _create_ikv_field_name(feature_view: FeatureView, feature_name: str) -> str: + return "{}_{}".format(feature_view.name, feature_name) + + @staticmethod + def _create_document( + entity_id: str, + feature_view: FeatureView, + values: Dict[str, ValueProto], + event_timestamp: datetime, + ) -> IKVDocument: + """Converts feast key-value pairs into an IKV document.""" + + # initialie builder by inserting primary key and row creation timestamp + event_timestamp_str: str = utils.make_tzaware(event_timestamp).isoformat() + builder = ( + IKVDocumentBuilder() + .put_string_field(PRIMARY_KEY_FIELD_NAME, entity_id) + .put_bytes_field( + EVENT_CREATION_TIMESTAMP_FIELD_NAME, event_timestamp_str.encode("utf-8") + ) + ) + + for feature_name, feature_value in values.items(): + field_name = IKVOnlineStore._create_ikv_field_name( + feature_view, feature_name + ) + builder.put_bytes_field(field_name, feature_value.SerializeToString()) + + return builder.build() + + def _init_clients(self, config: RepoConfig): + """Initializes (if required) reader/writer ikv clients.""" + online_config = config.online_store + assert isinstance(online_config, IKVOnlineStoreConfig) + client_options = IKVOnlineStore._config_to_client_options(online_config) + + # initialize writer + if self._writer is None: + self._writer = create_new_writer(client_options) + + # initialize reader, iff mount_dir is specified + if self._reader is None: + if online_config.mount_directory and len(online_config.mount_directory) > 0: + self._reader = create_new_reader(client_options) + + @staticmethod + def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions: + """Utility for IKVOnlineStoreConfig to IKV ClientOptions conversion.""" + builder = ( + ClientOptionsBuilder() + .with_account_id(config.account_id) + .with_account_passkey(config.account_passkey) + .with_store_name(config.store_name) + ) + + if config.mount_directory and len(config.mount_directory) > 0: + builder = builder.with_mount_directory(config.mount_directory) + + return builder.build() diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index e8185f4a4a..5e38fd1775 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -63,6 +63,7 @@ "mysql": "feast.infra.online_stores.contrib.mysql_online_store.mysql.MySQLOnlineStore", "rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore", "hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", + "ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 6eb5204161..04e69e04c6 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -100,6 +100,14 @@ "host": os.getenv("ROCKSET_APISERVER", "api.rs2.usw2.rockset.com"), } +IKV_CONFIG = { + "type": "ikv", + "account_id": os.getenv("IKV_ACCOUNT_ID", ""), + "account_passkey": os.getenv("IKV_ACCOUNT_PASSKEY", ""), + "store_name": os.getenv("IKV_STORE_NAME", ""), + "mount_directory": os.getenv("IKV_MOUNT_DIR", ""), +} + OFFLINE_STORE_TO_PROVIDER_CONFIG: Dict[str, Tuple[str, Type[DataSourceCreator]]] = { "file": ("local", FileDataSourceCreator), "bigquery": ("gcp", BigQueryDataSourceCreator), @@ -139,6 +147,11 @@ # containerized version of Rockset. # AVAILABLE_ONLINE_STORES["rockset"] = (ROCKSET_CONFIG, None) + # Uncomment to test using private IKV account. Currently not enabled as + # there is no dedicated IKV instance for CI testing and there is no + # containerized version of IKV. + # AVAILABLE_ONLINE_STORES["ikv"] = (IKV_CONFIG, None) + full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME) if full_repo_configs_module is not None: diff --git a/setup.py b/setup.py index 610e2a66ca..b1a79bdd98 100644 --- a/setup.py +++ b/setup.py @@ -130,6 +130,10 @@ "rockset>=1.0.3", ] +IKV_REQUIRED = [ + "ikvpy>=0.0.23", +] + HAZELCAST_REQUIRED = [ "hazelcast-python-client>=5.1", ] @@ -372,6 +376,7 @@ def run(self): "rockset": ROCKSET_REQUIRED, "ibis": IBIS_REQUIRED, "duckdb": DUCKDB_REQUIRED, + "ikv": IKV_REQUIRED }, include_package_data=True, license="Apache",