diff --git a/Makefile b/Makefile index 30ac86e891..7908567f0a 100644 --- a/Makefile +++ b/Makefile @@ -359,6 +359,25 @@ test-python-universal-singlestore-online: -k "test_retrieve_online_documents" \ sdk/python/tests/integration/online_store/test_universal_online.py +test-python-universal-couchbase-online: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.couchbase_repo_configuration \ + PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.couchbase \ + python -m pytest -n 8 --integration \ + -k "not test_universal_cli and \ + not test_go_feature_server and \ + not test_feature_logging and \ + not test_reorder_columns and \ + not test_logged_features_validation and \ + not test_lambda_materialization_consistency and \ + not test_offline_write and \ + not test_push_features_to_offline_store and \ + not gcs_registry and \ + not s3_registry and \ + not test_universal_types and \ + not test_snowflake" \ + sdk/python/tests + test-python-universal: python -m pytest -n 8 --integration sdk/python/tests diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index b7faf526c2..91ef61dac9 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -107,6 +107,7 @@ * [Remote](reference/online-stores/remote.md) * [PostgreSQL (contrib)](reference/online-stores/postgres.md) * [Cassandra + Astra DB (contrib)](reference/online-stores/cassandra.md) + * [Couchbase (contrib)](reference/online-stores/couchbase.md) * [MySQL (contrib)](reference/online-stores/mysql.md) * [Hazelcast (contrib)](reference/online-stores/hazelcast.md) * [ScyllaDB (contrib)](reference/online-stores/scylladb.md) diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index cdb9c37c1d..5df4710434 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -46,6 +46,10 @@ Please see [Online Store](../../getting-started/components/online-store.md) for [cassandra.md](cassandra.md) {% endcontent-ref %} +{% content-ref url="couchbase.md" %} +[couchbase.md](couchbase.md) +{% endcontent-ref %} + {% content-ref url="mysql.md" %} [mysql.md](mysql.md) {% endcontent-ref %} @@ -60,6 +64,7 @@ Please see [Online Store](../../getting-started/components/online-store.md) for {% content-ref url="remote.md" %} [remote.md](remote.md) +{% endcontent-ref %} {% content-ref url="singlestore.md" %} [singlestore.md](singlestore.md) diff --git a/docs/reference/online-stores/couchbase.md b/docs/reference/online-stores/couchbase.md new file mode 100644 index 0000000000..ff8822d85d --- /dev/null +++ b/docs/reference/online-stores/couchbase.md @@ -0,0 +1,78 @@ +# Couchbase Online Store +> NOTE: +> This is a community-contributed online store that is in alpha development. It is not officially supported by the Feast project. + +## Description +The [Couchbase](https://www.couchbase.com/) online store provides support for materializing feature values into a Couchbase Operational cluster for serving online features in real-time. + +* Only the latest feature values are persisted +* Features are stored in a document-oriented format + +The data model for using Couchbase as an online store follows a document format: +* Document ID: `{project}:{table_name}:{entity_key_hex}:{feature_name}` +* Document Content: + * `metadata`: + * `event_ts` (ISO formatted timestamp) + * `created_ts` (ISO formatted timestamp) + * `feature_name` (String) + * `value` (Base64 encoded protobuf binary) + + +## Getting started +In order to use this online store, you'll need to run `pip install 'feast[couchbase]'`. You can then get started with the command `feast init REPO_NAME -t couchbase`. + +To get started with Couchbase Capella Operational: +1. [Sign up for a Couchbase Capella account](https://docs.couchbase.com/cloud/get-started/create-account.html#sign-up-free-tier) +2. [Deploy an Operational cluster](https://docs.couchbase.com/cloud/get-started/create-account.html#getting-started) +3. [Create a bucket](https://docs.couchbase.com/cloud/clusters/data-service/manage-buckets.html#add-bucket) + - This can be named anything, but must correspond to the bucket described in the `feature_store.yaml` configuration file. +4. [Create cluster access credentials](https://docs.couchbase.com/cloud/clusters/manage-database-users.html#create-database-credentials) + - These credentials should have full access to the bucket created in step 3. +5. [Configure allowed IP addresses](https://docs.couchbase.com/cloud/clusters/allow-ip-address.html) + - You must allow the IP address of the machine running Feast. + +## Example +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: couchbase + connection_string: couchbase://127.0.0.1 # Couchbase connection string, copied from 'Connect' page in Couchbase Capella console + user: Administrator # Couchbase username from access credentials + password: password # Couchbase password from access credentials + bucket_name: feast # Couchbase bucket name, defaults to feast + kv_port: 11210 # Couchbase key-value port, defaults to 11210. Required if custom ports are used. +entity_key_serialization_version: 2 +``` +{% endcode %} + +The full set of configuration options is available in `CouchbaseOnlineStoreConfig`. + + +## Functionality Matrix +The set of functionality supported by online stores is described in detail [here](overview.md#functionality). +Below is a matrix indicating which functionality is supported by the Couchbase online store. + +| | Couchbase | +| :-------------------------------------------------------- | :-------- | +| write feature values to the online store | yes | +| read feature values from the online store | yes | +| update infrastructure (e.g. tables) in the online store | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | +| generate a plan of infrastructure changes | no | +| support for on-demand transforms | yes | +| readable by Python SDK | yes | +| readable by Java | no | +| readable by Go | no | +| support for entityless feature views | yes | +| support for concurrent writing to the same key | yes | +| support for ttl (time to live) at retrieval | no | +| support for deleting expired data | no | +| collocated by feature view | yes | +| collocated by feature service | no | +| collocated by entity key | no | + +To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix). + diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.couchbase_online_store.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.couchbase_online_store.rst new file mode 100644 index 0000000000..63ae72ffbb --- /dev/null +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.couchbase_online_store.rst @@ -0,0 +1,21 @@ +feast.infra.online\_stores.contrib.couchbase\_online\_store package +=================================================================== + +Submodules +---------- + +feast.infra.online\_stores.contrib.couchbase\_online\_store.couchbase module +---------------------------------------------------------------------------- + +.. automodule:: feast.infra.online_stores.contrib.couchbase_online_store.couchbase + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.online_stores.contrib.couchbase_online_store + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst index 2403b5b8d4..ac47a34b44 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst @@ -8,6 +8,7 @@ Subpackages :maxdepth: 4 feast.infra.online_stores.contrib.cassandra_online_store + feast.infra.online_stores.contrib.couchbase_online_store feast.infra.online_stores.contrib.hazelcast_online_store feast.infra.online_stores.contrib.hbase_online_store feast.infra.online_stores.contrib.ikv_online_store @@ -24,6 +25,14 @@ feast.infra.online\_stores.contrib.cassandra\_repo\_configuration module :undoc-members: :show-inheritance: +feast.infra.online\_stores.contrib.couchbase\_repo\_configuration module +------------------------------------------------------------------------ + +.. automodule:: feast.infra.online_stores.contrib.couchbase_repo_configuration + :members: + :undoc-members: + :show-inheritance: + feast.infra.online\_stores.contrib.elasticsearch module ------------------------------------------------------- diff --git a/sdk/python/docs/source/feast.infra.online_stores.rst b/sdk/python/docs/source/feast.infra.online_stores.rst index 801d187a7c..608e9fef6a 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.rst @@ -84,6 +84,14 @@ feast.infra.online\_stores.sqlite module :undoc-members: :show-inheritance: +feast.infra.online\_stores.vector\_store module +----------------------------------------------- + +.. automodule:: feast.infra.online_stores.vector_store + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/sdk/python/docs/source/feast.permissions.client.rst b/sdk/python/docs/source/feast.permissions.client.rst index f346801210..84e58bdc2d 100644 --- a/sdk/python/docs/source/feast.permissions.client.rst +++ b/sdk/python/docs/source/feast.permissions.client.rst @@ -20,10 +20,10 @@ feast.permissions.client.auth\_client\_manager module :undoc-members: :show-inheritance: -feast.permissions.client.auth\_client\_manager\_factory module --------------------------------------------------------------- +feast.permissions.client.client\_auth\_token module +--------------------------------------------------- -.. automodule:: feast.permissions.client.auth_client_manager_factory +.. automodule:: feast.permissions.client.client_auth_token :members: :undoc-members: :show-inheritance: @@ -44,6 +44,14 @@ feast.permissions.client.http\_auth\_requests\_wrapper module :undoc-members: :show-inheritance: +feast.permissions.client.intra\_comm\_authentication\_client\_manager module +---------------------------------------------------------------------------- + +.. automodule:: feast.permissions.client.intra_comm_authentication_client_manager + :members: + :undoc-members: + :show-inheritance: + feast.permissions.client.kubernetes\_auth\_client\_manager module ----------------------------------------------------------------- diff --git a/sdk/python/docs/source/feast.protos.feast.core.rst b/sdk/python/docs/source/feast.protos.feast.core.rst index 9d079953c1..78398e54dc 100644 --- a/sdk/python/docs/source/feast.protos.feast.core.rst +++ b/sdk/python/docs/source/feast.protos.feast.core.rst @@ -244,6 +244,22 @@ feast.protos.feast.core.Policy\_pb2\_grpc module :undoc-members: :show-inheritance: +feast.protos.feast.core.Project\_pb2 module +------------------------------------------- + +.. automodule:: feast.protos.feast.core.Project_pb2 + :members: + :undoc-members: + :show-inheritance: + +feast.protos.feast.core.Project\_pb2\_grpc module +------------------------------------------------- + +.. automodule:: feast.protos.feast.core.Project_pb2_grpc + :members: + :undoc-members: + :show-inheritance: + feast.protos.feast.core.Registry\_pb2 module -------------------------------------------- diff --git a/sdk/python/docs/source/feast.rst b/sdk/python/docs/source/feast.rst index b8c04ebde6..ea34c3d8dd 100644 --- a/sdk/python/docs/source/feast.rst +++ b/sdk/python/docs/source/feast.rst @@ -28,6 +28,14 @@ feast.aggregation module :undoc-members: :show-inheritance: +feast.arrow\_error\_handler module +---------------------------------- + +.. automodule:: feast.arrow_error_handler + :members: + :undoc-members: + :show-inheritance: + feast.base\_feature\_view module -------------------------------- @@ -196,6 +204,14 @@ feast.flags\_helper module :undoc-members: :show-inheritance: +feast.grpc\_error\_interceptor module +------------------------------------- + +.. automodule:: feast.grpc_error_interceptor + :members: + :undoc-members: + :show-inheritance: + feast.importer module --------------------- @@ -244,6 +260,14 @@ feast.online\_response module :undoc-members: :show-inheritance: +feast.project module +-------------------- + +.. automodule:: feast.project + :members: + :undoc-members: + :show-inheritance: + feast.project\_metadata module ------------------------------ @@ -292,6 +316,14 @@ feast.repo\_operations module :undoc-members: :show-inheritance: +feast.rest\_error\_handler module +--------------------------------- + +.. automodule:: feast.rest_error_handler + :members: + :undoc-members: + :show-inheritance: + feast.saved\_dataset module --------------------------- diff --git a/sdk/python/feast/infra/online_stores/contrib/couchbase_online_store/README.md b/sdk/python/feast/infra/online_stores/contrib/couchbase_online_store/README.md new file mode 100644 index 0000000000..df1b7a1382 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/couchbase_online_store/README.md @@ -0,0 +1,98 @@ +# Couchbase Online Store +> NOTE: +> This is a community-contributed online store that is in alpha development. It is not officially supported by the Feast project. + +This contribution makes it possible to use [Couchbase Capella Operational](https://docs.couchbase.com/cloud/get-started/intro.html) as an online store for Feast. + + +### Get Started with Couchbase Capella Operational +You'll need a Couchbase Capella Operational cluster to use this online store. Follow the steps below to get started: +1. [Create a Couchbase Capella account](https://docs.couchbase.com/cloud/get-started/create-account.html#sign-up-free-tier) +2. [Deploy an Operational cluster](https://docs.couchbase.com/cloud/get-started/create-account.html#getting-started) +3. [Create a bucket](https://docs.couchbase.com/cloud/clusters/data-service/manage-buckets.html#add-bucket) + - This can be named anything, but must correspond to the bucket described in the `feature_store.yaml` configuration file. + - The default bucket name is `feast`. +4. [Create cluster access credentials](https://docs.couchbase.com/cloud/clusters/manage-database-users.html#create-database-credentials) + - These credentials should have full access to the bucket created in step 3. +5. [Configure allowed IP addresses](https://docs.couchbase.com/cloud/clusters/allow-ip-address.html) + - You must allow the IP address of the machine running Feast. + +### Use Couchbase Online Store with Feast + +#### Create a feature repository + +```shell +feast init feature_repo +cd feature_repo +``` + +#### Edit `feature_store.yaml` + +Set the `online_store` type to `couchbase`, and fill in the required fields as shown below. + +```yaml +project: feature_repo +registry: data/registry.db +provider: local +online_store: + type: couchbase + connection_string: couchbase://127.0.0.1 # Couchbase connection string, copied from 'Connect' page in Couchbase Capella console + user: Administrator # Couchbase username from access credentials + password: password # Couchbase password from access credentials + bucket_name: feast # Couchbase bucket name, defaults to feast + kv_port: 11210 # Couchbase key-value port, defaults to 11210. Required if custom ports are used. +entity_key_serialization_version: 2 +``` + +#### Apply the feature definitions in [`example.py`](https://github.com/feast-dev/feast/blob/master/go/internal/test/feature_repo/example.py) + +```shell +feast -c feature_repo apply +``` +##### Output +``` +Registered entity driver_id +Registered feature view driver_hourly_stats_view +Deploying infrastructure for driver_hourly_stats_view +``` + +### Materialize Latest Data to Couchbase Online Feature Store +```shell +$ CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") +$ feast -c feature_repo materialize-incremental $CURRENT_TIME +``` +#### Output +``` +Materializing 1 feature views from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30 into the Couchbase online store. + +driver_hourly_stats_view from 2022-04-16 15:30:39+05:30 to 2022-04-19 15:31:04+05:30: +100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 120.59it/s] +``` + +### Fetch the latest features for some entity id +```python +from pprint import pprint +from feast import FeatureStore + +store = FeatureStore(repo_path=".") +feature_vector = store.get_online_features( + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + entity_rows=[ + {"driver_id": 1004}, + {"driver_id": 1005}, + ], +).to_dict() +pprint(feature_vector) + +``` +#### Output +```python +{'acc_rate': [0.01390857808291912, 0.4063614010810852], + 'avg_daily_trips': [69, 706], + 'conv_rate': [0.6624961495399475, 0.7595928311347961], + 'driver_id': [1004, 1005]} +``` diff --git a/sdk/python/feast/infra/online_stores/contrib/couchbase_online_store/__init__.py b/sdk/python/feast/infra/online_stores/contrib/couchbase_online_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/online_stores/contrib/couchbase_online_store/couchbase.py b/sdk/python/feast/infra/online_stores/contrib/couchbase_online_store/couchbase.py new file mode 100644 index 0000000000..91ce56a5ca --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/couchbase_online_store/couchbase.py @@ -0,0 +1,315 @@ +import base64 +import logging +import warnings +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple + +import pytz +from couchbase.auth import PasswordAuthenticator +from couchbase.cluster import Cluster +from couchbase.exceptions import ( + CollectionAlreadyExistsException, + DocumentNotFoundException, + ScopeAlreadyExistsException, +) +from couchbase.options import ClusterOptions +from pydantic import StrictStr + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel + +logger = logging.getLogger(__name__) +warnings.simplefilter("once", RuntimeWarning) + + +class CouchbaseOnlineStoreConfig(FeastConfigBaseModel): + """ + Configuration for the Couchbase online store. + """ + + type: Literal["couchbase"] = "couchbase" + + connection_string: Optional[StrictStr] = None + user: Optional[StrictStr] = None + password: Optional[StrictStr] = None + bucket_name: Optional[StrictStr] = None + kv_port: Optional[int] = None + + +class CouchbaseOnlineStore(OnlineStore): + """ + An online store implementation that uses Couchbase. + """ + + _cluster = None + + def _get_conn(self, config: RepoConfig, scope_name: str, collection_name: str): + """ + Obtain a connection to the Couchbase cluster and get the specific scope and collection. + """ + online_store_config = config.online_store + assert isinstance(online_store_config, CouchbaseOnlineStoreConfig) + + if not self._cluster: + self._cluster = Cluster( + f"{online_store_config.connection_string or 'couchbase://127.0.0.1'}:{online_store_config.kv_port or '11210'}", + ClusterOptions( + PasswordAuthenticator( + online_store_config.user or "Administrator", + online_store_config.password or "password", + ), + network="external", + ), + ) + + self.bucket = self._cluster.bucket( + online_store_config.bucket_name or "feast" + ) + + # Get the specific scope and collection + scope = self.bucket.scope(scope_name) + self.collection = scope.collection(collection_name) + + return self.collection + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Write a batch of feature data to the online Couchbase store. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + data: a list of quadruplets containing Feature data. Each + quadruplet contains an Entity Key, a dict containing feature + values, an event timestamp for the row, and + the created timestamp for the row if it exists. + progress: Optional function to be called once every mini-batch of + rows is written to the online store. Can be used to + display progress. + """ + warnings.warn( + "This online store is an experimental feature in alpha development. " + "Some functionality may still be unstable so functionality can change in the future.", + RuntimeWarning, + ) + project = config.project + scope_name = f"{project}_{table.name}_scope" + collection_name = f"{project}_{table.name}_collection" + collection = self._get_conn(config, scope_name, collection_name) + + for entity_key, values, timestamp, created_ts in data: + entity_key_str = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + timestamp = _to_naive_utc(timestamp).isoformat() # Convert to ISO format + if created_ts is not None: + created_ts = _to_naive_utc( + created_ts + ).isoformat() # Convert to ISO format + + for feature_name, val in values.items(): + document_id = _document_id(project, table, entity_key_str, feature_name) + + # Serialize the Protobuf to binary and then encode it in base64 + binary_value = val.SerializeToString() + base64_value = base64.b64encode(binary_value).decode("utf-8") + + # Store metadata and base64-encoded Protobuf binary in JSON-compatible format + document_content = { + "metadata": { + "event_ts": timestamp, + "created_ts": created_ts, + "feature_name": feature_name, + }, + "value": base64_value, # Store binary as base64 encoded string + } + + try: + collection.upsert( + document_id, document_content + ) # Upsert the document + except Exception as e: + logger.exception(f"Error upserting document {document_id}: {e}") + + if progress: + progress(1) + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Read feature values pertaining to the requested entities from + the online store. + + Args: + config: The RepoConfig for the current FeatureStore. + table: Feast FeatureView. + entity_keys: a list of entity keys that should be read + from the FeatureStore. + requested_features: Optional list of feature names to read. + """ + warnings.warn( + "This online store is an experimental feature in alpha development. " + "Some functionality may still be unstable so functionality can change in the future.", + RuntimeWarning, + ) + project = config.project + + scope_name = f"{project}_{table.name}_scope" + collection_name = f"{project}_{table.name}_collection" + + collection = self._get_conn(config, scope_name, collection_name) + + result: List[Tuple[Optional[datetime], Optional[Dict[str, Any]]]] = [] + for entity_key in entity_keys: + entity_key_str = serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ).hex() + try: + features = {} + for feature_name in requested_features or []: + document_id = _document_id( + project, table, entity_key_str, feature_name + ) + + # Fetch metadata and value (base64-encoded binary) + doc = collection.get(document_id) + content = doc.content_as[dict] # Get the document content as a dict + event_ts_str = content["metadata"]["event_ts"] + + # Convert event_ts from string (ISO format) to datetime object + event_ts = datetime.fromisoformat(event_ts_str) + + base64_value = content["value"] + + # Decode base64 back to Protobuf binary and then to ValueProto + binary_data = base64.b64decode(base64_value) + value = ValueProto() + value.ParseFromString(binary_data) # Parse protobuf data + + # Add the decoded value to the features dictionary + features[feature_name] = value + + result.append((event_ts, features)) + except DocumentNotFoundException: + result.append((None, None)) + + return result + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + Update schema on DB, by creating and destroying tables accordingly. + + Args: + config: The RepoConfig for the current FeatureStore. + tables_to_delete: Tables to delete from the Online Store. + tables_to_keep: Tables to keep in the Online Store. + entities_to_delete: Entities to delete from the Online Store. + entities_to_keep: Entities to keep in the Online Store. + partial: Whether to partially update the schema. + """ + warnings.warn( + "This online store is an experimental feature in alpha development. " + "Some functionality may still be unstable so functionality can change in the future.", + RuntimeWarning, + ) + project = config.project + + for table in tables_to_keep: + scope_name = f"{project}_{table.name}_scope" + collection_name = f"{project}_{table.name}_collection" + self._get_conn(config, scope_name, collection_name) + cm = self.bucket.collections() + + # Check and create scope + try: + cm.create_scope(scope_name) + logger.info(f"Created scope: {scope_name}") + except ScopeAlreadyExistsException: + logger.error(f"Scope {scope_name} already exists") + except Exception as e: + logger.error(f"Error creating scope {scope_name}: {e}") + + # Check and create collection + try: + cm.create_collection(scope_name, collection_name) + logger.info( + f"Created collection: {collection_name} in scope: {scope_name}" + ) + except CollectionAlreadyExistsException: + logger.error( + f"Collection {collection_name} already exists in {scope_name}" + ) + except Exception as e: + logger.error(f"Error creating collection {collection_name}: {e}") + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + """ + Delete tables from the database. + + Args: + config: The RepoConfig for the current FeatureStore. + tables: Tables to delete from the feature repo. + entities: Entities to delete from the feature repo. + """ + warnings.warn( + "This online store is an experimental feature in alpha development. " + "Some functionality may still be unstable so functionality can change in the future.", + RuntimeWarning, + ) + project = config.project + + for table in tables: + scope_name = f"{project}_{table.name}_scope" + collection_name = f"{project}_{table.name}_collection" + self._get_conn(config, scope_name, collection_name) + cm = self.bucket.collections() + try: + # dropping the scope will also drop the nested collection(s) + cm.drop_scope(scope_name) + except Exception as e: + logger.error(f"Error removing collection or scope: {e}") + + +def _document_id( + project: str, table: FeatureView, entity_key_str: str, feature_name: str +) -> str: + return f"{project}:{table.name}:{entity_key_str}:{feature_name}" + + +def _to_naive_utc(ts: datetime): + if ts.tzinfo is None: + return ts + else: + return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/online_stores/contrib/couchbase_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/couchbase_repo_configuration.py new file mode 100644 index 0000000000..e099e6ae1b --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/couchbase_repo_configuration.py @@ -0,0 +1,10 @@ +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.couchbase import ( + CouchbaseOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig(online_store_creator=CouchbaseOnlineStoreCreator), +] diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index b2b9374aa9..604915e7fa 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -68,6 +68,7 @@ "remote": "feast.infra.online_stores.remote.RemoteOnlineStore", "singlestore": "feast.infra.online_stores.contrib.singlestore_online_store.singlestore.SingleStoreOnlineStore", "qdrant": "feast.infra.online_stores.contrib.qdrant.QdrantOnlineStore", + "couchbase": "feast.infra.online_stores.contrib.couchbase_online_store.couchbase.CouchbaseOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 8c940ba84e..9ac4937582 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -126,6 +126,8 @@ comm==0.2.2 # via # ipykernel # ipywidgets +couchbase==4.3.2 + # via feast (setup.py) coverage[toml]==7.6.4 # via pytest-cov cryptography==42.0.8 @@ -189,7 +191,7 @@ executing==2.1.0 # via stack-data faiss-cpu==1.9.0 # via feast (setup.py) -fastapi==0.115.3 +fastapi==0.115.4 # via feast (setup.py) fastjsonschema==2.20.0 # via nbformat @@ -209,7 +211,7 @@ fsspec==2024.9.0 # dask geomet==0.2.1.post1 # via cassandra-driver -google-api-core[grpc]==2.21.0 +google-api-core[grpc]==2.22.0 # via # feast (setup.py) # google-cloud-bigquery @@ -258,7 +260,7 @@ googleapis-common-protos[grpc]==1.65.0 # google-api-core # grpc-google-iam-v1 # grpcio-status -great-expectations==0.18.21 +great-expectations==0.18.22 # via feast (setup.py) grpc-google-iam-v1==0.13.1 # via google-cloud-bigtable @@ -342,7 +344,7 @@ iniconfig==2.0.0 # via pytest ipykernel==6.29.5 # via jupyterlab -ipython==8.28.0 +ipython==8.29.0 # via # great-expectations # ipykernel @@ -561,7 +563,7 @@ pbr==6.1.0 # via mock pexpect==4.9.0 # via ipython -pip==24.2 +pip==24.3.1 # via pip-tools pip-tools==7.4.1 # via feast (setup.py) @@ -886,7 +888,7 @@ sqlparams==6.1.0 # via singlestoredb stack-data==0.6.3 # via ipython -starlette==0.41.0 +starlette==0.41.2 # via fastapi substrait==0.23.0 # via ibis-substrait @@ -932,7 +934,7 @@ tornado==6.4.1 # jupyterlab # notebook # terminado -tqdm==4.66.5 +tqdm==4.66.6 # via # feast (setup.py) # great-expectations @@ -953,7 +955,7 @@ traitlets==5.14.3 # nbformat trino==0.330.0 # via feast (setup.py) -typeguard==4.3.0 +typeguard==4.4.0 # via feast (setup.py) types-cffi==1.16.0.20240331 # via types-pyopenssl @@ -977,7 +979,7 @@ types-redis==4.6.0.20241004 # via feast (setup.py) types-requests==2.30.0.0 # via feast (setup.py) -types-setuptools==75.2.0.20241019 +types-setuptools==75.2.0.20241025 # via # feast (setup.py) # types-cffi @@ -1058,7 +1060,7 @@ websocket-client==1.8.0 # kubernetes websockets==13.1 # via uvicorn -werkzeug==3.0.5 +werkzeug==3.0.6 # via moto wheel==0.44.0 # via diff --git a/sdk/python/requirements/py3.10-requirements.txt b/sdk/python/requirements/py3.10-requirements.txt index 94c5d3945a..dd2ed6951c 100644 --- a/sdk/python/requirements/py3.10-requirements.txt +++ b/sdk/python/requirements/py3.10-requirements.txt @@ -35,7 +35,7 @@ dill==0.3.9 # via feast (setup.py) exceptiongroup==1.2.2 # via anyio -fastapi==0.115.3 +fastapi==0.115.4 # via feast (setup.py) fsspec==2024.10.0 # via dask @@ -74,7 +74,6 @@ numpy==1.26.4 # feast (setup.py) # dask # pandas - # pyarrow packaging==24.1 # via # dask @@ -92,7 +91,7 @@ protobuf==4.25.5 # via feast (setup.py) psutil==6.1.0 # via feast (setup.py) -pyarrow==17.0.0 +pyarrow==18.0.0 # via # feast (setup.py) # dask-expr @@ -133,7 +132,7 @@ sniffio==1.3.1 # via anyio sqlalchemy[mypy]==2.0.36 # via feast (setup.py) -starlette==0.41.0 +starlette==0.41.2 # via fastapi tabulate==0.9.0 # via feast (setup.py) @@ -147,9 +146,9 @@ toolz==1.0.0 # via # dask # partd -tqdm==4.66.5 +tqdm==4.66.6 # via feast (setup.py) -typeguard==4.3.0 +typeguard==4.4.0 # via feast (setup.py) typing-extensions==4.12.2 # via diff --git a/sdk/python/requirements/py3.11-ci-requirements.txt b/sdk/python/requirements/py3.11-ci-requirements.txt index 4d5d8a7188..feaafa36e3 100644 --- a/sdk/python/requirements/py3.11-ci-requirements.txt +++ b/sdk/python/requirements/py3.11-ci-requirements.txt @@ -124,6 +124,8 @@ comm==0.2.2 # via # ipykernel # ipywidgets +couchbase==4.3.2 + # via feast (setup.py) coverage[toml]==7.6.4 # via pytest-cov cryptography==42.0.8 @@ -182,7 +184,7 @@ executing==2.1.0 # via stack-data faiss-cpu==1.9.0 # via feast (setup.py) -fastapi==0.115.3 +fastapi==0.115.4 # via feast (setup.py) fastjsonschema==2.20.0 # via nbformat @@ -202,7 +204,7 @@ fsspec==2024.9.0 # dask geomet==0.2.1.post1 # via cassandra-driver -google-api-core[grpc]==2.21.0 +google-api-core[grpc]==2.22.0 # via # feast (setup.py) # google-cloud-bigquery @@ -251,7 +253,7 @@ googleapis-common-protos[grpc]==1.65.0 # google-api-core # grpc-google-iam-v1 # grpcio-status -great-expectations==0.18.21 +great-expectations==0.18.22 # via feast (setup.py) grpc-google-iam-v1==0.13.1 # via google-cloud-bigtable @@ -333,7 +335,7 @@ iniconfig==2.0.0 # via pytest ipykernel==6.29.5 # via jupyterlab -ipython==8.28.0 +ipython==8.29.0 # via # great-expectations # ipykernel @@ -552,7 +554,7 @@ pbr==6.1.0 # via mock pexpect==4.9.0 # via ipython -pip==24.2 +pip==24.3.1 # via pip-tools pip-tools==7.4.1 # via feast (setup.py) @@ -877,7 +879,7 @@ sqlparams==6.1.0 # via singlestoredb stack-data==0.6.3 # via ipython -starlette==0.41.0 +starlette==0.41.2 # via fastapi substrait==0.23.0 # via ibis-substrait @@ -913,7 +915,7 @@ tornado==6.4.1 # jupyterlab # notebook # terminado -tqdm==4.66.5 +tqdm==4.66.6 # via # feast (setup.py) # great-expectations @@ -934,7 +936,7 @@ traitlets==5.14.3 # nbformat trino==0.330.0 # via feast (setup.py) -typeguard==4.3.0 +typeguard==4.4.0 # via feast (setup.py) types-cffi==1.16.0.20240331 # via types-pyopenssl @@ -958,7 +960,7 @@ types-redis==4.6.0.20241004 # via feast (setup.py) types-requests==2.30.0.0 # via feast (setup.py) -types-setuptools==75.2.0.20241019 +types-setuptools==75.2.0.20241025 # via # feast (setup.py) # types-cffi @@ -1034,7 +1036,7 @@ websocket-client==1.8.0 # kubernetes websockets==13.1 # via uvicorn -werkzeug==3.0.5 +werkzeug==3.0.6 # via moto wheel==0.44.0 # via diff --git a/sdk/python/requirements/py3.11-requirements.txt b/sdk/python/requirements/py3.11-requirements.txt index e2a8589e77..c9833ca07b 100644 --- a/sdk/python/requirements/py3.11-requirements.txt +++ b/sdk/python/requirements/py3.11-requirements.txt @@ -33,7 +33,7 @@ dask-expr==1.1.16 # via dask dill==0.3.9 # via feast (setup.py) -fastapi==0.115.3 +fastapi==0.115.4 # via feast (setup.py) fsspec==2024.10.0 # via dask @@ -72,7 +72,6 @@ numpy==1.26.4 # feast (setup.py) # dask # pandas - # pyarrow packaging==24.1 # via # dask @@ -90,7 +89,7 @@ protobuf==4.25.5 # via feast (setup.py) psutil==6.1.0 # via feast (setup.py) -pyarrow==17.0.0 +pyarrow==18.0.0 # via # feast (setup.py) # dask-expr @@ -131,7 +130,7 @@ sniffio==1.3.1 # via anyio sqlalchemy[mypy]==2.0.36 # via feast (setup.py) -starlette==0.41.0 +starlette==0.41.2 # via fastapi tabulate==0.9.0 # via feast (setup.py) @@ -143,9 +142,9 @@ toolz==1.0.0 # via # dask # partd -tqdm==4.66.5 +tqdm==4.66.6 # via feast (setup.py) -typeguard==4.3.0 +typeguard==4.4.0 # via feast (setup.py) typing-extensions==4.12.2 # via diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 2ba384e205..30eab84822 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -128,6 +128,8 @@ comm==0.2.2 # via # ipykernel # ipywidgets +couchbase==4.3.2 + # via feast (setup.py) coverage[toml]==7.6.4 # via pytest-cov cryptography==42.0.8 @@ -191,7 +193,7 @@ executing==2.1.0 # via stack-data faiss-cpu==1.9.0 # via feast (setup.py) -fastapi==0.115.3 +fastapi==0.115.4 # via feast (setup.py) fastjsonschema==2.20.0 # via nbformat @@ -211,7 +213,7 @@ fsspec==2024.9.0 # dask geomet==0.2.1.post1 # via cassandra-driver -google-api-core[grpc]==2.21.0 +google-api-core[grpc]==2.22.0 # via # feast (setup.py) # google-cloud-bigquery @@ -260,7 +262,7 @@ googleapis-common-protos[grpc]==1.65.0 # google-api-core # grpc-google-iam-v1 # grpcio-status -great-expectations==0.18.21 +great-expectations==0.18.22 # via feast (setup.py) grpc-google-iam-v1==0.13.1 # via google-cloud-bigtable @@ -569,7 +571,7 @@ pbr==6.1.0 # via mock pexpect==4.9.0 # via ipython -pip==24.2 +pip==24.3.1 # via pip-tools pip-tools==7.4.1 # via feast (setup.py) @@ -894,7 +896,7 @@ sqlparams==6.1.0 # via singlestoredb stack-data==0.6.3 # via ipython -starlette==0.41.0 +starlette==0.41.2 # via fastapi substrait==0.23.0 # via ibis-substrait @@ -940,7 +942,7 @@ tornado==6.4.1 # jupyterlab # notebook # terminado -tqdm==4.66.5 +tqdm==4.66.6 # via # feast (setup.py) # great-expectations @@ -961,7 +963,7 @@ traitlets==5.14.3 # nbformat trino==0.330.0 # via feast (setup.py) -typeguard==4.3.0 +typeguard==4.4.0 # via feast (setup.py) types-cffi==1.16.0.20240331 # via types-pyopenssl @@ -985,7 +987,7 @@ types-redis==4.6.0.20241004 # via feast (setup.py) types-requests==2.30.0.0 # via feast (setup.py) -types-setuptools==75.2.0.20241019 +types-setuptools==75.2.0.20241025 # via # feast (setup.py) # types-cffi @@ -1069,7 +1071,7 @@ websocket-client==1.8.0 # kubernetes websockets==13.1 # via uvicorn -werkzeug==3.0.5 +werkzeug==3.0.6 # via moto wheel==0.44.0 # via diff --git a/sdk/python/requirements/py3.9-requirements.txt b/sdk/python/requirements/py3.9-requirements.txt index 7f8eecd6f8..ec46a195c1 100644 --- a/sdk/python/requirements/py3.9-requirements.txt +++ b/sdk/python/requirements/py3.9-requirements.txt @@ -35,7 +35,7 @@ dill==0.3.9 # via feast (setup.py) exceptiongroup==1.2.2 # via anyio -fastapi==0.115.3 +fastapi==0.115.4 # via feast (setup.py) fsspec==2024.10.0 # via dask @@ -76,7 +76,6 @@ numpy==1.26.4 # feast (setup.py) # dask # pandas - # pyarrow packaging==24.1 # via # dask @@ -94,7 +93,7 @@ protobuf==4.25.5 # via feast (setup.py) psutil==6.1.0 # via feast (setup.py) -pyarrow==17.0.0 +pyarrow==18.0.0 # via # feast (setup.py) # dask-expr @@ -135,7 +134,7 @@ sniffio==1.3.1 # via anyio sqlalchemy[mypy]==2.0.36 # via feast (setup.py) -starlette==0.41.0 +starlette==0.41.2 # via fastapi tabulate==0.9.0 # via feast (setup.py) @@ -149,9 +148,9 @@ toolz==1.0.0 # via # dask # partd -tqdm==4.66.5 +tqdm==4.66.6 # via feast (setup.py) -typeguard==4.3.0 +typeguard==4.4.0 # via feast (setup.py) typing-extensions==4.12.2 # via diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/couchbase.py b/sdk/python/tests/integration/feature_repos/universal/online_store/couchbase.py new file mode 100644 index 0000000000..f2ba12da8d --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/couchbase.py @@ -0,0 +1,149 @@ +import time +from typing import Dict + +import requests +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs + +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class CouchbaseOnlineStoreCreator(OnlineStoreCreator): + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + # Using the latest Couchbase Enterprise version + self.container = DockerContainer( + "couchbase/server:enterprise-7.6.3" + ).with_exposed_ports( + "8091", # REST/HTTP interface - mgmt + "8092", # Views - C api + "8093", # Query - n1ql + "8094", # Search + "8095", # Analytics + "8096", # Eventing + "11210", # Key-Value + ) + self.username = "Administrator" + self.password = "password" + self.bucket_name = f"feast_{project_name}" + + def create_online_store(self) -> Dict[str, object]: + self.container.start() + + # Wait for Couchbase server to be ready + log_string_to_wait_for = "Starting Couchbase Server" + wait_for_logs( + container=self.container, predicate=log_string_to_wait_for, timeout=120 + ) + + # Get the exposed ports + rest_port = self.container.get_exposed_port("8091") + views_port = self.container.get_exposed_port("8092") + query_port = self.container.get_exposed_port("8093") + kv_port = self.container.get_exposed_port("11210") + base_url = f"http://127.0.0.1:{rest_port}" + + port_map = { + "rest": rest_port, + "views": views_port, + "query": query_port, + "kv": kv_port, + } + + # Wait for the server to be fully available + self._wait_for_server_ready(base_url) + + # Initialize the cluster + self._initialize_cluster(base_url, port_map) + + # Create bucket + self._create_bucket(base_url) + + # Wait for the credentials to be valid + time.sleep(5) + + # Return the configuration for Feast + return { + "type": "couchbase", + "connection_string": "couchbase://127.0.0.1", + "user": self.username, + "password": self.password, + "bucket_name": self.bucket_name, + "kv_port": int(kv_port), + } + + def _wait_for_server_ready(self, base_url: str, timeout: int = 120): + start_time = time.time() + while True: + try: + response = requests.get(f"{base_url}/pools") + if response.status_code == 200: + break + except requests.RequestException: + pass + + if time.time() - start_time > timeout: + raise TimeoutError( + f"Couchbase server failed to start after {timeout} seconds" + ) + + time.sleep(1) + + def _initialize_cluster(self, base_url: str, ports: Dict[str, int]): + # Initialize services + services_data = {"services": "kv,n1ql,index"} + requests.post(f"{base_url}/node/controller/setupServices", data=services_data) + + # Initialize memory quotas + quota_data = {"memoryQuota": "256", "indexMemoryQuota": "256"} + requests.post(f"{base_url}/pools/default", data=quota_data) + + # Set administrator credentials + credentials_data = { + "username": self.username, + "password": self.password, + "port": "SAME", + } + requests.post(f"{base_url}/settings/web", data=credentials_data) + + # Initialize index storage mode + index_data = {"storageMode": "memory_optimized"} + requests.post( + f"{base_url}/settings/indexes", + data=index_data, + auth=(self.username, self.password), + ) + + # Set up alternate addresses + payload = { + "hostname": "127.0.0.1", + "kv": ports["kv"], # KV service port + "n1ql": ports["query"], # Query service port + "capi": ports["views"], # Views service port + "mgmt": ports["rest"], # REST/HTTP interface port + } + + requests.put( + f"{base_url}/node/controller/setupAlternateAddresses/external", + data=payload, + auth=(self.username, self.password), + ) + + def _create_bucket(self, base_url: str): + bucket_data = { + "name": self.bucket_name, + "bucketType": "couchbase", + "ramQuotaMB": "128", + "durabilityMinLevel": "none", + } + + requests.post( + f"{base_url}/pools/default/buckets", + data=bucket_data, + auth=(self.username, self.password), + ) + + def teardown(self): + self.container.stop() diff --git a/setup.py b/setup.py index b335d39c2b..5a6581cc85 100644 --- a/setup.py +++ b/setup.py @@ -142,6 +142,8 @@ SINGLESTORE_REQUIRED = ["singlestoredb"] +COUCHBASE_REQUIRED = ["couchbase==4.3.2"] + MSSQL_REQUIRED = ["ibis-framework[mssql]>=9.0.0,<10"] FAISS_REQUIRED = ["faiss-cpu>=1.7.0,<2"] @@ -214,6 +216,7 @@ + ELASTICSEARCH_REQUIRED + SQLITE_VEC_REQUIRED + SINGLESTORE_REQUIRED + + COUCHBASE_REQUIRED + OPENTELEMETRY + FAISS_REQUIRED + QDRANT_REQUIRED @@ -285,6 +288,7 @@ "elasticsearch": ELASTICSEARCH_REQUIRED, "sqlite_vec": SQLITE_VEC_REQUIRED, "singlestore": SINGLESTORE_REQUIRED, + "couchbase": COUCHBASE_REQUIRED, "opentelemetry": OPENTELEMETRY, "faiss": FAISS_REQUIRED, "qdrant": QDRANT_REQUIRED