diff --git a/astacus/coordinator/plugins/clickhouse/object_storage.py b/astacus/coordinator/plugins/clickhouse/object_storage.py index 48ab7f17..564a1c7c 100644 --- a/astacus/coordinator/plugins/clickhouse/object_storage.py +++ b/astacus/coordinator/plugins/clickhouse/object_storage.py @@ -42,6 +42,9 @@ def list_items(self) -> list[ObjectStorageItem]: ... @abstractmethod def delete_item(self, key: str) -> None: ... + @abstractmethod + def delete_items(self, keys: Sequence[str]) -> None: ... + @abstractmethod def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str], *, stats: StatsClient | None) -> None: ... @@ -68,6 +71,10 @@ def delete_item(self, key: str) -> None: with self._storage_lock: self._storage.delete_key(key) + def delete_items(self, keys: Sequence[str]) -> None: + with self._storage_lock: + self._storage.delete_keys(keys) + def copy_items_from(self, source: ObjectStorage, keys: Sequence[str], *, stats: StatsClient | None) -> None: # In theory this could deadlock if some other place was locking the same two storages # in the reverse order at the same time. Within the context of backups and restore, diff --git a/astacus/coordinator/plugins/clickhouse/steps.py b/astacus/coordinator/plugins/clickhouse/steps.py index 44946ef2..d951ade7 100644 --- a/astacus/coordinator/plugins/clickhouse/steps.py +++ b/astacus/coordinator/plugins/clickhouse/steps.py @@ -918,11 +918,8 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None: if disk_kept_path not in disk_available_paths: # Make sure the non-deleted files are actually in object storage raise StepFailedError(f"missing object storage file in disk {disk_name!r}: {disk_kept_path!r}") - logger.info("found %d object storage files to remove in disk %r", len(keys_to_remove), disk_name) - for key_to_remove in keys_to_remove: - # We should really have a batch delete operation there, but it's missing from rohmu - logger.debug("deleting object storage file in disk %r : %r", disk_name, key_to_remove) - disk_object_storage.delete_item(key_to_remove) + logger.info("deleting %d object storage files to remove in disk %r", len(keys_to_remove), disk_name) + disk_object_storage.delete_items(keys_to_remove) finally: disk_object_storage.close() diff --git a/pyproject.toml b/pyproject.toml index 2a2c710c..f1c16b1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ "protobuf < 3.21", "pydantic < 2", "pyyaml", - "rohmu >= 2.7.0", + "rohmu@git+ssh://git@github.com/Aiven-Open/rohmu#egg=45d6cca2223a0b07288094285df00898357507a8", "sentry-sdk", "starlette", "tabulate", @@ -132,6 +132,9 @@ astacus = "astacus.main:main" [tool.hatch.version] source = "vcs" +[tool.hatch.metadata] +allow-direct-references = true + [tool.isort] no_sections = true force_alphabetical_sort = true diff --git a/tests/unit/coordinator/plugins/clickhouse/object_storage.py b/tests/unit/coordinator/plugins/clickhouse/object_storage.py index e3c50e40..100f86cc 100644 --- a/tests/unit/coordinator/plugins/clickhouse/object_storage.py +++ b/tests/unit/coordinator/plugins/clickhouse/object_storage.py @@ -44,6 +44,10 @@ def delete_item(self, key: str) -> None: logger.info("deleting item: %r", key) self.items.pop(key) + def delete_items(self, keys: Sequence[str]) -> None: + for key in keys: + self.delete_item(key) + def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str], *, stats: StatsClient | None) -> None: keys_set = set(keys) keys_to_copy = len(keys_set)