Skip to content

Commit

Permalink
clickhouse: bulk delete dangling object storage files
Browse files Browse the repository at this point in the history
Replace delete_item with delete_items in order
to support bulk deletion of object storage files for gcp.
  • Loading branch information
tilman-aiven committed Sep 17, 2024
1 parent 17f5667 commit bf157ac
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 6 deletions.
7 changes: 7 additions & 0 deletions astacus/coordinator/plugins/clickhouse/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def list_items(self) -> list[ObjectStorageItem]: ...
@abstractmethod
def delete_item(self, key: str) -> None: ...

@abstractmethod
def delete_items(self, keys: Sequence[str]) -> None: ...

@abstractmethod
def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str], *, stats: StatsClient | None) -> None: ...

Expand All @@ -68,6 +71,10 @@ def delete_item(self, key: str) -> None:
with self._storage_lock:
self._storage.delete_key(key)

def delete_items(self, keys: Sequence[str]) -> None:
with self._storage_lock:
self._storage.delete_keys(keys)

def copy_items_from(self, source: ObjectStorage, keys: Sequence[str], *, stats: StatsClient | None) -> None:
# In theory this could deadlock if some other place was locking the same two storages
# in the reverse order at the same time. Within the context of backups and restore,
Expand Down
7 changes: 2 additions & 5 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,11 +918,8 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None:
if disk_kept_path not in disk_available_paths:
# Make sure the non-deleted files are actually in object storage
raise StepFailedError(f"missing object storage file in disk {disk_name!r}: {disk_kept_path!r}")
logger.info("found %d object storage files to remove in disk %r", len(keys_to_remove), disk_name)
for key_to_remove in keys_to_remove:
# We should really have a batch delete operation there, but it's missing from rohmu
logger.debug("deleting object storage file in disk %r : %r", disk_name, key_to_remove)
disk_object_storage.delete_item(key_to_remove)
logger.info("deleting %d object storage files to remove in disk %r", len(keys_to_remove), disk_name)
disk_object_storage.delete_items(keys_to_remove)
finally:
disk_object_storage.close()

Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies = [
"protobuf < 3.21",
"pydantic < 2",
"pyyaml",
"rohmu >= 2.7.0",
"rohmu@git+ssh://[email protected]/Aiven-Open/rohmu#egg=45d6cca2223a0b07288094285df00898357507a8",
"sentry-sdk",
"starlette",
"tabulate",
Expand Down Expand Up @@ -132,6 +132,9 @@ astacus = "astacus.main:main"
[tool.hatch.version]
source = "vcs"

[tool.hatch.metadata]
allow-direct-references = true

[tool.isort]
no_sections = true
force_alphabetical_sort = true
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/coordinator/plugins/clickhouse/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def delete_item(self, key: str) -> None:
logger.info("deleting item: %r", key)
self.items.pop(key)

def delete_items(self, keys: Sequence[str]) -> None:
for key in keys:
self.delete_item(key)

def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str], *, stats: StatsClient | None) -> None:
keys_set = set(keys)
keys_to_copy = len(keys_set)
Expand Down

0 comments on commit bf157ac

Please sign in to comment.