Skip to content

Commit

Permalink
feat(ingest): upgrade feast (#6186)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
cburroughs and hsheth2 authored Nov 3, 2022
1 parent c3c2b27 commit 39c84c2
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 69 deletions.
12 changes: 7 additions & 5 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
from typing import Dict, Set

import setuptools
Expand Down Expand Up @@ -95,8 +96,9 @@ def get_long_description():
kafka_protobuf = {
"networkx>=2.6.2",
# Required to generate protobuf python modules from the schema downloaded from the schema registry
"grpcio==1.44.0",
"grpcio-tools==1.44.0",
# NOTE: potential conflict with feast also depending on grpcio
"grpcio>=1.44.0,<2",
"grpcio-tools>=1.44.0,<2",
}

sql_common = {
Expand Down Expand Up @@ -254,7 +256,7 @@ def get_long_description():
# https://github.com/elastic/elasticsearch-py/issues/1639#issuecomment-883587433
"elasticsearch": {"elasticsearch==7.13.4"},
"feast-legacy": {"docker"},
"feast": {"feast==0.18.0", "flask-openid>=1.3.0"},
"feast": {"feast~=0.26.0", "flask-openid>=1.3.0"},
"glue": aws_common,
# hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported
"hana": sql_common
Expand Down Expand Up @@ -397,7 +399,7 @@ def get_long_description():
"delta-lake",
"druid",
"elasticsearch",
"feast",
"feast" if sys.version_info >= (3, 8) else None,
"iceberg",
"ldap",
"looker",
Expand Down Expand Up @@ -425,6 +427,7 @@ def get_long_description():
"unity-catalog"
# airflow is added below
]
if plugin
for dependency in plugins[plugin]
),
}
Expand All @@ -444,7 +447,6 @@ def get_long_description():
"clickhouse",
"delta-lake",
"druid",
"feast",
"feast-legacy",
"hana",
"hive",
Expand Down
82 changes: 53 additions & 29 deletions metadata-ingestion/src/datahub/ingestion/source/feast.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import sys

if sys.version_info < (3, 8):
raise ImportError("Feast is only supported on Python 3.8+")

from dataclasses import dataclass
from typing import Dict, Iterable, List, Tuple, Union
from typing import Dict, Iterable, List, Optional, Tuple, Union

import feast.types
from feast import (
BigQuerySource,
Entity,
Feature,
FeatureStore,
FeatureView,
Field as FeastField,
FileSource,
KafkaSource,
KinesisSource,
OnDemandFeatureView,
RequestSource,
SnowflakeSource,
ValueType,
)
from feast.data_source import DataSource, RequestDataSource
from feast.data_source import DataSource
from pydantic import Field

import datahub.emitter.mce_builder as builder
Expand Down Expand Up @@ -46,7 +54,7 @@
)

# FIXME: ValueType module cannot be used as a type
_field_type_mapping: Dict[ValueType, str] = {
_field_type_mapping: Dict[Union[ValueType, feast.types.FeastType], str] = {
ValueType.UNKNOWN: MLFeatureDataType.UNKNOWN,
ValueType.BYTES: MLFeatureDataType.BYTE,
ValueType.STRING: MLFeatureDataType.TEXT,
Expand All @@ -65,11 +73,26 @@
ValueType.BOOL_LIST: MLFeatureDataType.SEQUENCE,
ValueType.UNIX_TIMESTAMP_LIST: MLFeatureDataType.SEQUENCE,
ValueType.NULL: MLFeatureDataType.UNKNOWN,
feast.types.Invalid: MLFeatureDataType.UNKNOWN,
feast.types.Bytes: MLFeatureDataType.BYTE,
feast.types.String: MLFeatureDataType.TEXT,
feast.types.Int32: MLFeatureDataType.ORDINAL,
feast.types.Int64: MLFeatureDataType.ORDINAL,
feast.types.Float64: MLFeatureDataType.CONTINUOUS,
feast.types.Float32: MLFeatureDataType.CONTINUOUS,
feast.types.Bool: MLFeatureDataType.BINARY,
feast.types.UnixTimestamp: MLFeatureDataType.TIME,
feast.types.Array: MLFeatureDataType.SEQUENCE, # type: ignore
feast.types.Invalid: MLFeatureDataType.UNKNOWN,
}


class FeastRepositorySourceConfig(ConfigModel):
path: str = Field(description="Path to Feast repository")
fs_yaml_file: Optional[str] = Field(
default=None,
description="Path to the `feature_store.yaml` file used to configure the feature store",
)
environment: str = Field(
default=DEFAULT_ENV, description="Environment to use when constructing URNs"
)
Expand All @@ -85,7 +108,7 @@ class FeastRepositorySource(Source):
This plugin extracts:
- Entities as [`MLPrimaryKey`](https://datahubproject.io/docs/graphql/objects#mlprimarykey)
- Features as [`MLFeature`](https://datahubproject.io/docs/graphql/objects#mlfeature)
- Fields as [`MLFeature`](https://datahubproject.io/docs/graphql/objects#mlfeature)
- Feature views and on-demand feature views as [`MLFeatureTable`](https://datahubproject.io/docs/graphql/objects#mlfeaturetable)
- Batch and stream source details as [`Dataset`](https://datahubproject.io/docs/graphql/objects#dataset)
- Column types associated with each entity and feature
Expand All @@ -97,12 +120,16 @@ class FeastRepositorySource(Source):

def __init__(self, config: FeastRepositorySourceConfig, ctx: PipelineContext):
super().__init__(ctx)

self.source_config = config
self.report = SourceReport()
self.feature_store = FeatureStore(self.source_config.path)
self.feature_store = FeatureStore(
repo_path=self.source_config.path,
fs_yaml_file=self.source_config.fs_yaml_file,
)

def _get_field_type(self, field_type: ValueType, parent_name: str) -> str:
def _get_field_type(
self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str
) -> str:
"""
Maps types encountered in Feast to corresponding schema types.
"""
Expand All @@ -128,26 +155,24 @@ def _get_data_source_details(self, source: DataSource) -> Tuple[str, str]:

if isinstance(source, FileSource):
platform = "file"

name = source.path.replace("://", ".").replace("/", ".")

if isinstance(source, BigQuerySource):
elif isinstance(source, BigQuerySource):
platform = "bigquery"
name = source.table

if isinstance(source, KafkaSource):
elif isinstance(source, KafkaSource):
platform = "kafka"
name = source.kafka_options.topic

if isinstance(source, KinesisSource):
elif isinstance(source, KinesisSource):
platform = "kinesis"
name = (
f"{source.kinesis_options.region}:{source.kinesis_options.stream_name}"
)

if isinstance(source, RequestDataSource):
elif isinstance(source, RequestSource):
platform = "request"
name = source.name
elif isinstance(source, SnowflakeSource):
platform = "snowflake"
name = source.table

return platform, name

Expand Down Expand Up @@ -214,25 +239,24 @@ def _get_feature_workunit(
self,
# FIXME: FeatureView and OnDemandFeatureView cannot be used as a type
feature_view: Union[FeatureView, OnDemandFeatureView],
feature: Feature,
field: FeastField,
) -> MetadataWorkUnit:
"""
Generate an MLFeature work unit for a Feast feature.
"""
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"

feature_snapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(feature_view_name, feature.name),
urn=builder.make_ml_feature_urn(feature_view_name, field.name),
aspects=[StatusClass(removed=False)],
)

feature_sources = []

if isinstance(feature_view, FeatureView):
feature_sources = self._get_data_sources(feature_view)
elif isinstance(feature_view, OnDemandFeatureView):
if feature_view.input_request_data_sources is not None:
for request_source in feature_view.input_request_data_sources.values():
if feature_view.source_request_sources is not None:
for request_source in feature_view.source_request_sources.values():
source_platform, source_name = self._get_data_source_details(
request_source
)
Expand All @@ -245,10 +269,10 @@ def _get_feature_workunit(
)
)

if feature_view.input_feature_view_projections is not None:
if feature_view.source_feature_view_projections is not None:
for (
feature_view_projection
) in feature_view.input_feature_view_projections.values():
) in feature_view.source_feature_view_projections.values():
feature_view_source = self.feature_store.get_feature_view(
feature_view_projection.name
)
Expand All @@ -257,15 +281,15 @@ def _get_feature_workunit(

feature_snapshot.aspects.append(
MLFeaturePropertiesClass(
description=feature.labels.get("description"),
dataType=self._get_field_type(feature.dtype, feature.name),
description=field.tags.get("description"),
dataType=self._get_field_type(field.dtype, field.name),
sources=feature_sources,
)
)

mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot)

return MetadataWorkUnit(id=feature.name, mce=mce)
return MetadataWorkUnit(id=field.name, mce=mce)

def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkUnit:
"""
Expand Down Expand Up @@ -359,8 +383,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

yield work_unit

for feature in feature_view.features:
work_unit = self._get_feature_workunit(feature_view, feature)
for field in feature_view.features:
work_unit = self._get_feature_workunit(feature_view, field)
self.report.report_workunit(work_unit)

yield work_unit
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,4 @@ online_store:
path: data/online_store.db
offline_store:
type: file
flags:
alpha_features: true
on_demand_transforms: true
entity_key_serialization_version: 2
Original file line number Diff line number Diff line change
@@ -1,63 +1,74 @@
from datetime import timedelta

import feast.types
import pandas as pd
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.data_source import RequestDataSource
from feast import Entity, FeatureView, Field, FileSource, RequestSource, ValueType
from feast.on_demand_feature_view import on_demand_feature_view

driver_hourly_stats_source = FileSource(
name="driver_hourly_stats_source",
path="data/driver_stats_with_string.parquet",
event_timestamp_column="event_timestamp",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver_entity = Entity(
name="driver_id", value_type=ValueType.INT64, description="Driver ID"
driver = Entity(
# It would be the modern Feast pattern to call this `driver`, but the
# golden tests have the name as `driver_id`
name="driver_id",
join_keys=["driver_id"],
value_type=ValueType.INT64,
description="Driver ID",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
entities=[driver],
ttl=timedelta(days=7),
features=[
Feature(
schema=[
Field(
name="conv_rate",
dtype=ValueType.FLOAT,
labels=dict(description="Conv rate"),
dtype=feast.types.Float64,
tags=dict(description="Conv rate"),
),
Feature(
name="acc_rate", dtype=ValueType.FLOAT, labels=dict(description="Acc rate")
Field(
name="acc_rate",
dtype=feast.types.Float64,
tags=dict(description="Acc rate"),
),
Feature(
Field(
name="avg_daily_trips",
dtype=ValueType.INT64,
labels=dict(description="Avg daily trips"),
dtype=feast.types.Int64,
tags=dict(description="Avg daily trips"),
),
Feature(
Field(
name="string_feature",
dtype=ValueType.STRING,
labels=dict(description="String feature"),
dtype=feast.types.String,
tags=dict(description="String feature"),
),
],
online=True,
batch_source=driver_hourly_stats_source,
source=driver_hourly_stats_source,
tags={},
)

input_request = RequestDataSource(
input_request = RequestSource(
name="vals_to_add",
schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64},
schema=[
Field(name="val_to_add", dtype=feast.types.Int64),
Field(name="val_to_add_2", dtype=feast.types.Int64),
],
)


@on_demand_feature_view( # type: ignore
inputs={
"driver_hourly_stats": driver_hourly_stats_view,
"vals_to_add": input_request,
},
features=[
Feature(name="conv_rate_plus_val1", dtype=ValueType.DOUBLE),
Feature(name="conv_rate_plus_val2", dtype=ValueType.DOUBLE),
sources=[
driver_hourly_stats_view,
input_request,
],
schema=[
Field(name="conv_rate_plus_val1", dtype=feast.types.Float64),
Field(name="conv_rate_plus_val2", dtype=feast.types.Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
import sys

import pytest
from freezegun import freeze_time
Expand All @@ -8,11 +8,12 @@

FROZEN_TIME = "2020-04-14 07:00:00"

pytestmark = pytest.mark.skipif(
sys.version_info < (3, 8), reason="requires python 3.8 or higher"
)


@freeze_time(FROZEN_TIME)
@pytest.mark.skipif(
os.getenv("AIRFLOW1_TEST") == "true", reason="feast requires Airflow 2.0 or newer"
)
def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/feast"
output_path = tmp_path / "feast_repository_mces.json"
Expand Down

0 comments on commit 39c84c2

Please sign in to comment.