Skip to content

Commit

Permalink
Merge branch 'feast-dev:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
ElliotNguyen68 authored Mar 13, 2024
2 parents 001354c + 34cabfb commit 36e2279
Show file tree
Hide file tree
Showing 33 changed files with 1,139 additions and 513 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ jobs:
ref: refs/pull/${{ github.event.pull_request.number }}/merge
submodules: recursive
- name: Setup Python
uses: actions/setup-python@v3
uses: actions/setup-python@v4
id: setup-python
with:
python-version: ${{ matrix.python-version }}
Expand Down
3 changes: 3 additions & 0 deletions docs/getting-started/concepts/registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ registry:
registry_type: sql
path: postgresql://postgres:[email protected]:55001/feast
cache_ttl_seconds: 60
sqlalchemy_config_kwargs:
echo: false
pool_pre_ping: true
```
This supports any SQLAlchemy compatible database as a backend. The exact schema can be seen in [sql.py](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/registry/sql.py)
Expand Down
3 changes: 3 additions & 0 deletions docs/tutorials/using-scalable-registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ registry:
registry_type: sql
path: postgresql://postgres:[email protected]:55001/feast
cache_ttl_seconds: 60
sqlalchemy_config_kwargs:
echo: false
pool_pre_ping: true
```
Specifically, the registry_type needs to be set to sql in the registry config block. On doing so, the path should refer to the [Database URL](https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls) for the database to be used, as expected by SQLAlchemy. No other additional commands are currently needed to configure this registry.
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/driver_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame
"event_timestamp": [
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
for dt in pd.date_range(
start=start_date, end=end_date, freq="1H", inclusive="left"
start=start_date, end=end_date, freq="1h", inclusive="left"
)
]
# include a fixed timestamp for get_historical_features in the quickstart
Expand Down Expand Up @@ -209,7 +209,7 @@ def create_location_stats_df(locations, start_date, end_date) -> pd.DataFrame:
"event_timestamp": [
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
for dt in pd.date_range(
start=start_date, end=end_date, freq="1H", inclusive="left"
start=start_date, end=end_date, freq="1h", inclusive="left"
)
]
}
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,8 @@ def __init__(self):
class PushSourceNotFoundException(Exception):
def __init__(self, push_source_name: str):
super().__init__(f"Unable to find push source '{push_source_name}'.")


class ReadOnlyRegistryException(Exception):
def __init__(self):
super().__init__("Registry implementation is read-only.")
8 changes: 7 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from feast.infra.registry.sql import SqlRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.serving.ServingService_pb2 import (
FieldStatus,
GetOnlineFeaturesResponse,
Expand Down Expand Up @@ -164,6 +165,10 @@ def __init__(
self._registry = SnowflakeRegistry(
registry_config, self.config.project, None
)
elif registry_config and registry_config.registry_type == "remote":
from feast.infra.registry.remote import RemoteRegistry

self._registry = RemoteRegistry(registry_config, self.config.project, None)
else:
r = Registry(self.config.project, registry_config, repo_path=self.repo_path)
r._initialize_registry(self.config.project)
Expand Down Expand Up @@ -741,7 +746,8 @@ def plan(
# Compute the desired difference between the current infra, as stored in the registry,
# and the desired infra.
self._registry.refresh(project=self.project)
current_infra_proto = self._registry.proto().infra.__deepcopy__()
current_infra_proto = InfraProto()
current_infra_proto.CopyFrom(self._registry.proto().infra)
desired_registry_proto = desired_repo_contents.to_registry_proto()
new_infra = self._provider.plan_infra(self.config, desired_registry_proto)
new_infra_proto = new_infra.to_proto()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from feast import flags_helper
from feast.data_source import DataSource
from feast.errors import DataSourceNoNameException
from feast.errors import DataSourceNoNameException, DataSourceNotFoundException
from feast.infra.offline_stores.offline_utils import get_temp_entity_table_name
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.SavedDataset_pb2 import (
Expand Down Expand Up @@ -179,6 +179,7 @@ def get_table_query_string(self) -> str:
logger.exception(
"Spark read of file source failed.\n" + traceback.format_exc()
)
raise DataSourceNotFoundException(self.path)
tmp_table_name = get_temp_entity_table_name()
df.createOrReplaceTempView(tmp_table_name)

Expand Down
19 changes: 14 additions & 5 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, List, Literal, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union

import dask
import dask.dataframe as dd
Expand Down Expand Up @@ -38,10 +38,7 @@
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import log_exceptions_and_usage
from feast.utils import (
_get_requested_feature_views_to_features_dict,
_run_dask_field_mapping,
)
from feast.utils import _get_requested_feature_views_to_features_dict

# FileRetrievalJob will cast string objects to string[pyarrow] from dask version 2023.7.1
# This is not the desired behavior for our use case, so we set the convert-string option to False
Expand Down Expand Up @@ -512,6 +509,18 @@ def _read_datasource(data_source) -> dd.DataFrame:
)


def _run_dask_field_mapping(
table: dd.DataFrame,
field_mapping: Dict[str, str],
):
if field_mapping:
# run field mapping in the forward direction
table = table.rename(columns=field_mapping)
table = table.persist()

return table


def _field_mapping(
df_to_join: dd.DataFrame,
feature_view: FeatureView,
Expand Down
10 changes: 9 additions & 1 deletion sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Callable, Dict, Iterable, List, Optional, Tuple

import pyarrow
from packaging import version
from pyarrow._fs import FileSystem
from pyarrow._s3fs import S3FileSystem
from pyarrow.parquet import ParquetDataset
Expand Down Expand Up @@ -158,7 +160,13 @@ def get_table_column_names_and_types(
# Adding support for different file format path
# based on S3 filesystem
if filesystem is None:
schema = ParquetDataset(path, use_legacy_dataset=False).schema
kwargs = (
{"use_legacy_dataset": False}
if version.parse(pyarrow.__version__) < version.parse("15.0.0")
else {}
)

schema = ParquetDataset(path, **kwargs).schema
if hasattr(schema, "names") and hasattr(schema, "types"):
# Newer versions of pyarrow doesn't have this method,
# but this field is good enough.
Expand Down
15 changes: 2 additions & 13 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,20 +470,9 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
return df

def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
pa_table = execute_snowflake_statement(
return execute_snowflake_statement(
self.snowflake_conn, self.to_sql()
).fetch_arrow_all(force_return_table=False)

if pa_table:
return pa_table
else:
empty_result = execute_snowflake_statement(
self.snowflake_conn, self.to_sql()
)

return pyarrow.Table.from_pandas(
pd.DataFrame(columns=[md.name for md in empty_result.description])
)
).fetch_arrow_all(force_return_table=True)

def to_sql(self) -> str:
"""
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True):
@abstractmethod
def get_stream_feature_view(
self, name: str, project: str, allow_cache: bool = False
):
) -> StreamFeatureView:
"""
Retrieves a stream feature view.
Expand Down
Loading

0 comments on commit 36e2279

Please sign in to comment.