Skip to content

Commit

Permalink
Design RedisSimpleQueue class (#553)
Browse files Browse the repository at this point in the history
* Design RedisSimpleQueue class

This class is complete with the exception of the `.put()` and `.get()`
methods. `RedisSimpleQueue` will be powered by Redis streams, and the
`.put()` and `.get()` methods will be implemented using `XADD` and
`XREAD`/`XDEL`.

https://redis.io/topics/streams-intro

* Flesh out .put() and .get() methods

* Make note of potential bug in redis-py

* Unit test RedisSimpleQueue

* Document RedisSimpleQueue

* Write docstrings

* Bump version number
  • Loading branch information
brainix authored Dec 27, 2021
1 parent edbdf70 commit e7b133a
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 2 deletions.
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ battle tested in production at scale.
- [Lists ⛓](#lists)
- [Counters 🧮](#counters)
- [Deques 🖇️](#deques)
- [Queues 🚶‍♂️🚶‍♀️🚶‍♂️](#queues)
- [Redlock 🔒](#redlock)
- [synchronize() 👯‍♀️](#synchronize)
- [NextId 🔢](#nextid)
Expand Down Expand Up @@ -305,6 +306,63 @@ can use your `RedisDeque` the same way that you use any other Python `deque`.



## <a name="queues"></a>Queues 🚶‍♂️🚶‍♀️🚶‍♂️

`RedisSimpleQueue` is a Redis-backed multi-producer, multi-consumer FIFO queue
compatible with Python&rsquo;s
[`queue.SimpleQueue`](https://docs.python.org/3/library/queue.html#simplequeue-objects).
In general, use a Python `queue.Queue` if you&rsquo;re using it in one or more
threads, use `multiprocessing.Queue` if you&rsquo;re using it between processes,
and use `RedisSimpleQueue` if you&rsquo;re sharing it across machines or if you
need for your queue to persist across application crashes or restarts.

Instantiate a `RedisSimpleQueue`:

```python
>>> from pottery import RedisSimpleQueue
>>> cars = RedisSimpleQueue(redis=redis, key='cars')
>>>
```

Notice the two keyword arguments to `RedisSimpleQueue()`: The first is your
Redis client. The second is the Redis key name for your queue. Other than
that, you can use your `RedisSimpleQueue` the same way that you use any other
Python `queue.SimpleQueue`.

Check the queue state, put some items in the queue, and get those items back
out:

```python
>>> cars.empty()
True
>>> cars.qsize()
0
>>> cars.put('Jeep')
>>> cars.put('Honda')
>>> cars.put('Audi')
>>> cars.empty()
False
>>> cars.qsize()
3
>>> cars.get()
'Jeep'
>>> cars.get()
'Honda'
>>> cars.get()
'Audi'
>>> cars.empty()
True
>>> cars.qsize()
0
>>>
```

*Limitations:*

1. Items must be JSON serializable.



## <a name="redlock"></a>Redlock 🔒

`Redlock` is a safe and reliable lock to coordinate access to a resource shared
Expand Down
6 changes: 5 additions & 1 deletion pottery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@


__title__: Final[str] = 'pottery'
__version__: Final[str] = '2.2.2'
__version__: Final[str] = '2.3.0'
__description__: Final[str] = __doc__.split(sep='\n\n', maxsplit=1)[0]
__url__: Final[str] = 'https://github.com/brainix/pottery'
__author__: Final[str] = 'Rajiv Bakulesh Shah'
Expand All @@ -45,6 +45,7 @@
from .exceptions import PotteryError # isort:skip
from .exceptions import KeyExistsError # isort:skip
from .exceptions import RandomKeyError # isort:skip
from .exceptions import QueueEmptyError # isort:skip
from .exceptions import PrimitiveError # isort:skip
from .exceptions import QuorumIsImpossible # isort:skip
from .exceptions import QuorumNotAchieved # isort:skip
Expand All @@ -67,13 +68,15 @@
from .deque import RedisDeque
from .dict import RedisDict
from .list import RedisList
from .queue import RedisSimpleQueue
from .set import RedisSet


__all__: Final[Tuple[str, ...]] = (
'PotteryError',
'KeyExistsError',
'RandomKeyError',
'QueueEmptyError',
'PrimitiveError',
'QuorumIsImpossible',
'QuorumNotAchieved',
Expand All @@ -96,5 +99,6 @@
'RedisDeque',
'RedisDict',
'RedisList',
'RedisSimpleQueue',
'RedisSet',
)
4 changes: 4 additions & 0 deletions pottery/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# --------------------------------------------------------------------------- #


from queue import Empty
from typing import Iterable
from typing import Optional

Expand Down Expand Up @@ -53,6 +54,9 @@ def __repr__(self) -> str:
def __str__(self) -> str:
return f'redis={self._redis}'

class QueueEmptyError(PotteryError, Empty):
'Non-blocking .get() or .get_nowait() called on RedisQueue which is empty.'


class PrimitiveError(Exception):
'Base exception class for distributed primitives.'
Expand Down
125 changes: 125 additions & 0 deletions pottery/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# --------------------------------------------------------------------------- #
# queue.py #
# #
# Copyright © 2015-2021, Rajiv Bakulesh Shah, original author. #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); #
# you may not use this file except in compliance with the License. #
# You may obtain a copy of the License at: #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
# --------------------------------------------------------------------------- #


import math
import random
import time
from typing import ClassVar
from typing import Optional
from typing import cast

from redis import WatchError

from .base import Base
from .base import JSONTypes
from .exceptions import QueueEmptyError
from .timer import ContextTimer


class RedisSimpleQueue(Base):
RETRY_DELAY: ClassVar[int] = 200

def qsize(self) -> int:
'Return the approximate size of the queue (not reliable!). O(1)'
return self.redis.xlen(self.key)

# Preserve the Open-Closed Principle with name mangling.
# https://youtu.be/miGolgp9xq8?t=2086
# https://stackoverflow.com/a/38534939
__qsize = qsize

def empty(self) -> bool:
'Return True if the queue is empty, False otherwise (not reliable!). O(1)'
return self.__qsize() == 0

def put(self,
item: JSONTypes,
block: bool = True,
timeout: Optional[float] = None,
) -> None:
'''Put the item on the queue. O(1)
The optional 'block' and 'timeout' arguments are ignored, as this method
never blocks. They are provided for compatibility with the queue.Queue
class.
'''
encoded_value = self._encode(item)
self.redis.xadd(self.key, {'item': encoded_value}, id='*')

__put = put

def put_nowait(self, item: JSONTypes) -> None:
'''Put an item into the queue without blocking. O(1)
This is exactly equivalent to `.put(item)` and is only provided for
compatibility with the queue.Queue class.
'''
return self.__put(item, False)

def get(self,
block: bool = True,
timeout: Optional[float] = None,
) -> JSONTypes:
'''Remove and return an item from the queue. O(1)
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the QueueEmptyError exception ('timeout' is
ignored in that case).
'''
redis_block = (timeout or 0.0) if block else 0.0
redis_block = math.floor(redis_block)
with ContextTimer() as timer:
while True:
try:
item = self.__remove_and_return(redis_block)
return item
except (WatchError, IndexError):
if not block or timer.elapsed() / 1000 >= (timeout or 0):
raise QueueEmptyError(redis=self.redis, key=self.key)
delay = random.uniform(0, self.RETRY_DELAY/1000)
time.sleep(delay)

__get = get

def __remove_and_return(self, redis_block: int) -> JSONTypes:
with self._watch() as pipeline:
# XXX: The following line raises WatchError after the socket timeout
# if the RedisQueue is empty and we're not blocking. This feels
# like a bug in redis-py?
returned_value = pipeline.xread({self.key: 0}, count=1, block=redis_block)
# The following line raises IndexError if the RedisQueue is empty
# and we're blocking.
id_ = cast(bytes, returned_value[0][1][0][0])
dict_ = cast(dict, returned_value[0][1][0][1])
pipeline.multi()
pipeline.xdel(self.key, id_)
encoded_value = dict_[b'item']
item = self._decode(encoded_value)
return item

def get_nowait(self) -> JSONTypes:
'''Remove and return an item from the queue without blocking. O(1)
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
'''
return self.__get(False)
3 changes: 2 additions & 1 deletion pottery/redlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ def log_time_enqueued(timer: ContextTimer, acquired: bool) -> None:
log_time_enqueued(timer, True)
return True
enqueued = True
time.sleep(random.uniform(0, self.RETRY_DELAY/1000))
delay = random.uniform(0, self.RETRY_DELAY/1000)
time.sleep(delay)
if enqueued:
log_time_enqueued(timer, False)
return False
Expand Down
90 changes: 90 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# --------------------------------------------------------------------------- #
# test_queue.py #
# #
# Copyright © 2015-2021, Rajiv Bakulesh Shah, original author. #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); #
# you may not use this file except in compliance with the License. #
# You may obtain a copy of the License at: #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
# --------------------------------------------------------------------------- #


from pottery import ContextTimer
from pottery import QueueEmptyError
from pottery import RedisSimpleQueue
from tests.base import TestCase


class QueueTests(TestCase):
def test_put(self):
queue = RedisSimpleQueue()

assert queue.qsize() == 0
assert queue.empty()

for num in range(1, 6):
with self.subTest(num=num):
queue.put(num)
assert queue.qsize() == num
assert not queue.empty()

def test_put_nowait(self):
queue = RedisSimpleQueue()

assert queue.qsize() == 0
assert queue.empty()

for num in range(1, 6):
with self.subTest(num=num):
queue.put_nowait(num)
assert queue.qsize() == num
assert not queue.empty()

def test_get(self):
queue = RedisSimpleQueue()

with self.assertRaises(QueueEmptyError):
queue.get()

for num in range(1, 6):
with self.subTest(num=num):
queue.put(num)
assert queue.get() == num
assert queue.qsize() == 0
assert queue.empty()

with self.assertRaises(QueueEmptyError):
queue.get()

def test_get_nowait(self):
queue = RedisSimpleQueue()

with self.assertRaises(QueueEmptyError):
queue.get_nowait()

for num in range(1, 6):
queue.put(num)

for num in range(1, 6):
with self.subTest(num=num):
assert queue.get_nowait() == num
assert queue.qsize() == 5 - num
assert queue.empty() == (num == 5)

with self.assertRaises(QueueEmptyError):
queue.get_nowait()

def test_get_timeout(self):
queue = RedisSimpleQueue()
timeout = 1

with self.assertRaises(QueueEmptyError), ContextTimer() as timer:
queue.get(timeout=timeout)
assert timer.elapsed() / 1000 >= timeout

0 comments on commit e7b133a

Please sign in to comment.