Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for historical field mappings #2252

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Setting up your development environment for Feast Python SDK / CLI:
3. _Recommended:_ Create a virtual environment to isolate development dependencies to be installed
```sh
# create & activate a virtual environment
python -v venv venv/
python -m venv venv/
source venv/bin/activate
```

Expand Down
26 changes: 26 additions & 0 deletions sdk/python/feast/driver_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,29 @@ def create_global_daily_stats_df(start_date, end_date) -> pd.DataFrame:
# TODO: Remove created timestamp in order to test whether its really optional
df_daily["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
return df_daily


def create_field_mapping_df(start_date, end_date) -> pd.DataFrame:
"""
Example df generated by this function:
| event_timestamp | column_name | created |
|------------------+-------------+------------------|
| 2021-03-17 19:00 | 99 | 2021-03-24 19:38 |
| 2021-03-17 19:00 | 22 | 2021-03-24 19:38 |
| 2021-03-17 19:00 | 7 | 2021-03-24 19:38 |
| 2021-03-17 19:00 | 45 | 2021-03-24 19:38 |
"""
size = 10
df = pd.DataFrame()
df["column_name"] = np.random.randint(1, 100, size=size).astype(np.int32)
df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [
_convert_event_timestamp(
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
EventTimestampType(idx % 4),
)
for idx, dt in enumerate(
pd.date_range(start=start_date, end=end_date, periods=size)
)
]
df["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
return df
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
Expand Down Expand Up @@ -699,7 +699,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
SELECT
{{featureview.name}}__entity_row_unique_id
{% for feature in featureview.features %}
,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}
,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}
{% endfor %}
FROM {{ featureview.name }}__cleaned
) USING ({{featureview.name}}__entity_row_unique_id)
Expand Down
12 changes: 10 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class FeatureViewQueryContext:
ttl: int
entities: List[str]
features: List[str] # feature reference format
field_mapping: Dict[str, str]
event_timestamp_column: str
created_timestamp_column: Optional[str]
table_subquery: str
Expand Down Expand Up @@ -144,7 +145,10 @@ def get_feature_view_query_context(
name=feature_view.projection.name_to_use(),
ttl=ttl_seconds,
entities=join_keys,
features=features,
features=[
reverse_field_mapping.get(feature, feature) for feature in features
],
field_mapping=feature_view.input.field_mapping,
event_timestamp_column=reverse_field_mapping.get(
event_timestamp_column, event_timestamp_column
),
Expand Down Expand Up @@ -175,7 +179,11 @@ def build_point_in_time_query(
final_output_feature_names = list(entity_df_columns)
final_output_feature_names.extend(
[
(f"{fv.name}__{feature}" if full_feature_names else feature)
(
f"{fv.name}__{fv.field_mapping.get(feature, feature)}"
if full_feature_names
else fv.field_mapping.get(feature, feature)
)
for fv in feature_view_query_contexts
for feature in fv.features
]
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ def _get_entity_df_event_timestamp_range(
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
Expand Down Expand Up @@ -664,7 +664,7 @@ def _get_entity_df_event_timestamp_range(
SELECT
{{featureview.name}}__entity_row_unique_id
{% for feature in featureview.features %}
,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}
,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}
{% endfor %}
FROM {{ featureview.name }}__cleaned
) USING ({{featureview.name}}__entity_row_unique_id)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
+ AWS_REQUIRED
)

DEV_REQUIRED = ["mypy-protobuf==1.*", "grpcio-testing==1.*"] + CI_REQUIRED
DEV_REQUIRED = ["mypy-protobuf>=1.*", "grpcio-testing==1.*"] + CI_REQUIRED

# Get git repo root directory
repo_root = str(pathlib.Path(__file__).resolve().parent.parent.parent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
create_customer_daily_profile_feature_view,
create_driver_age_request_feature_view,
create_driver_hourly_stats_feature_view,
create_field_mapping_feature_view,
create_global_stats_feature_view,
create_location_stats_feature_view,
create_order_feature_view,
Expand Down Expand Up @@ -126,6 +127,7 @@ def construct_universal_datasets(
order_count=20,
)
global_df = driver_test_data.create_global_daily_stats_df(start_time, end_time)
field_mapping_df = driver_test_data.create_field_mapping_df(start_time, end_time)
entity_df = orders_df[
[
"customer_id",
Expand All @@ -143,6 +145,7 @@ def construct_universal_datasets(
"location": location_df,
"orders": orders_df,
"global": global_df,
"field_mapping": field_mapping_df,
"entity": entity_df,
}

Expand Down Expand Up @@ -180,12 +183,20 @@ def construct_universal_data_sources(
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
field_mapping_ds = data_source_creator.create_data_source(
datasets["field_mapping"],
destination_name="field_mapping",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
field_mapping={"column_name": "feature_name"},
)
return {
"customer": customer_ds,
"driver": driver_ds,
"location": location_ds,
"orders": orders_ds,
"global": global_ds,
"field_mapping": field_mapping_ds,
}


Expand All @@ -210,6 +221,9 @@ def construct_universal_feature_views(
"driver_age_request_fv": create_driver_age_request_feature_view(),
"order": create_order_feature_view(data_sources["orders"]),
"location": create_location_stats_feature_view(data_sources["location"]),
"field_mapping": create_field_mapping_feature_view(
data_sources["field_mapping"]
),
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,13 @@ def create_location_stats_feature_view(source, infer_features: bool = False):
ttl=timedelta(days=2),
)
return location_stats_feature_view


def create_field_mapping_feature_view(source):
return FeatureView(
name="field_mapping",
entities=[],
features=[Feature(name="feature_name", dtype=ValueType.INT32)],
batch_source=source,
ttl=timedelta(days=2),
)
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def get_expected_training_df(
location_fv: FeatureView,
global_df: pd.DataFrame,
global_fv: FeatureView,
field_mapping_df: pd.DataFrame,
field_mapping_fv: FeatureView,
entity_df: pd.DataFrame,
event_timestamp: str,
full_feature_names: bool = False,
Expand All @@ -102,6 +104,10 @@ def get_expected_training_df(
global_records = convert_timestamp_records_to_utc(
global_df.to_dict("records"), global_fv.batch_source.event_timestamp_column
)
field_mapping_records = convert_timestamp_records_to_utc(
field_mapping_df.to_dict("records"),
field_mapping_fv.batch_source.event_timestamp_column,
)
entity_rows = convert_timestamp_records_to_utc(
entity_df.to_dict("records"), event_timestamp
)
Expand Down Expand Up @@ -156,6 +162,13 @@ def get_expected_training_df(
ts_end=order_record[event_timestamp],
)

field_mapping_record = find_asof_record(
field_mapping_records,
ts_key=field_mapping_fv.batch_source.event_timestamp_column,
ts_start=order_record[event_timestamp] - field_mapping_fv.ttl,
ts_end=order_record[event_timestamp],
)

entity_row.update(
{
(
Expand Down Expand Up @@ -197,6 +210,16 @@ def get_expected_training_df(
}
)

# get field_mapping_record by column name, but label by feature name
entity_row.update(
{
(
f"field_mapping__{feature}" if full_feature_names else feature
): field_mapping_record.get(column, None)
for (column, feature) in field_mapping_fv.input.field_mapping.items()
}
)

# Convert records back to pandas dataframe
expected_df = pd.DataFrame(entity_rows)

Expand All @@ -213,6 +236,7 @@ def get_expected_training_df(
"customer_profile__current_balance": "float32",
"customer_profile__avg_passenger_count": "float32",
"global_stats__avg_ride_length": "float32",
"field_mapping__feature_name": "int32",
}
else:
expected_column_types = {
Expand All @@ -221,6 +245,7 @@ def get_expected_training_df(
"current_balance": "float32",
"avg_passenger_count": "float32",
"avg_ride_length": "float32",
"feature_name": "int32",
}

for col, typ in expected_column_types.items():
Expand Down Expand Up @@ -311,6 +336,8 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
feature_views["location"],
datasets["global"],
feature_views["global"],
datasets["field_mapping"],
feature_views["field_mapping"],
entity_df_with_request_data,
event_timestamp,
full_feature_names,
Expand All @@ -336,6 +363,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
"global_stats:num_rides",
"global_stats:avg_ride_length",
"driver_age:driver_age",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand Down Expand Up @@ -404,6 +432,7 @@ def test_historical_features_with_missing_request_data(
"conv_rate_plus_100:conv_rate_plus_val_to_add",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand All @@ -419,6 +448,7 @@ def test_historical_features_with_missing_request_data(
"driver_age:driver_age",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand Down Expand Up @@ -452,6 +482,7 @@ def test_historical_features_with_entities_from_query(
"order:order_is_success",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand All @@ -477,6 +508,8 @@ def test_historical_features_with_entities_from_query(
feature_views["location"],
datasets["global"],
feature_views["global"],
datasets["field_mapping"],
feature_views["field_mapping"],
datasets["entity"],
event_timestamp,
full_feature_names,
Expand Down Expand Up @@ -538,6 +571,7 @@ def test_historical_features_persisting(
"order:order_is_success",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand All @@ -561,6 +595,8 @@ def test_historical_features_persisting(
feature_views["location"],
datasets["global"],
feature_views["global"],
datasets["field_mapping"],
feature_views["field_mapping"],
entity_df,
event_timestamp,
full_feature_names,
Expand Down