Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DynamoDB and S3 registry #1483

Merged
merged 38 commits into from
Jul 3, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9e0e7c7
Add support for DynamoDB and S3 registry
leonid133 Apr 27, 2021
44aadd8
rcu and wcu as a parameter of dynamodb online store
leonid133 Apr 27, 2021
6383791
fix linter
leonid133 May 4, 2021
73ff67a
aws dependency to extras
leonid133 May 18, 2021
aa6d0da
FEAST_S3_ENDPOINT_URL
leonid133 May 18, 2021
0a87050
tests
leonid133 May 18, 2021
3b8bb31
merge from master
leonid133 May 18, 2021
00e8675
fix signature, after merge
leonid133 May 18, 2021
6a99cd9
aws default region name configurable
leonid133 May 18, 2021
32dc799
merge from master
leonid133 Jun 11, 2021
db616c4
add offlinestore config type to test
leonid133 Jun 11, 2021
8dcbd5a
review changes
leonid133 Jun 11, 2021
fee93dd
merge from master
leonid133 Jun 18, 2021
2bbe268
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
5d33a79
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
24c44ee
merge latest from master
leonid133 Jun 23, 2021
7b99cde
review requested changes
leonid133 Jun 23, 2021
3a985b0
integration test for Dynamo
leonid133 Jun 23, 2021
6973581
change the rest of table_name to table_instance (where table_name is …
leonid133 Jun 28, 2021
e928424
fix DynamoDBOnlineStore commit
leonid133 Jun 28, 2021
59d7e4c
move client to _initialize_dynamodb
leonid133 Jun 28, 2021
594b932
rename document_id to entity_id and Row to entity_id
leonid133 Jun 28, 2021
15a787c
The default value is None
leonid133 Jun 28, 2021
7eaa654
Remove Datastore from the docstring.
leonid133 Jun 28, 2021
1468117
get rid of the return call from S3RegistryStore
leonid133 Jun 28, 2021
5dbe429
merge two exceptions
leonid133 Jun 29, 2021
986d45e
For ci requirement
leonid133 Jun 29, 2021
79d85c7
remove configuration from test
leonid133 Jun 29, 2021
f50b2fb
feast-integration-tests for tests
leonid133 Jun 29, 2021
509c521
change test path
leonid133 Jun 29, 2021
cd67973
add fixture feature_store_with_s3_registry to test
leonid133 Jun 29, 2021
5466d20
merge from master
leonid133 Jun 29, 2021
3d1b78c
region required
leonid133 Jun 29, 2021
ff8d635
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 29, 2021
57a607c
Address the rest of the comments
Jul 2, 2021
e9422ea
Merge branch 'master' into feature/online_dynamodb
Jul 2, 2021
3cd9597
Update to_table to to_arrow
Jul 2, 2021
124b337
Merge branch 'master' into feature/online_dynamodb
Jul 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/specs/dynamodb_online_example.monopic
Binary file not shown.
Binary file added docs/specs/dynamodb_online_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def materialize_incremental_command(end_ts: str, views: List[str]):
@click.option(
"--template",
"-t",
type=click.Choice(["local", "gcp"], case_sensitive=False),
type=click.Choice(["local", "gcp", "aws"], case_sensitive=False),
help="Specify a template for the created project",
default="local",
)
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ def __init__(self, name, project=None):
super().__init__(f"Feature table {name} does not exist")


class FeatureBucketNotExist(FeastObjectNotFoundException):
tsotnet marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, bucket, project=None):
if project:
super().__init__(
f"Feature bucket {bucket} does not exist in project {project}"
)
else:
super().__init__(f"Feature bucket {bucket} does not exist")


class FeatureBucketForbiddenAccess(FeastObjectNotFoundException):
def __init__(self, bucket, project=None):
if project:
super().__init__(
f"Private Registry Bucket {bucket} forbidden Access in project {project}"
)
else:
super().__init__(f"Private Registry Bucket {bucket} forbidden Access")
leonid133 marked this conversation as resolved.
Show resolved Hide resolved


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""

Expand Down
219 changes: 219 additions & 0 deletions sdk/python/feast/infra/aws_dynamodb_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import boto3
tsotnet marked this conversation as resolved.
Show resolved Hide resolved
import mmh3
import pandas
from botocore.exceptions import ClientError
tsotnet marked this conversation as resolved.
Show resolved Hide resolved

from feast import FeatureTable, utils
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
from feast.infra.provider import (
Provider,
RetrievalJob,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import DynamoDbOnlineStoreConfig, RepoConfig


class AwsDynamodbProvider(Provider):
_wcu: int
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
_rcu: int

def __init__(self, config: RepoConfig):
assert isinstance(config.online_store, DynamoDbOnlineStoreConfig)
if config and config.online_store and config.online_store.rcu:
self._rcu = config.online_store.rcu
else:
self._rcu = 5
leonid133 marked this conversation as resolved.
Show resolved Hide resolved

if config and config.online_store and config.online_store.wcu:
self._wcu = config.online_store.wcu
else:
self._wcu = 5

def _initialize_dynamodb(self):
return boto3.resource("dynamodb")

def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
dynamodb = self._initialize_dynamodb()

for table_name in tables_to_keep:
table = None
try:
table = dynamodb.create_table(
TableName=table_name.name,
KeySchema=[
{"AttributeName": "Row", "KeyType": "HASH"},
{"AttributeName": "Project", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "Row", "AttributeType": "S"},
{"AttributeName": "Project", "AttributeType": "S"},
],
ProvisionedThroughput={
"ReadCapacityUnits": self._rcu,
"WriteCapacityUnits": self._wcu,
},
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
)
table.meta.client.get_waiter("table_exists").wait(
TableName=table_name.name
)
except ClientError as ce:
print(ce)
if ce.response["Error"]["Code"] == "ResourceNotFoundException":
table = dynamodb.Table(table_name.name)

for table_name in tables_to_delete:
table = dynamodb.Table(table_name.name)
table.delete()

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
dynamodb = self._initialize_dynamodb()

for table_name in tables:
table = dynamodb.Table(table_name)
table.delete()

def online_write_batch(
self,
project: str,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
dynamodb = self._initialize_dynamodb()

table_instance = dynamodb.Table(table.name)
with table_instance.batch_writer() as batch:
for entity_key, features, timestamp, created_ts in data:
document_id = compute_datastore_entity_id(entity_key) # TODO check id
# TODO compression encoding
batch.put_item(
Item={
"Row": document_id, # PartitionKey
"Project": project, # SortKey
"event_ts": str(utils.make_tzaware(timestamp)),
"values": {
k: v.SerializeToString()
for k, v in features.items() # Serialized Features
},
}
)

def online_read(
self,
project: str,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
dynamodb = self._initialize_dynamodb()

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
table_instace = dynamodb.Table(table.name)
document_id = compute_datastore_entity_id(entity_key) # TODO check id
response = table_instace.get_item(
Key={"Row": document_id, "Project": project}
)
value = response["Item"]

if value is not None:
res = {}
for feature_name, value_bin in value["values"].items():
val = ValueProto()
val.ParseFromString(value_bin.value)
res[feature_name] = val
result.append((value["event_ts"], res))
else:
result.append((None, None))
return result

def materialize_single_feature_view(
self,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
) -> None:
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
event_timestamp_column,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

offline_store = get_offline_store_from_sources([feature_view.input])
table = offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(project, feature_view, rows_to_write, None)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)

@staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
# TODO implement me
pass


def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
"""
Compute Datastore Entity id given Feast Entity Key.

Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
do with the Entity concept we have in Feast.
"""
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
4 changes: 4 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
from feast.infra.gcp import GcpProvider

return GcpProvider(config)
elif config.provider == "aws_dynamodb":
from feast.infra.aws_dynamodb_provider import AwsDynamodbProvider

return AwsDynamodbProvider(config)
elif config.provider == "local":
from feast.infra.local import LocalProvider

Expand Down
73 changes: 73 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from feast.entity import Entity
from feast.errors import (
EntityNotFoundException,
FeatureBucketForbiddenAccess,
FeatureBucketNotExist,
FeatureTableNotFoundException,
FeatureViewNotFoundException,
)
Expand Down Expand Up @@ -56,6 +58,8 @@ def __init__(self, registry_path: str, repo_path: Path, cache_ttl: timedelta):
uri = urlparse(registry_path)
if uri.scheme == "gs":
self._registry_store: RegistryStore = GCSRegistryStore(registry_path)
elif uri.scheme == "s3":
self._registry_store = S3RegistryStore(registry_path)
elif uri.scheme == "file" or uri.scheme == "":
self._registry_store = LocalRegistryStore(
repo_path=repo_path, registry_path_string=registry_path
Expand Down Expand Up @@ -495,3 +499,72 @@ def _write_registry(self, registry_proto: RegistryProto):
file_obj.seek(0)
blob.upload_from_file(file_obj)
return


class S3RegistryStore(RegistryStore):
def __init__(self, uri: str):
self._uri = urlparse(uri)
self._bucket = self._uri.hostname
self._key = self._uri.path.lstrip("/")
return
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
tsotnet marked this conversation as resolved.
Show resolved Hide resolved

def get_registry_proto(self):
import boto3
import botocore
tsotnet marked this conversation as resolved.
Show resolved Hide resolved

file_obj = TemporaryFile()
registry_proto = RegistryProto()
s3 = boto3.resource("s3")
try:
bucket = s3.Bucket(self._bucket)
s3.meta.client.head_bucket(Bucket=bucket.name)
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
except botocore.client.ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the bucket does not exist.
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
raise FeatureBucketNotExist(self._bucket)
else:
raise FeatureBucketForbiddenAccess(self._bucket)

try:
obj = bucket.Object(self._key)
obj.download_fileobj(file_obj)
file_obj.seek(0)
registry_proto.ParseFromString(file_obj.read())
return registry_proto
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
raise FileNotFoundError(
f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?'
)
else:
raise FileNotFoundError(
f'Registry is not able to locate data under path "{self._uri.geturl()}" with [original error]: {e.response}'
)
tsotnet marked this conversation as resolved.
Show resolved Hide resolved

def update_registry_proto(
self, updater: Optional[Callable[[RegistryProto], RegistryProto]] = None
):
try:
registry_proto = self.get_registry_proto()
except FileNotFoundError:
registry_proto = RegistryProto()
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
if updater:
registry_proto = updater(registry_proto)
self._write_registry(registry_proto)
return
leonid133 marked this conversation as resolved.
Show resolved Hide resolved

def _write_registry(self, registry_proto: RegistryProto):
import boto3
leonid133 marked this conversation as resolved.
Show resolved Hide resolved

registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
# we have already checked the bucket exists so no need to do it again
file_obj = TemporaryFile()
file_obj.write(registry_proto.SerializeToString())
file_obj.seek(0)
s3 = boto3.client("s3")
s3.put_object(Bucket=self._bucket, Body=file_obj, Key=self._key)
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
return
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
Loading