Skip to content

Commit

Permalink
Add type annotations
Browse files Browse the repository at this point in the history
Signed-off-by: Willem Pienaar <[email protected]>
  • Loading branch information
woop committed Aug 22, 2021
1 parent eda2764 commit 5986998
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def evaluate_historical_retrieval():
)

# Read offline parquet data in pyarrow format.
filesystem, path = FileSource.prepare_path(
filesystem, path = FileSource.create_filesystem_and_path(
feature_view.batch_source.path,
feature_view.batch_source.file_options.s3_endpoint_override,
)
Expand Down Expand Up @@ -242,7 +242,7 @@ def pull_latest_from_table_or_query(

# Create lazy function that is only called from the RetrievalJob object
def evaluate_offline_job():
filesystem, path = FileSource.prepare_path(
filesystem, path = FileSource.create_filesystem_and_path(
data_source.path, data_source.file_options.s3_endpoint_override
)
source_df = pd.read_parquet(path, filesystem=filesystem)
Expand Down
13 changes: 8 additions & 5 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Callable, Dict, Iterable, Optional, Tuple

from pyarrow import fs
from pyarrow._fs import FileSystem
from pyarrow._s3fs import S3FileSystem
from pyarrow.parquet import ParquetFile

from feast import type_map
Expand Down Expand Up @@ -138,7 +139,7 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
filesystem, path = FileSource.prepare_path(
filesystem, path = FileSource.create_filesystem_and_path(
self.path, self._file_options.s3_endpoint_override
)
schema = ParquetFile(
Expand All @@ -147,12 +148,14 @@ def get_table_column_names_and_types(
return zip(schema.names, map(str, schema.types))

@staticmethod
def prepare_path(path: str, s3_endpoint_override: str):
def create_filesystem_and_path(
path: str, s3_endpoint_override: str
) -> Tuple[Optional[FileSystem], str]:
if path.startswith("s3://"):
s3 = fs.S3FileSystem(
s3fs = S3FileSystem(
endpoint_override=s3_endpoint_override if s3_endpoint_override else None
)
return s3, path.replace("s3://", "")
return s3fs, path.replace("s3://", "")
else:
return None, path

Expand Down

0 comments on commit 5986998

Please sign in to comment.