From a62e613da343fa7cc6c22e98b859caa59d88eeb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Mart=C3=ADn=20Vedriel?= Date: Sun, 25 Aug 2024 08:45:25 +0200 Subject: [PATCH] SQS: Fixes #2091 queue_delete() method doesn't actually delete the queue (#2099) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * SQS: wrap logic to resolve queue URL into a reusable method * Create new `_resolve_queue_url` method for reusability. `_new_queue` method retrieves the queue URL from cache or fetches it from SQS, updating the cache if necessary. `_delete` method needs to resolve the queue URL in order to delete the queue from SQS (next commit). Both methods will be able to reuse the same functionality by calling the `_resolve_queue_url` method. * Introduce DoesNotExistQueueException for easier error handling. `_new_queue` method is responsible for creating a new queue when it doesn't exist, utilizing the new exception for clarity. * Unit test coverage for `_resolve_queue_url` method. * SQS: Fix missing queue deletion in Channel._delete * Add call to `delete_queue` using sqs client. `_delete` method is expected to delete the specified queue when called. Previously, this functionality was missing, which has now been corrected. The method raises a `DoesNotExistQueueException` if the specified queue name doesn’t exist. * Update unit tests with new assertion and mock to verify queue deletion. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- kombu/transport/SQS.py | 38 ++++++++++++++++++++++++++---------- t/unit/transport/test_SQS.py | 16 +++++++++++++++ 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 0382009fe..26a69637e 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -192,6 +192,10 @@ class AccessDeniedQueueException(Exception): """ +class DoesNotExistQueueException(Exception): + """The specified queue doesn't exist.""" + + class QoS(virtual.QoS): """Quality of Service guarantees implementation for SQS.""" @@ -358,15 +362,8 @@ def entity_name(self, name, table=CHARS_REPLACE_TABLE): def canonical_queue_name(self, queue_name): return self.entity_name(self.queue_name_prefix + queue_name) - def _new_queue(self, queue, **kwargs): - """Ensure a queue with given name exists in SQS. - - Arguments: - --------- - queue (str): the AMQP queue name - Returns - str: the SQS queue URL - """ + def _resolve_queue_url(self, queue): + """Try to retrieve the SQS queue URL for a given queue name.""" # Translate to SQS name for consistency with initial # _queue_cache population. sqs_qname = self.canonical_queue_name(queue) @@ -386,6 +383,23 @@ def _new_queue(self, queue, **kwargs): "defined in 'predefined_queues'." ).format(sqs_qname)) + raise DoesNotExistQueueException( + f"Queue with name '{sqs_qname}' doesn't exist in SQS" + ) + + def _new_queue(self, queue, **kwargs): + """Ensure a queue with given name exists in SQS. + + Arguments: + --------- + queue (str): the AMQP queue name + Returns + str: the SQS queue URL + """ + try: + return self._resolve_queue_url(queue) + except DoesNotExistQueueException: + sqs_qname = self.canonical_queue_name(queue) attributes = {'VisibilityTimeout': str(self.visibility_timeout)} if sqs_qname.endswith('.fifo'): attributes['FifoQueue'] = 'true' @@ -414,7 +428,11 @@ def _delete(self, queue, *args, **kwargs): """Delete queue by name.""" if self.predefined_queues: return - super()._delete(queue) + + q_url = self._resolve_queue_url(queue) + self.sqs().delete_queue( + QueueUrl=q_url, + ) self._queue_cache.pop(queue, None) def _put(self, queue, message, **kwargs): diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 0f5971f43..8edc61ce4 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -137,6 +137,16 @@ def purge_queue(self, QueueUrl=None): if q.url == QueueUrl: q.messages = [] + def delete_queue(self, QueueUrl=None): + queue_name = None + for key, val in self._queues.items(): + if val.url == QueueUrl: + queue_name = key + break + if queue_name is None: + raise Exception(f"Queue url {QueueUrl} not found") + del self._queues[queue_name] + class test_Channel: @@ -262,6 +272,11 @@ def test_entity_name(self): 'foo-bar-baz_qux_quux' assert self.channel.entity_name('abcdef.fifo') == 'abcdef.fifo' + def test_resolve_queue_url(self): + queue_name = 'unittest_queue' + assert self.sqs_conn_mock._queues[queue_name].url == \ + self.channel._resolve_queue_url(queue_name) + def test_new_queue(self): queue_name = 'new_unittest_queue' self.channel._new_queue(queue_name) @@ -316,6 +331,7 @@ def test_delete(self): self.channel._new_queue(queue_name) self.channel._delete(queue_name) assert queue_name not in self.channel._queue_cache + assert queue_name not in self.sqs_conn_mock._queues def test_get_from_sqs(self): # Test getting a single message