Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable custom predicates for media operations #1385

Merged
merged 5 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ Preview Release

Python Storage 3.0 is currently in a preview state. If you experience that
backwards compatibility for your application is broken with this release for any
reason, please let us know through the Github issues system. Thank you.
reason, please let us know through the Github issues system. While some breaks
of backwards compatibility may be unavoidable due to new features in the major
version release, we will do our best to minimize them. Thank you.

Exception Handling
~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -88,6 +90,9 @@ Miscellaneous

- The BlobWriter class now attempts to terminate an ongoing resumable upload if
the writer exits with an exception.
- Retry behavior is now identical between media operations (uploads and
downloads) and other operations, and custom predicates are now supported for
media operations as well.

Quick Start
-----------
Expand Down
24 changes: 0 additions & 24 deletions google/cloud/storage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from uuid import uuid4

from google.auth import environment_vars
from google.cloud.storage import _media
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED
Expand Down Expand Up @@ -625,29 +624,6 @@ def _bucket_bound_hostname_url(host, scheme=None):
return f"{scheme}://{host}"


def _api_core_retry_to_resumable_media_retry(retry):
"""Convert google.api.core.Retry to google.cloud.storage._media.RetryStrategy.

Custom predicates are not translated.

:type retry: google.api_core.Retry
:param retry: (Optional) The google.api_core.Retry object to translate.

:rtype: google.cloud.storage._media.RetryStrategy
:returns: A RetryStrategy with all applicable attributes copied from input.
"""

if retry is not None:
return _media.RetryStrategy(
max_sleep=retry._maximum,
max_cumulative_retry=retry._deadline,
initial_delay=retry._initial,
multiplier=retry._multiplier,
)
else:
return _media.RetryStrategy(max_retries=0)


def _get_invocation_id():
return "gccl-invocation-id/" + str(uuid4())

Expand Down
2 changes: 0 additions & 2 deletions google/cloud/storage/_media/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
.. _requests: http://docs.python-requests.org/
"""

from google.cloud.storage._media.common import RetryStrategy
from google.cloud.storage._media.common import UPLOAD_CHUNK_SIZE


__all__ = [
"RetryStrategy",
"UPLOAD_CHUNK_SIZE",
]
60 changes: 54 additions & 6 deletions google/cloud/storage/_media/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import re

from google.cloud.storage._media import _helpers
from google.cloud.storage._media import common
from google.cloud.storage.exceptions import InvalidResponse
from google.cloud.storage.retry import DEFAULT_RETRY


_CONTENT_RANGE_RE = re.compile(
Expand All @@ -45,14 +45,30 @@ class DownloadBase(object):
end (int): The last byte in a range to be downloaded.
headers (Optional[Mapping[str, str]]): Extra headers that should
be sent with the request, e.g. headers for encrypted data.
retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
A None value will disable retries. A google.api_core.retry.Retry
value will enable retries, and the object will configure backoff and
timeout options.

See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.

Attributes:
media_url (str): The URL containing the media to be downloaded.
start (Optional[int]): The first byte in a range to be downloaded.
end (Optional[int]): The last byte in a range to be downloaded.
"""

def __init__(self, media_url, stream=None, start=None, end=None, headers=None):
def __init__(
self,
media_url,
stream=None,
start=None,
end=None,
headers=None,
retry=DEFAULT_RETRY,
):
self.media_url = media_url
self._stream = stream
self.start = start
Expand All @@ -61,7 +77,7 @@ def __init__(self, media_url, stream=None, start=None, end=None, headers=None):
headers = {}
self._headers = headers
self._finished = False
self._retry_strategy = common.RetryStrategy()
self._retry_strategy = retry

@property
def finished(self):
Expand Down Expand Up @@ -133,6 +149,15 @@ class Download(DownloadBase):
values are "md5", "crc32c", "auto" and None. The default is "auto",
which will try to detect if the C extension for crc32c is installed
and fall back to md5 otherwise.
retry (Optional[google.api_core.retry.Retry]): How to retry the
RPC. A None value will disable retries. A
google.api_core.retry.Retry value will enable retries, and the
object will configure backoff and timeout options.

See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.

"""

def __init__(
Expand All @@ -143,9 +168,10 @@ def __init__(
end=None,
headers=None,
checksum="auto",
retry=DEFAULT_RETRY,
):
super(Download, self).__init__(
media_url, stream=stream, start=start, end=end, headers=headers
media_url, stream=stream, start=start, end=end, headers=headers, retry=retry
)
self.checksum = checksum
if self.checksum == "auto":
Expand Down Expand Up @@ -242,6 +268,14 @@ class ChunkedDownload(DownloadBase):
headers (Optional[Mapping[str, str]]): Extra headers that should
be sent with each request, e.g. headers for data encryption
key headers.
retry (Optional[google.api_core.retry.Retry]): How to retry the
RPC. A None value will disable retries. A
google.api_core.retry.Retry value will enable retries, and the
object will configure backoff and timeout options.

See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.

Attributes:
media_url (str): The URL containing the media to be downloaded.
Expand All @@ -253,13 +287,27 @@ class ChunkedDownload(DownloadBase):
ValueError: If ``start`` is negative.
"""

def __init__(self, media_url, chunk_size, stream, start=0, end=None, headers=None):
def __init__(
self,
media_url,
chunk_size,
stream,
start=0,
end=None,
headers=None,
retry=DEFAULT_RETRY,
):
if start < 0:
raise ValueError(
"On a chunked download the starting " "value cannot be negative."
)
super(ChunkedDownload, self).__init__(
media_url, stream=stream, start=start, end=end, headers=headers
media_url,
stream=stream,
start=start,
end=end,
headers=headers,
retry=retry,
)
self.chunk_size = chunk_size
self._bytes_downloaded = 0
Expand Down
33 changes: 2 additions & 31 deletions google/cloud/storage/_media/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import base64
import hashlib
import logging
import random

from urllib.parse import parse_qs
from urllib.parse import urlencode
from urllib.parse import urlsplit
from urllib.parse import urlunsplit

from google.cloud.storage._media import common
from google.cloud.storage import retry
from google.cloud.storage.exceptions import InvalidResponse


Expand Down Expand Up @@ -101,7 +100,7 @@ def require_status_code(response, status_codes, get_status_code, callback=do_not
"""
status_code = get_status_code(response)
if status_code not in status_codes:
if status_code not in common.RETRYABLE:
if status_code not in retry._RETRYABLE_STATUS_CODES:
callback()
raise InvalidResponse(
response,
Expand All @@ -113,34 +112,6 @@ def require_status_code(response, status_codes, get_status_code, callback=do_not
return status_code


def calculate_retry_wait(base_wait, max_sleep, multiplier=2.0):
"""Calculate the amount of time to wait before a retry attempt.

Wait time grows exponentially with the number of attempts, until
``max_sleep``.

A random amount of jitter (between 0 and 1 seconds) is added to spread out
retry attempts from different clients.

Args:
base_wait (float): The "base" wait time (i.e. without any jitter)
that will be multiplied until it reaches the maximum sleep.
max_sleep (float): Maximum value that a sleep time is allowed to be.
multiplier (float): Multiplier to apply to the base wait.

Returns:
Tuple[float, float]: The new base wait time as well as the wait time
to be applied (with a random amount of jitter between 0 and 1 seconds
added).
"""
new_base_wait = multiplier * base_wait
if new_base_wait > max_sleep:
new_base_wait = max_sleep

jitter_ms = random.randint(0, 1000)
return new_base_wait, new_base_wait + 0.001 * jitter_ms


def _get_metadata_key(checksum_type):
if checksum_type == "md5":
return "md5Hash"
Expand Down
Loading