diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 0adcd27efcf99..300f20efba1d4 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -1,4 +1,5 @@ import os +import sys from typing import Dict, Set import setuptools @@ -95,8 +96,9 @@ def get_long_description(): kafka_protobuf = { "networkx>=2.6.2", # Required to generate protobuf python modules from the schema downloaded from the schema registry - "grpcio==1.44.0", - "grpcio-tools==1.44.0", + # NOTE: potential conflict with feast also depending on grpcio + "grpcio>=1.44.0,<2", + "grpcio-tools>=1.44.0,<2", } sql_common = { @@ -254,7 +256,7 @@ def get_long_description(): # https://github.com/elastic/elasticsearch-py/issues/1639#issuecomment-883587433 "elasticsearch": {"elasticsearch==7.13.4"}, "feast-legacy": {"docker"}, - "feast": {"feast==0.18.0", "flask-openid>=1.3.0"}, + "feast": {"feast~=0.26.0", "flask-openid>=1.3.0"}, "glue": aws_common, # hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported "hana": sql_common @@ -397,7 +399,7 @@ def get_long_description(): "delta-lake", "druid", "elasticsearch", - "feast", + "feast" if sys.version_info >= (3, 8) else None, "iceberg", "ldap", "looker", @@ -425,6 +427,7 @@ def get_long_description(): "unity-catalog" # airflow is added below ] + if plugin for dependency in plugins[plugin] ), } @@ -444,7 +447,6 @@ def get_long_description(): "clickhouse", "delta-lake", "druid", - "feast", "feast-legacy", "hana", "hive", diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 9671d9b80c14c..43291bc763601 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -1,19 +1,27 @@ +import sys + +if sys.version_info < (3, 8): + raise ImportError("Feast is only supported on Python 3.8+") + from dataclasses import dataclass -from typing import Dict, Iterable, List, Tuple, Union +from typing import Dict, Iterable, List, Optional, Tuple, Union +import feast.types from feast import ( BigQuerySource, Entity, - Feature, FeatureStore, FeatureView, + Field as FeastField, FileSource, KafkaSource, KinesisSource, OnDemandFeatureView, + RequestSource, + SnowflakeSource, ValueType, ) -from feast.data_source import DataSource, RequestDataSource +from feast.data_source import DataSource from pydantic import Field import datahub.emitter.mce_builder as builder @@ -46,7 +54,7 @@ ) # FIXME: ValueType module cannot be used as a type -_field_type_mapping: Dict[ValueType, str] = { +_field_type_mapping: Dict[Union[ValueType, feast.types.FeastType], str] = { ValueType.UNKNOWN: MLFeatureDataType.UNKNOWN, ValueType.BYTES: MLFeatureDataType.BYTE, ValueType.STRING: MLFeatureDataType.TEXT, @@ -65,11 +73,26 @@ ValueType.BOOL_LIST: MLFeatureDataType.SEQUENCE, ValueType.UNIX_TIMESTAMP_LIST: MLFeatureDataType.SEQUENCE, ValueType.NULL: MLFeatureDataType.UNKNOWN, + feast.types.Invalid: MLFeatureDataType.UNKNOWN, + feast.types.Bytes: MLFeatureDataType.BYTE, + feast.types.String: MLFeatureDataType.TEXT, + feast.types.Int32: MLFeatureDataType.ORDINAL, + feast.types.Int64: MLFeatureDataType.ORDINAL, + feast.types.Float64: MLFeatureDataType.CONTINUOUS, + feast.types.Float32: MLFeatureDataType.CONTINUOUS, + feast.types.Bool: MLFeatureDataType.BINARY, + feast.types.UnixTimestamp: MLFeatureDataType.TIME, + feast.types.Array: MLFeatureDataType.SEQUENCE, # type: ignore + feast.types.Invalid: MLFeatureDataType.UNKNOWN, } class FeastRepositorySourceConfig(ConfigModel): path: str = Field(description="Path to Feast repository") + fs_yaml_file: Optional[str] = Field( + default=None, + description="Path to the `feature_store.yaml` file used to configure the feature store", + ) environment: str = Field( default=DEFAULT_ENV, description="Environment to use when constructing URNs" ) @@ -85,7 +108,7 @@ class FeastRepositorySource(Source): This plugin extracts: - Entities as [`MLPrimaryKey`](https://datahubproject.io/docs/graphql/objects#mlprimarykey) - - Features as [`MLFeature`](https://datahubproject.io/docs/graphql/objects#mlfeature) + - Fields as [`MLFeature`](https://datahubproject.io/docs/graphql/objects#mlfeature) - Feature views and on-demand feature views as [`MLFeatureTable`](https://datahubproject.io/docs/graphql/objects#mlfeaturetable) - Batch and stream source details as [`Dataset`](https://datahubproject.io/docs/graphql/objects#dataset) - Column types associated with each entity and feature @@ -97,12 +120,16 @@ class FeastRepositorySource(Source): def __init__(self, config: FeastRepositorySourceConfig, ctx: PipelineContext): super().__init__(ctx) - self.source_config = config self.report = SourceReport() - self.feature_store = FeatureStore(self.source_config.path) + self.feature_store = FeatureStore( + repo_path=self.source_config.path, + fs_yaml_file=self.source_config.fs_yaml_file, + ) - def _get_field_type(self, field_type: ValueType, parent_name: str) -> str: + def _get_field_type( + self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str + ) -> str: """ Maps types encountered in Feast to corresponding schema types. """ @@ -128,26 +155,24 @@ def _get_data_source_details(self, source: DataSource) -> Tuple[str, str]: if isinstance(source, FileSource): platform = "file" - name = source.path.replace("://", ".").replace("/", ".") - - if isinstance(source, BigQuerySource): + elif isinstance(source, BigQuerySource): platform = "bigquery" name = source.table - - if isinstance(source, KafkaSource): + elif isinstance(source, KafkaSource): platform = "kafka" name = source.kafka_options.topic - - if isinstance(source, KinesisSource): + elif isinstance(source, KinesisSource): platform = "kinesis" name = ( f"{source.kinesis_options.region}:{source.kinesis_options.stream_name}" ) - - if isinstance(source, RequestDataSource): + elif isinstance(source, RequestSource): platform = "request" name = source.name + elif isinstance(source, SnowflakeSource): + platform = "snowflake" + name = source.table return platform, name @@ -214,7 +239,7 @@ def _get_feature_workunit( self, # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type feature_view: Union[FeatureView, OnDemandFeatureView], - feature: Feature, + field: FeastField, ) -> MetadataWorkUnit: """ Generate an MLFeature work unit for a Feast feature. @@ -222,17 +247,16 @@ def _get_feature_workunit( feature_view_name = f"{self.feature_store.project}.{feature_view.name}" feature_snapshot = MLFeatureSnapshot( - urn=builder.make_ml_feature_urn(feature_view_name, feature.name), + urn=builder.make_ml_feature_urn(feature_view_name, field.name), aspects=[StatusClass(removed=False)], ) feature_sources = [] - if isinstance(feature_view, FeatureView): feature_sources = self._get_data_sources(feature_view) elif isinstance(feature_view, OnDemandFeatureView): - if feature_view.input_request_data_sources is not None: - for request_source in feature_view.input_request_data_sources.values(): + if feature_view.source_request_sources is not None: + for request_source in feature_view.source_request_sources.values(): source_platform, source_name = self._get_data_source_details( request_source ) @@ -245,10 +269,10 @@ def _get_feature_workunit( ) ) - if feature_view.input_feature_view_projections is not None: + if feature_view.source_feature_view_projections is not None: for ( feature_view_projection - ) in feature_view.input_feature_view_projections.values(): + ) in feature_view.source_feature_view_projections.values(): feature_view_source = self.feature_store.get_feature_view( feature_view_projection.name ) @@ -257,15 +281,15 @@ def _get_feature_workunit( feature_snapshot.aspects.append( MLFeaturePropertiesClass( - description=feature.labels.get("description"), - dataType=self._get_field_type(feature.dtype, feature.name), + description=field.tags.get("description"), + dataType=self._get_field_type(field.dtype, field.name), sources=feature_sources, ) ) mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) - return MetadataWorkUnit(id=feature.name, mce=mce) + return MetadataWorkUnit(id=field.name, mce=mce) def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkUnit: """ @@ -359,8 +383,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: yield work_unit - for feature in feature_view.features: - work_unit = self._get_feature_workunit(feature_view, feature) + for field in feature_view.features: + work_unit = self._get_feature_workunit(feature_view, field) self.report.report_workunit(work_unit) yield work_unit diff --git a/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db b/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db index dbc038017909c..a511ff56c9770 100644 Binary files a/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db and b/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db differ diff --git a/metadata-ingestion/tests/integration/feast/feature_store/feature_store.yaml b/metadata-ingestion/tests/integration/feast/feature_store/feature_store.yaml index f68061d824ed8..e7e298b9852f6 100644 --- a/metadata-ingestion/tests/integration/feast/feature_store/feature_store.yaml +++ b/metadata-ingestion/tests/integration/feast/feature_store/feature_store.yaml @@ -5,6 +5,4 @@ online_store: path: data/online_store.db offline_store: type: file -flags: - alpha_features: true - on_demand_transforms: true +entity_key_serialization_version: 2 diff --git a/metadata-ingestion/tests/integration/feast/feature_store/features.py b/metadata-ingestion/tests/integration/feast/feature_store/features.py index b91318a823844..a6e6cd3616e92 100644 --- a/metadata-ingestion/tests/integration/feast/feature_store/features.py +++ b/metadata-ingestion/tests/integration/feast/feature_store/features.py @@ -1,63 +1,74 @@ from datetime import timedelta +import feast.types import pandas as pd -from feast import Entity, Feature, FeatureView, FileSource, ValueType -from feast.data_source import RequestDataSource +from feast import Entity, FeatureView, Field, FileSource, RequestSource, ValueType from feast.on_demand_feature_view import on_demand_feature_view driver_hourly_stats_source = FileSource( + name="driver_hourly_stats_source", path="data/driver_stats_with_string.parquet", - event_timestamp_column="event_timestamp", + timestamp_field="event_timestamp", created_timestamp_column="created", ) -driver_entity = Entity( - name="driver_id", value_type=ValueType.INT64, description="Driver ID" +driver = Entity( + # It would be the modern Feast pattern to call this `driver`, but the + # golden tests have the name as `driver_id` + name="driver_id", + join_keys=["driver_id"], + value_type=ValueType.INT64, + description="Driver ID", ) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], + entities=[driver], ttl=timedelta(days=7), - features=[ - Feature( + schema=[ + Field( name="conv_rate", - dtype=ValueType.FLOAT, - labels=dict(description="Conv rate"), + dtype=feast.types.Float64, + tags=dict(description="Conv rate"), ), - Feature( - name="acc_rate", dtype=ValueType.FLOAT, labels=dict(description="Acc rate") + Field( + name="acc_rate", + dtype=feast.types.Float64, + tags=dict(description="Acc rate"), ), - Feature( + Field( name="avg_daily_trips", - dtype=ValueType.INT64, - labels=dict(description="Avg daily trips"), + dtype=feast.types.Int64, + tags=dict(description="Avg daily trips"), ), - Feature( + Field( name="string_feature", - dtype=ValueType.STRING, - labels=dict(description="String feature"), + dtype=feast.types.String, + tags=dict(description="String feature"), ), ], online=True, - batch_source=driver_hourly_stats_source, + source=driver_hourly_stats_source, tags={}, ) -input_request = RequestDataSource( +input_request = RequestSource( name="vals_to_add", - schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64}, + schema=[ + Field(name="val_to_add", dtype=feast.types.Int64), + Field(name="val_to_add_2", dtype=feast.types.Int64), + ], ) @on_demand_feature_view( # type: ignore - inputs={ - "driver_hourly_stats": driver_hourly_stats_view, - "vals_to_add": input_request, - }, - features=[ - Feature(name="conv_rate_plus_val1", dtype=ValueType.DOUBLE), - Feature(name="conv_rate_plus_val2", dtype=ValueType.DOUBLE), + sources=[ + driver_hourly_stats_view, + input_request, + ], + schema=[ + Field(name="conv_rate_plus_val1", dtype=feast.types.Float64), + Field(name="conv_rate_plus_val2", dtype=feast.types.Float64), ], ) def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: diff --git a/metadata-ingestion/tests/integration/feast/test_feast_repository.py b/metadata-ingestion/tests/integration/feast/test_feast_repository.py index 3e7b0d8b5dc00..eab37f67ed155 100644 --- a/metadata-ingestion/tests/integration/feast/test_feast_repository.py +++ b/metadata-ingestion/tests/integration/feast/test_feast_repository.py @@ -1,4 +1,4 @@ -import os +import sys import pytest from freezegun import freeze_time @@ -8,11 +8,12 @@ FROZEN_TIME = "2020-04-14 07:00:00" +pytestmark = pytest.mark.skipif( + sys.version_info < (3, 8), reason="requires python 3.8 or higher" +) + @freeze_time(FROZEN_TIME) -@pytest.mark.skipif( - os.getenv("AIRFLOW1_TEST") == "true", reason="feast requires Airflow 2.0 or newer" -) def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time): test_resources_dir = pytestconfig.rootpath / "tests/integration/feast" output_path = tmp_path / "feast_repository_mces.json"