Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DynamoDB and S3 registry #1483

Merged
merged 38 commits into from
Jul 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9e0e7c7
Add support for DynamoDB and S3 registry
leonid133 Apr 27, 2021
44aadd8
rcu and wcu as a parameter of dynamodb online store
leonid133 Apr 27, 2021
6383791
fix linter
leonid133 May 4, 2021
73ff67a
aws dependency to extras
leonid133 May 18, 2021
aa6d0da
FEAST_S3_ENDPOINT_URL
leonid133 May 18, 2021
0a87050
tests
leonid133 May 18, 2021
3b8bb31
merge from master
leonid133 May 18, 2021
00e8675
fix signature, after merge
leonid133 May 18, 2021
6a99cd9
aws default region name configurable
leonid133 May 18, 2021
32dc799
merge from master
leonid133 Jun 11, 2021
db616c4
add offlinestore config type to test
leonid133 Jun 11, 2021
8dcbd5a
review changes
leonid133 Jun 11, 2021
fee93dd
merge from master
leonid133 Jun 18, 2021
2bbe268
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
5d33a79
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
24c44ee
merge latest from master
leonid133 Jun 23, 2021
7b99cde
review requested changes
leonid133 Jun 23, 2021
3a985b0
integration test for Dynamo
leonid133 Jun 23, 2021
6973581
change the rest of table_name to table_instance (where table_name is …
leonid133 Jun 28, 2021
e928424
fix DynamoDBOnlineStore commit
leonid133 Jun 28, 2021
59d7e4c
move client to _initialize_dynamodb
leonid133 Jun 28, 2021
594b932
rename document_id to entity_id and Row to entity_id
leonid133 Jun 28, 2021
15a787c
The default value is None
leonid133 Jun 28, 2021
7eaa654
Remove Datastore from the docstring.
leonid133 Jun 28, 2021
1468117
get rid of the return call from S3RegistryStore
leonid133 Jun 28, 2021
5dbe429
merge two exceptions
leonid133 Jun 29, 2021
986d45e
For ci requirement
leonid133 Jun 29, 2021
79d85c7
remove configuration from test
leonid133 Jun 29, 2021
f50b2fb
feast-integration-tests for tests
leonid133 Jun 29, 2021
509c521
change test path
leonid133 Jun 29, 2021
cd67973
add fixture feature_store_with_s3_registry to test
leonid133 Jun 29, 2021
5466d20
merge from master
leonid133 Jun 29, 2021
3d1b78c
region required
leonid133 Jun 29, 2021
ff8d635
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 29, 2021
57a607c
Address the rest of the comments
Jul 2, 2021
e9422ea
Merge branch 'master' into feature/online_dynamodb
Jul 2, 2021
3cd9597
Update to_table to to_arrow
Jul 2, 2021
124b337
Merge branch 'master' into feature/online_dynamodb
Jul 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/specs/dynamodb_online_example.monopic
Binary file not shown.
Binary file added docs/specs/dynamodb_online_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/specs/online_store_format.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This format is considered part of Feast public API contract; that allows other c

The format is not entirely technology or cloud agnostic. Since users may opt to use different key-value stores as an underlying engine to store feature data, and we don't want to aim for the lowest common denominator across them, we have to provide different "flavors" of this data format, specialized for every supported store.

This version of the Online Store Format supports only Redis as the underlying storage engine. We envision adding more storage engines to this document in the future.
This version of the Online Store Format supports Redis and DynamoDB as storage engine. We envision adding more storage engines to this document in the future.


## Overview
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
@click.option(
"--template",
"-t",
type=click.Choice(["local", "gcp"], case_sensitive=False),
type=click.Choice(["local", "gcp", "aws"], case_sensitive=False),
help="Specify a template for the created project",
default="local",
)
Expand Down
10 changes: 10 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ def __init__(self, name, project=None):
super().__init__(f"Feature table {name} does not exist")


class S3RegistryBucketNotExist(FeastObjectNotFoundException):
def __init__(self, bucket):
super().__init__(f"S3 bucket {bucket} for the Feast registry does not exist")


class S3RegistryBucketForbiddenAccess(FeastObjectNotFoundException):
def __init__(self, bucket):
super().__init__(f"S3 bucket {bucket} for the Feast registry can't be accessed")


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""

Expand Down
141 changes: 141 additions & 0 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
from tqdm import tqdm

from feast import FeatureTable
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig


class AwsProvider(Provider):
def __init__(self, config: RepoConfig):
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)

def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)

def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
self.online_store.online_write_batch(config, table, data, progress)

def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result = self.online_store.online_read(config, table, entity_keys)

return result

def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
event_timestamp_column,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=config,
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

table = offline_job.to_arrow()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)

def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
job = self.offline_store.get_historical_features(
config=config,
feature_views=feature_views,
feature_refs=feature_refs,
entity_df=entity_df,
registry=registry,
project=project,
)
return job
17 changes: 3 additions & 14 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
from multiprocessing.pool import ThreadPool
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union

import mmh3
from pydantic import PositiveInt, StrictStr
from pydantic.typing import Literal

from feast import Entity, FeatureTable, utils
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
Expand Down Expand Up @@ -191,7 +190,7 @@ def _write_minibatch(
):
entities = []
for entity_key, features, timestamp, created_ts in data:
document_id = compute_datastore_entity_id(entity_key)
document_id = compute_entity_id(entity_key)

key = client.key(
"Project", project, "Table", table.name, "Row", document_id,
Expand Down Expand Up @@ -236,7 +235,7 @@ def online_read(

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
document_id = compute_datastore_entity_id(entity_key)
document_id = compute_entity_id(entity_key)
key = client.key(
"Project", feast_project, "Table", table.name, "Row", document_id
)
Expand All @@ -253,16 +252,6 @@ def online_read(
return result


def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
"""
Compute Datastore Entity id given Feast Entity Key.

Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
do with the Entity concept we have in Feast.
"""
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()


def _delete_all_values(client, key) -> None:
"""
Delete all data under the key path in datastore.
Expand Down
Loading