Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clickhouse: bulk delete dangling object storage files for gcp #237

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions astacus/coordinator/plugins/clickhouse/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def list_items(self) -> list[ObjectStorageItem]: ...
@abstractmethod
def delete_item(self, key: str) -> None: ...

@abstractmethod
def delete_items(self, keys: Sequence[str]) -> None: ...

@abstractmethod
def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str], *, stats: StatsClient | None) -> None: ...

Expand All @@ -68,6 +71,10 @@ def delete_item(self, key: str) -> None:
with self._storage_lock:
self._storage.delete_key(key)

def delete_items(self, keys: Sequence[str]) -> None:
with self._storage_lock:
self._storage.delete_keys(keys)

def copy_items_from(self, source: ObjectStorage, keys: Sequence[str], *, stats: StatsClient | None) -> None:
# In theory this could deadlock if some other place was locking the same two storages
# in the reverse order at the same time. Within the context of backups and restore,
Expand Down
5 changes: 1 addition & 4 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,10 +1035,7 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None:
# Make sure the non-deleted files are actually in object storage
raise StepFailedError(f"missing object storage file in disk {disk_name!r}: {disk_kept_path!r}")
logger.info("found %d object storage files to remove in disk %r", len(keys_to_remove), disk_name)
for key_to_remove in keys_to_remove:
# We should really have a batch delete operation there, but it's missing from rohmu
logger.debug("deleting object storage file in disk %r : %r", disk_name, key_to_remove)
disk_object_storage.delete_item(key_to_remove)
disk_object_storage.delete_items(keys_to_remove)
finally:
disk_object_storage.close()

Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies = [
"protobuf < 3.21",
"pydantic < 2",
"pyyaml",
"rohmu >= 2.7.0",
"rohmu@git+ssh://[email protected]/Aiven-Open/rohmu#egg=fe750f754d2648564e7130bd790e53a5fdc170b3",
"sentry-sdk",
"starlette",
"tabulate",
Expand Down Expand Up @@ -132,6 +132,9 @@ astacus = "astacus.main:main"
[tool.hatch.version]
source = "vcs"

[tool.hatch.metadata]
allow-direct-references = true

[tool.isort]
no_sections = true
force_alphabetical_sort = true
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/coordinator/plugins/clickhouse/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def delete_item(self, key: str) -> None:
logger.info("deleting item: %r", key)
self.items.pop(key)

def delete_items(self, keys: Sequence[str]) -> None:
for key in keys:
self.delete_item(key)

def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str], *, stats: StatsClient | None) -> None:
keys_set = set(keys)
keys_to_copy = len(keys_set)
Expand Down
Loading