Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp: bulk request for gcp object storage deletion #193

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions rohmu/object_storage/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

from contextlib import contextmanager
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.errors import BatchError, HttpError
from googleapiclient.http import (
BatchHttpRequest,
build_http,
HttpRequest,
MediaDownloadProgress,
Expand Down Expand Up @@ -45,12 +46,13 @@
GoogleObjectStorageConfig as Config,
)
from rohmu.typing import AnyPath, Metadata
from rohmu.util import get_total_size_from_content_range
from rohmu.util import batched, get_total_size_from_content_range
from typing import (
Any,
BinaryIO,
Callable,
cast,
Collection,
Iterable,
Iterator,
Optional,
Expand Down Expand Up @@ -302,13 +304,15 @@ def _bucket_client(self) -> Iterator[Any]:
self.gs_bucket_client = self.gs.buckets()
yield self.gs_bucket_client

def _retry_on_reset(self, request: HttpRequest, action: Callable[[], ResType], retry_reporter: Reporter) -> ResType:
def _retry_on_reset(
self, request: HttpRequest | BatchHttpRequest, action: Callable[[], ResType], retry_reporter: Reporter
) -> ResType:
retries = 60
retry_wait = 2.0
while True:
try:
return action()
except (IncompleteRead, HttpError, ssl.SSLEOFError, socket.timeout, OSError, socket.gaierror) as ex:
except (IncompleteRead, HttpError, BatchError, ssl.SSLEOFError, socket.timeout, OSError, socket.gaierror) as ex:
# Note that socket.timeout and ssl.SSLEOFError inherit from OSError
# and the order of handling the errors here needs to be correct
if not retries:
Expand All @@ -327,6 +331,9 @@ def _retry_on_reset(self, request: HttpRequest, action: Callable[[], ResType], r
# getaddrinfo sometimes fails with "Name or service not known"
elif isinstance(ex, socket.gaierror) and ex.errno != socket.EAI_NONAME:
raise
# batch request or response has a wrong format
elif isinstance(ex, BatchError):
raise

self.log.warning("%s failed: %s (%s), retrying in %.2fs", action, ex.__class__.__name__, ex, retry_wait)

Expand Down Expand Up @@ -463,6 +470,34 @@ def delete_key(self, key: str) -> None:
reporter.report(self.stats)
self.notifier.object_deleted(key)

def delete_keys(self, keys: Collection[str]) -> None:
self.log.debug("Deleting %i keys", len(keys))
reporter = Reporter(StorageOperation.delete_key)
with self._object_client() as object_resource:
for keys_batch in batched(keys, 1000): # Cannot delete more than 1000 objects at a time
assert self.gs is not None
batch_request = self.gs.new_batch_http_request(callback=self._delete_keys_callback)
for key in keys_batch:
path = self.format_key_for_backend(key)
self.log.debug("Deleting key: %r", path)
batch_request.add(object_resource.delete(bucket=self.bucket_name, object=path), request_id=key)

self._retry_on_reset(batch_request, batch_request.execute, retry_reporter=reporter)

for key in keys_batch:
self.log.debug("Deleted key: %r", key)
self.notifier.object_deleted(key)

reporter.report(self.stats)

def _delete_keys_callback(self, request_id: str, response: HttpRequest, exception: HttpError | None) -> None:
if exception is not None:
self.log.error(
"Received server error %r for request_id %s, Google API delete operation",
exception.resp["status"],
request_id,
)

def get_contents_to_fileobj(
self,
key: str,
Expand Down
42 changes: 41 additions & 1 deletion test/object_storage/test_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,45 @@ def test_store_file_object() -> None:
)


@pytest.mark.parametrize(
("total_keys,expected_bulk_request_count"),
(
(0, 0),
(1, 1),
(1000, 1),
(1001, 2),
(2000, 2),
(2001, 3),
(10_000, 10),
),
)
def test_delete_keys(total_keys: int, expected_bulk_request_count: int) -> None:
notifier = MagicMock()
test_keys = _generate_keys(total_keys)
with ExitStack() as stack:
stack.enter_context(patch("rohmu.object_storage.google.get_credentials"))
stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._create_object_store_if_needed_unwrapped"))
mock_retry_on_reset = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._retry_on_reset"))
transfer = GoogleTransfer(
project_id="test-project-id",
bucket_name="test-bucket",
notifier=notifier,
)
mock_client = stack.enter_context(patch.object(transfer, "_object_client"))
mock_request = _mock_request([], resumable=None)
mock_client.return_value.__enter__.return_value.delete.return_value = mock_request

transfer.delete_keys(keys=test_keys)

assert notifier.object_deleted.call_count == total_keys
notifier.object_deleted.assert_has_calls([call(key) for key in test_keys])
assert mock_retry_on_reset.call_count == expected_bulk_request_count


def _generate_keys(total: int, prefix: str = "test_key_") -> list[str]:
return [f"{prefix}{i+1}" for i in range(total)]


def test_upload_size_unknown_to_reporter() -> None:
notifier = MagicMock()
with ExitStack() as stack:
Expand Down Expand Up @@ -253,7 +292,7 @@ def test_get_contents_to_fileobj_raises_error_on_invalid_byte_range() -> None:
)


def _mock_request(calls: list[tuple[str, bytes]]) -> Mock:
def _mock_request(calls: list[tuple[str, bytes]], resumable: bool | None = None) -> Mock:
results = []
for call_content_range, call_content in calls:
response = Mock()
Expand All @@ -268,6 +307,7 @@ def _mock_request(calls: list[tuple[str, bytes]]) -> Mock:
request = Mock()
request.headers = {}
request.http.request = http_call
request.resumable = resumable
return request


Expand Down
Loading