diff --git a/docs/specs/dynamodb_online_example.monopic b/docs/specs/dynamodb_online_example.monopic new file mode 100644 index 0000000000..6531749dd9 Binary files /dev/null and b/docs/specs/dynamodb_online_example.monopic differ diff --git a/docs/specs/dynamodb_online_example.png b/docs/specs/dynamodb_online_example.png new file mode 100644 index 0000000000..34121fbff5 Binary files /dev/null and b/docs/specs/dynamodb_online_example.png differ diff --git a/docs/specs/online_store_format.md b/docs/specs/online_store_format.md index 107863001d..9f901ae69c 100644 --- a/docs/specs/online_store_format.md +++ b/docs/specs/online_store_format.md @@ -7,7 +7,7 @@ This format is considered part of Feast public API contract; that allows other c The format is not entirely technology or cloud agnostic. Since users may opt to use different key-value stores as an underlying engine to store feature data, and we don't want to aim for the lowest common denominator across them, we have to provide different "flavors" of this data format, specialized for every supported store. -This version of the Online Store Format supports only Redis as the underlying storage engine. We envision adding more storage engines to this document in the future. +This version of the Online Store Format supports Redis and DynamoDB as storage engine. We envision adding more storage engines to this document in the future. ## Overview diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 71d688556d..04e49c04c7 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -280,7 +280,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List @click.option( "--template", "-t", - type=click.Choice(["local", "gcp"], case_sensitive=False), + type=click.Choice(["local", "gcp", "aws"], case_sensitive=False), help="Specify a template for the created project", default="local", ) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 5eb0ea8422..1c7e93f9ee 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -40,6 +40,16 @@ def __init__(self, name, project=None): super().__init__(f"Feature table {name} does not exist") +class S3RegistryBucketNotExist(FeastObjectNotFoundException): + def __init__(self, bucket): + super().__init__(f"S3 bucket {bucket} for the Feast registry does not exist") + + +class S3RegistryBucketForbiddenAccess(FeastObjectNotFoundException): + def __init__(self, bucket): + super().__init__(f"S3 bucket {bucket} for the Feast registry can't be accessed") + + class FeastProviderLoginError(Exception): """Error class that indicates a user has not authenticated with their provider.""" diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py new file mode 100644 index 0000000000..272f39840e --- /dev/null +++ b/sdk/python/feast/infra/aws.py @@ -0,0 +1,141 @@ +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union + +import pandas +from tqdm import tqdm + +from feast import FeatureTable +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.infra.offline_stores.helpers import get_offline_store_from_config +from feast.infra.online_stores.helpers import get_online_store_from_config +from feast.infra.provider import ( + Provider, + RetrievalJob, + _convert_arrow_to_proto, + _get_column_names, + _run_field_mapping, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.registry import Registry +from feast.repo_config import RepoConfig + + +class AwsProvider(Provider): + def __init__(self, config: RepoConfig): + self.repo_config = config + self.offline_store = get_offline_store_from_config(config.offline_store) + self.online_store = get_online_store_from_config(config.online_store) + + def update_infra( + self, + project: str, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + self.online_store.update( + config=self.repo_config, + tables_to_delete=tables_to_delete, + tables_to_keep=tables_to_keep, + entities_to_keep=entities_to_keep, + entities_to_delete=entities_to_delete, + partial=partial, + ) + + def teardown_infra( + self, + project: str, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ) -> None: + self.online_store.teardown(self.repo_config, tables, entities) + + def online_write_batch( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + self.online_store.online_write_batch(config, table, data, progress) + + def online_read( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + requested_features: List[str] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + result = self.online_store.online_read(config, table, entity_keys) + + return result + + def materialize_single_feature_view( + self, + config: RepoConfig, + feature_view: FeatureView, + start_date: datetime, + end_date: datetime, + registry: Registry, + project: str, + tqdm_builder: Callable[[int], tqdm], + ) -> None: + entities = [] + for entity_name in feature_view.entities: + entities.append(registry.get_entity(entity_name, project)) + + ( + join_key_columns, + feature_name_columns, + event_timestamp_column, + created_timestamp_column, + ) = _get_column_names(feature_view, entities) + + offline_job = self.offline_store.pull_latest_from_table_or_query( + config=config, + data_source=feature_view.input, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + event_timestamp_column=event_timestamp_column, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + table = offline_job.to_arrow() + + if feature_view.input.field_mapping is not None: + table = _run_field_mapping(table, feature_view.input.field_mapping) + + join_keys = [entity.join_key for entity in entities] + rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) + + with tqdm_builder(len(rows_to_write)) as pbar: + self.online_write_batch( + self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x) + ) + + def get_historical_features( + self, + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pandas.DataFrame, str], + registry: Registry, + project: str, + ) -> RetrievalJob: + job = self.offline_store.get_historical_features( + config=config, + feature_views=feature_views, + feature_refs=feature_refs, + entity_df=entity_df, + registry=registry, + project=project, + ) + return job diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index c623af1c1f..e5328a2725 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -16,13 +16,12 @@ from multiprocessing.pool import ThreadPool from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union -import mmh3 from pydantic import PositiveInt, StrictStr from pydantic.typing import Literal from feast import Entity, FeatureTable, utils from feast.feature_view import FeatureView -from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -191,7 +190,7 @@ def _write_minibatch( ): entities = [] for entity_key, features, timestamp, created_ts in data: - document_id = compute_datastore_entity_id(entity_key) + document_id = compute_entity_id(entity_key) key = client.key( "Project", project, "Table", table.name, "Row", document_id, @@ -236,7 +235,7 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] for entity_key in entity_keys: - document_id = compute_datastore_entity_id(entity_key) + document_id = compute_entity_id(entity_key) key = client.key( "Project", feast_project, "Table", table.name, "Row", document_id ) @@ -253,16 +252,6 @@ def online_read( return result -def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str: - """ - Compute Datastore Entity id given Feast Entity Key. - - Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to - do with the Entity concept we have in Feast. - """ - return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() - - def _delete_all_values(client, key) -> None: """ Delete all data under the key path in datastore. diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py new file mode 100644 index 0000000000..722a081f2e --- /dev/null +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -0,0 +1,182 @@ +# Copyright 2021 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union + +from pydantic import StrictStr +from pydantic.typing import Literal + +from feast import Entity, FeatureTable, FeatureView, utils +from feast.infra.online_stores.helpers import compute_entity_id +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig + +try: + import boto3 + from botocore.exceptions import ClientError +except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("aws", str(e)) + + +class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): + """Online store config for DynamoDB store""" + + type: Literal["dynamodb"] = "dynamodb" + """Online store type selector""" + + region: StrictStr + """ AWS Region Name """ + + +class DynamoDBOnlineStore(OnlineStore): + """ + Online feature store for AWS DynamoDB. + """ + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + online_config = config.online_store + assert isinstance(online_config, DynamoDBOnlineStoreConfig) + dynamodb_client, dynamodb_resource = self._initialize_dynamodb(online_config) + + for table_instance in tables_to_keep: + try: + dynamodb_resource.create_table( + TableName=f"{config.project}.{table_instance.name}", + KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}], + AttributeDefinitions=[ + {"AttributeName": "entity_id", "AttributeType": "S"} + ], + BillingMode="PAY_PER_REQUEST", + ) + except ClientError as ce: + # If the table creation fails with ResourceInUseException, + # it means the table already exists or is being created. + # Otherwise, re-raise the exception + if ce.response["Error"]["Code"] != "ResourceInUseException": + raise + + for table_instance in tables_to_keep: + dynamodb_client.get_waiter("table_exists").wait( + TableName=f"{config.project}.{table_instance.name}" + ) + + self._delete_tables_idempotent(dynamodb_resource, config, tables_to_delete) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ): + online_config = config.online_store + assert isinstance(online_config, DynamoDBOnlineStoreConfig) + _, dynamodb_resource = self._initialize_dynamodb(online_config) + + self._delete_tables_idempotent(dynamodb_resource, config, tables) + + def online_write_batch( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + online_config = config.online_store + assert isinstance(online_config, DynamoDBOnlineStoreConfig) + _, dynamodb_resource = self._initialize_dynamodb(online_config) + + table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}") + with table_instance.batch_writer() as batch: + for entity_key, features, timestamp, created_ts in data: + entity_id = compute_entity_id(entity_key) + batch.put_item( + Item={ + "entity_id": entity_id, # PartitionKey + "event_ts": str(utils.make_tzaware(timestamp)), + "values": { + k: v.SerializeToString() + for k, v in features.items() # Serialized Features + }, + } + ) + if progress: + progress(1) + + def online_read( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + online_config = config.online_store + assert isinstance(online_config, DynamoDBOnlineStoreConfig) + _, dynamodb_resource = self._initialize_dynamodb(online_config) + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for entity_key in entity_keys: + table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}") + entity_id = compute_entity_id(entity_key) + response = table_instance.get_item(Key={"entity_id": entity_id}) + value = response.get("Item") + + if value is not None: + res = {} + for feature_name, value_bin in value["values"].items(): + val = ValueProto() + val.ParseFromString(value_bin.value) + res[feature_name] = val + result.append((value["event_ts"], res)) + else: + result.append((None, None)) + return result + + def _initialize_dynamodb(self, online_config: DynamoDBOnlineStoreConfig): + return ( + boto3.client("dynamodb", region_name=online_config.region), + boto3.resource("dynamodb", region_name=online_config.region), + ) + + def _delete_tables_idempotent( + self, + dynamodb_resource, + config: RepoConfig, + tables: Sequence[Union[FeatureTable, FeatureView]], + ): + for table_instance in tables: + try: + table = dynamodb_resource.Table( + f"{config.project}.{table_instance.name}" + ) + table.delete() + except ClientError as ce: + # If the table deletion fails with ResourceNotFoundException, + # it means the table has already been deleted. + # Otherwise, re-raise the exception + if ce.response["Error"]["Code"] != "ResourceNotFoundException": + raise diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py index 9c42c5ea00..788be68b8d 100644 --- a/sdk/python/feast/infra/online_stores/helpers.py +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -5,6 +5,7 @@ import mmh3 from feast import errors +from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -53,3 +54,12 @@ def _mmh3(key: str): """ key_hash = mmh3.hash(key, signed=False) return bytes.fromhex(struct.pack(" str: + """ + Compute Entity id given Feast Entity Key for online stores. + Remember that Entity here refers to `EntityKeyProto` which is used in some online stores to encode the keys. + It has nothing to do with the Entity concept we have in Feast. + """ + return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex() diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 8b92374d23..29766c9d9a 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -145,6 +145,10 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: from feast.infra.gcp import GcpProvider return GcpProvider(config) + elif config.provider == "aws": + from feast.infra.aws import AwsProvider + + return AwsProvider(config) elif config.provider == "local": from feast.infra.local import LocalProvider diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 9500194d04..53c3cae1e7 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import uuid from abc import ABC, abstractmethod from datetime import datetime, timedelta @@ -25,6 +26,8 @@ EntityNotFoundException, FeatureTableNotFoundException, FeatureViewNotFoundException, + S3RegistryBucketForbiddenAccess, + S3RegistryBucketNotExist, ) from feast.feature_table import FeatureTable from feast.feature_view import FeatureView @@ -56,6 +59,8 @@ def __init__(self, registry_path: str, repo_path: Path, cache_ttl: timedelta): uri = urlparse(registry_path) if uri.scheme == "gs": self._registry_store: RegistryStore = GCSRegistryStore(registry_path) + elif uri.scheme == "s3": + self._registry_store = S3RegistryStore(registry_path) elif uri.scheme == "file" or uri.scheme == "": self._registry_store = LocalRegistryStore( repo_path=repo_path, registry_path_string=registry_path @@ -537,3 +542,73 @@ def _write_registry(self, registry_proto: RegistryProto): file_obj.seek(0) blob.upload_from_file(file_obj) return + + +class S3RegistryStore(RegistryStore): + def __init__(self, uri: str): + try: + import boto3 + except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("aws", str(e)) + self._uri = urlparse(uri) + self._bucket = self._uri.hostname + self._key = self._uri.path.lstrip("/") + + self.s3_client = boto3.resource( + "s3", endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL") + ) + + def get_registry_proto(self): + file_obj = TemporaryFile() + registry_proto = RegistryProto() + try: + from botocore.exceptions import ClientError + except ImportError as e: + from feast.errors import FeastExtrasDependencyImportError + + raise FeastExtrasDependencyImportError("aws", str(e)) + try: + bucket = self.s3_client.Bucket(self._bucket) + self.s3_client.meta.client.head_bucket(Bucket=bucket.name) + except ClientError as e: + # If a client error is thrown, then check that it was a 404 error. + # If it was a 404 error, then the bucket does not exist. + error_code = int(e.response["Error"]["Code"]) + if error_code == 404: + raise S3RegistryBucketNotExist(self._bucket) + else: + raise S3RegistryBucketForbiddenAccess(self._bucket) from e + + try: + obj = bucket.Object(self._key) + obj.download_fileobj(file_obj) + file_obj.seek(0) + registry_proto.ParseFromString(file_obj.read()) + return registry_proto + except ClientError as e: + raise FileNotFoundError( + f"Error while trying to locate Registry at path {self._uri.geturl()}" + ) from e + + def update_registry_proto( + self, updater: Optional[Callable[[RegistryProto], RegistryProto]] = None + ): + try: + registry_proto = self.get_registry_proto() + except FileNotFoundError: + registry_proto = RegistryProto() + registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION + if updater: + registry_proto = updater(registry_proto) + self._write_registry(registry_proto) + + def _write_registry(self, registry_proto: RegistryProto): + registry_proto.version_id = str(uuid.uuid4()) + registry_proto.last_updated.FromDatetime(datetime.utcnow()) + # we have already checked the bucket exists so no need to do it again + file_obj = TemporaryFile() + file_obj.write(registry_proto.SerializeToString()) + file_obj.seek(0) + self.s3_client.Bucket(self._bucket).put_object(Body=file_obj, Key=self._key) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 8ef98736f9..5cf17bf729 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -16,6 +16,7 @@ "sqlite": "feast.infra.online_stores.sqlite.SqliteOnlineStore", "datastore": "feast.infra.online_stores.datastore.DatastoreOnlineStore", "redis": "feast.infra.online_stores.redis.RedisOnlineStore", + "dynamodb": "feast.infra.online_stores.dynamodb.DynamoDBOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { @@ -67,7 +68,7 @@ class RepoConfig(FeastBaseModel): """ provider: StrictStr - """ str: local or gcp """ + """ str: local or gcp or aws """ online_store: Any """ OnlineStoreConfig: Online store configuration (optional depending on provider) """ @@ -127,6 +128,8 @@ def _validate_online_store_config(cls, values): values["online_store"]["type"] = "sqlite" elif values["provider"] == "gcp": values["online_store"]["type"] = "datastore" + elif values["provider"] == "aws": + values["online_store"]["type"] = "dynamodb" online_store_type = values["online_store"]["type"] @@ -161,7 +164,7 @@ def _validate_offline_store_config(cls, values): elif values["provider"] == "gcp": values["offline_store"]["type"] = "bigquery" elif values["provider"] == "aws": - values["offline_store"]["type"] = "redshift" + values["offline_store"]["type"] = "file" offline_store_type = values["offline_store"]["type"] diff --git a/sdk/python/feast/templates/aws/bootstrap.py b/sdk/python/feast/templates/aws/bootstrap.py new file mode 100644 index 0000000000..4013ca5a8d --- /dev/null +++ b/sdk/python/feast/templates/aws/bootstrap.py @@ -0,0 +1,35 @@ +def bootstrap(): + # Bootstrap() will automatically be called from the init_repo() during `feast init` + + import pathlib + from datetime import datetime, timedelta + + from feast.driver_test_data import create_driver_hourly_stats_df + + repo_path = pathlib.Path(__file__).parent.absolute() + data_path = repo_path / "data" + data_path.mkdir(exist_ok=True) + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + + driver_stats_path = data_path / "driver_stats.parquet" + driver_df.to_parquet(path=str(driver_stats_path), allow_truncated_timestamps=True) + + example_py_file = repo_path / "example.py" + replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path)) + + +def replace_str_in_file(file_path, match_str, sub_str): + with open(file_path, "r") as f: + contents = f.read() + contents = contents.replace(match_str, sub_str) + with open(file_path, "wt") as f: + f.write(contents) + + +if __name__ == "__main__": + bootstrap() diff --git a/sdk/python/feast/templates/aws/example.py b/sdk/python/feast/templates/aws/example.py new file mode 100644 index 0000000000..a66dbba120 --- /dev/null +++ b/sdk/python/feast/templates/aws/example.py @@ -0,0 +1,36 @@ +# This is an example feature definition file + +from google.protobuf.duration_pb2 import Duration + +from feast import Entity, Feature, FeatureView, ValueType +from feast.data_source import FileSource + +# Read data from parquet files. Parquet is convenient for local development mode. For +# production, you can use your favorite DWH, such as BigQuery. See Feast documentation +# for more info. +driver_hourly_stats = FileSource( + path="%PARQUET_PATH%", + event_timestamp_column="datetime", + created_timestamp_column="created", +) + +# Define an entity for the driver. You can think of entity as a primary key used to +# fetch features. +driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) + +# Our parquet files contain sample data that includes a driver_id column, timestamps and +# three feature column. Here we define a Feature View that will allow us to serve this +# data to our model online. +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=Duration(seconds=86400 * 1), + features=[ + Feature(name="conv_rate", dtype=ValueType.FLOAT), + Feature(name="acc_rate", dtype=ValueType.FLOAT), + Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ], + online=True, + input=driver_hourly_stats, + tags={}, +) diff --git a/sdk/python/feast/templates/aws/feature_store.yaml b/sdk/python/feast/templates/aws/feature_store.yaml new file mode 100644 index 0000000000..7f7be8527e --- /dev/null +++ b/sdk/python/feast/templates/aws/feature_store.yaml @@ -0,0 +1,3 @@ +project: my_project +registry: data/registry.db +provider: aws diff --git a/sdk/python/feast/templates/aws/test.py b/sdk/python/feast/templates/aws/test.py new file mode 100644 index 0000000000..cc2cf7e984 --- /dev/null +++ b/sdk/python/feast/templates/aws/test.py @@ -0,0 +1,38 @@ +from datetime import datetime + +import pandas as pd +from example import driver, driver_hourly_stats_view + +from feast import FeatureStore + + +def main(): + pd.set_option("display.max_columns", None) + pd.set_option("display.width", 1000) + + # Load the feature store from the current path + fs = FeatureStore(repo_path=".") + + # Deploy the feature store to AWS + print("Deploying feature store to AWS...") + fs.apply([driver, driver_hourly_stats_view]) + + # Select features + feature_refs = ["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"] + + print("Loading features into the online store...") + fs.materialize_incremental(end_date=datetime.now()) + + print("Retrieving online features...") + + # Retrieve features from the online store (DynamoDB) + online_features = fs.get_online_features( + feature_refs=feature_refs, + entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}], + ).to_dict() + + print(pd.DataFrame.from_dict(online_features)) + + +if __name__ == "__main__": + main() diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 293e6804e7..bd51956160 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -71,6 +71,10 @@ "redis-py-cluster==2.1.2", ] +AWS_REQUIRED = [ + "boto3==1.17.*", +] + CI_REQUIRED = [ "cryptography==3.3.2", "flake8", @@ -104,8 +108,10 @@ "google-cloud-storage>=1.20.*", "google-cloud-core==1.4.*", "redis-py-cluster==2.1.2", + "boto3==1.17.*", ] + # README file from Feast repo root directory repo_root = ( subprocess.Popen(["git", "rev-parse", "--show-toplevel"], stdout=subprocess.PIPE) @@ -198,6 +204,7 @@ def run(self): "dev": ["mypy-protobuf==1.*", "grpcio-testing==1.*"], "ci": CI_REQUIRED, "gcp": GCP_REQUIRED, + "aws": AWS_REQUIRED, "redis": REDIS_REQUIRED, }, include_package_data=True, diff --git a/sdk/python/tests/test_cli_aws.py b/sdk/python/tests/test_cli_aws.py new file mode 100644 index 0000000000..2792858e8d --- /dev/null +++ b/sdk/python/tests/test_cli_aws.py @@ -0,0 +1,58 @@ +import random +import string +import tempfile +from pathlib import Path +from textwrap import dedent + +import pytest + +from feast.feature_store import FeatureStore +from tests.cli_utils import CliRunner +from tests.online_read_write_test import basic_rw_test + + +@pytest.mark.integration +def test_basic() -> None: + project_id = "".join( + random.choice(string.ascii_lowercase + string.digits) for _ in range(10) + ) + runner = CliRunner() + with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: + + repo_path = Path(repo_dir_name) + data_path = Path(data_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text( + dedent( + f""" + project: {project_id} + registry: {data_path / "registry.db"} + provider: aws + online_store: + type: dynamodb + region: us-west-2 + """ + ) + ) + + repo_example = repo_path / "example.py" + repo_example.write_text( + (Path(__file__).parent / "example_feature_repo_1.py").read_text() + ) + + result = runner.run(["apply"], cwd=repo_path) + assert result.returncode == 0 + + # Doing another apply should be a no op, and should not cause errors + result = runner.run(["apply"], cwd=repo_path) + assert result.returncode == 0 + + basic_rw_test( + FeatureStore(repo_path=str(repo_path), config=None), + view_name="driver_locations", + ) + + result = runner.run(["teardown"], cwd=repo_path) + assert result.returncode == 0 diff --git a/sdk/python/tests/test_feature_store.py b/sdk/python/tests/test_feature_store.py index 49a3a9a63b..f169c1336a 100644 --- a/sdk/python/tests/test_feature_store.py +++ b/sdk/python/tests/test_feature_store.py @@ -24,6 +24,8 @@ from feast.feature import Feature from feast.feature_store import FeatureStore from feast.feature_view import FeatureView +from feast.infra.offline_stores.file import FileOfflineStoreConfig +from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast.protos.feast.types import Value_pb2 as ValueProto from feast.repo_config import RepoConfig @@ -72,6 +74,19 @@ def feature_store_with_gcs_registry(): ) +@pytest.fixture +def feature_store_with_s3_registry(): + return FeatureStore( + config=RepoConfig( + registry=f"s3://feast-integration-tests/registries/{int(time.time() * 1000)}/registry.db", + project="default", + provider="aws", + online_store=DynamoDBOnlineStoreConfig(region="us-west-2"), + offline_store=FileOfflineStoreConfig(), + ) + ) + + @pytest.mark.parametrize( "test_feature_store", [lazy_fixture("feature_store_with_local_registry")], ) @@ -101,7 +116,11 @@ def test_apply_entity_success(test_feature_store): @pytest.mark.integration @pytest.mark.parametrize( - "test_feature_store", [lazy_fixture("feature_store_with_gcs_registry")], + "test_feature_store", + [ + lazy_fixture("feature_store_with_gcs_registry"), + lazy_fixture("feature_store_with_s3_registry"), + ], ) def test_apply_entity_integration(test_feature_store): entity = Entity( @@ -250,7 +269,11 @@ def test_feature_view_inference_success(test_feature_store, dataframe_source): @pytest.mark.integration @pytest.mark.parametrize( - "test_feature_store", [lazy_fixture("feature_store_with_gcs_registry")], + "test_feature_store", + [ + lazy_fixture("feature_store_with_gcs_registry"), + lazy_fixture("feature_store_with_s3_registry"), + ], ) def test_apply_feature_view_integration(test_feature_store): # Create Feature Views diff --git a/sdk/python/tests/test_offline_online_store_consistency.py b/sdk/python/tests/test_offline_online_store_consistency.py index 6f2fc41841..0fcc368b22 100644 --- a/sdk/python/tests/test_offline_online_store_consistency.py +++ b/sdk/python/tests/test_offline_online_store_consistency.py @@ -18,7 +18,9 @@ from feast.feature import Feature from feast.feature_store import FeatureStore from feast.feature_view import FeatureView +from feast.infra.offline_stores.file import FileOfflineStoreConfig from feast.infra.online_stores.datastore import DatastoreOnlineStoreConfig +from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig from feast.infra.online_stores.redis import RedisOnlineStoreConfig, RedisType from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast.repo_config import RepoConfig @@ -167,7 +169,7 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: join_key="driver_id", value_type=ValueType.INT32, ) - with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory(): + with tempfile.TemporaryDirectory() as repo_dir_name: config = RepoConfig( registry=str(Path(repo_dir_name) / "registry.db"), project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}", @@ -184,6 +186,41 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: yield fs, fv +@contextlib.contextmanager +def prep_dynamodb_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: + with tempfile.NamedTemporaryFile(suffix=".parquet") as f: + df = create_dataset() + f.close() + df.to_parquet(f.name) + file_source = FileSource( + file_format=ParquetFormat(), + file_url=f"file://{f.name}", + event_timestamp_column="ts", + created_timestamp_column="created_ts", + date_partition_column="", + field_mapping={"ts_1": "ts", "id": "driver_id"}, + ) + fv = get_feature_view(file_source) + e = Entity( + name="driver", + description="id for driver", + join_key="driver_id", + value_type=ValueType.INT32, + ) + with tempfile.TemporaryDirectory() as repo_dir_name: + config = RepoConfig( + registry=str(Path(repo_dir_name) / "registry.db"), + project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}", + provider="aws", + online_store=DynamoDBOnlineStoreConfig(region="us-west-2"), + offline_store=FileOfflineStoreConfig(), + ) + fs = FeatureStore(config=config) + fs.apply([fv, e]) + + yield fs, fv + + # Checks that both offline & online store values are as expected def check_offline_and_online_features( fs: FeatureStore, @@ -264,6 +301,12 @@ def test_redis_offline_online_store_consistency(): run_offline_online_store_consistency_test(fs, fv) +@pytest.mark.integration +def test_dynamodb_offline_online_store_consistency(): + with prep_dynamodb_fs_and_fv() as (fs, fv): + run_offline_online_store_consistency_test(fs, fv) + + def test_local_offline_online_store_consistency(): with prep_local_fs_and_fv() as (fs, fv): run_offline_online_store_consistency_test(fs, fv)