Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DatastoreTable infra object #2140

Merged
merged 3 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions protos/feast/core/DatastoreTable.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// * Copyright 2021 The Feast Authors
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * https://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
//

syntax = "proto3";

package feast.core;
option java_package = "feast.proto.core";
option java_outer_classname = "DatastoreTableProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "google/protobuf/wrappers.proto";

// Represents a Datastore table
message DatastoreTable {
// Feast project of the table
string project = 1;

// Name of the table
string name = 2;

// GCP project id
google.protobuf.StringValue project_id = 3;

// Datastore namespace
google.protobuf.StringValue namespace = 4;
}
2 changes: 2 additions & 0 deletions protos/feast/core/InfraObject.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ option java_outer_classname = "InfraObjectProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "feast/core/DynamoDBTable.proto";
import "feast/core/DatastoreTable.proto";

// Represents a set of infrastructure objects managed by Feast
message Infra {
Expand All @@ -37,6 +38,7 @@ message InfraObject {
// The infrastructure object
oneof infra_object {
DynamoDBTable dynamodb_table = 2;
DatastoreTable datastore_table = 3;
CustomInfra custom_infra = 100;
}

Expand Down
119 changes: 102 additions & 17 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@

from feast import Entity, FeatureTable, utils
from feast.feature_view import FeatureView
from feast.infra.infra_object import InfraObject
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.DatastoreTable_pb2 import (
DatastoreTable as DatastoreTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand Down Expand Up @@ -80,8 +85,6 @@ def update(
entities_to_keep: Sequence[Entity],
partial: bool,
):
"""
"""
online_config = config.online_store
assert isinstance(online_config, DatastoreOnlineStoreConfig)
client = self._get_client(online_config)
Expand Down Expand Up @@ -110,9 +113,6 @@ def teardown(
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
):
"""
There's currently no teardown done for Datastore.
"""
online_config = config.online_store
assert isinstance(online_config, DatastoreOnlineStoreConfig)
client = self._get_client(online_config)
Expand All @@ -128,18 +128,10 @@ def teardown(
client.delete(key)

def _get_client(self, online_config: DatastoreOnlineStoreConfig):

if not self._client:
try:
self._client = datastore.Client(
project=online_config.project_id, namespace=online_config.namespace,
)
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your '
"local Google Cloud account "
)
self._client = _initialize_client(
online_config.project_id, online_config.namespace
)
return self._client

@log_exceptions_and_usage(online_store="datastore")
Expand Down Expand Up @@ -267,7 +259,7 @@ def online_read(
return result


def _delete_all_values(client, key) -> None:
def _delete_all_values(client, key):
"""
Delete all data under the key path in datastore.
"""
Expand All @@ -279,3 +271,96 @@ def _delete_all_values(client, key) -> None:

for entity in entities:
client.delete(entity.key)


def _initialize_client(
project_id: Optional[str], namespace: Optional[str]
) -> datastore.Client:
try:
client = datastore.Client(project=project_id, namespace=namespace,)
return client
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your '
"local Google Cloud account "
)


class DatastoreTable(InfraObject):
"""
A Datastore table managed by Feast.

Attributes:
project: The Feast project of the table.
name: The name of the table.
project_id (optional): The GCP project id.
namespace (optional): Datastore namespace.
client: Datastore client.
"""

project: str
name: str
project_id: Optional[str]
namespace: Optional[str]
client: datastore.Client

def __init__(
self,
project: str,
name: str,
project_id: Optional[str] = None,
namespace: Optional[str] = None,
):
self.project = project
self.name = name
self.project_id = project_id
self.namespace = namespace
self.client = _initialize_client(self.project_id, self.namespace)

def to_proto(self) -> InfraObjectProto:
datastore_table_proto = DatastoreTableProto()
datastore_table_proto.project = self.project
datastore_table_proto.name = self.name
if self.project_id:
datastore_table_proto.project_id.FromString(bytes(self.project_id, "utf-8"))
if self.namespace:
datastore_table_proto.namespace.FromString(bytes(self.namespace, "utf-8"))

return InfraObjectProto(
infra_object_class_type="feast.infra.online_stores.datastore.DatastoreTable",
datastore_table=datastore_table_proto,
)

@staticmethod
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
datastore_table = DatastoreTable(
project=infra_object_proto.datastore_table.project,
name=infra_object_proto.datastore_table.name,
)

if infra_object_proto.datastore_table.HasField("project_id"):
datastore_table.project_id = (
infra_object_proto.datastore_table.project_id.SerializeToString()
).decode("utf-8")
if infra_object_proto.datastore_table.HasField("namespace"):
datastore_table.namespace = (
infra_object_proto.datastore_table.namespace.SerializeToString()
).decode("utf-8")

return datastore_table

def update(self):
key = self.client.key("Project", self.project, "Table", self.name)
entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)
entity.update({"created_ts": datetime.utcnow()})
self.client.put(entity)

def teardown(self):
key = self.client.key("Project", self.project, "Table", self.name)
_delete_all_values(self.client, key)

# Delete the table metadata datastore entity
self.client.delete(key)