-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathfeatures.py
108 lines (96 loc) · 2.99 KB
/
features.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from datetime import timedelta
import pandas as pd
from feast import (
Entity,
FeatureView,
FeatureService,
Field,
FileSource,
PushSource,
RequestSource,
ValueType,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64, String
driver_hourly_stats = FileSource(
path="data/driver_stats_with_string.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
description="A table describing the stats of a driver, such as the average daily number of trips.",
owner="[email protected]",
)
global_features = FileSource(
path="data/global_features.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
description="A table with global features around drivers.",
owner="[email protected]",
)
driver_stats_push_source = PushSource(
name="driver_stats_push_source",
batch_source=driver_hourly_stats,
)
driver = Entity(
name="driver",
join_keys=["driver_id"],
description="driver id",
)
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(seconds=8640000000),
schema=[
Field(name="driver_id", dtype=Int64),
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="string_feature", dtype=String),
],
online=True,
source=driver_stats_push_source,
tags={"production": "True"},
owner="[email protected]",
)
global_features_view = FeatureView(
name="global_driver_features",
entities=[],
ttl=timedelta(seconds=8640000000),
schema=[
Field(name="total_trips_today_by_all_drivers", dtype=Float32),
],
online=True,
source=global_features,
tags={"production": "True"},
owner="[email protected]",
)
# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)
# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_hourly_stats_view, input_request],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df
feature_service = FeatureService(
name="convrate_plus100",
features=[
driver_hourly_stats_view[["conv_rate", "avg_daily_trips"]],
transformed_conv_rate,
],
owner="[email protected]",
)