Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix push sources and add docs / tests pushing via the python feature server #2561

Merged
merged 18 commits into from
Apr 19, 2022
2 changes: 1 addition & 1 deletion docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
* [feature\_store.yaml](reference/feature-repository/feature-store-yaml.md)
* [.feastignore](reference/feature-repository/feast-ignore.md)
* [Feature servers](reference/feature-servers/README.md)
* [Local feature server](reference/feature-servers/local-feature-server.md)
* [Python feature server](reference/feature-servers/python-feature-server.md)
* [Go-based feature retrieval](reference/feature-servers/go-feature-retrieval.md)
* [\[Alpha\] Data quality monitoring](reference/dqm.md)
* [\[Alpha\] On demand feature view](reference/alpha-on-demand-feature-view.md)
Expand Down
6 changes: 4 additions & 2 deletions docs/reference/data-sources/push.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fv = FeatureView(
name="feature view",
entities=["user_id"],
schema=[Field(name="life_time_value", dtype=Int64)],
stream_source=push_source,
source=push_source,
)
```

Expand All @@ -53,6 +53,8 @@ import pandas as pd

fs = FeatureStore(...)
feature_data_frame = pd.DataFrame()
fs.push("push_source", feature_data_frame)
fs.push("push_source_name", feature_data_frame)
```

See also [Python feature server](../feature-servers/python-feature-server.md) for instructions on how to push data to a deployed feature server.

2 changes: 1 addition & 1 deletion docs/reference/feature-servers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

Feast users can choose to retrieve features from a feature server, as opposed to through the Python SDK.

{% page-ref page="local-feature-server.md" %}
{% page-ref page="python-feature-server.md" %}
2 changes: 1 addition & 1 deletion docs/reference/feature-servers/go-feature-retrieval.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Overview

The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](local-feature-server.md).
The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](python-feature-server.md).

Currently, this component only supports online serving and does not have an offline component including APIs to create feast feature repositories or apply configuration to the registry to facilitate online materialization. It also does not expose its own dedicated cli to perform feast actions. Furthermore, this component is only meant to expose an online serving API that can be called through the python SDK to facilitate faster online feature retrieval.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
# Local feature server
# Python feature server

## Overview

The local feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to get features from Feast using any programming language that can make HTTP requests. A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is also available. A remote feature server on GCP Cloud Run is currently being developed.
The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests.

## CLI

There is a new CLI command that starts the server: `feast serve`. By default Feast uses port 6566; the port be overridden by a `--port` flag.
There is a CLI command that starts the server: `feast serve`. By default, Feast uses port 6566; the port be overridden by a `--port` flag.

## Deploying as a service

One can also deploy a feature server by building a docker image that bundles in the project's `feature_store.yaml`. See [helm chart](https://github.com/feast-dev/feast/blob/master/infra/charts/feast-python-server) for example.

A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is available. A remote feature server on GCP Cloud Run is currently being developed.


## Example

### Initializing a feature server
Here's the local feature server usage example with the local template:

```bash
Expand Down Expand Up @@ -41,6 +49,7 @@ INFO: Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit)
09/10/2021 10:42:11 AM INFO:Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit)
```

### Retrieving features from the online store
After the server starts, we can execute cURL commands from another terminal tab:

```bash
Expand Down Expand Up @@ -142,3 +151,47 @@ curl -X POST \
}
}' | jq
```

### Pushing features to the online store
You can push data corresponding to a push source to the online store (note that timestamps need to be strings):

```text
curl -X POST "http://localhost:6566/push" -d '{
"push_source_name": "driver_hourly_stats_push_source",
"df": {
"driver_id": [1001],
"event_timestamp": ["2022-05-13 10:59:42"],
"created": ["2022-05-13 10:59:42"],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000]
}
}' | jq
```

or equivalently from Python:
```python
import requests
import pandas as pd
from datetime import datetime

event_df = pd.DataFrame.from_dict(
{
"driver_id": [1001],
"event_timestamp": [datetime(2021, 5, 13, 10, 59, 42),],
"created": [datetime(2021, 5, 13, 10, 59, 42),],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000],
"string_feature": "test2",
}
)
event_df['event_timestamp'] = event_df['event_timestamp'].astype(str)
event_df['created'] = event_df['created'].astype(str)
requests.post(
"http://localhost:6566/push",
json={
"push_source_name":"driver_stats_push_source",
"df":event_df.to_dict()
})
```
4 changes: 2 additions & 2 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,10 +714,10 @@ class PushSource(DataSource):
A source that can be used to ingest features on request
"""

name: str
schema: List[Field]
batch_source: DataSource
timestamp_field: str
# TODO(adchia): remove schema + timestamp_field?

def __init__(
self,
Expand All @@ -728,7 +728,7 @@ def __init__(
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
timestamp_field: Optional[str] = "",
timestamp_field: Optional[str] = None,
):
"""
Creates a PushSource object.
Expand Down
11 changes: 7 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@

warnings.simplefilter("once", DeprecationWarning)


if TYPE_CHECKING:
from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer

Expand Down Expand Up @@ -1186,7 +1185,9 @@ def tqdm_builder(length):
)

@log_exceptions_and_usage
def push(self, push_source_name: str, df: pd.DataFrame):
def push(
self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True
):
"""
Push features to a push source. This updates all the feature views that have the push source as stream source.
Args:
Expand All @@ -1195,7 +1196,7 @@ def push(self, push_source_name: str, df: pd.DataFrame):
"""
from feast.data_source import PushSource

all_fvs = self.list_feature_views(allow_cache=True)
all_fvs = self.list_feature_views(allow_cache=allow_registry_cache)

fvs_with_push_sources = {
fv
Expand All @@ -1208,7 +1209,9 @@ def push(self, push_source_name: str, df: pd.DataFrame):
}

for fv in fvs_with_push_sources:
self.write_to_online_store(fv.name, df, allow_registry_cache=True)
self.write_to_online_store(
fv.name, df, allow_registry_cache=allow_registry_cache
)

@log_exceptions_and_usage
def write_to_online_store(
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ def teardown(
def _initialize_conn(db_path: str):
Path(db_path).parent.mkdir(exist_ok=True)
return sqlite3.connect(
db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
db_path,
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
check_same_thread=False,
)


Expand Down
120 changes: 120 additions & 0 deletions sdk/python/tests/integration/e2e/test_python_feature_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import contextlib
import json
from typing import List

import pytest
from fastapi.testclient import TestClient

from feast.feast_object import FeastObject
from feast.feature_server import get_app
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.repo_configuration import (
construct_test_environment,
construct_universal_feature_views,
construct_universal_test_data,
)
from tests.integration.feature_repos.universal.entities import (
customer,
driver,
location,
)


@pytest.mark.integration
@pytest.mark.universal
def test_get_online_features():
with setup_python_fs_client() as client:
request_data_dict = {
"features": [
"driver_stats:conv_rate",
"driver_stats:acc_rate",
"driver_stats:avg_daily_trips",
],
"entities": {"driver_id": [5001, 5002]},
}
response = client.post(
"/get-online-features", data=json.dumps(request_data_dict)
)

# Check entities and features are present
parsed_response = json.loads(response.text)
assert "metadata" in parsed_response
metadata = parsed_response["metadata"]
expected_features = ["driver_id", "conv_rate", "acc_rate", "avg_daily_trips"]
response_feature_names = metadata["feature_names"]
assert len(response_feature_names) == len(expected_features)
for expected_feature in expected_features:
assert expected_feature in response_feature_names
assert "results" in parsed_response
results = parsed_response["results"]
for result in results:
# Same order as in metadata
assert len(result["statuses"]) == 2 # Requested two entities
for status in result["statuses"]:
assert status == "PRESENT"
results_driver_id_index = response_feature_names.index("driver_id")
assert (
results[results_driver_id_index]["values"]
== request_data_dict["entities"]["driver_id"]
)


@pytest.mark.integration
@pytest.mark.universal
def test_push():
with setup_python_fs_client() as client:
initial_temp = get_temperatures(client, location_ids=[1])[0]
json_data = json.dumps(
{
"push_source_name": "location_stats_push_source",
"df": {
"location_id": [1],
"temperature": [initial_temp * 100],
"event_timestamp": ["2022-05-13 10:59:42"],
"created": ["2022-05-13 10:59:42"],
},
}
)
response = client.post("/push", data=json_data,)

# Check new pushed temperature is fetched
assert response.status_code == 200
assert get_temperatures(client, location_ids=[1]) == [initial_temp * 100]


def get_temperatures(client, location_ids: List[int]):
get_request_data = {
"features": ["pushable_location_stats:temperature"],
"entities": {"location_id": location_ids},
}
response = client.post("/get-online-features", data=json.dumps(get_request_data))
parsed_response = json.loads(response.text)
assert "metadata" in parsed_response
metadata = parsed_response["metadata"]
response_feature_names = metadata["feature_names"]
assert "results" in parsed_response
results = parsed_response["results"]
results_temperature_index = response_feature_names.index("temperature")
return results[results_temperature_index]["values"]


@contextlib.contextmanager
def setup_python_fs_client():
config = IntegrationTestRepoConfig()
environment = construct_test_environment(config)
fs = environment.feature_store
try:
entities, datasets, data_sources = construct_universal_test_data(environment)
feature_views = construct_universal_feature_views(data_sources)
feast_objects: List[FeastObject] = []
feast_objects.extend(feature_views.values())
feast_objects.extend([driver(), customer(), location()])
fs.apply(feast_objects)
fs.materialize(environment.start_date, environment.end_date)
client = TestClient(get_app(fs))
yield client
finally:
fs.teardown()
environment.data_source_creator.teardown()
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,19 @@ def conv_rate_plus_100(features_df: pd.DataFrame) -> pd.DataFrame:
def conv_rate_plus_100_feature_view(
sources: Dict[str, Union[RequestSource, FeatureView]],
infer_features: bool = False,
features: Optional[List[Feature]] = None,
features: Optional[List[Field]] = None,
) -> OnDemandFeatureView:
# Test that positional arguments and Features still work for ODFVs.
_features = features or [
Feature(name="conv_rate_plus_100", dtype=ValueType.DOUBLE),
Feature(name="conv_rate_plus_val_to_add", dtype=ValueType.DOUBLE),
Feature(name="conv_rate_plus_100_rounded", dtype=ValueType.INT32),
Field(name="conv_rate_plus_100", dtype=Float64),
Field(name="conv_rate_plus_val_to_add", dtype=Float64),
Field(name="conv_rate_plus_100_rounded", dtype=Int32),
]
return OnDemandFeatureView(
conv_rate_plus_100.__name__,
[] if infer_features else _features,
sources,
conv_rate_plus_100,
name=conv_rate_plus_100.__name__,
schema=[] if infer_features else _features,
sources=sources,
udf=conv_rate_plus_100,
)


Expand Down