Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add file write_to_offline_store functionality #2808

Merged
merged 11 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1389,8 +1389,9 @@ def write_to_offline_store(
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
table = pa.Table.from_pandas(df)
provider = self._get_provider()
provider.ingest_df_to_offline_store(feature_view, df)
provider.ingest_df_to_offline_store(feature_view, table)

@log_exceptions_and_usage
def get_online_features(
Expand Down
38 changes: 37 additions & 1 deletion sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from datetime import datetime
from pathlib import Path
from typing import Callable, List, Optional, Tuple, Union
from typing import Any, Callable, List, Optional, Tuple, Union

import dask.dataframe as dd
import pandas as pd
Expand Down Expand Up @@ -404,6 +404,42 @@ def write_logged_features(
existing_data_behavior="overwrite_or_ignore",
)

@staticmethod
def offline_write_batch(
config: RepoConfig,
table: FeatureView,
data: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
if not table.batch_source:
raise ValueError(
"feature view does not have a batch source to persist offline data"
)
if not isinstance(config.offline_store, FileOfflineStoreConfig):
raise ValueError(
f"offline store config is of type {type(config.offline_store)} when file type required"
)
if not isinstance(table.batch_source, FileSource):
raise ValueError(
f"feature view batch source is {type(table.batch_source)} not file source"
)
file_options = table.batch_source.file_options
filesystem, path = FileSource.create_filesystem_and_path(
file_options.uri, file_options.s3_endpoint_override
)

prev_table = pyarrow.parquet.read_table(path, memory_map=True)
if prev_table.column_names != data.column_names:
raise ValueError(
f"Input dataframe has incorrect schema or wrong order, expected columns are: {prev_table.column_names}"
)
if data.schema != prev_table.schema:
data = data.cast(prev_table.schema)
new_table = pyarrow.concat_tables([data, prev_table])
writer = pyarrow.parquet.ParquetWriter(path, data.schema, filesystem=filesystem)
writer.write_table(new_table)
writer.close()


def _get_entity_df_event_timestamp_range(
entity_df: Union[pd.DataFrame, str], entity_df_event_timestamp_col: str,
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def write_logged_features(
def offline_write_batch(
config: RepoConfig,
table: FeatureView,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit this shouldn't be called table it should be feature_view or something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was adhering to how we defined in online write batch but ill change it.

data: pd.DataFrame,
data: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
"""
Expand All @@ -287,7 +287,7 @@ def offline_write_batch(
Args:
config: Repo configuration object
table: FeatureView to write the data to.
data: dataframe containing feature data and timestamp column for historical feature retrieval
data: pyarrow table containing feature data and timestamp column for historical feature retrieval
progress: Optional function to be called once every mini-batch of rows is written to
the online store. Can be used to display progress.
"""
Expand Down
14 changes: 13 additions & 1 deletion sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,14 @@ def offline_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: pd.DataFrame,
data: pa.Table,
progress: Optional[Callable[[int], Any]],
) -> None:
set_usage_attribute("provider", self.__class__.__name__)

if "created" not in data.column_names:
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("input dataframe must have a created timestamp column")

if self.offline_store:
self.offline_store.offline_write_batch(config, table, data, progress)

Expand Down Expand Up @@ -143,6 +147,14 @@ def ingest_df(
self.repo_config, feature_view, rows_to_write, progress=None
)

def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table):
set_usage_attribute("provider", self.__class__.__name__)

if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

self.offline_write_batch(self.repo_config, feature_view, table, None)

def materialize_single_feature_view(
self,
config: RepoConfig,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def ingest_df(
pass

def ingest_df_to_offline_store(
self, feature_view: FeatureView, df: pd.DataFrame,
self, feature_view: FeatureView, df: pyarrow.Table,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this interface changing? can you add that to the PR description?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure left some comments

):
"""
Ingests a DataFrame directly into the offline store
Expand Down
226 changes: 226 additions & 0 deletions sdk/python/tests/integration/offline_store/test_offline_push.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import random
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import pytest

from feast import FeatureView, Field
from feast.types import Float32, Int32
from tests.integration.feature_repos.universal.entities import driver


@pytest.mark.integration
@pytest.mark.universal_online_stores
def test_writing_incorrect_order_fails(environment, universal_data_sources):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain (and maybe add this explanation in a docstring for this test) why exactly we expect it to fail? the name suggests an incorrect order, but from inspecting the underlying data source it looks like the df might not just be out of order, but it's also missing some columns?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in
store = environment.feature_store
_, _, data_sources = universal_data_sources
driver_stats = FeatureView(
name="driver_stats",
entities=["driver"],
schema=[
Field(name="avg_daily_trips", dtype=Int32),
Field(name="conv_rate", dtype=Float32),
],
source=data_sources.driver,
)

now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")

entity_df = pd.DataFrame.from_dict(
{"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts]}
)

store.apply([driver(), driver_stats])
df = store.get_historical_features(
entity_df=entity_df,
features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"],
full_feature_names=False,
).to_df()

assert df["conv_rate"].isnull().all()
assert df["avg_daily_trips"].isnull().all()

expected_df = pd.DataFrame.from_dict(
{
"driver_id": [1001, 1002],
"event_timestamp": [ts - timedelta(hours=3), ts],
"conv_rate": [random.random(), random.random()],
"avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)],
"created": [ts, ts],
},
)
with pytest.raises(ValueError):
store.write_to_offline_store(
driver_stats.name, expected_df, allow_registry_cache=False
)


@pytest.mark.integration
@pytest.mark.universal_online_stores
def test_writing_incorrect_schema_fails(environment, universal_data_sources):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a copy of the above test?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This writes an incorrect schema. I just want to test both trigger the valuerror.

# TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in
store = environment.feature_store
_, _, data_sources = universal_data_sources
driver_stats = FeatureView(
name="driver_stats",
entities=["driver"],
schema=[
Field(name="avg_daily_trips", dtype=Int32),
Field(name="conv_rate", dtype=Float32),
],
source=data_sources.driver,
)

now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")

entity_df = pd.DataFrame.from_dict(
{"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts]}
)

store.apply([driver(), driver_stats])
df = store.get_historical_features(
entity_df=entity_df,
features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"],
full_feature_names=False,
).to_df()

assert df["conv_rate"].isnull().all()
assert df["avg_daily_trips"].isnull().all()

expected_df = pd.DataFrame.from_dict(
{
"event_timestamp": [ts - timedelta(hours=3), ts],
"driver_id": [1001, 1002],
"conv_rate": [random.random(), random.random()],
"incorrect_schema": [random.randint(0, 10), random.randint(0, 10)],
"created": [ts, ts],
},
)
with pytest.raises(ValueError):
store.write_to_offline_store(
driver_stats.name, expected_df, allow_registry_cache=False
)


@pytest.mark.integration
@pytest.mark.universal_online_stores
def test_writing_consecutively_to_offline_store(environment, universal_data_sources):
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
store = environment.feature_store
_, _, data_sources = universal_data_sources
driver_stats = FeatureView(
name="driver_stats",
entities=["driver"],
schema=[
Field(name="avg_daily_trips", dtype=Int32),
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
],
source=data_sources.driver,
ttl=timedelta(minutes=10),
)

now = datetime.utcnow()
ts = pd.Timestamp(now, unit="ns")

entity_df = pd.DataFrame.from_dict(
{
"driver_id": [1001, 1001],
"event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)],
}
)

store.apply([driver(), driver_stats])
df = store.get_historical_features(
entity_df=entity_df,
features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"],
full_feature_names=False,
).to_df()

assert df["conv_rate"].isnull().all()
assert df["avg_daily_trips"].isnull().all()

first_df = pd.DataFrame.from_dict(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might leave a comment here indicating that these columns are arranged in exactly the same order as the underlying DS (whose schema can be found in driver_test_data.py)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

{
"event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)],
"driver_id": [1001, 1001],
"conv_rate": [random.random(), random.random()],
"acc_rate": [random.random(), random.random()],
"avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)],
"created": [ts, ts],
},
)
store.write_to_offline_store(
driver_stats.name, first_df, allow_registry_cache=False
)

after_write_df = store.get_historical_features(
entity_df=entity_df,
features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"],
full_feature_names=False,
).to_df()

assert len(after_write_df) == len(first_df)
assert np.where(
after_write_df["conv_rate"].reset_index(drop=True)
== first_df["conv_rate"].reset_index(drop=True)
)
assert np.where(
after_write_df["avg_daily_trips"].reset_index(drop=True)
== first_df["avg_daily_trips"].reset_index(drop=True)
)

second_df = pd.DataFrame.from_dict(
{
"event_timestamp": [ts - timedelta(hours=1), ts],
"driver_id": [1001, 1001],
"conv_rate": [random.random(), random.random()],
"acc_rate": [random.random(), random.random()],
"avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)],
"created": [ts, ts],
},
)

store.write_to_offline_store(
driver_stats.name, second_df, allow_registry_cache=False
)

entity_df = pd.DataFrame.from_dict(
{
"driver_id": [1001, 1001, 1001, 1001],
"event_timestamp": [
ts - timedelta(hours=4),
ts - timedelta(hours=3),
ts - timedelta(hours=1),
ts,
],
}
)

after_write_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_stats:conv_rate",
"driver_stats:acc_rate",
"driver_stats:avg_daily_trips",
],
full_feature_names=False,
).to_df()

expected_df = pd.concat([first_df, second_df])
assert len(after_write_df) == len(expected_df)
assert np.where(
after_write_df["conv_rate"].reset_index(drop=True)
== expected_df["conv_rate"].reset_index(drop=True)
)
assert np.where(
after_write_df["acc_rate"].reset_index(drop=True)
== expected_df["acc_rate"].reset_index(drop=True)
)
assert np.where(
after_write_df["avg_daily_trips"].reset_index(drop=True)
== expected_df["avg_daily_trips"].reset_index(drop=True)
)