Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parquet ingestion #1410

Merged
merged 5 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
import json
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict
from typing import Dict, List

import click
import pkg_resources
import yaml
from pytz import utc

from feast.client import Client
from feast.config import Config
from feast.entity import Entity
from feast.feature_store import FeatureStore
from feast.feature_table import FeatureTable
from feast.loaders.yaml import yaml_loader
from feast.repo_config import load_repo_config
Expand Down Expand Up @@ -389,5 +392,24 @@ def registry_dump_command(repo_path: str):
registry_dump(repo_config)


@cli.command("materialize")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
@click.argument("start_ts")
@click.argument("end_ts")
@click.option(
"--views", "-v", help="Feature views to materialize", multiple=True,
)
def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[str]):
"""
Prints contents of the metadata registry
"""
store = FeatureStore(repo_path=repo_path)
store.materialize(
feature_views=None if not views else views,
start_date=datetime.fromisoformat(start_ts).replace(tzinfo=utc),
end_date=datetime.fromisoformat(end_ts).replace(tzinfo=utc),
)


if __name__ == "__main__":
cli()
24 changes: 17 additions & 7 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import pandas as pd
import pyarrow

from feast.data_source import FileSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.provider import Provider, get_provider
Expand Down Expand Up @@ -51,10 +50,12 @@ class FeatureStore:
"""

config: RepoConfig
repo_path: Optional[str]

def __init__(
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
):
self.repo_path = repo_path
if repo_path is not None and config is not None:
raise ValueError("You cannot specify both repo_path and config")
if config is not None:
Expand Down Expand Up @@ -225,10 +226,6 @@ def materialize(

# TODO paging large loads
for feature_view in feature_views_to_materialize:
if isinstance(feature_view.input, FileSource):
raise NotImplementedError(
"This function is not yet implemented for File data sources"
)
(
entity_names,
feature_names,
Expand Down Expand Up @@ -451,6 +448,18 @@ def _convert_arrow_to_proto(
table: pyarrow.Table, feature_view: FeatureView
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
rows_to_write = []

def _coerce_datetime(ts):
"""
Depending on underlying ts resolution, arrow to_pydict() sometimes returns pandas timestamp
type (for ns resolution), and sometimes you get python datetime (for us resolution).
"""

if isinstance(ts, pd.Timestamp):
return ts.to_pydatetime()
else:
return ts

for row in zip(*table.to_pydict().values()):
entity_key = EntityKeyProto()
for entity_name in feature_view.entities:
Expand All @@ -466,12 +475,13 @@ def _convert_arrow_to_proto(
event_timestamp_idx = table.column_names.index(
feature_view.input.event_timestamp_column
)
event_timestamp = row[event_timestamp_idx]
event_timestamp = _coerce_datetime(row[event_timestamp_idx])

if feature_view.input.created_timestamp_column is not None:
created_timestamp_idx = table.column_names.index(
feature_view.input.created_timestamp_column
)
created_timestamp = row[created_timestamp_idx]
created_timestamp = _coerce_datetime(row[created_timestamp_idx])
else:
created_timestamp = None

Expand Down
12 changes: 12 additions & 0 deletions sdk/python/feast/infra/local_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from datetime import datetime
from typing import Dict, List, Optional, Sequence, Tuple, Union

import pytz

from feast import FeatureTable, FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.provider import Provider
Expand All @@ -15,6 +17,13 @@ def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str:
return f"{project}_{table.name}"


def _to_naive_utc(ts: datetime):
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)


class LocalSqlite(Provider):
_db_path: str

Expand Down Expand Up @@ -64,6 +73,9 @@ def online_write_batch(
for entity_key, values, timestamp, created_ts in data:
for feature_name, val in values.items():
entity_key_bin = serialize_entity_key(entity_key)
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)

conn.execute(
f"""
Expand Down
23 changes: 22 additions & 1 deletion sdk/python/feast/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,26 @@ def pull_latest_from_table_or_query(
start_date: datetime,
end_date: datetime,
) -> pyarrow.Table:
pass
assert isinstance(data_source, FileSource)
source_df = pd.read_parquet(data_source.path)

ts_columns = (
[event_timestamp_column, created_timestamp_column]
if created_timestamp_column is not None
else [event_timestamp_column]
)
source_df.sort_values(by=ts_columns, inplace=True)

filtered_df = source_df[
(source_df[event_timestamp_column] >= start_date)
& (source_df[event_timestamp_column] < end_date)
]
last_values_df = filtered_df.groupby(by=entity_names).last()
last_values_df.reset_index(inplace=True)

return pyarrow.Table.from_pandas(
last_values_df[entity_names + feature_names + ts_columns]
)

@staticmethod
def get_historical_features(
Expand Down Expand Up @@ -569,5 +588,7 @@ def _get_requested_feature_views_to_features_dict(
def get_offline_store(config: RepoConfig) -> Type[OfflineStore]:
if config.provider == "gcp":
return BigQueryOfflineStore
elif config.provider == "local":
return FileOfflineStore
else:
raise ValueError(config)
27 changes: 27 additions & 0 deletions sdk/python/tests/cli/example_feature_repo_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from google.protobuf.duration_pb2 import Duration

from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource

driver_hourly_stats = FileSource(
path="%PARQUET_PATH%", # replaced by the test
event_timestamp_column="datetime",
created_timestamp_column="created",
)

driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)


driver_hourly_stats = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 1),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
input=driver_hourly_stats,
tags={},
)
75 changes: 75 additions & 0 deletions sdk/python/tests/cli/test_e2e_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os
import tempfile
from datetime import datetime, timedelta

import pandas as pd

import tests.driver_test_data as driver_data
from tests.cli.utils import CliRunner, get_example_repo


def _get_last_feature_row(df: pd.DataFrame, driver_id):
filtered = df[df["driver_id"] == driver_id]
max_ts = filtered.loc[filtered["datetime"].idxmax()]["datetime"]
filtered_by_ts = filtered[filtered["datetime"] == max_ts]
return filtered_by_ts.loc[filtered_by_ts["created"].idxmax()]


class TestLocalEndToEnd:
def test_basic(self) -> None:
"""
Add another table to existing repo using partial apply API. Make sure both the table
applied via CLI apply and the new table are passing RW test.
"""

runner = CliRunner()
with tempfile.TemporaryDirectory() as data_dir:
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = driver_data.create_driver_hourly_stats_df(
driver_entities, start_date, end_date
)

driver_stats_path = os.path.join(data_dir, "driver_stats.parquet")
driver_df.to_parquet(
path=driver_stats_path, allow_truncated_timestamps=True
)

with runner.local_repo(
get_example_repo("example_feature_repo_2.py").replace(
"%PARQUET_PATH%", driver_stats_path
)
) as store:

repo_path = store.repo_path

r = runner.run(
[
"materialize",
str(repo_path),
start_date.isoformat(),
end_date.isoformat(),
],
cwd=repo_path,
)
assert r.returncode == 0
result = store.get_online_features(
feature_refs=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:avg_daily_trips",
],
entity_rows=[{"driver_id": 1001}],
)

assert "driver_hourly_stats:avg_daily_trips" in result.to_dict()

assert "driver_hourly_stats:conv_rate" in result.to_dict()
assert (
abs(
result.to_dict()["driver_hourly_stats:conv_rate"][0]
- _get_last_feature_row(driver_df, 1001)["conv_rate"]
)
< 0.01
)
4 changes: 2 additions & 2 deletions sdk/python/tests/cli/test_online_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from tests.cli.utils import CliRunner
from tests.cli.utils import CliRunner, get_example_repo


class TestOnlineRetrieval:
def test_basic(self) -> None:
runner = CliRunner()
with runner.local_repo("example_feature_repo_1.py") as store:
with runner.local_repo(get_example_repo("example_feature_repo_1.py")) as store:

# Write some data to two tables
registry = store._get_registry()
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/cli/test_partial_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from feast import BigQuerySource, Feature, FeatureView, ValueType
from tests.cli.online_read_write_test import basic_rw_test
from tests.cli.utils import CliRunner
from tests.cli.utils import CliRunner, get_example_repo


class TestOnlineRetrieval:
Expand All @@ -13,7 +13,7 @@ def test_basic(self) -> None:
"""

runner = CliRunner()
with runner.local_repo("example_feature_repo_1.py") as store:
with runner.local_repo(get_example_repo("example_feature_repo_1.py")) as store:

driver_locations_source = BigQuerySource(
table_ref="rh_prod.ride_hailing_co.drivers",
Expand Down
13 changes: 7 additions & 6 deletions sdk/python/tests/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from feast.feature_store import FeatureStore


def get_example_repo(example_repo_py) -> str:
return (Path(__file__).parent / example_repo_py).read_text()


class CliRunner:
"""
NB. We can't use test runner helper from click here, since it doesn't start a new Python
Expand All @@ -27,9 +31,8 @@ def local_repo(self, example_repo_py: str):
"""
Convenience method to set up all the boilerplate for a local feature repo.
"""
project_id = "".join(
"test" + random.choice(string.ascii_lowercase + string.digits)
for _ in range(10)
project_id = "test" + "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(10)
)

with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name:
Expand All @@ -53,9 +56,7 @@ def local_repo(self, example_repo_py: str):
)

repo_example = repo_path / "example.py"
repo_example.write_text(
(Path(__file__).parent / example_repo_py).read_text()
)
repo_example.write_text(example_repo_py)

result = self.run(["apply", str(repo_path)], cwd=repo_path)
assert result.returncode == 0
Expand Down
Loading