-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add file write_to_offline_store functionality #2808
Changes from 8 commits
58ce39d
656fb9f
83f0d2f
c21f6c2
d4b678a
3e96ac1
f2d77ef
8049390
7a4be44
f283f72
13fa653
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -127,7 +127,7 @@ def ingest_df( | |
pass | ||
|
||
def ingest_df_to_offline_store( | ||
self, feature_view: FeatureView, df: pd.DataFrame, | ||
self, feature_view: FeatureView, df: pyarrow.Table, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this interface changing? can you add that to the PR description? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure left some comments |
||
): | ||
""" | ||
Ingests a DataFrame directly into the offline store | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
import random | ||
felixwang9817 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from datetime import datetime, timedelta | ||
|
||
import numpy as np | ||
import pandas as pd | ||
import pytest | ||
|
||
from feast import FeatureView, Field | ||
from feast.types import Float32, Int32 | ||
from tests.integration.feature_repos.universal.entities import driver | ||
|
||
|
||
@pytest.mark.integration | ||
@pytest.mark.universal_online_stores | ||
def test_writing_incorrect_order_fails(environment, universal_data_sources): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you explain (and maybe add this explanation in a docstring for this test) why exactly we expect it to fail? the name suggests an incorrect order, but from inspecting the underlying data source it looks like the df might not just be out of order, but it's also missing some columns? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
# TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in | ||
store = environment.feature_store | ||
_, _, data_sources = universal_data_sources | ||
driver_stats = FeatureView( | ||
name="driver_stats", | ||
entities=["driver"], | ||
schema=[ | ||
Field(name="avg_daily_trips", dtype=Int32), | ||
Field(name="conv_rate", dtype=Float32), | ||
], | ||
source=data_sources.driver, | ||
) | ||
|
||
now = datetime.utcnow() | ||
ts = pd.Timestamp(now).round("ms") | ||
|
||
entity_df = pd.DataFrame.from_dict( | ||
{"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts]} | ||
) | ||
|
||
store.apply([driver(), driver_stats]) | ||
df = store.get_historical_features( | ||
entity_df=entity_df, | ||
features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], | ||
full_feature_names=False, | ||
).to_df() | ||
|
||
assert df["conv_rate"].isnull().all() | ||
assert df["avg_daily_trips"].isnull().all() | ||
|
||
expected_df = pd.DataFrame.from_dict( | ||
{ | ||
"driver_id": [1001, 1002], | ||
"event_timestamp": [ts - timedelta(hours=3), ts], | ||
"conv_rate": [random.random(), random.random()], | ||
"avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], | ||
"created": [ts, ts], | ||
}, | ||
) | ||
with pytest.raises(ValueError): | ||
store.write_to_offline_store( | ||
driver_stats.name, expected_df, allow_registry_cache=False | ||
) | ||
|
||
|
||
@pytest.mark.integration | ||
@pytest.mark.universal_online_stores | ||
def test_writing_incorrect_schema_fails(environment, universal_data_sources): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this a copy of the above test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This writes an incorrect schema. I just want to test both trigger the valuerror. |
||
# TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in | ||
store = environment.feature_store | ||
_, _, data_sources = universal_data_sources | ||
driver_stats = FeatureView( | ||
name="driver_stats", | ||
entities=["driver"], | ||
schema=[ | ||
Field(name="avg_daily_trips", dtype=Int32), | ||
Field(name="conv_rate", dtype=Float32), | ||
], | ||
source=data_sources.driver, | ||
) | ||
|
||
now = datetime.utcnow() | ||
ts = pd.Timestamp(now).round("ms") | ||
|
||
entity_df = pd.DataFrame.from_dict( | ||
{"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts]} | ||
) | ||
|
||
store.apply([driver(), driver_stats]) | ||
df = store.get_historical_features( | ||
entity_df=entity_df, | ||
features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], | ||
full_feature_names=False, | ||
).to_df() | ||
|
||
assert df["conv_rate"].isnull().all() | ||
assert df["avg_daily_trips"].isnull().all() | ||
|
||
expected_df = pd.DataFrame.from_dict( | ||
{ | ||
"event_timestamp": [ts - timedelta(hours=3), ts], | ||
"driver_id": [1001, 1002], | ||
"conv_rate": [random.random(), random.random()], | ||
"incorrect_schema": [random.randint(0, 10), random.randint(0, 10)], | ||
"created": [ts, ts], | ||
}, | ||
) | ||
with pytest.raises(ValueError): | ||
store.write_to_offline_store( | ||
driver_stats.name, expected_df, allow_registry_cache=False | ||
) | ||
|
||
|
||
@pytest.mark.integration | ||
@pytest.mark.universal_online_stores | ||
def test_writing_consecutively_to_offline_store(environment, universal_data_sources): | ||
felixwang9817 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
store = environment.feature_store | ||
_, _, data_sources = universal_data_sources | ||
driver_stats = FeatureView( | ||
name="driver_stats", | ||
entities=["driver"], | ||
schema=[ | ||
Field(name="avg_daily_trips", dtype=Int32), | ||
Field(name="conv_rate", dtype=Float32), | ||
Field(name="acc_rate", dtype=Float32), | ||
], | ||
source=data_sources.driver, | ||
ttl=timedelta(minutes=10), | ||
) | ||
|
||
now = datetime.utcnow() | ||
ts = pd.Timestamp(now, unit="ns") | ||
|
||
entity_df = pd.DataFrame.from_dict( | ||
{ | ||
"driver_id": [1001, 1001], | ||
"event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)], | ||
} | ||
) | ||
|
||
store.apply([driver(), driver_stats]) | ||
df = store.get_historical_features( | ||
entity_df=entity_df, | ||
features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], | ||
full_feature_names=False, | ||
).to_df() | ||
|
||
assert df["conv_rate"].isnull().all() | ||
assert df["avg_daily_trips"].isnull().all() | ||
|
||
first_df = pd.DataFrame.from_dict( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might leave a comment here indicating that these columns are arranged in exactly the same order as the underlying DS (whose schema can be found in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
{ | ||
"event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)], | ||
"driver_id": [1001, 1001], | ||
"conv_rate": [random.random(), random.random()], | ||
"acc_rate": [random.random(), random.random()], | ||
"avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], | ||
"created": [ts, ts], | ||
}, | ||
) | ||
store.write_to_offline_store( | ||
driver_stats.name, first_df, allow_registry_cache=False | ||
) | ||
|
||
after_write_df = store.get_historical_features( | ||
entity_df=entity_df, | ||
features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], | ||
full_feature_names=False, | ||
).to_df() | ||
|
||
assert len(after_write_df) == len(first_df) | ||
assert np.where( | ||
after_write_df["conv_rate"].reset_index(drop=True) | ||
== first_df["conv_rate"].reset_index(drop=True) | ||
) | ||
assert np.where( | ||
after_write_df["avg_daily_trips"].reset_index(drop=True) | ||
== first_df["avg_daily_trips"].reset_index(drop=True) | ||
) | ||
|
||
second_df = pd.DataFrame.from_dict( | ||
{ | ||
"event_timestamp": [ts - timedelta(hours=1), ts], | ||
"driver_id": [1001, 1001], | ||
"conv_rate": [random.random(), random.random()], | ||
"acc_rate": [random.random(), random.random()], | ||
"avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], | ||
"created": [ts, ts], | ||
}, | ||
) | ||
|
||
store.write_to_offline_store( | ||
driver_stats.name, second_df, allow_registry_cache=False | ||
) | ||
|
||
entity_df = pd.DataFrame.from_dict( | ||
{ | ||
"driver_id": [1001, 1001, 1001, 1001], | ||
"event_timestamp": [ | ||
ts - timedelta(hours=4), | ||
ts - timedelta(hours=3), | ||
ts - timedelta(hours=1), | ||
ts, | ||
], | ||
} | ||
) | ||
|
||
after_write_df = store.get_historical_features( | ||
entity_df=entity_df, | ||
features=[ | ||
"driver_stats:conv_rate", | ||
"driver_stats:acc_rate", | ||
"driver_stats:avg_daily_trips", | ||
], | ||
full_feature_names=False, | ||
).to_df() | ||
|
||
expected_df = pd.concat([first_df, second_df]) | ||
assert len(after_write_df) == len(expected_df) | ||
assert np.where( | ||
after_write_df["conv_rate"].reset_index(drop=True) | ||
== expected_df["conv_rate"].reset_index(drop=True) | ||
) | ||
assert np.where( | ||
after_write_df["acc_rate"].reset_index(drop=True) | ||
== expected_df["acc_rate"].reset_index(drop=True) | ||
) | ||
assert np.where( | ||
after_write_df["avg_daily_trips"].reset_index(drop=True) | ||
== expected_df["avg_daily_trips"].reset_index(drop=True) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit this shouldn't be called table it should be
feature_view
or something.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was adhering to how we defined in online write batch but ill change it.