From fb6b807f8b32776d388757ca431d290c03170c66 Mon Sep 17 00:00:00 2001 From: Harry Date: Sat, 21 Oct 2023 05:13:49 +0700 Subject: [PATCH] feat: Support GCS filesystem for bytewax engine (#3774) * fix: Support param timeout when persisting Signed-off-by: Hai Nguyen * fix: fix java integration test Signed-off-by: Hai Nguyen --------- Signed-off-by: Hai Nguyen --- .../contrib/bytewax/bytewax_materialization_dataflow.py | 4 +--- setup.py | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index bf5229303a..fe2a7f35c1 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -2,7 +2,6 @@ import pyarrow as pa import pyarrow.parquet as pq -import s3fs from bytewax.dataflow import Dataflow # type: ignore from bytewax.execution import cluster_main from bytewax.inputs import ManualInputConfig, distribute @@ -29,8 +28,7 @@ def __init__( self._run_dataflow() def process_path(self, path): - fs = s3fs.S3FileSystem() - dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False) + dataset = pq.ParquetDataset(path, use_legacy_dataset=False) batches = [] for fragment in dataset.fragments: for batch in fragment.to_table().to_batches(): diff --git a/setup.py b/setup.py index 047100f03e..9fbc2bc2cd 100644 --- a/setup.py +++ b/setup.py @@ -91,6 +91,7 @@ "google-cloud-datastore>=2.1.0,<3", "google-cloud-storage>=1.34.0,<3", "google-cloud-bigtable>=2.11.0,<3", + "gcsfs", ] REDIS_REQUIRED = [ @@ -158,7 +159,7 @@ "moto", "mypy>=0.981,<0.990", "avro==1.10.0", - "gcsfs>=0.4.0,<=2022.01.0", + "gcsfs", "urllib3>=1.25.4,<2", "psutil==5.9.0", "py>=1.11.0", # https://github.com/pytest-dev/pytest/issues/10420