-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add GCS Requester Pays bucket support to GCSToS3Operator #32760
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -197,7 +197,6 @@ def copy( | |
destination_object = destination_object or source_object | ||
|
||
if source_bucket == destination_bucket and source_object == destination_object: | ||
|
||
raise ValueError( | ||
f"Either source/destination bucket or source/destination object must be different, " | ||
f"not both the same: bucket={source_bucket}, object={source_object}" | ||
|
@@ -282,6 +281,7 @@ def download( | |
chunk_size: int | None = None, | ||
timeout: int | None = DEFAULT_TIMEOUT, | ||
num_max_attempts: int | None = 1, | ||
user_project: str | None = None, | ||
) -> bytes: | ||
... | ||
|
||
|
@@ -294,6 +294,7 @@ def download( | |
chunk_size: int | None = None, | ||
timeout: int | None = DEFAULT_TIMEOUT, | ||
num_max_attempts: int | None = 1, | ||
user_project: str | None = None, | ||
) -> str: | ||
... | ||
|
||
|
@@ -305,6 +306,7 @@ def download( | |
chunk_size: int | None = None, | ||
timeout: int | None = DEFAULT_TIMEOUT, | ||
num_max_attempts: int | None = 1, | ||
user_project: str | None = None, | ||
) -> str | bytes: | ||
""" | ||
Downloads a file from Google Cloud Storage. | ||
|
@@ -320,6 +322,8 @@ def download( | |
:param chunk_size: Blob chunk size. | ||
:param timeout: Request timeout in seconds. | ||
:param num_max_attempts: Number of attempts to download the file. | ||
:param user_project: The identifier of the Google Cloud project to bill for the request. | ||
Required for Requester Pays buckets. | ||
""" | ||
# TODO: future improvement check file size before downloading, | ||
# to check for local space availability | ||
|
@@ -330,7 +334,7 @@ def download( | |
try: | ||
num_file_attempts += 1 | ||
client = self.get_conn() | ||
bucket = client.bucket(bucket_name) | ||
bucket = client.bucket(bucket_name, user_project=user_project) | ||
blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size) | ||
|
||
if filename: | ||
|
@@ -395,6 +399,7 @@ def provide_file( | |
object_name: str | None = None, | ||
object_url: str | None = None, | ||
dir: str | None = None, | ||
user_project: str | None = None, | ||
) -> Generator[IO[bytes], None, None]: | ||
""" | ||
Downloads the file to a temporary directory and returns a file handle. | ||
|
@@ -406,13 +411,20 @@ def provide_file( | |
:param object_name: The object to fetch. | ||
:param object_url: File reference url. Must start with "gs: //" | ||
:param dir: The tmp sub directory to download the file to. (passed to NamedTemporaryFile) | ||
:param user_project: The identifier of the Google Cloud project to bill for the request. | ||
Required for Requester Pays buckets. | ||
:return: File handler | ||
""" | ||
if object_name is None: | ||
raise ValueError("Object name can not be empty") | ||
_, _, file_name = object_name.rpartition("/") | ||
with NamedTemporaryFile(suffix=file_name, dir=dir) as tmp_file: | ||
self.download(bucket_name=bucket_name, object_name=object_name, filename=tmp_file.name) | ||
self.download( | ||
bucket_name=bucket_name, | ||
object_name=object_name, | ||
filename=tmp_file.name, | ||
user_project=user_project, | ||
) | ||
tmp_file.flush() | ||
yield tmp_file | ||
|
||
|
@@ -423,6 +435,7 @@ def provide_file_and_upload( | |
bucket_name: str = PROVIDE_BUCKET, | ||
object_name: str | None = None, | ||
object_url: str | None = None, | ||
user_project: str | None = None, | ||
) -> Generator[IO[bytes], None, None]: | ||
""" | ||
Creates temporary file, returns a file handle and uploads the files content on close. | ||
|
@@ -433,6 +446,8 @@ def provide_file_and_upload( | |
:param bucket_name: The bucket to fetch from. | ||
:param object_name: The object to fetch. | ||
:param object_url: File reference url. Must start with "gs: //" | ||
:param user_project: The identifier of the Google Cloud project to bill for the request. | ||
Required for Requester Pays buckets. | ||
:return: File handler | ||
""" | ||
if object_name is None: | ||
|
@@ -442,7 +457,12 @@ def provide_file_and_upload( | |
with NamedTemporaryFile(suffix=file_name) as tmp_file: | ||
yield tmp_file | ||
tmp_file.flush() | ||
self.upload(bucket_name=bucket_name, object_name=object_name, filename=tmp_file.name) | ||
self.upload( | ||
bucket_name=bucket_name, | ||
object_name=object_name, | ||
filename=tmp_file.name, | ||
user_project=user_project, | ||
) | ||
|
||
def upload( | ||
self, | ||
|
@@ -458,6 +478,7 @@ def upload( | |
num_max_attempts: int = 1, | ||
metadata: dict | None = None, | ||
cache_control: str | None = None, | ||
user_project: str | None = None, | ||
) -> None: | ||
""" | ||
Uploads a local file or file data as string or bytes to Google Cloud Storage. | ||
|
@@ -474,6 +495,8 @@ def upload( | |
:param num_max_attempts: Number of attempts to try to upload the file. | ||
:param metadata: The metadata to be uploaded with the file. | ||
:param cache_control: Cache-Control metadata field. | ||
:param user_project: The identifier of the Google Cloud project to bill for the request. | ||
Required for Requester Pays buckets. | ||
""" | ||
|
||
def _call_with_retry(f: Callable[[], None]) -> None: | ||
|
@@ -506,7 +529,7 @@ def _call_with_retry(f: Callable[[], None]) -> None: | |
continue | ||
|
||
client = self.get_conn() | ||
bucket = client.bucket(bucket_name) | ||
bucket = client.bucket(bucket_name, user_project=user_project) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can add support for "requester pays" buckets by specifying |
||
blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size) | ||
|
||
if metadata: | ||
|
@@ -596,7 +619,6 @@ def is_updated_after(self, bucket_name: str, object_name: str, ts: datetime) -> | |
""" | ||
blob_update_time = self.get_blob_update_time(bucket_name, object_name) | ||
if blob_update_time is not None: | ||
|
||
if not ts.tzinfo: | ||
ts = ts.replace(tzinfo=timezone.utc) | ||
self.log.info("Verify object date: %s > %s", blob_update_time, ts) | ||
|
@@ -618,7 +640,6 @@ def is_updated_between( | |
""" | ||
blob_update_time = self.get_blob_update_time(bucket_name, object_name) | ||
if blob_update_time is not None: | ||
|
||
if not min_ts.tzinfo: | ||
min_ts = min_ts.replace(tzinfo=timezone.utc) | ||
if not max_ts.tzinfo: | ||
|
@@ -639,7 +660,6 @@ def is_updated_before(self, bucket_name: str, object_name: str, ts: datetime) -> | |
""" | ||
blob_update_time = self.get_blob_update_time(bucket_name, object_name) | ||
if blob_update_time is not None: | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Black formatter removed some superfluous space. |
||
if not ts.tzinfo: | ||
ts = ts.replace(tzinfo=timezone.utc) | ||
self.log.info("Verify object date: %s < %s", blob_update_time, ts) | ||
|
@@ -681,16 +701,18 @@ def delete(self, bucket_name: str, object_name: str) -> None: | |
|
||
self.log.info("Blob %s deleted.", object_name) | ||
|
||
def delete_bucket(self, bucket_name: str, force: bool = False) -> None: | ||
def delete_bucket(self, bucket_name: str, force: bool = False, user_project: str | None = None) -> None: | ||
""" | ||
Delete a bucket object from the Google Cloud Storage. | ||
|
||
:param bucket_name: name of the bucket which will be deleted | ||
:param force: false not allow to delete non empty bucket, set force=True | ||
allows to delete non empty bucket | ||
:param user_project: The identifier of the Google Cloud project to bill for the request. | ||
Required for Requester Pays buckets. | ||
""" | ||
client = self.get_conn() | ||
bucket = client.bucket(bucket_name) | ||
bucket = client.bucket(bucket_name, user_project=user_project) | ||
|
||
self.log.info("Deleting %s bucket", bucket_name) | ||
try: | ||
|
@@ -707,6 +729,7 @@ def list( | |
prefix: str | List[str] | None = None, | ||
delimiter: str | None = None, | ||
match_glob: str | None = None, | ||
user_project: str | None = None, | ||
): | ||
""" | ||
List all objects from the bucket with the given a single prefix or multiple prefixes. | ||
|
@@ -718,6 +741,8 @@ def list( | |
:param delimiter: (Deprecated) filters objects based on the delimiter (for e.g '.csv') | ||
:param match_glob: (Optional) filters objects based on the glob pattern given by the string | ||
(e.g, ``'**/*/.json'``). | ||
:param user_project: The identifier of the Google Cloud project to bill for the request. | ||
Required for Requester Pays buckets. | ||
:return: a stream of object names matching the filtering criteria | ||
""" | ||
if delimiter and delimiter != "/": | ||
|
@@ -739,6 +764,7 @@ def list( | |
prefix=prefix_item, | ||
delimiter=delimiter, | ||
match_glob=match_glob, | ||
user_project=user_project, | ||
) | ||
) | ||
else: | ||
|
@@ -750,6 +776,7 @@ def list( | |
prefix=prefix, | ||
delimiter=delimiter, | ||
match_glob=match_glob, | ||
user_project=user_project, | ||
) | ||
) | ||
return objects | ||
|
@@ -762,6 +789,7 @@ def _list( | |
prefix: str | None = None, | ||
delimiter: str | None = None, | ||
match_glob: str | None = None, | ||
user_project: str | None = None, | ||
) -> List: | ||
""" | ||
List all objects from the bucket with the give string prefix in name. | ||
|
@@ -773,10 +801,12 @@ def _list( | |
:param delimiter: (Deprecated) filters objects based on the delimiter (for e.g '.csv') | ||
:param match_glob: (Optional) filters objects based on the glob pattern given by the string | ||
(e.g, ``'**/*/.json'``). | ||
:param user_project: The identifier of the Google Cloud project to bill for the request. | ||
Required for Requester Pays buckets. | ||
:return: a stream of object names matching the filtering criteria | ||
""" | ||
client = self.get_conn() | ||
bucket = client.bucket(bucket_name) | ||
bucket = client.bucket(bucket_name, user_project=user_project) | ||
|
||
ids = [] | ||
page_token = None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed names of "hook" and "files" variables to "gcs_hook" and "gcs_files" respectively to differentiate from s3 hook and files.