From 0a0d329e3ce4adafdcb9dac50597fc6fe0aa1348 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Fri, 2 Feb 2024 16:20:25 -0500 Subject: [PATCH 01/10] Update artifacts fetcher to download artifacts --- .../python/apache_beam/ml/transforms/utils.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/utils.py b/sdks/python/apache_beam/ml/transforms/utils.py index fadf611b0e66..96f0f7f2f575 100644 --- a/sdks/python/apache_beam/ml/transforms/utils.py +++ b/sdks/python/apache_beam/ml/transforms/utils.py @@ -18,18 +18,28 @@ __all__ = ['ArtifactsFetcher'] import os +import tempfile import typing import tensorflow_transform as tft +from apache_beam.io.filesystems import FileSystems from apache_beam.ml.transforms import base -class ArtifactsFetcher(): +class ArtifactsFetcher: """ Utility class used to fetch artifacts from the artifact_location passed to the TFTProcessHandlers in MLTransform. + + This is intended to be used for testing purposes only. """ def __init__(self, artifact_location): + tempdir = tempfile.mkdtemp() + self._artifact_location = tempdir + # TODO: Can we use FileSystems.match() here with a * glob pattern? + # using match, does it output files and directories path? + FileSystems.copy(artifact_location, tempdir) + assert os.listdir(tempdir), f"No files found in {artifact_location}" files = os.listdir(artifact_location) files.remove(base._ATTRIBUTE_FILE_NAME) # TODO: https://github.com/apache/beam/issues/29356 @@ -43,9 +53,7 @@ def __init__(self, artifact_location): self._artifact_location = os.path.join(artifact_location, files[0]) self.transform_output = tft.TFTransformOutput(self._artifact_location) - def get_vocab_list( - self, - vocab_filename: str = 'compute_and_apply_vocab') -> typing.List[bytes]: + def get_vocab_list(self, vocab_filename: str) -> typing.List[bytes]: """ Returns list of vocabulary terms created during MLTransform. """ @@ -57,8 +65,7 @@ def get_vocab_list( vocab_filename)) from e return [x.decode('utf-8') for x in vocab_list] - def get_vocab_filepath( - self, vocab_filename: str = 'compute_and_apply_vocab') -> str: + def get_vocab_filepath(self, vocab_filename: str) -> str: """ Return the path to the vocabulary file created during MLTransform. """ From e51bc4ebc135682c88287c38e414923a3d12e73e Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Fri, 2 Feb 2024 16:26:07 -0500 Subject: [PATCH 02/10] Use context for tempdir creation --- .../python/apache_beam/ml/transforms/utils.py | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/utils.py b/sdks/python/apache_beam/ml/transforms/utils.py index 96f0f7f2f575..ff229842def4 100644 --- a/sdks/python/apache_beam/ml/transforms/utils.py +++ b/sdks/python/apache_beam/ml/transforms/utils.py @@ -34,24 +34,24 @@ class ArtifactsFetcher: This is intended to be used for testing purposes only. """ def __init__(self, artifact_location): - tempdir = tempfile.mkdtemp() - self._artifact_location = tempdir - # TODO: Can we use FileSystems.match() here with a * glob pattern? - # using match, does it output files and directories path? - FileSystems.copy(artifact_location, tempdir) - assert os.listdir(tempdir), f"No files found in {artifact_location}" - files = os.listdir(artifact_location) - files.remove(base._ATTRIBUTE_FILE_NAME) - # TODO: https://github.com/apache/beam/issues/29356 - # Integrate ArtifactFetcher into MLTransform. - if len(files) > 1: - raise NotImplementedError( - "MLTransform may have been utilized alongside transforms written " - "in TensorFlow Transform, in conjunction with those from different " - "frameworks. Currently, retrieving artifacts from this " - "multi-framework setup is not supported.") - self._artifact_location = os.path.join(artifact_location, files[0]) - self.transform_output = tft.TFTransformOutput(self._artifact_location) + with tempfile.TemporaryDirectory() as tempdir: + # TODO: Can we use FileSystems.match() here with a * glob pattern? + # using match, does it output files and directories path? + FileSystems.copy(artifact_location, tempdir) + assert os.listdir(tempdir), f"No files found in {artifact_location}" + artifact_location = tempdir + files = os.listdir(artifact_location) + files.remove(base._ATTRIBUTE_FILE_NAME) + # TODO: https://github.com/apache/beam/issues/29356 + # Integrate ArtifactFetcher into MLTransform. + if len(files) > 1: + raise NotImplementedError( + "MLTransform may have been utilized alongside transforms written " + "in TensorFlow Transform, in conjunction with those from different " + "frameworks. Currently, retrieving artifacts from this " + "multi-framework setup is not supported.") + self._artifact_location = os.path.join(artifact_location, files[0]) + self.transform_output = tft.TFTransformOutput(self._artifact_location) def get_vocab_list(self, vocab_filename: str) -> typing.List[bytes]: """ From b1610875261253d29593a15377e20942325955d8 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 6 Feb 2024 14:45:57 -0500 Subject: [PATCH 03/10] Refactor artifacts fetcher to support gcs path --- .../python/apache_beam/ml/transforms/utils.py | 59 ++++++++++++------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/utils.py b/sdks/python/apache_beam/ml/transforms/utils.py index ff229842def4..6e9c2aa06182 100644 --- a/sdks/python/apache_beam/ml/transforms/utils.py +++ b/sdks/python/apache_beam/ml/transforms/utils.py @@ -22,9 +22,25 @@ import typing import tensorflow_transform as tft -from apache_beam.io.filesystems import FileSystems from apache_beam.ml.transforms import base +from google.cloud.storage import Client +from google.cloud.storage import transfer_manager + + +def download_artifacts_from_gcs(bucket_name, prefix, local_path): + """Downloads artifacts from GCS to the local file system. + Args: + bucket_name: The name of the GCS bucket to download from. + folder_name: The name of the folder to download. + local_path: The local path to download the folder to. + """ + client = Client() + bucket = client.get_bucket(bucket_name) + blobs = [blob.name for blob in bucket.list_blobs(prefix=prefix)] + _ = transfer_manager.download_many_to_path( + bucket, blobs, destination_directory=local_path, max_workers=6) + class ArtifactsFetcher: """ @@ -33,25 +49,28 @@ class ArtifactsFetcher: This is intended to be used for testing purposes only. """ - def __init__(self, artifact_location): - with tempfile.TemporaryDirectory() as tempdir: - # TODO: Can we use FileSystems.match() here with a * glob pattern? - # using match, does it output files and directories path? - FileSystems.copy(artifact_location, tempdir) - assert os.listdir(tempdir), f"No files found in {artifact_location}" - artifact_location = tempdir - files = os.listdir(artifact_location) - files.remove(base._ATTRIBUTE_FILE_NAME) - # TODO: https://github.com/apache/beam/issues/29356 - # Integrate ArtifactFetcher into MLTransform. - if len(files) > 1: - raise NotImplementedError( - "MLTransform may have been utilized alongside transforms written " - "in TensorFlow Transform, in conjunction with those from different " - "frameworks. Currently, retrieving artifacts from this " - "multi-framework setup is not supported.") - self._artifact_location = os.path.join(artifact_location, files[0]) - self.transform_output = tft.TFTransformOutput(self._artifact_location) + def __init__(self, artifact_location: str): + tempdir = tempfile.mkdtemp() + if artifact_location.startswith('gs://'): + parts = artifact_location[5:].split('/') + bucket_name = parts[0] + prefix = '/'.join(parts[1:]) + download_artifacts_from_gcs(bucket_name, prefix, tempdir) + + assert os.listdir(tempdir), f"No files found in {artifact_location}" + artifact_location = os.path.join(tempdir, prefix) + files = os.listdir(artifact_location) + files.remove(base._ATTRIBUTE_FILE_NAME) + # TODO: https://github.com/apache/beam/issues/29356 + # Integrate ArtifactFetcher into MLTransform. + if len(files) > 1: + raise NotImplementedError( + "MLTransform may have been utilized alongside transforms written " + "in TensorFlow Transform, in conjunction with those from different " + "frameworks. Currently, retrieving artifacts from this " + "multi-framework setup is not supported.") + self._artifact_location = os.path.join(artifact_location, files[0]) + self.transform_output = tft.TFTransformOutput(self._artifact_location) def get_vocab_list(self, vocab_filename: str) -> typing.List[bytes]: """ From db09a185ed46a0e3a901ebff834d66982f3b3de2 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 6 Feb 2024 14:51:18 -0500 Subject: [PATCH 04/10] Remove defaults --- .../examples/ml_transform/ml_transform_it_test.py | 3 ++- .../examples/ml_transform/vocab_tfidf_processing.py | 6 +++++- sdks/python/apache_beam/ml/transforms/utils.py | 3 +-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py b/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py index 17adb9c7c6ab..14f40eef6885 100644 --- a/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py +++ b/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py @@ -88,9 +88,10 @@ def test_process_large_movie_review_dataset(self): artifacts_fetcher = ArtifactsFetcher(artifact_location=artifact_location) + vocab_filename = f'vocab_{vocab_tfidf_processing.REVIEW_COLUMN}' actual_vocab_list = artifacts_fetcher.get_vocab_list() - expected_artifact_filepath = 'gs://apache-beam-ml/testing/expected_outputs/compute_and_apply_vocab' # pylint: disable=line-too-long + expected_artifact_filepath = f'gs://apache-beam-ml/testing/expected_outputs/{vocab_filename}' # pylint: disable=line-too-long with FileSystems.open(expected_artifact_filepath, 'r') as f: expected_vocab_list = f.readlines() diff --git a/sdks/python/apache_beam/examples/ml_transform/vocab_tfidf_processing.py b/sdks/python/apache_beam/examples/ml_transform/vocab_tfidf_processing.py index 6ecd56892410..b8ae61ce51e5 100644 --- a/sdks/python/apache_beam/examples/ml_transform/vocab_tfidf_processing.py +++ b/sdks/python/apache_beam/examples/ml_transform/vocab_tfidf_processing.py @@ -123,8 +123,12 @@ def preprocess_data( top_k=VOCAB_SIZE, frequency_threshold=10, columns=[REVIEW_COLUMN], + vocab_filename='vocab', split_string_by_delimiter=DELIMITERS)).with_transform( - TFIDF(columns=[REVIEW_COLUMN], vocab_size=VOCAB_SIZE)) + TFIDF( + columns=[REVIEW_COLUMN], + vocab_size=VOCAB_SIZE, + )) data_pcoll = data_pcoll | 'MLTransform' >> ml_transform data_pcoll = ( diff --git a/sdks/python/apache_beam/ml/transforms/utils.py b/sdks/python/apache_beam/ml/transforms/utils.py index 6e9c2aa06182..1c2b883b7e97 100644 --- a/sdks/python/apache_beam/ml/transforms/utils.py +++ b/sdks/python/apache_beam/ml/transforms/utils.py @@ -90,6 +90,5 @@ def get_vocab_filepath(self, vocab_filename: str) -> str: """ return self.transform_output.vocabulary_file_by_name(vocab_filename) - def get_vocab_size( - self, vocab_filename: str = 'compute_and_apply_vocab') -> int: + def get_vocab_size(self, vocab_filename: str) -> int: return self.transform_output.vocabulary_size_by_name(vocab_filename) From 265c0556375794691fcd6ca24cc3ed0ae1bb6822 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Wed, 7 Feb 2024 11:07:29 -0500 Subject: [PATCH 05/10] Pass vocab filename param --- .../apache_beam/examples/ml_transform/ml_transform_it_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py b/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py index 14f40eef6885..c765fdf78340 100644 --- a/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py +++ b/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py @@ -89,7 +89,8 @@ def test_process_large_movie_review_dataset(self): artifacts_fetcher = ArtifactsFetcher(artifact_location=artifact_location) vocab_filename = f'vocab_{vocab_tfidf_processing.REVIEW_COLUMN}' - actual_vocab_list = artifacts_fetcher.get_vocab_list() + actual_vocab_list = artifacts_fetcher.get_vocab_list( + vocab_filename=vocab_filename) expected_artifact_filepath = f'gs://apache-beam-ml/testing/expected_outputs/{vocab_filename}' # pylint: disable=line-too-long From 13016581164c7e38679718f5c09f975aecf24cec Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Wed, 7 Feb 2024 13:30:51 -0500 Subject: [PATCH 06/10] Fix path --- .../apache_beam/examples/ml_transform/ml_transform_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py b/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py index c765fdf78340..96fb3f775671 100644 --- a/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py +++ b/sdks/python/apache_beam/examples/ml_transform/ml_transform_it_test.py @@ -92,7 +92,7 @@ def test_process_large_movie_review_dataset(self): actual_vocab_list = artifacts_fetcher.get_vocab_list( vocab_filename=vocab_filename) - expected_artifact_filepath = f'gs://apache-beam-ml/testing/expected_outputs/{vocab_filename}' # pylint: disable=line-too-long + expected_artifact_filepath = 'gs://apache-beam-ml/testing/expected_outputs/compute_and_apply_vocab' # pylint: disable=line-too-long with FileSystems.open(expected_artifact_filepath, 'r') as f: expected_vocab_list = f.readlines() From 2d8e8b5b5f915ab442bf9465fb83c6fc362de6a7 Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Fri, 9 Feb 2024 13:07:29 -0500 Subject: [PATCH 07/10] Update sdks/python/apache_beam/ml/transforms/utils.py Co-authored-by: tvalentyn --- sdks/python/apache_beam/ml/transforms/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/transforms/utils.py b/sdks/python/apache_beam/ml/transforms/utils.py index 1c2b883b7e97..f332fc700e46 100644 --- a/sdks/python/apache_beam/ml/transforms/utils.py +++ b/sdks/python/apache_beam/ml/transforms/utils.py @@ -34,7 +34,7 @@ def download_artifacts_from_gcs(bucket_name, prefix, local_path): bucket_name: The name of the GCS bucket to download from. folder_name: The name of the folder to download. local_path: The local path to download the folder to. - """ + """ client = Client() bucket = client.get_bucket(bucket_name) blobs = [blob.name for blob in bucket.list_blobs(prefix=prefix)] From ac854d6d56a7dc2718e5134ac3e6d41d39dc222d Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Fri, 9 Feb 2024 13:07:41 -0500 Subject: [PATCH 08/10] Update sdks/python/apache_beam/ml/transforms/utils.py Co-authored-by: tvalentyn --- sdks/python/apache_beam/ml/transforms/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/transforms/utils.py b/sdks/python/apache_beam/ml/transforms/utils.py index f332fc700e46..8217d3fd95f2 100644 --- a/sdks/python/apache_beam/ml/transforms/utils.py +++ b/sdks/python/apache_beam/ml/transforms/utils.py @@ -32,7 +32,7 @@ def download_artifacts_from_gcs(bucket_name, prefix, local_path): """Downloads artifacts from GCS to the local file system. Args: bucket_name: The name of the GCS bucket to download from. - folder_name: The name of the folder to download. + prefix: Prefix of GCS objects to download. local_path: The local path to download the folder to. """ client = Client() From 9e987cffbeb3d94b386a17fe2e114178458272a9 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Fri, 9 Feb 2024 13:26:08 -0500 Subject: [PATCH 09/10] Remove num_workers --- sdks/python/apache_beam/ml/transforms/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/transforms/utils.py b/sdks/python/apache_beam/ml/transforms/utils.py index 8217d3fd95f2..c9610be41e75 100644 --- a/sdks/python/apache_beam/ml/transforms/utils.py +++ b/sdks/python/apache_beam/ml/transforms/utils.py @@ -39,7 +39,7 @@ def download_artifacts_from_gcs(bucket_name, prefix, local_path): bucket = client.get_bucket(bucket_name) blobs = [blob.name for blob in bucket.list_blobs(prefix=prefix)] _ = transfer_manager.download_many_to_path( - bucket, blobs, destination_directory=local_path, max_workers=6) + bucket, blobs, destination_directory=local_path) class ArtifactsFetcher: From 05a334e83c09ea186fac95bf6240276933b017ad Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Fri, 9 Feb 2024 14:14:38 -0500 Subject: [PATCH 10/10] Fix lint --- sdks/python/apache_beam/ml/transforms/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/utils.py b/sdks/python/apache_beam/ml/transforms/utils.py index c9610be41e75..abf4c48fc642 100644 --- a/sdks/python/apache_beam/ml/transforms/utils.py +++ b/sdks/python/apache_beam/ml/transforms/utils.py @@ -21,12 +21,12 @@ import tempfile import typing -import tensorflow_transform as tft -from apache_beam.ml.transforms import base - from google.cloud.storage import Client from google.cloud.storage import transfer_manager +import tensorflow_transform as tft +from apache_beam.ml.transforms import base + def download_artifacts_from_gcs(bucket_name, prefix, local_path): """Downloads artifacts from GCS to the local file system.