Skip to content

Commit

Permalink
Raise exception if quorum is impossible (#406)
Browse files Browse the repository at this point in the history
* Raise exception if quorum is impossible

If a majority of Redis masters are down, then attempting to acquire a
Redlock will never succeed.  Raise a new custom exception in this case.

* Unit test QuorumIsImpossible

* Write docstrings for exceptions

* Don't repeat myself

* Consistently order arguments

* Improve method name

* If Redlock acquisition fails, gracefully release

* Bump version number
  • Loading branch information
brainix authored May 27, 2021
1 parent f28fbc7 commit 75ae6a3
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 46 deletions.
4 changes: 3 additions & 1 deletion pottery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@


__title__ = 'pottery'
__version__ = '1.1.8'
__version__ = '1.2.0'
__description__, __long_description__ = (
s.strip() for s in __doc__.split(sep='\n\n', maxsplit=1)
)
Expand All @@ -46,6 +46,7 @@
from .exceptions import KeyExistsError # isort:skip
from .exceptions import RandomKeyError # isort:skip
from .exceptions import PrimitiveError # isort:skip
from .exceptions import QuorumIsImpossible # isort:skip
from .exceptions import QuorumNotAchieved # isort:skip
from .exceptions import TooManyExtensions # isort:skip
from .exceptions import ExtendUnlockedLock # isort:skip
Expand All @@ -72,6 +73,7 @@
'KeyExistsError',
'RandomKeyError',
'PrimitiveError',
'QuorumIsImpossible',
'QuorumNotAchieved',
'TooManyExtensions',
'ExtendUnlockedLock',
Expand Down
16 changes: 16 additions & 0 deletions pottery/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from . import monkey
from .annotations import JSONTypes
from .annotations import RedisValues
from .exceptions import QuorumIsImpossible
from .exceptions import RandomKeyError


Expand Down Expand Up @@ -330,9 +331,11 @@ def __init__(self,
*,
key: str,
masters: Iterable[Redis] = frozenset(),
raise_on_redis_errors: bool = False,
) -> None:
self.key = key
self.masters = frozenset(masters) or self._DEFAULT_MASTERS
self.raise_on_redis_errors = raise_on_redis_errors

@property
@abc.abstractmethod
Expand All @@ -346,3 +349,16 @@ def key(self) -> str:
@key.setter
def key(self, value: str) -> None:
self._key = f'{self.KEY_PREFIX}:{value}'

def _check_enough_masters_up(self,
raise_on_redis_errors: Optional[bool],
redis_errors: List[RedisError],
) -> None:
if raise_on_redis_errors is None:
raise_on_redis_errors = self.raise_on_redis_errors
if raise_on_redis_errors and len(redis_errors) > len(self.masters) // 2:
raise QuorumIsImpossible(
self.key,
self.masters,
redis_errors=redis_errors,
)
29 changes: 22 additions & 7 deletions pottery/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Optional

from redis import Redis
from redis import RedisError


class PotteryError(Exception):
Expand Down Expand Up @@ -56,27 +57,41 @@ def __str__(self) -> str:
class PrimitiveError(Exception):
'Base exception class for distributed primitives.'

def __init__(self, key: str, masters: Iterable[Redis]) -> None:
def __init__(self,
key: str,
masters: Iterable[Redis],
*,
redis_errors: Iterable[RedisError] = tuple(),
) -> None:
self._key = key
self._masters = masters
self._redis_errors = redis_errors

def __repr__(self) -> str:
return (
f"{self.__class__.__name__}(key='{self._key}', "
f"masters={list(self._masters)})"
f"masters={list(self._masters)}, "
f"redis_errors={list(self._redis_errors)})"
)

def __str__(self) -> str:
return f"key='{self._key}', masters={list(self._masters)}"
return (
f"key='{self._key}', "
f"masters={list(self._masters)}, "
f"redis_errors={list(self._redis_errors)}"
)

class QuorumNotAchieved(PrimitiveError, RuntimeError):
...
'Consensus-based algorithm could not achieve quorum.'

class TooManyExtensions(PrimitiveError, RuntimeError):
...
'Redlock has been extended too many times.'

class ExtendUnlockedLock(PrimitiveError, RuntimeError):
...
'Attempting to extend an unlocked Redlock.'

class ReleaseUnlockedLock(PrimitiveError, RuntimeError):
...
'Attempting to release an unlocked Redlock.'

class QuorumIsImpossible(PrimitiveError, RuntimeError):
'Too many Redis masters threw RedisErrors; quorum can not be achieved.'
47 changes: 35 additions & 12 deletions pottery/nextid.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import logging
from typing import ClassVar
from typing import Iterable
from typing import List
from typing import Optional
from typing import Type
from typing import cast

from redis import Redis
Expand All @@ -38,6 +40,7 @@
from typing_extensions import Final

from .base import Primitive
from .exceptions import QuorumIsImpossible
from .exceptions import QuorumNotAchieved
from .executor import BailOutExecutor

Expand Down Expand Up @@ -90,9 +93,14 @@ def __init__(self,
*,
key: str = KEY,
masters: Iterable[Redis] = frozenset(),
raise_on_redis_errors: bool = False,
num_tries: int = NUM_TRIES,
) -> None:
super().__init__(key=key, masters=masters)
super().__init__(
key=key,
masters=masters,
raise_on_redis_errors=raise_on_redis_errors,
)
self.__register_set_id_script()
self.num_tries = num_tries
self.__init_masters()
Expand Down Expand Up @@ -127,19 +135,16 @@ def __iter__(self) -> 'NextId':
return self

def __next__(self) -> int:
suppressable_errors: List[Type[BaseException]] = [QuorumNotAchieved]
if not self.raise_on_redis_errors:
suppressable_errors.append(QuorumIsImpossible)
for _ in range(self.num_tries):
with contextlib.suppress(QuorumNotAchieved):
with contextlib.suppress(*suppressable_errors):
next_id = self.__current_id + 1
self.__current_id = next_id
return next_id
raise QuorumNotAchieved(self.key, self.masters)

def __repr__(self) -> str:
return (
f'<{self.__class__.__name__} key={self.key} '
f'value={self.__current_id}>'
)

@property
def __current_id(self) -> int:
with concurrent.futures.ThreadPoolExecutor() as executor:
Expand All @@ -148,11 +153,12 @@ def __current_id(self) -> int:
future = executor.submit(master.get, self.key)
futures.add(future)

current_ids = []
current_ids, redis_errors = [], []
for future in concurrent.futures.as_completed(futures):
try:
current_id = int(future.result())
except RedisError as error:
redis_errors.append(error)
_logger.exception(
'%s.__current_id() getter caught %s',
self.__class__.__name__,
Expand All @@ -163,7 +169,12 @@ def __current_id(self) -> int:

if len(current_ids) > len(self.masters) // 2:
return max(current_ids)
raise QuorumNotAchieved(self.key, self.masters)
self._check_enough_masters_up(None, redis_errors)
raise QuorumNotAchieved(
self.key,
self.masters,
redis_errors=redis_errors,
)

@__current_id.setter
def __current_id(self, value: int) -> None:
Expand All @@ -178,11 +189,12 @@ def __current_id(self, value: int) -> None:
)
futures.add(future)

num_masters_set = 0
num_masters_set, redis_errors = 0, []
for future in concurrent.futures.as_completed(futures):
try:
num_masters_set += future.result() == value
except RedisError as error:
redis_errors.append(error)
_logger.exception(
'%s.__current_id() setter caught %s',
self.__class__.__name__,
Expand All @@ -192,7 +204,18 @@ def __current_id(self, value: int) -> None:
if num_masters_set > len(self.masters) // 2: # pragma: no cover
return

raise QuorumNotAchieved(self.key, self.masters)
self._check_enough_masters_up(None, redis_errors)
raise QuorumNotAchieved(
self.key,
self.masters,
redis_errors=redis_errors,
)

def __repr__(self) -> str:
return (
f'<{self.__class__.__name__} key={self.key} '
f'value={self.__current_id}>'
)


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 75ae6a3

Please sign in to comment.