Skip to content

Commit

Permalink
Merge pull request #84 from khorshuheng/missing-import
Browse files Browse the repository at this point in the history
Fix missing imports
  • Loading branch information
khorshuheng authored Jan 9, 2023
2 parents e0d1692 + f616e77 commit 995722b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
6 changes: 4 additions & 2 deletions caraml-store-python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ def list_feature_tables(

# Get latest feature tables from Feast Core
feature_table_protos = self._core_service.ListFeatureTables(
filter=ListFeatureTablesRequest.Filter(
project=project, labels=labels or dict()
ListFeatureTablesRequest(
filter=ListFeatureTablesRequest.Filter(
project=project, labels=labels or dict()
)
)
)

Expand Down
18 changes: 17 additions & 1 deletion caraml-store-python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Dict, Optional

from feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.data_format import FileFormat
from feast.data_format import FileFormat, StreamFormat


class SourceType(enum.Enum):
Expand Down Expand Up @@ -251,6 +251,22 @@ def from_proto(data_source):
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
)
elif (
data_source.kafka_options.bootstrap_servers
and data_source.kafka_options.topic
and data_source.kafka_options.message_format
):
data_source_obj = KafkaSource(
field_mapping=data_source.field_mapping,
bootstrap_servers=data_source.kafka_options.bootstrap_servers,
message_format=StreamFormat.from_proto(
data_source.kafka_options.message_format
),
topic=data_source.kafka_options.topic,
event_timestamp_column=data_source.event_timestamp_column,
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
)
else:
raise ValueError("Could not identify the source type being added")

Expand Down
Empty file.

0 comments on commit 995722b

Please sign in to comment.