Skip to content

Commit

Permalink
merge from master
Browse files Browse the repository at this point in the history
  • Loading branch information
leonid133 committed Apr 24, 2021
2 parents 913c65f + 762aeb0 commit d63be24
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 59 deletions.
13 changes: 3 additions & 10 deletions sdk/python/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,24 @@ Feast Python API Documentation


Feature Store
---------------------------
==================

.. automodule:: feast.feature_store
:members:
:undoc-members:
:show-inheritance:

Config
==================

.. automodule:: feast.repo_config
:members:
:exclude-members: load_repo_config
:exclude-members: load_repo_config, FeastBaseModel

Data Source
==================

.. automodule:: feast.data_source
:members:
:exclude-members: KafkaOptions, KafkaSource, KinesisOptions, KinesisSource


Entity
Expand All @@ -38,12 +37,6 @@ Feature View
.. automodule:: feast.feature_view
:members:

Feature Table
==================

.. automodule:: feast.feature_table
:members:

Feature
==================

Expand Down
17 changes: 17 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,20 @@ def __init__(self, bucket, project=None):

class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""


class FeastProviderNotImplementedError(Exception):
def __init__(self, provider_name):
super().__init__(f"Provider '{provider_name}' is not implemented")


class FeastProviderModuleImportError(Exception):
def __init__(self, module_name):
super().__init__(f"Could not import provider module '{module_name}'")


class FeastProviderClassImportError(Exception):
def __init__(self, module_name, class_name):
super().__init__(
f"Could not import provider '{class_name}' from module '{module_name}'"
)
5 changes: 4 additions & 1 deletion sdk/python/feast/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def __init__(self, name: str, feature_table: str):
def from_proto(cls, proto: FeatureRefProto):
"""
Construct a feature reference from the given FeatureReference proto
Arg:
Args:
proto: Protobuf FeatureReference to construct from
Returns:
FeatureRef that refers to the given feature
Expand All @@ -124,6 +125,7 @@ def from_str(cls, feature_ref_str: str):
String feature reference should be in the format feature_table:feature.
Where "feature_table" and "name" are the feature_table name and feature name
respectively.
Args:
feature_ref_str: String representation of the feature reference
Returns:
Expand All @@ -144,6 +146,7 @@ def from_str(cls, feature_ref_str: str):
def to_proto(self) -> FeatureRefProto:
"""
Convert and return this feature table reference to protobuf.
Returns:
Protobuf respresentation of this feature table reference.
"""
Expand Down
49 changes: 25 additions & 24 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
class FeatureStore:
"""
A FeatureStore object is used to define, create, and retrieve features.
Args:
repo_path: Path to a `feature_store.yaml` used to configure the feature store
config (RepoConfig): Configuration object used to configure the feature store
"""

config: RepoConfig
Expand All @@ -50,12 +54,6 @@ class FeatureStore:
def __init__(
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
):
""" Initializes a new FeatureStore object. Used to manage a feature store.
Args:
repo_path: Path to a `feature_store.yaml` used to configure the feature store
config (RepoConfig): Configuration object used to configure the feature store
"""
if repo_path is not None and config is not None:
raise ValueError("You cannot specify both repo_path and config")
if config is not None:
Expand Down Expand Up @@ -188,11 +186,13 @@ def apply(
infrastructure (e.g., create tables in an online store) in order to reflect these new definitions. All
operations are idempotent, meaning they can safely be rerun.
Args: objects (List[Union[FeatureView, Entity]]): A list of FeatureView or Entity objects that should be
registered
Args:
objects (List[Union[FeatureView, Entity]]): A list of FeatureView or Entity objects that should be
registered
Examples:
Register a single Entity and FeatureView.
>>> from feast.feature_store import FeatureStore
>>> from feast import Entity, FeatureView, Feature, ValueType, FileSource
>>> from datetime import timedelta
Expand All @@ -217,20 +217,24 @@ def apply(
if isinstance(objects, Entity) or isinstance(objects, FeatureView):
objects = [objects]
views_to_update = []
entities_to_update = []
for ob in objects:
if isinstance(ob, FeatureView):
self._registry.apply_feature_view(ob, project=self.config.project)
self._registry.apply_feature_view(ob, project=self.project)
views_to_update.append(ob)
elif isinstance(ob, Entity):
self._registry.apply_entity(ob, project=self.config.project)
self._registry.apply_entity(ob, project=self.project)
entities_to_update.append(ob)
else:
raise ValueError(
f"Unknown object type ({type(ob)}) provided as part of apply() call"
)
self._get_provider().update_infra(
project=self.config.project,
project=self.project,
tables_to_delete=[],
tables_to_keep=views_to_update,
entities_to_delete=[],
entities_to_keep=entities_to_update,
partial=True,
)

Expand Down Expand Up @@ -263,6 +267,7 @@ def get_historical_features(
Examples:
Retrieve historical features using a BigQuery SQL entity dataframe
>>> from feast.feature_store import FeatureStore
>>>
>>> fs = FeatureStore(config=RepoConfig(provider="gcp"))
Expand All @@ -275,9 +280,7 @@ def get_historical_features(
"""
self._tele.log("get_historical_features")

all_feature_views = self._registry.list_feature_views(
project=self.config.project
)
all_feature_views = self._registry.list_feature_views(project=self.project)
try:
feature_views = _get_requested_feature_views(
feature_refs, all_feature_views
Expand Down Expand Up @@ -319,6 +322,7 @@ def materialize_incremental(
Examples:
Materialize all features into the online store up to 5 minutes ago.
>>> from datetime import datetime, timedelta
>>> from feast.feature_store import FeatureStore
>>>
Expand All @@ -330,13 +334,11 @@ def materialize_incremental(
feature_views_to_materialize = []
if feature_views is None:
feature_views_to_materialize = self._registry.list_feature_views(
self.config.project
self.project
)
else:
for name in feature_views:
feature_view = self._registry.get_feature_view(
name, self.config.project
)
feature_view = self._registry.get_feature_view(name, self.project)
feature_views_to_materialize.append(feature_view)

# TODO paging large loads
Expand Down Expand Up @@ -378,6 +380,7 @@ def materialize(
Examples:
Materialize all features into the online store over the interval
from 3 hours ago to 10 minutes ago.
>>> from datetime import datetime, timedelta
>>> from feast.feature_store import FeatureStore
>>>
Expand All @@ -396,13 +399,11 @@ def materialize(
feature_views_to_materialize = []
if feature_views is None:
feature_views_to_materialize = self._registry.list_feature_views(
self.config.project
self.project
)
else:
for name in feature_views:
feature_view = self._registry.get_feature_view(
name, self.config.project
)
feature_view = self._registry.get_feature_view(name, self.project)
feature_views_to_materialize.append(feature_view)

# TODO paging large loads
Expand Down Expand Up @@ -445,7 +446,7 @@ def get_online_features(
>>> entity_rows = [{"customer_id": 0},{"customer_id": 1}]
>>>
>>> online_response = store.get_online_features(
>>> feature_refs, entity_rows, project="my_project")
>>> feature_refs, entity_rows)
>>> online_response_dict = online_response.to_dict()
>>> print(online_response_dict)
{'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]}
Expand Down Expand Up @@ -481,7 +482,7 @@ def get_online_features(
result_rows.append(_entity_row_to_field_values(entity_row_proto))

all_feature_views = self._registry.list_feature_views(
project=self.config.project, allow_cache=True
project=self.project, allow_cache=True
)

grouped_refs = _group_refs(feature_refs, all_feature_views)
Expand Down
8 changes: 7 additions & 1 deletion sdk/python/feast/infra/aws_dynamodb_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from botocore.exceptions import ClientError

from feast import FeatureTable, utils
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
Expand Down Expand Up @@ -35,6 +36,8 @@ def update_infra(
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
dynamodb = self._initialize_dynamodb()
Expand Down Expand Up @@ -70,7 +73,10 @@ def update_infra(
table.delete()

def teardown_infra(
self, project: str, tables: Sequence[Union[FeatureTable, FeatureView]]
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
dynamodb = self._initialize_dynamodb()

Expand Down
8 changes: 7 additions & 1 deletion sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from google.auth.exceptions import DefaultCredentialsError

from feast import FeatureTable, utils
from feast.entity import Entity
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
Expand Down Expand Up @@ -55,6 +56,8 @@ def update_infra(
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
from google.cloud import datastore
Expand All @@ -77,7 +80,10 @@ def update_infra(
client.delete(key)

def teardown_infra(
self, project: str, tables: Sequence[Union[FeatureTable, FeatureView]]
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
client = self._initialize_client()

Expand Down
8 changes: 7 additions & 1 deletion sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytz

from feast import FeatureTable, utils
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
Expand Down Expand Up @@ -50,6 +51,8 @@ def update_infra(
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
conn = self._get_conn()
Expand All @@ -65,7 +68,10 @@ def update_infra(
conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")

def teardown_infra(
self, project: str, tables: Sequence[Union[FeatureTable, FeatureView]]
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
os.unlink(self._db_path)

Expand Down
Loading

0 comments on commit d63be24

Please sign in to comment.