Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): upgrade feast #6186

Merged
merged 3 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
from typing import Dict, Set

import setuptools
Expand Down Expand Up @@ -95,8 +96,9 @@ def get_long_description():
kafka_protobuf = {
"networkx>=2.6.2",
# Required to generate protobuf python modules from the schema downloaded from the schema registry
"grpcio==1.44.0",
"grpcio-tools==1.44.0",
# NOTE: potential conflict with feast also depending on grpcio
"grpcio>=1.44.0,<2",
"grpcio-tools>=1.44.0,<2",
}

sql_common = {
Expand Down Expand Up @@ -254,7 +256,7 @@ def get_long_description():
# https://github.com/elastic/elasticsearch-py/issues/1639#issuecomment-883587433
"elasticsearch": {"elasticsearch==7.13.4"},
"feast-legacy": {"docker"},
"feast": {"feast==0.18.0", "flask-openid>=1.3.0"},
"feast": {"feast~=0.26.0", "flask-openid>=1.3.0"},
"glue": aws_common,
# hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported
"hana": sql_common
Expand Down Expand Up @@ -397,7 +399,7 @@ def get_long_description():
"delta-lake",
"druid",
"elasticsearch",
"feast",
"feast" if sys.version_info >= (3, 8) else None,
"iceberg",
"ldap",
"looker",
Expand Down Expand Up @@ -425,6 +427,7 @@ def get_long_description():
"unity-catalog"
# airflow is added below
]
if plugin
for dependency in plugins[plugin]
),
}
Expand All @@ -444,7 +447,6 @@ def get_long_description():
"clickhouse",
"delta-lake",
"druid",
"feast",
"feast-legacy",
"hana",
"hive",
Expand Down
82 changes: 53 additions & 29 deletions metadata-ingestion/src/datahub/ingestion/source/feast.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import sys

if sys.version_info < (3, 8):
raise ImportError("Feast is only supported on Python 3.8+")

from dataclasses import dataclass
from typing import Dict, Iterable, List, Tuple, Union
from typing import Dict, Iterable, List, Optional, Tuple, Union

import feast.types
from feast import (
BigQuerySource,
Entity,
Feature,
FeatureStore,
FeatureView,
Field as FeastField,
FileSource,
KafkaSource,
KinesisSource,
OnDemandFeatureView,
RequestSource,
SnowflakeSource,
ValueType,
)
from feast.data_source import DataSource, RequestDataSource
from feast.data_source import DataSource
from pydantic import Field

import datahub.emitter.mce_builder as builder
Expand Down Expand Up @@ -46,7 +54,7 @@
)

# FIXME: ValueType module cannot be used as a type
_field_type_mapping: Dict[ValueType, str] = {
_field_type_mapping: Dict[Union[ValueType, feast.types.FeastType], str] = {
ValueType.UNKNOWN: MLFeatureDataType.UNKNOWN,
ValueType.BYTES: MLFeatureDataType.BYTE,
ValueType.STRING: MLFeatureDataType.TEXT,
Expand All @@ -65,11 +73,26 @@
ValueType.BOOL_LIST: MLFeatureDataType.SEQUENCE,
ValueType.UNIX_TIMESTAMP_LIST: MLFeatureDataType.SEQUENCE,
ValueType.NULL: MLFeatureDataType.UNKNOWN,
feast.types.Invalid: MLFeatureDataType.UNKNOWN,
feast.types.Bytes: MLFeatureDataType.BYTE,
feast.types.String: MLFeatureDataType.TEXT,
feast.types.Int32: MLFeatureDataType.ORDINAL,
feast.types.Int64: MLFeatureDataType.ORDINAL,
feast.types.Float64: MLFeatureDataType.CONTINUOUS,
feast.types.Float32: MLFeatureDataType.CONTINUOUS,
feast.types.Bool: MLFeatureDataType.BINARY,
feast.types.UnixTimestamp: MLFeatureDataType.TIME,
feast.types.Array: MLFeatureDataType.SEQUENCE, # type: ignore
feast.types.Invalid: MLFeatureDataType.UNKNOWN,
}


class FeastRepositorySourceConfig(ConfigModel):
path: str = Field(description="Path to Feast repository")
fs_yaml_file: Optional[str] = Field(
default=None,
description="Path to the `feature_store.yaml` file used to configure the feature store",
)
environment: str = Field(
default=DEFAULT_ENV, description="Environment to use when constructing URNs"
)
Expand All @@ -85,7 +108,7 @@ class FeastRepositorySource(Source):
This plugin extracts:

- Entities as [`MLPrimaryKey`](https://datahubproject.io/docs/graphql/objects#mlprimarykey)
- Features as [`MLFeature`](https://datahubproject.io/docs/graphql/objects#mlfeature)
- Fields as [`MLFeature`](https://datahubproject.io/docs/graphql/objects#mlfeature)
- Feature views and on-demand feature views as [`MLFeatureTable`](https://datahubproject.io/docs/graphql/objects#mlfeaturetable)
- Batch and stream source details as [`Dataset`](https://datahubproject.io/docs/graphql/objects#dataset)
- Column types associated with each entity and feature
Expand All @@ -97,12 +120,16 @@ class FeastRepositorySource(Source):

def __init__(self, config: FeastRepositorySourceConfig, ctx: PipelineContext):
super().__init__(ctx)

self.source_config = config
self.report = SourceReport()
self.feature_store = FeatureStore(self.source_config.path)
self.feature_store = FeatureStore(
repo_path=self.source_config.path,
fs_yaml_file=self.source_config.fs_yaml_file,
)

def _get_field_type(self, field_type: ValueType, parent_name: str) -> str:
def _get_field_type(
self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str
) -> str:
"""
Maps types encountered in Feast to corresponding schema types.
"""
Expand All @@ -128,26 +155,24 @@ def _get_data_source_details(self, source: DataSource) -> Tuple[str, str]:

if isinstance(source, FileSource):
platform = "file"

name = source.path.replace("://", ".").replace("/", ".")

if isinstance(source, BigQuerySource):
elif isinstance(source, BigQuerySource):
platform = "bigquery"
name = source.table

if isinstance(source, KafkaSource):
elif isinstance(source, KafkaSource):
platform = "kafka"
name = source.kafka_options.topic

if isinstance(source, KinesisSource):
elif isinstance(source, KinesisSource):
platform = "kinesis"
name = (
f"{source.kinesis_options.region}:{source.kinesis_options.stream_name}"
)

if isinstance(source, RequestDataSource):
elif isinstance(source, RequestSource):
platform = "request"
name = source.name
elif isinstance(source, SnowflakeSource):
platform = "snowflake"
name = source.table

return platform, name

Expand Down Expand Up @@ -214,25 +239,24 @@ def _get_feature_workunit(
self,
# FIXME: FeatureView and OnDemandFeatureView cannot be used as a type
feature_view: Union[FeatureView, OnDemandFeatureView],
feature: Feature,
field: FeastField,
) -> MetadataWorkUnit:
"""
Generate an MLFeature work unit for a Feast feature.
"""
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"

feature_snapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(feature_view_name, feature.name),
urn=builder.make_ml_feature_urn(feature_view_name, field.name),
aspects=[StatusClass(removed=False)],
)

feature_sources = []

if isinstance(feature_view, FeatureView):
feature_sources = self._get_data_sources(feature_view)
elif isinstance(feature_view, OnDemandFeatureView):
if feature_view.input_request_data_sources is not None:
for request_source in feature_view.input_request_data_sources.values():
if feature_view.source_request_sources is not None:
for request_source in feature_view.source_request_sources.values():
source_platform, source_name = self._get_data_source_details(
request_source
)
Expand All @@ -245,10 +269,10 @@ def _get_feature_workunit(
)
)

if feature_view.input_feature_view_projections is not None:
if feature_view.source_feature_view_projections is not None:
for (
feature_view_projection
) in feature_view.input_feature_view_projections.values():
) in feature_view.source_feature_view_projections.values():
feature_view_source = self.feature_store.get_feature_view(
feature_view_projection.name
)
Expand All @@ -257,15 +281,15 @@ def _get_feature_workunit(

feature_snapshot.aspects.append(
MLFeaturePropertiesClass(
description=feature.labels.get("description"),
dataType=self._get_field_type(feature.dtype, feature.name),
description=field.tags.get("description"),
dataType=self._get_field_type(field.dtype, field.name),
sources=feature_sources,
)
)

mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot)

return MetadataWorkUnit(id=feature.name, mce=mce)
return MetadataWorkUnit(id=field.name, mce=mce)

def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkUnit:
"""
Expand Down Expand Up @@ -359,8 +383,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

yield work_unit

for feature in feature_view.features:
work_unit = self._get_feature_workunit(feature_view, feature)
for field in feature_view.features:
work_unit = self._get_feature_workunit(feature_view, field)
self.report.report_workunit(work_unit)

yield work_unit
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,4 @@ online_store:
path: data/online_store.db
offline_store:
type: file
flags:
alpha_features: true
on_demand_transforms: true
entity_key_serialization_version: 2
Original file line number Diff line number Diff line change
@@ -1,63 +1,74 @@
from datetime import timedelta

import feast.types
import pandas as pd
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.data_source import RequestDataSource
from feast import Entity, FeatureView, Field, FileSource, RequestSource, ValueType
from feast.on_demand_feature_view import on_demand_feature_view

driver_hourly_stats_source = FileSource(
name="driver_hourly_stats_source",
path="data/driver_stats_with_string.parquet",
event_timestamp_column="event_timestamp",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver_entity = Entity(
name="driver_id", value_type=ValueType.INT64, description="Driver ID"
driver = Entity(
# It would be the modern Feast pattern to call this `driver`, but the
# golden tests have the name as `driver_id`
name="driver_id",
join_keys=["driver_id"],
value_type=ValueType.INT64,
description="Driver ID",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
entities=[driver],
ttl=timedelta(days=7),
features=[
Feature(
schema=[
Field(
name="conv_rate",
dtype=ValueType.FLOAT,
labels=dict(description="Conv rate"),
dtype=feast.types.Float64,
tags=dict(description="Conv rate"),
),
Feature(
name="acc_rate", dtype=ValueType.FLOAT, labels=dict(description="Acc rate")
Field(
name="acc_rate",
dtype=feast.types.Float64,
tags=dict(description="Acc rate"),
),
Feature(
Field(
name="avg_daily_trips",
dtype=ValueType.INT64,
labels=dict(description="Avg daily trips"),
dtype=feast.types.Int64,
tags=dict(description="Avg daily trips"),
),
Feature(
Field(
name="string_feature",
dtype=ValueType.STRING,
labels=dict(description="String feature"),
dtype=feast.types.String,
tags=dict(description="String feature"),
),
],
online=True,
batch_source=driver_hourly_stats_source,
source=driver_hourly_stats_source,
tags={},
)

input_request = RequestDataSource(
input_request = RequestSource(
name="vals_to_add",
schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64},
schema=[
Field(name="val_to_add", dtype=feast.types.Int64),
Field(name="val_to_add_2", dtype=feast.types.Int64),
],
)


@on_demand_feature_view( # type: ignore
inputs={
"driver_hourly_stats": driver_hourly_stats_view,
"vals_to_add": input_request,
},
features=[
Feature(name="conv_rate_plus_val1", dtype=ValueType.DOUBLE),
Feature(name="conv_rate_plus_val2", dtype=ValueType.DOUBLE),
sources=[
driver_hourly_stats_view,
input_request,
],
schema=[
Field(name="conv_rate_plus_val1", dtype=feast.types.Float64),
Field(name="conv_rate_plus_val2", dtype=feast.types.Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
import sys

import pytest
from freezegun import freeze_time
Expand All @@ -8,11 +8,12 @@

FROZEN_TIME = "2020-04-14 07:00:00"

pytestmark = pytest.mark.skipif(
sys.version_info < (3, 8), reason="requires python 3.8 or higher"
)


@freeze_time(FROZEN_TIME)
@pytest.mark.skipif(
os.getenv("AIRFLOW1_TEST") == "true", reason="feast requires Airflow 2.0 or newer"
)
def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/feast"
output_path = tmp_path / "feast_repository_mces.json"
Expand Down