From c7a48efcf502a8b37bbd809d0823a17aef60cd01 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 12 Apr 2019 14:30:40 +0100 Subject: [PATCH 1/5] [AIRFLOW-4255] Make GCS Hook Backwards compatible --- airflow/contrib/hooks/gcs_hook.py | 28 +++++++++++++++++++++++---- tests/contrib/hooks/test_gcs_hook.py | 29 ++++++++++++++++++---------- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index 3cc6efd36aa61..af4d31fbd027d 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -20,6 +20,7 @@ import gzip as gz import os import shutil +import warnings from google.cloud import storage @@ -173,7 +174,8 @@ def download(self, bucket, object, filename=None): # pylint:disable=redefined-builtin def upload(self, bucket, object, filename, - mime_type='application/octet-stream', gzip=False): + mime_type='application/octet-stream', gzip=False, + multipart=False, num_retries=0): """ Uploads a local file to Google Cloud Storage. @@ -187,8 +189,15 @@ def upload(self, bucket, object, filename, :type mime_type: str :param gzip: Option to compress file for upload :type gzip: bool + :param multipart: Deprecated parameter. Multipart would be handled automatically + :type multipart: bool or int + :param num_retries: Deprecated parameter. Retries would be handled automatically + :type num_retries: int """ + warnings.warn("'multipart' and 'num_retries' parameters have been deprecated." + " They are handled automatically by the Storage client", DeprecationWarning) + if gzip: filename_gz = filename + '.gz' @@ -256,7 +265,7 @@ def is_updated_after(self, bucket, object, ts): return False - def delete(self, bucket, object): + def delete(self, bucket, object, generation=None): """ Deletes an object from the bucket. @@ -264,7 +273,12 @@ def delete(self, bucket, object): :type bucket: str :param object: name of the object to delete :type object: str + :param generation: Deprecated parameter + :type generation: str """ + + warnings.warn("'generation' parameter is no longer supported", DeprecationWarning) + client = self.get_conn() bucket = client.get_bucket(bucket_name=bucket) blob = bucket.blob(blob_name=object) @@ -477,7 +491,8 @@ def insert_bucket_acl(self, bucket, entity, role, user_project=None): self.log.info('A new ACL entry created in bucket: %s', bucket) - def insert_object_acl(self, bucket, object_name, entity, role, user_project=None): + def insert_object_acl(self, bucket, object_name, entity, role, generation=None, + user_project=None): """ Creates a new ACL entry on the specified object. See: https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert @@ -496,10 +511,13 @@ def insert_object_acl(self, bucket, object_name, entity, role, user_project=None :param role: The access permission for the entity. Acceptable values are: "OWNER", "READER". :type role: str + :param generation: (Deprecated) Parameter is no longer supported. + :type generation: str :param user_project: (Optional) The project to be billed for this request. Required for Requester Pays buckets. :type user_project: str """ + warnings.warn("'generation' parameter is no longer supported", DeprecationWarning) self.log.info('Creating a new ACL entry for object: %s in bucket: %s', object_name, bucket) client = self.get_conn() @@ -515,7 +533,7 @@ def insert_object_acl(self, bucket, object_name, entity, role, user_project=None self.log.info('A new ACL entry created for object: %s in bucket: %s', object_name, bucket) - def compose(self, bucket, source_objects, destination_object): + def compose(self, bucket, source_objects, destination_object, num_retries=5): """ Composes a list of existing object into a new object in the same storage bucket @@ -533,6 +551,8 @@ def compose(self, bucket, source_objects, destination_object): :param destination_object: The path of the object if given. :type destination_object: str """ + warnings.warn("'num_retries' parameter is Deprecated. Retries are " + "now handled automatically", DeprecationWarning) if not source_objects or not len(source_objects): raise ValueError('source_objects cannot be empty.') diff --git a/tests/contrib/hooks/test_gcs_hook.py b/tests/contrib/hooks/test_gcs_hook.py index 40590564d8fb1..a9e763b0aa794 100644 --- a/tests/contrib/hooks/test_gcs_hook.py +++ b/tests/contrib/hooks/test_gcs_hook.py @@ -17,7 +17,8 @@ # specific language governing permissions and limitations # under the License. -import unittest +import six + import tempfile import os @@ -27,6 +28,11 @@ from tests.contrib.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id from google.cloud import storage from google.cloud import exceptions +if six.PY2: + # Need `assertWarns` back-ported from unittest2 + import unittest2 as unittest +else: + import unittest BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}' GCS_STRING = 'airflow.contrib.hooks.gcs_hook.{}' @@ -235,7 +241,8 @@ def test_delete(self, mock_service, mock_bucket): delete_method = get_blob_method.return_value.delete delete_method.return_value = blob_to_be_deleted - response = self.gcs_hook.delete(bucket=test_bucket, object=test_object) + with self.assertWarns(DeprecationWarning): + response = self.gcs_hook.delete(bucket=test_bucket, object=test_object) self.assertIsNone(response) @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) @@ -386,11 +393,12 @@ def test_compose_without_destination_object(self, mock_service): test_destination_object = None with self.assertRaises(ValueError) as e: - self.gcs_hook.compose( - bucket=test_bucket, - source_objects=test_source_objects, - destination_object=test_destination_object - ) + with self.assertWarns(DeprecationWarning): + self.gcs_hook.compose( + bucket=test_bucket, + source_objects=test_source_objects, + destination_object=test_destination_object + ) self.assertEqual( str(e.exception), @@ -422,9 +430,10 @@ def test_upload(self, mock_service): .blob.return_value.upload_from_filename upload_method.return_value = None - response = self.gcs_hook.upload(test_bucket, - test_object, - self.testfile.name) + with self.assertWarns(DeprecationWarning): + response = self.gcs_hook.upload(test_bucket, + test_object, + self.testfile.name) self.assertIsNone(response) upload_method.assert_called_once_with( From df23e1f0c4a8e810aa4bf24eb7c037e49cb0b2e5 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 12 Apr 2019 14:36:17 +0100 Subject: [PATCH 2/5] Update UPDATING.md --- UPDATING.md | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 84145a32242ae..5061d536d52ac 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -27,24 +27,11 @@ assists users migrating to a new version. ### Changes to GoogleCloudStorageHook * the discovery-based api (`googleapiclient.discovery`) used in `GoogleCloudStorageHook` is now replaced by the recommended client based api (`google-cloud-storage`). To know the difference between both the libraries, read https://cloud.google.com/apis/docs/client-libraries-explained. PR: [#5054](https://github.com/apache/airflow/pull/5054) -* as a part of this replacement, the `multipart` & `num_retries` parameters for `GoogleCloudStorageHook.upload` method has been removed: +* as a part of this replacement, the `multipart` & `num_retries` parameters for `GoogleCloudStorageHook.upload` method have been deprecated. - **Old**: - ```python - def upload(self, bucket, object, filename, - mime_type='application/octet-stream', gzip=False, - multipart=False, num_retries=0): - ``` - - **New**: - ```python - def upload(self, bucket, object, filename, - mime_type='application/octet-stream', gzip=False): - ``` - - The client library uses multipart upload automatically if the object/blob size is more than 8 MB - [source code](https://github.com/googleapis/google-cloud-python/blob/11c543ce7dd1d804688163bc7895cf592feb445f/storage/google/cloud/storage/blob.py#L989-L997). + The client library uses multipart upload automatically if the object/blob size is more than 8 MB - [source code](https://github.com/googleapis/google-cloud-python/blob/11c543ce7dd1d804688163bc7895cf592feb445f/storage/google/cloud/storage/blob.py#L989-L997). The client also handles retries automatically -* the `generation` parameter is no longer supported in `GoogleCloudStorageHook.delete` and `GoogleCloudStorageHook.insert_object_acl`. +* the `generation` parameter is deprecated in `GoogleCloudStorageHook.delete` and `GoogleCloudStorageHook.insert_object_acl`. ### Changes to CloudantHook From 0ac1b8518b8ce7c75c793050458f217c48d29e53 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 12 Apr 2019 14:44:52 +0100 Subject: [PATCH 3/5] Add option to stop warnings --- airflow/contrib/hooks/gcs_hook.py | 32 +++++++++---------- tests/contrib/hooks/test_gcs_hook.py | 47 ++++++++++++++++++++++++---- 2 files changed, 57 insertions(+), 22 deletions(-) diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index af4d31fbd027d..ba71dd3900d96 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -175,7 +175,7 @@ def download(self, bucket, object, filename=None): # pylint:disable=redefined-builtin def upload(self, bucket, object, filename, mime_type='application/octet-stream', gzip=False, - multipart=False, num_retries=0): + multipart=None, num_retries=None): """ Uploads a local file to Google Cloud Storage. @@ -189,14 +189,15 @@ def upload(self, bucket, object, filename, :type mime_type: str :param gzip: Option to compress file for upload :type gzip: bool - :param multipart: Deprecated parameter. Multipart would be handled automatically - :type multipart: bool or int - :param num_retries: Deprecated parameter. Retries would be handled automatically - :type num_retries: int """ - warnings.warn("'multipart' and 'num_retries' parameters have been deprecated." - " They are handled automatically by the Storage client", DeprecationWarning) + if multipart is not None: + warnings.warn("'multipart' parameter is deprecated." + " It is handled automatically by the Storage client", DeprecationWarning) + + if num_retries is not None: + warnings.warn("'num_retries' parameter is deprecated." + " It is handled automatically by the Storage client", DeprecationWarning) if gzip: filename_gz = filename + '.gz' @@ -273,11 +274,10 @@ def delete(self, bucket, object, generation=None): :type bucket: str :param object: name of the object to delete :type object: str - :param generation: Deprecated parameter - :type generation: str """ - warnings.warn("'generation' parameter is no longer supported", DeprecationWarning) + if generation is not None: + warnings.warn("'generation' parameter is no longer supported", DeprecationWarning) client = self.get_conn() bucket = client.get_bucket(bucket_name=bucket) @@ -511,13 +511,12 @@ def insert_object_acl(self, bucket, object_name, entity, role, generation=None, :param role: The access permission for the entity. Acceptable values are: "OWNER", "READER". :type role: str - :param generation: (Deprecated) Parameter is no longer supported. - :type generation: str :param user_project: (Optional) The project to be billed for this request. Required for Requester Pays buckets. :type user_project: str """ - warnings.warn("'generation' parameter is no longer supported", DeprecationWarning) + if generation is not None: + warnings.warn("'generation' parameter is no longer supported", DeprecationWarning) self.log.info('Creating a new ACL entry for object: %s in bucket: %s', object_name, bucket) client = self.get_conn() @@ -533,7 +532,7 @@ def insert_object_acl(self, bucket, object_name, entity, role, generation=None, self.log.info('A new ACL entry created for object: %s in bucket: %s', object_name, bucket) - def compose(self, bucket, source_objects, destination_object, num_retries=5): + def compose(self, bucket, source_objects, destination_object, num_retries=None): """ Composes a list of existing object into a new object in the same storage bucket @@ -551,8 +550,9 @@ def compose(self, bucket, source_objects, destination_object, num_retries=5): :param destination_object: The path of the object if given. :type destination_object: str """ - warnings.warn("'num_retries' parameter is Deprecated. Retries are " - "now handled automatically", DeprecationWarning) + if num_retries is not None: + warnings.warn("'num_retries' parameter is Deprecated. Retries are " + "now handled automatically", DeprecationWarning) if not source_objects or not len(source_objects): raise ValueError('source_objects cannot be empty.') diff --git a/tests/contrib/hooks/test_gcs_hook.py b/tests/contrib/hooks/test_gcs_hook.py index a9e763b0aa794..8eb0100d370a5 100644 --- a/tests/contrib/hooks/test_gcs_hook.py +++ b/tests/contrib/hooks/test_gcs_hook.py @@ -393,18 +393,32 @@ def test_compose_without_destination_object(self, mock_service): test_destination_object = None with self.assertRaises(ValueError) as e: - with self.assertWarns(DeprecationWarning): - self.gcs_hook.compose( - bucket=test_bucket, - source_objects=test_source_objects, - destination_object=test_destination_object - ) + self.gcs_hook.compose( + bucket=test_bucket, + source_objects=test_source_objects, + destination_object=test_destination_object + ) self.assertEqual( str(e.exception), 'bucket and destination_object cannot be empty.' ) + # Test Deprecation warnings for deprecated parameters + @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) + def test_compose_deprecated_params(self, mock_service): + test_bucket = 'test_bucket' + test_source_objects = ['test_object_1', 'test_object_2', 'test_object_3'] + test_destination_object = 'test_object_composed' + + with self.assertWarns(DeprecationWarning): + self.gcs_hook.compose( + bucket=test_bucket, + source_objects=test_source_objects, + destination_object=test_destination_object, + num_retries=5 + ) + class TestGoogleCloudStorageHookUpload(unittest.TestCase): def setUp(self): @@ -456,3 +470,24 @@ def test_upload_gzip(self, mock_service): gzip=True) self.assertFalse(os.path.exists(self.testfile.name + '.gz')) self.assertIsNone(response) + + @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) + def test_upload_deprecated_params(self, mock_service): + test_bucket = 'test_bucket' + test_object = 'test_object' + + upload_method = mock_service.return_value.get_bucket.return_value\ + .blob.return_value.upload_from_filename + upload_method.return_value = None + + with self.assertWarns(DeprecationWarning): + self.gcs_hook.upload(test_bucket, + test_object, + self.testfile.name, + multipart=True) + + with self.assertWarns(DeprecationWarning): + self.gcs_hook.upload(test_bucket, + test_object, + self.testfile.name, + num_retries=2) From b9f1c3ecf61e11871ccad3b24ee4eaf3882da2cc Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 12 Apr 2019 16:30:25 +0100 Subject: [PATCH 4/5] Update test_gcs_hook.py --- tests/contrib/hooks/test_gcs_hook.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/contrib/hooks/test_gcs_hook.py b/tests/contrib/hooks/test_gcs_hook.py index 8eb0100d370a5..b13b1779b17e6 100644 --- a/tests/contrib/hooks/test_gcs_hook.py +++ b/tests/contrib/hooks/test_gcs_hook.py @@ -241,8 +241,7 @@ def test_delete(self, mock_service, mock_bucket): delete_method = get_blob_method.return_value.delete delete_method.return_value = blob_to_be_deleted - with self.assertWarns(DeprecationWarning): - response = self.gcs_hook.delete(bucket=test_bucket, object=test_object) + response = self.gcs_hook.delete(bucket=test_bucket, object=test_object) self.assertIsNone(response) @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) @@ -444,10 +443,9 @@ def test_upload(self, mock_service): .blob.return_value.upload_from_filename upload_method.return_value = None - with self.assertWarns(DeprecationWarning): - response = self.gcs_hook.upload(test_bucket, - test_object, - self.testfile.name) + response = self.gcs_hook.upload(test_bucket, + test_object, + self.testfile.name) self.assertIsNone(response) upload_method.assert_called_once_with( From f4d2db01c07871ab6e5c6492b4cc77433c0eabc6 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sat, 13 Apr 2019 20:05:36 +0100 Subject: [PATCH 5/5] Add tests --- airflow/contrib/hooks/gcs_hook.py | 2 +- tests/contrib/hooks/test_gcs_hook.py | 125 ++++++++++++++++++++++++++- 2 files changed, 124 insertions(+), 3 deletions(-) diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index ba71dd3900d96..b64cccce32194 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -335,7 +335,7 @@ def list(self, bucket, versions=None, maxResults=None, prefix=None, delimiter=No def get_size(self, bucket, object): """ - Gets the size of a file in Google Cloud Storage. + Gets the size of a file in Google Cloud Storage in bytes. :param bucket: The Google cloud storage bucket where the object is. :type bucket: str diff --git a/tests/contrib/hooks/test_gcs_hook.py b/tests/contrib/hooks/test_gcs_hook.py index b13b1779b17e6..64d458bb3a28d 100644 --- a/tests/contrib/hooks/test_gcs_hook.py +++ b/tests/contrib/hooks/test_gcs_hook.py @@ -16,9 +16,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +import io import six - import tempfile import os @@ -229,6 +228,40 @@ def test_rewrite(self, mock_service, mock_bucket): rewrite_method.assert_called_once_with( source=source_blob) + def test_rewrite_empty_source_bucket(self): + source_bucket = None + source_object = 'test-source-object' + destination_bucket = 'test-dest-bucket' + destination_object = 'test-dest-object' + + with self.assertRaises(ValueError) as e: + self.gcs_hook.rewrite(source_bucket=source_bucket, + source_object=source_object, + destination_bucket=destination_bucket, + destination_object=destination_object) + + self.assertEqual( + str(e.exception), + 'source_bucket and source_object cannot be empty.' + ) + + def test_rewrite_empty_source_object(self): + source_bucket = 'test-source-object' + source_object = None + destination_bucket = 'test-dest-bucket' + destination_object = 'test-dest-object' + + with self.assertRaises(ValueError) as e: + self.gcs_hook.rewrite(source_bucket=source_bucket, + source_object=source_object, + destination_bucket=destination_bucket, + destination_object=destination_object) + + self.assertEqual( + str(e.exception), + 'source_bucket and source_object cannot be empty.' + ) + @mock.patch('google.cloud.storage.Bucket') @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) def test_delete(self, mock_service, mock_bucket): @@ -257,6 +290,55 @@ def test_delete_nonexisting_object(self, mock_service): with self.assertRaises(exceptions.NotFound): self.gcs_hook.delete(bucket=test_bucket, object=test_object) + @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) + def test_object_get_size(self, mock_service): + test_bucket = 'test_bucket' + test_object = 'test_object' + returned_file_size = 1200 + + get_bucket_method = mock_service.return_value.get_bucket + get_blob_method = get_bucket_method.return_value.get_blob + get_blob_method.return_value.size = returned_file_size + + response = self.gcs_hook.get_size(bucket=test_bucket, object=test_object) + + self.assertEquals(response, returned_file_size) + get_blob_method.return_value.reload.assert_called_once_with() + + @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) + def test_object_get_crc32c(self, mock_service): + test_bucket = 'test_bucket' + test_object = 'test_object' + returned_file_crc32c = "xgdNfQ==" + + get_bucket_method = mock_service.return_value.get_bucket + get_blob_method = get_bucket_method.return_value.get_blob + get_blob_method.return_value.crc32c = returned_file_crc32c + + response = self.gcs_hook.get_crc32c(bucket=test_bucket, object=test_object) + + self.assertEquals(response, returned_file_crc32c) + + # Check that reload method is called + get_blob_method.return_value.reload.assert_called_once_with() + + @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) + def test_object_get_md5hash(self, mock_service): + test_bucket = 'test_bucket' + test_object = 'test_object' + returned_file_md5hash = "leYUJBUWrRtks1UeUFONJQ==" + + get_bucket_method = mock_service.return_value.get_bucket + get_blob_method = get_bucket_method.return_value.get_blob + get_blob_method.return_value.md5_hash = returned_file_md5hash + + response = self.gcs_hook.get_md5hash(bucket=test_bucket, object=test_object) + + self.assertEquals(response, returned_file_md5hash) + + # Check that reload method is called + get_blob_method.return_value.reload.assert_called_once_with() + @mock.patch('google.cloud.storage.Bucket') @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) def test_create_bucket(self, mock_service, mock_bucket): @@ -418,6 +500,45 @@ def test_compose_deprecated_params(self, mock_service): num_retries=5 ) + @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) + def test_download_as_string(self, mock_service): + test_bucket = 'test_bucket' + test_object = 'test_object' + test_object_bytes = io.BytesIO(b"input") + + download_method = mock_service.return_value.get_bucket.return_value \ + .blob.return_value.download_as_string + download_method.return_value = test_object_bytes + + response = self.gcs_hook.download(bucket=test_bucket, + object=test_object, + filename=None) + + self.assertEquals(response, test_object_bytes) + download_method.assert_called_once_with() + + @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn')) + def test_download_to_file(self, mock_service): + test_bucket = 'test_bucket' + test_object = 'test_object' + test_object_bytes = io.BytesIO(b"input") + test_file = 'test_file' + + download_filename_method = mock_service.return_value.get_bucket.return_value \ + .blob.return_value.download_to_filename + download_filename_method.return_value = None + + download_as_a_string_method = mock_service.return_value.get_bucket.return_value \ + .blob.return_value.download_as_string + download_as_a_string_method.return_value = test_object_bytes + + response = self.gcs_hook.download(bucket=test_bucket, + object=test_object, + filename=test_file) + + self.assertEquals(response, test_object_bytes) + download_filename_method.assert_called_once_with(test_file) + class TestGoogleCloudStorageHookUpload(unittest.TestCase): def setUp(self):