-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add GCS Requester Pays bucket support to GCSToS3Operator #32760
Conversation
|
||
def execute(self, context: Context) -> list[str]: | ||
# list all files in an Google Cloud Storage bucket | ||
hook = GCSHook( | ||
gcs_hook = GCSHook( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed names of "hook" and "files" variables to "gcs_hook" and "gcs_files" respectively to differentiate from s3 hook and files.
@@ -506,7 +529,7 @@ def _call_with_retry(f: Callable[[], None]) -> None: | |||
continue | |||
|
|||
client = self.get_conn() | |||
bucket = client.bucket(bucket_name) | |||
bucket = client.bucket(bucket_name, user_project=user_project) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add support for "requester pays" buckets by specifying user_project
when creating a bucket object. All updates in this PR are basically to pass user_project
from the operator level to this bucket object factory method.
@@ -639,7 +660,6 @@ def is_updated_before(self, bucket_name: str, object_name: str, ts: datetime) -> | |||
""" | |||
blob_update_time = self.get_blob_update_time(bucket_name, object_name) | |||
if blob_update_time is not None: | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Black formatter removed some superfluous space.
create_gcs_bucket = GCSCreateBucketOperator( | ||
task_id="create_gcs_bucket", | ||
bucket_name=gcs_bucket, | ||
resource={"billing": {"requesterPays": True}}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To test that GCSToS3Operator works with requester pays buckets, you need a bucket with "requester pays" enabled. Luckily, you don't need two GCP accounts to use this feature. You can specify your own GCP project ID as the billable project. If a bucket has "requester pays" enabled and you don't specify a user_project
, an error will occur.
Static checks passing locally |
@eladkal @o-nikolas Please review this at your earliest convenience! |
@@ -503,7 +503,7 @@ def test_delete_bucket(self, mock_service): | |||
|
|||
self.gcs_hook.delete_bucket(bucket_name=test_bucket) | |||
|
|||
mock_service.return_value.bucket.assert_called_once_with(test_bucket) | |||
mock_service.return_value.bucket.assert_called_once_with(test_bucket, user_project=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? It is the default value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I thought it was a little silly, but the assertion method recognizes the difference between specifying None
upfront and leaving it as the default value. I only updated this code because CI tests were failing.
# [START howto_transfer_gcs_to_s3] | ||
gcs_to_s3 = GCSToS3Operator( | ||
task_id="gcs_to_s3", | ||
bucket=s3_bucket, | ||
dest_s3_key=s3_key, | ||
bucket=gcs_bucket, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't believe that's been there this whole time. Maybe renaming that param to gcs_bucket should be on the todo list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. I'd be happy to change the declaration (keeping support for the old param name and adding a deprecation warning) in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, I definitely didn't mean to add to the scope of this one. If you want to take it in another PR, that's great. Ping me in it and I'll review it for you as well.
The AWS team runs "all" of the system tests regularly with a dashboard and fixes any regressions, but doesn't run any of the ones that require outside services (like maintaining GCS or MongoDB accounts) or that would have been caught long ago. [Speaking for myself here and not on behalf of anyone else] It would be neat to see if the Google team and the AWS team could work something out to get that combination of tests running somehow.
Related: #31137, #32296
Summary
This PR adds an optional
user_project
argument toGCSToS3Operator
in order to support operations on Google Cloud Storage buckets that have the Requester Pays feature enabled.Classes and methods affected:
Testing
All affected classes and methods are covered in the below system test.