Skip to content

Commit

Permalink
SQS: Fixes #2091 queue_delete() method doesn't actually delete the qu…
Browse files Browse the repository at this point in the history
…eue (#2099)

* SQS: wrap logic to resolve queue URL into a reusable method

* Create new `_resolve_queue_url` method for reusability.

`_new_queue` method retrieves the queue URL from cache
or fetches it from SQS, updating the cache if necessary.

`_delete` method needs to resolve the queue URL in order to
delete the queue from SQS (next commit).

Both methods will be able to reuse the same functionality by
calling the `_resolve_queue_url` method.

* Introduce DoesNotExistQueueException for easier error handling.

`_new_queue` method is responsible for creating a new queue when
 it doesn't exist, utilizing the new exception for clarity.

* Unit test coverage for `_resolve_queue_url` method.

* SQS: Fix missing queue deletion in Channel._delete

* Add call to `delete_queue` using sqs client.

`_delete` method is expected to delete the specified queue when
called. Previously, this functionality was missing, which has
now been corrected.

The method raises a `DoesNotExistQueueException` if the specified
queue name doesn’t exist.

* Update unit tests with new assertion and mock to verify queue deletion.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
ivanprjcts and pre-commit-ci[bot] authored Aug 25, 2024
1 parent ad35ab5 commit a62e613
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 10 deletions.
38 changes: 28 additions & 10 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ class AccessDeniedQueueException(Exception):
"""


class DoesNotExistQueueException(Exception):
"""The specified queue doesn't exist."""


class QoS(virtual.QoS):
"""Quality of Service guarantees implementation for SQS."""

Expand Down Expand Up @@ -358,15 +362,8 @@ def entity_name(self, name, table=CHARS_REPLACE_TABLE):
def canonical_queue_name(self, queue_name):
return self.entity_name(self.queue_name_prefix + queue_name)

def _new_queue(self, queue, **kwargs):
"""Ensure a queue with given name exists in SQS.
Arguments:
---------
queue (str): the AMQP queue name
Returns
str: the SQS queue URL
"""
def _resolve_queue_url(self, queue):
"""Try to retrieve the SQS queue URL for a given queue name."""
# Translate to SQS name for consistency with initial
# _queue_cache population.
sqs_qname = self.canonical_queue_name(queue)
Expand All @@ -386,6 +383,23 @@ def _new_queue(self, queue, **kwargs):
"defined in 'predefined_queues'."
).format(sqs_qname))

raise DoesNotExistQueueException(
f"Queue with name '{sqs_qname}' doesn't exist in SQS"
)

def _new_queue(self, queue, **kwargs):
"""Ensure a queue with given name exists in SQS.
Arguments:
---------
queue (str): the AMQP queue name
Returns
str: the SQS queue URL
"""
try:
return self._resolve_queue_url(queue)
except DoesNotExistQueueException:
sqs_qname = self.canonical_queue_name(queue)
attributes = {'VisibilityTimeout': str(self.visibility_timeout)}
if sqs_qname.endswith('.fifo'):
attributes['FifoQueue'] = 'true'
Expand Down Expand Up @@ -414,7 +428,11 @@ def _delete(self, queue, *args, **kwargs):
"""Delete queue by name."""
if self.predefined_queues:
return
super()._delete(queue)

q_url = self._resolve_queue_url(queue)
self.sqs().delete_queue(
QueueUrl=q_url,
)
self._queue_cache.pop(queue, None)

def _put(self, queue, message, **kwargs):
Expand Down
16 changes: 16 additions & 0 deletions t/unit/transport/test_SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ def purge_queue(self, QueueUrl=None):
if q.url == QueueUrl:
q.messages = []

def delete_queue(self, QueueUrl=None):
queue_name = None
for key, val in self._queues.items():
if val.url == QueueUrl:
queue_name = key
break
if queue_name is None:
raise Exception(f"Queue url {QueueUrl} not found")
del self._queues[queue_name]


class test_Channel:

Expand Down Expand Up @@ -262,6 +272,11 @@ def test_entity_name(self):
'foo-bar-baz_qux_quux'
assert self.channel.entity_name('abcdef.fifo') == 'abcdef.fifo'

def test_resolve_queue_url(self):
queue_name = 'unittest_queue'
assert self.sqs_conn_mock._queues[queue_name].url == \
self.channel._resolve_queue_url(queue_name)

def test_new_queue(self):
queue_name = 'new_unittest_queue'
self.channel._new_queue(queue_name)
Expand Down Expand Up @@ -316,6 +331,7 @@ def test_delete(self):
self.channel._new_queue(queue_name)
self.channel._delete(queue_name)
assert queue_name not in self.channel._queue_cache
assert queue_name not in self.sqs_conn_mock._queues

def test_get_from_sqs(self):
# Test getting a single message
Expand Down

0 comments on commit a62e613

Please sign in to comment.