From 9533fbedfd1201d37c0247c2e076624835483936 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Wed, 24 Mar 2021 21:47:56 -0700 Subject: [PATCH 1/4] support parquet ingestion Signed-off-by: Oleg Avdeev --- sdk/python/feast/cli.py | 24 ++- sdk/python/feast/feature_store.py | 24 ++- sdk/python/feast/infra/local_sqlite.py | 12 ++ sdk/python/feast/offline_store.py | 23 ++- .../tests/cli/example_feature_repo_2.py | 27 ++++ sdk/python/tests/cli/test_e2e_local.py | 75 ++++++++++ sdk/python/tests/cli/test_online_retrieval.py | 4 +- sdk/python/tests/cli/test_partial_apply.py | 4 +- sdk/python/tests/cli/utils.py | 13 +- sdk/python/tests/driver_test_data.py | 140 ++++++++++++++++++ sdk/python/tests/test_historical_retrieval.py | 105 ++----------- 11 files changed, 337 insertions(+), 114 deletions(-) create mode 100644 sdk/python/tests/cli/example_feature_repo_2.py create mode 100644 sdk/python/tests/cli/test_e2e_local.py create mode 100644 sdk/python/tests/driver_test_data.py diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 12aeada765..450e579991 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -15,16 +15,19 @@ import json import logging import sys +from datetime import datetime from pathlib import Path -from typing import Dict +from typing import Dict, List import click import pkg_resources import yaml +from pytz import utc from feast.client import Client from feast.config import Config from feast.entity import Entity +from feast.feature_store import FeatureStore from feast.feature_table import FeatureTable from feast.loaders.yaml import yaml_loader from feast.repo_config import load_repo_config @@ -389,5 +392,24 @@ def registry_dump_command(repo_path: str): registry_dump(repo_config) +@cli.command("materialize") +@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True)) +@click.argument("start_ts") +@click.argument("end_ts") +@click.option( + "--views", "-v", help="Feature views to materialize", multiple=True, +) +def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[str]): + """ + Prints contents of the metadata registry + """ + store = FeatureStore(repo_path=repo_path) + store.materialize( + feature_views=None if not views else views, + start_date=datetime.fromisoformat(start_ts).replace(tzinfo=utc), + end_date=datetime.fromisoformat(end_ts).replace(tzinfo=utc), + ) + + if __name__ == "__main__": cli() diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index bff787dfaa..1bb78b2ffc 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -19,7 +19,6 @@ import pandas as pd import pyarrow -from feast.data_source import FileSource from feast.entity import Entity from feast.feature_view import FeatureView from feast.infra.provider import Provider, get_provider @@ -51,10 +50,12 @@ class FeatureStore: """ config: RepoConfig + repo_path: Optional[str] def __init__( self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, ): + self.repo_path = repo_path if repo_path is not None and config is not None: raise ValueError("You cannot specify both repo_path and config") if config is not None: @@ -225,10 +226,6 @@ def materialize( # TODO paging large loads for feature_view in feature_views_to_materialize: - if isinstance(feature_view.input, FileSource): - raise NotImplementedError( - "This function is not yet implemented for File data sources" - ) ( entity_names, feature_names, @@ -451,6 +448,18 @@ def _convert_arrow_to_proto( table: pyarrow.Table, feature_view: FeatureView ) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: rows_to_write = [] + + def _coerce_datetime(ts): + """ + Depending on underlying ts resolution, arrow to_pydict() sometimes returns pandas timestamp + type (for ns resolution), and sometimes you get python datetime (for us resolution). + """ + + if isinstance(ts, pd.Timestamp): + return ts.to_pydatetime() + else: + return ts + for row in zip(*table.to_pydict().values()): entity_key = EntityKeyProto() for entity_name in feature_view.entities: @@ -466,12 +475,13 @@ def _convert_arrow_to_proto( event_timestamp_idx = table.column_names.index( feature_view.input.event_timestamp_column ) - event_timestamp = row[event_timestamp_idx] + event_timestamp = _coerce_datetime(row[event_timestamp_idx]) + if feature_view.input.created_timestamp_column is not None: created_timestamp_idx = table.column_names.index( feature_view.input.created_timestamp_column ) - created_timestamp = row[created_timestamp_idx] + created_timestamp = _coerce_datetime(row[created_timestamp_idx]) else: created_timestamp = None diff --git a/sdk/python/feast/infra/local_sqlite.py b/sdk/python/feast/infra/local_sqlite.py index eda37bdeac..5035cb7406 100644 --- a/sdk/python/feast/infra/local_sqlite.py +++ b/sdk/python/feast/infra/local_sqlite.py @@ -3,6 +3,8 @@ from datetime import datetime from typing import Dict, List, Optional, Sequence, Tuple, Union +import pytz + from feast import FeatureTable, FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.provider import Provider @@ -15,6 +17,13 @@ def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str: return f"{project}_{table.name}" +def _to_naive_utc(ts: datetime): + if ts.tzinfo is None: + return ts + else: + return ts.astimezone(pytz.utc).replace(tzinfo=None) + + class LocalSqlite(Provider): _db_path: str @@ -64,6 +73,9 @@ def online_write_batch( for entity_key, values, timestamp, created_ts in data: for feature_name, val in values.items(): entity_key_bin = serialize_entity_key(entity_key) + timestamp = _to_naive_utc(timestamp) + if created_ts is not None: + created_ts = _to_naive_utc(created_ts) conn.execute( f""" diff --git a/sdk/python/feast/offline_store.py b/sdk/python/feast/offline_store.py index 08c8cb3808..0cac44b166 100644 --- a/sdk/python/feast/offline_store.py +++ b/sdk/python/feast/offline_store.py @@ -295,7 +295,26 @@ def pull_latest_from_table_or_query( start_date: datetime, end_date: datetime, ) -> pyarrow.Table: - pass + assert isinstance(data_source, FileSource) + source_df = pd.read_parquet(data_source.path) + + ts_columns = ( + [event_timestamp_column, created_timestamp_column] + if created_timestamp_column is not None + else [event_timestamp_column] + ) + source_df.sort_values(by=ts_columns, inplace=True) + + filtered_df = source_df[ + (source_df[event_timestamp_column] >= start_date) + & (source_df[event_timestamp_column] < end_date) + ] + last_values_df = filtered_df.groupby(by=entity_names).last() + last_values_df.reset_index(inplace=True) + + return pyarrow.Table.from_pandas( + last_values_df[entity_names + feature_names + ts_columns] + ) @staticmethod def get_historical_features( @@ -569,5 +588,7 @@ def _get_requested_feature_views_to_features_dict( def get_offline_store(config: RepoConfig) -> Type[OfflineStore]: if config.provider == "gcp": return BigQueryOfflineStore + elif config.provider == "local": + return FileOfflineStore else: raise ValueError(config) diff --git a/sdk/python/tests/cli/example_feature_repo_2.py b/sdk/python/tests/cli/example_feature_repo_2.py new file mode 100644 index 0000000000..aac3b193c4 --- /dev/null +++ b/sdk/python/tests/cli/example_feature_repo_2.py @@ -0,0 +1,27 @@ +from google.protobuf.duration_pb2 import Duration + +from feast import Entity, Feature, FeatureView, ValueType +from feast.data_source import FileSource + +driver_hourly_stats = FileSource( + path="%PARQUET_PATH%", # replaced by the test + event_timestamp_column="datetime", + created_timestamp_column="created", +) + +driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) + + +driver_hourly_stats = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=Duration(seconds=86400 * 1), + features=[ + Feature(name="conv_rate", dtype=ValueType.FLOAT), + Feature(name="acc_rate", dtype=ValueType.FLOAT), + Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ], + online=True, + input=driver_hourly_stats, + tags={}, +) diff --git a/sdk/python/tests/cli/test_e2e_local.py b/sdk/python/tests/cli/test_e2e_local.py new file mode 100644 index 0000000000..de2f941813 --- /dev/null +++ b/sdk/python/tests/cli/test_e2e_local.py @@ -0,0 +1,75 @@ +import os +import tempfile +from datetime import datetime, timedelta + +import pandas as pd + +import tests.driver_test_data as driver_data +from tests.cli.utils import CliRunner, get_example_repo + + +def _get_last_feature_row(df: pd.DataFrame, driver_id): + filtered = df[df["driver_id"] == driver_id] + max_ts = filtered.loc[filtered["datetime"].idxmax()]["datetime"] + filtered_by_ts = filtered[filtered["datetime"] == max_ts] + return filtered_by_ts.loc[filtered_by_ts["created"].idxmax()] + + +class TestLocalEndToEnd: + def test_basic(self) -> None: + """ + Add another table to existing repo using partial apply API. Make sure both the table + applied via CLI apply and the new table are passing RW test. + """ + + runner = CliRunner() + with tempfile.TemporaryDirectory() as data_dir: + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = driver_data.create_driver_hourly_stats_df( + driver_entities, start_date, end_date + ) + + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet( + path=driver_stats_path, allow_truncated_timestamps=True + ) + + with runner.local_repo( + get_example_repo("example_feature_repo_2.py").replace( + "%PARQUET_PATH%", driver_stats_path + ) + ) as store: + + repo_path = store.repo_path + + r = runner.run( + [ + "materialize", + str(repo_path), + start_date.isoformat(), + end_date.isoformat(), + ], + cwd=repo_path, + ) + assert r.returncode == 0 + result = store.get_online_features( + feature_refs=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:avg_daily_trips", + ], + entity_rows=[{"driver_id": 1001}], + ) + + assert "driver_hourly_stats:avg_daily_trips" in result.to_dict() + + assert "driver_hourly_stats:conv_rate" in result.to_dict() + assert ( + abs( + result.to_dict()["driver_hourly_stats:conv_rate"][0] + - _get_last_feature_row(driver_df, 1001)["conv_rate"] + ) + < 0.01 + ) diff --git a/sdk/python/tests/cli/test_online_retrieval.py b/sdk/python/tests/cli/test_online_retrieval.py index 43fb47a038..f7fd6c9517 100644 --- a/sdk/python/tests/cli/test_online_retrieval.py +++ b/sdk/python/tests/cli/test_online_retrieval.py @@ -4,13 +4,13 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from tests.cli.utils import CliRunner +from tests.cli.utils import CliRunner, get_example_repo class TestOnlineRetrieval: def test_basic(self) -> None: runner = CliRunner() - with runner.local_repo("example_feature_repo_1.py") as store: + with runner.local_repo(get_example_repo("example_feature_repo_1.py")) as store: # Write some data to two tables registry = store._get_registry() diff --git a/sdk/python/tests/cli/test_partial_apply.py b/sdk/python/tests/cli/test_partial_apply.py index 21f4aa1187..b36c5eb62e 100644 --- a/sdk/python/tests/cli/test_partial_apply.py +++ b/sdk/python/tests/cli/test_partial_apply.py @@ -2,7 +2,7 @@ from feast import BigQuerySource, Feature, FeatureView, ValueType from tests.cli.online_read_write_test import basic_rw_test -from tests.cli.utils import CliRunner +from tests.cli.utils import CliRunner, get_example_repo class TestOnlineRetrieval: @@ -13,7 +13,7 @@ def test_basic(self) -> None: """ runner = CliRunner() - with runner.local_repo("example_feature_repo_1.py") as store: + with runner.local_repo(get_example_repo("example_feature_repo_1.py")) as store: driver_locations_source = BigQuerySource( table_ref="rh_prod.ride_hailing_co.drivers", diff --git a/sdk/python/tests/cli/utils.py b/sdk/python/tests/cli/utils.py index 286ab0eaee..55ff6b9b7f 100644 --- a/sdk/python/tests/cli/utils.py +++ b/sdk/python/tests/cli/utils.py @@ -12,6 +12,10 @@ from feast.feature_store import FeatureStore +def get_example_repo(example_repo_py) -> str: + return (Path(__file__).parent / example_repo_py).read_text() + + class CliRunner: """ NB. We can't use test runner helper from click here, since it doesn't start a new Python @@ -27,9 +31,8 @@ def local_repo(self, example_repo_py: str): """ Convenience method to set up all the boilerplate for a local feature repo. """ - project_id = "".join( - "test" + random.choice(string.ascii_lowercase + string.digits) - for _ in range(10) + project_id = "test" + "".join( + random.choice(string.ascii_lowercase + string.digits) for _ in range(10) ) with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: @@ -53,9 +56,7 @@ def local_repo(self, example_repo_py: str): ) repo_example = repo_path / "example.py" - repo_example.write_text( - (Path(__file__).parent / example_repo_py).read_text() - ) + repo_example.write_text(example_repo_py) result = self.run(["apply", str(repo_path)], cwd=repo_path) assert result.returncode == 0 diff --git a/sdk/python/tests/driver_test_data.py b/sdk/python/tests/driver_test_data.py new file mode 100644 index 0000000000..538616e41d --- /dev/null +++ b/sdk/python/tests/driver_test_data.py @@ -0,0 +1,140 @@ +import numpy as np +import pandas as pd + +from feast.offline_store import ENTITY_DF_EVENT_TIMESTAMP_COL + + +def create_orders_df( + customers, drivers, start_date, end_date, order_count +) -> pd.DataFrame: + """ + Example df generated by this function: + + | order_id | driver_id | customer_id | order_is_success | event_timestamp | + +----------+-----------+-------------+------------------+---------------------+ + | 100 | 5004 | 1007 | 0 | 2021-03-10 19:31:15 | + | 101 | 5003 | 1006 | 0 | 2021-03-11 22:02:50 | + | 102 | 5010 | 1005 | 0 | 2021-03-13 00:34:24 | + | 103 | 5010 | 1001 | 1 | 2021-03-14 03:05:59 | + """ + df = pd.DataFrame() + df["order_id"] = [order_id for order_id in range(100, 100 + order_count)] + df["driver_id"] = np.random.choice(drivers, order_count) + df["customer_id"] = np.random.choice(customers, order_count) + df["order_is_success"] = np.random.randint(0, 2, size=order_count).astype(np.int32) + df[ENTITY_DF_EVENT_TIMESTAMP_COL] = [ + pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") + for dt in pd.date_range(start=start_date, end=end_date, periods=order_count) + ] + df.sort_values( + by=[ENTITY_DF_EVENT_TIMESTAMP_COL, "order_id", "driver_id", "customer_id"], + inplace=True, + ) + return df + + +def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame: + """ + Example df generated by this function: + + | datetime | driver_id | conv_rate | acc_rate | avg_daily_trips | created | + |------------------+-----------+-----------+----------+-----------------+------------------| + | 2021-03-17 19:31 | 5010 | 0.229297 | 0.685843 | 861 | 2021-03-24 19:34 | + | 2021-03-17 20:31 | 5010 | 0.781655 | 0.861280 | 769 | 2021-03-24 19:34 | + | 2021-03-17 21:31 | 5010 | 0.150333 | 0.525581 | 778 | 2021-03-24 19:34 | + | 2021-03-17 22:31 | 5010 | 0.951701 | 0.228883 | 570 | 2021-03-24 19:34 | + | 2021-03-17 23:31 | 5010 | 0.819598 | 0.262503 | 473 | 2021-03-24 19:34 | + | | ... | ... | ... | ... | | + | 2021-03-24 16:31 | 5001 | 0.061585 | 0.658140 | 477 | 2021-03-24 19:34 | + | 2021-03-24 17:31 | 5001 | 0.088949 | 0.303897 | 618 | 2021-03-24 19:34 | + | 2021-03-24 18:31 | 5001 | 0.096652 | 0.747421 | 480 | 2021-03-24 19:34 | + | 2021-03-17 19:31 | 5005 | 0.142936 | 0.707596 | 466 | 2021-03-24 19:34 | + | 2021-03-17 19:31 | 5005 | 0.142936 | 0.707596 | 466 | 2021-03-24 19:34 | + """ + df_hourly = pd.DataFrame( + { + "datetime": [ + pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") + for dt in pd.date_range( + start=start_date, end=end_date, freq="1H", closed="left" + ) + ] + } + ) + df_all_drivers = pd.DataFrame() + + for driver in drivers: + df_hourly_copy = df_hourly.copy() + df_hourly_copy["driver_id"] = driver + df_all_drivers = pd.concat([df_hourly_copy, df_all_drivers]) + + df_all_drivers.reset_index(drop=True, inplace=True) + rows = df_all_drivers["datetime"].count() + + df_all_drivers["conv_rate"] = np.random.random(size=rows).astype(np.float32) + df_all_drivers["acc_rate"] = np.random.random(size=rows).astype(np.float32) + df_all_drivers["avg_daily_trips"] = np.random.randint(0, 1000, size=rows).astype( + np.int32 + ) + df_all_drivers["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) + + # Create duplicate rows that should be filtered by created timestamp + # TODO: These duplicate rows area indirectly being filtered out by the point in time join already. We need to + # inject a bad row at a timestamp where we know it will get joined to the entity dataframe, and then test that + # we are actually filtering it with the created timestamp + late_row = df_all_drivers.iloc[int(rows / 2)] + df_all_drivers = df_all_drivers.append(late_row).append(late_row) + + return df_all_drivers + + +def create_customer_daily_profile_df(customers, start_date, end_date) -> pd.DataFrame: + """ + Example df generated by this function: + + | datetime | customer_id | current_balance | avg_passenger_count | lifetime_trip_count | created | + |------------------+-------------+-----------------+---------------------+---------------------+------------------| + | 2021-03-17 19:31 | 1010 | 0.889188 | 0.049057 | 412 | 2021-03-24 19:38 | + | 2021-03-18 19:31 | 1010 | 0.979273 | 0.212630 | 639 | 2021-03-24 19:38 | + | 2021-03-19 19:31 | 1010 | 0.976549 | 0.176881 | 70 | 2021-03-24 19:38 | + | 2021-03-20 19:31 | 1010 | 0.273697 | 0.325012 | 68 | 2021-03-24 19:38 | + | 2021-03-21 19:31 | 1010 | 0.438262 | 0.313009 | 192 | 2021-03-24 19:38 | + | | ... | ... | ... | ... | | + | 2021-03-19 19:31 | 1001 | 0.738860 | 0.857422 | 344 | 2021-03-24 19:38 | + | 2021-03-20 19:31 | 1001 | 0.848397 | 0.745989 | 106 | 2021-03-24 19:38 | + | 2021-03-21 19:31 | 1001 | 0.301552 | 0.185873 | 812 | 2021-03-24 19:38 | + | 2021-03-22 19:31 | 1001 | 0.943030 | 0.561219 | 322 | 2021-03-24 19:38 | + | 2021-03-23 19:31 | 1001 | 0.354919 | 0.810093 | 273 | 2021-03-24 19:38 | + """ + df_daily = pd.DataFrame( + { + "datetime": [ + pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") + for dt in pd.date_range( + start=start_date, end=end_date, freq="1D", closed="left" + ) + ] + } + ) + df_all_customers = pd.DataFrame() + + for customer in customers: + df_daily_copy = df_daily.copy() + df_daily_copy["customer_id"] = customer + df_all_customers = pd.concat([df_daily_copy, df_all_customers]) + + df_all_customers.reset_index(drop=True, inplace=True) + + rows = df_all_customers["datetime"].count() + + df_all_customers["current_balance"] = np.random.random(size=rows).astype(np.float32) + df_all_customers["avg_passenger_count"] = np.random.random(size=rows).astype( + np.float32 + ) + df_all_customers["lifetime_trip_count"] = np.random.randint( + 0, 1000, size=rows + ).astype(np.int32) + + # TODO: Remove created timestamp in order to test whether its really optional + df_all_customers["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) + return df_all_customers diff --git a/sdk/python/tests/test_historical_retrieval.py b/sdk/python/tests/test_historical_retrieval.py index 75bc7c4c77..e5333eb66d 100644 --- a/sdk/python/tests/test_historical_retrieval.py +++ b/sdk/python/tests/test_historical_retrieval.py @@ -9,6 +9,7 @@ from google.cloud import bigquery from pandas.testing import assert_frame_equal +import tests.driver_test_data as driver_data from feast.data_source import BigQuerySource, FileSource from feast.entity import Entity from feast.feature import Feature @@ -30,67 +31,12 @@ def generate_entities(date): after_end_date = end_date + timedelta(days=7) customer_entities = [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010] driver_entities = [5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010] - orders_df = create_orders_df( + orders_df = driver_data.create_orders_df( customer_entities, driver_entities, before_start_date, after_end_date, 20 ) return customer_entities, driver_entities, end_date, orders_df, start_date -def create_orders_df(customers, drivers, start_date, end_date, order_count): - df = pd.DataFrame() - df["order_id"] = [order_id for order_id in range(100, 100 + order_count)] - df["driver_id"] = np.random.choice(drivers, order_count) - df["customer_id"] = np.random.choice(customers, order_count) - df["order_is_success"] = np.random.randint(0, 2, size=order_count).astype(np.int32) - df[ENTITY_DF_EVENT_TIMESTAMP_COL] = [ - pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") - for dt in pd.date_range(start=start_date, end=end_date, periods=order_count) - ] - df.sort_values( - by=[ENTITY_DF_EVENT_TIMESTAMP_COL, "order_id", "driver_id", "customer_id"], - inplace=True, - ) - return df - - -def create_driver_hourly_stats_df(drivers, start_date, end_date): - df_hourly = pd.DataFrame( - { - "datetime": [ - pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") - for dt in pd.date_range( - start=start_date, end=end_date, freq="1H", closed="left" - ) - ] - } - ) - df_all_drivers = pd.DataFrame() - - for driver in drivers: - df_hourly_copy = df_hourly.copy() - df_hourly_copy["driver_id"] = driver - df_all_drivers = pd.concat([df_hourly_copy, df_all_drivers]) - - df_all_drivers.reset_index(drop=True, inplace=True) - rows = df_all_drivers["datetime"].count() - - df_all_drivers["conv_rate"] = np.random.random(size=rows).astype(np.float32) - df_all_drivers["acc_rate"] = np.random.random(size=rows).astype(np.float32) - df_all_drivers["avg_daily_trips"] = np.random.randint(0, 1000, size=rows).astype( - np.int32 - ) - df_all_drivers["created"] = pd.to_datetime(pd.Timestamp.now().round("ms")) - - # Create duplicate rows that should be filtered by created timestamp - # TODO: These duplicate rows area indirectly being filtered out by the point in time join already. We need to - # inject a bad row at a timestamp where we know it will get joined to the entity dataframe, and then test that - # we are actually filtering it with the created timestamp - late_row = df_all_drivers.iloc[int(rows / 2)] - df_all_drivers = df_all_drivers.append(late_row).append(late_row) - - return df_all_drivers - - def stage_driver_hourly_stats_parquet_source(directory, df): # Write to disk driver_stats_path = os.path.join(directory, "driver_stats.parquet") @@ -121,41 +67,6 @@ def create_driver_hourly_stats_feature_view(source): return driver_stats_feature_view -def create_customer_daily_profile_df(customers, start_date, end_date): - df_daily = pd.DataFrame( - { - "datetime": [ - pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") - for dt in pd.date_range( - start=start_date, end=end_date, freq="1D", closed="left" - ) - ] - } - ) - df_all_customers = pd.DataFrame() - - for customer in customers: - df_daily_copy = df_daily.copy() - df_daily_copy["customer_id"] = customer - df_all_customers = pd.concat([df_daily_copy, df_all_customers]) - - df_all_customers.reset_index(drop=True, inplace=True) - - rows = df_all_customers["datetime"].count() - - df_all_customers["current_balance"] = np.random.random(size=rows).astype(np.float32) - df_all_customers["avg_passenger_count"] = np.random.random(size=rows).astype( - np.float32 - ) - df_all_customers["lifetime_trip_count"] = np.random.randint( - 0, 1000, size=rows - ).astype(np.int32) - - # TODO: Remove created timestamp in order to test whether its really optional - df_all_customers["created"] = pd.to_datetime(pd.Timestamp.now().round("ms")) - return df_all_customers - - def stage_customer_daily_profile_parquet_source(directory, df): customer_profile_path = os.path.join(directory, "customer_profile.parquet") df.to_parquet(path=customer_profile_path, allow_truncated_timestamps=True) @@ -301,10 +212,12 @@ def test_historical_features_from_parquet_sources(): ) = generate_entities(start_date) with TemporaryDirectory() as temp_dir: - driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + driver_df = driver_data.create_driver_hourly_stats_df( + driver_entities, start_date, end_date + ) driver_source = stage_driver_hourly_stats_parquet_source(temp_dir, driver_df) driver_fv = create_driver_hourly_stats_feature_view(driver_source) - customer_df = create_customer_daily_profile_df( + customer_df = driver_data.create_customer_daily_profile_df( customer_entities, start_date, end_date ) customer_source = stage_customer_daily_profile_parquet_source( @@ -387,7 +300,9 @@ def test_historical_features_from_bigquery_sources(): entity_df_query = f"SELECT * FROM {gcp_project}.{table_id}" # Driver Feature View - driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + driver_df = driver_data.create_driver_hourly_stats_df( + driver_entities, start_date, end_date + ) driver_table_id = f"{gcp_project}.{bigquery_dataset}.driver_hourly" stage_driver_hourly_stats_bigquery_source(driver_df, driver_table_id) driver_source = BigQuerySource( @@ -398,7 +313,7 @@ def test_historical_features_from_bigquery_sources(): driver_fv = create_driver_hourly_stats_feature_view(driver_source) # Customer Feature View - customer_df = create_customer_daily_profile_df( + customer_df = driver_data.create_customer_daily_profile_df( customer_entities, start_date, end_date ) customer_table_id = f"{gcp_project}.{bigquery_dataset}.customer_profile" From 14b7ce7011b7abaa23035bc9cae2a286536fef4a Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Thu, 25 Mar 2021 10:31:23 -0700 Subject: [PATCH 2/4] better comments Signed-off-by: Oleg Avdeev --- sdk/python/feast/cli.py | 2 +- sdk/python/feast/feature_store.py | 9 ++++++-- sdk/python/feast/offline_store.py | 2 ++ .../tests/cli/example_feature_repo_2.py | 2 +- sdk/python/tests/cli/test_e2e_local.py | 22 ++++++++++++++----- 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 450e579991..d9234bab46 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -401,7 +401,7 @@ def registry_dump_command(repo_path: str): ) def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[str]): """ - Prints contents of the metadata registry + Run a (non-incremental) materialization job to ingest data into the online store. """ store = FeatureStore(repo_path=repo_path) store.materialize( diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 1bb78b2ffc..814d2e7dde 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -451,8 +451,13 @@ def _convert_arrow_to_proto( def _coerce_datetime(ts): """ - Depending on underlying ts resolution, arrow to_pydict() sometimes returns pandas timestamp - type (for ns resolution), and sometimes you get python datetime (for us resolution). + Depending on underlying time resolution, arrow to_pydict() sometimes returns pandas + timestamp type (for nanosecond resolution), and sometimes you get standard python datetime + (for microsecond resolution). + + While pandas timestamp class is a subclass of python datetime, it doesn't always behave the + same way. We convert it to normal datetime so that consumers downstream don't have to deal + with these qiurks. """ if isinstance(ts, pd.Timestamp): diff --git a/sdk/python/feast/offline_store.py b/sdk/python/feast/offline_store.py index 0cac44b166..682cad5b5a 100644 --- a/sdk/python/feast/offline_store.py +++ b/sdk/python/feast/offline_store.py @@ -310,6 +310,8 @@ def pull_latest_from_table_or_query( & (source_df[event_timestamp_column] < end_date) ] last_values_df = filtered_df.groupby(by=entity_names).last() + + # make driver_id a normal column again last_values_df.reset_index(inplace=True) return pyarrow.Table.from_pandas( diff --git a/sdk/python/tests/cli/example_feature_repo_2.py b/sdk/python/tests/cli/example_feature_repo_2.py index aac3b193c4..e188eb6ff4 100644 --- a/sdk/python/tests/cli/example_feature_repo_2.py +++ b/sdk/python/tests/cli/example_feature_repo_2.py @@ -4,7 +4,7 @@ from feast.data_source import FileSource driver_hourly_stats = FileSource( - path="%PARQUET_PATH%", # replaced by the test + path="%PARQUET_PATH%", # placeholder to be replaced by the test event_timestamp_column="datetime", created_timestamp_column="created", ) diff --git a/sdk/python/tests/cli/test_e2e_local.py b/sdk/python/tests/cli/test_e2e_local.py index de2f941813..b132683427 100644 --- a/sdk/python/tests/cli/test_e2e_local.py +++ b/sdk/python/tests/cli/test_e2e_local.py @@ -1,6 +1,7 @@ import os import tempfile from datetime import datetime, timedelta +from pathlib import Path import pandas as pd @@ -9,6 +10,7 @@ def _get_last_feature_row(df: pd.DataFrame, driver_id): + """ Manually extract last feature value from a dataframe for a given driver_id """ filtered = df[df["driver_id"] == driver_id] max_ts = filtered.loc[filtered["datetime"].idxmax()]["datetime"] filtered_by_ts = filtered[filtered["datetime"] == max_ts] @@ -18,12 +20,16 @@ def _get_last_feature_row(df: pd.DataFrame, driver_id): class TestLocalEndToEnd: def test_basic(self) -> None: """ - Add another table to existing repo using partial apply API. Make sure both the table - applied via CLI apply and the new table are passing RW test. + 1. Create a repo. + 2. Apply + 3. Ingest some data to online store from parquet + 4. Read from the online store to make sure it made it there. """ runner = CliRunner() with tempfile.TemporaryDirectory() as data_dir: + + # Generate some test data in parquet format. end_date = datetime.now().replace(microsecond=0, second=0, minute=0) start_date = end_date - timedelta(days=15) @@ -37,24 +43,30 @@ def test_basic(self) -> None: path=driver_stats_path, allow_truncated_timestamps=True ) + # Note that runner takes care of running apply/teardown for us here. + # We patch python code in example_feature_repo_2.py to set the path to Parquet files. with runner.local_repo( get_example_repo("example_feature_repo_2.py").replace( "%PARQUET_PATH%", driver_stats_path ) ) as store: - repo_path = store.repo_path + assert store.repo_path is not None + # feast materialize r = runner.run( [ "materialize", - str(repo_path), + str(store.repo_path), start_date.isoformat(), end_date.isoformat(), ], - cwd=repo_path, + cwd=Path(store.repo_path), ) + assert r.returncode == 0 + + # Read features back result = store.get_online_features( feature_refs=[ "driver_hourly_stats:conv_rate", From 7548444b3300ee1472e6513e2d90213455c6bd68 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Thu, 25 Mar 2021 11:10:25 -0700 Subject: [PATCH 3/4] Update sdk/python/feast/feature_store.py Co-authored-by: Willem Pienaar <6728866+woop@users.noreply.github.com> Signed-off-by: Oleg Avdeev --- sdk/python/feast/feature_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index cbda4ddde5..c0cba2d80f 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -510,7 +510,7 @@ def _coerce_datetime(ts): While pandas timestamp class is a subclass of python datetime, it doesn't always behave the same way. We convert it to normal datetime so that consumers downstream don't have to deal - with these qiurks. + with these quirks. """ if isinstance(ts, pd.Timestamp): From 46e817dca620571efaad0ed2b688c2974da3ddbf Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Thu, 25 Mar 2021 11:18:23 -0700 Subject: [PATCH 4/4] better help for materialize command Signed-off-by: Oleg Avdeev --- sdk/python/feast/cli.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index d9234bab46..b5dd92b618 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -401,7 +401,12 @@ def registry_dump_command(repo_path: str): ) def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[str]): """ - Run a (non-incremental) materialization job to ingest data into the online store. + Run a (non-incremental) materialization job to ingest data into the online store. Feast + will read all data between START_TS and END_TS from the offline store and write it to the + online store. If you don't specify feature view names using --views, all registred Feature + Views will be materialized. + + START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' """ store = FeatureStore(repo_path=repo_path) store.materialize(