Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NextId #23

Merged
merged 2 commits into from
Nov 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pottery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .exceptions import TooManyTriesError

from .contexttimer import contexttimer
from .nextid import NextId
from .redlock import Redlock

from .counter import RedisCounter
Expand Down
91 changes: 91 additions & 0 deletions pottery/nextid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#-----------------------------------------------------------------------------#
# nextid.py #
# #
# Copyright © 2015-2016, Rajiv Bakulesh Shah, original author. #
# All rights reserved. #
#-----------------------------------------------------------------------------#
'''Distributed Redis-powered monotonically increasing ID generator.

Rationale and algorithm description:
http://antirez.com/news/102

Lua scripting:
https://github.com/andymccurdy/redis-py#lua-scripting
'''



import concurrent.futures
import contextlib

from redis import Redis



class NextId:
'Distributed Redis-powered monotonically increasing ID generator.'

KEY = 'nextid:current'
NUM_TRIES = 3
default_masters = frozenset({Redis()})

def __init__(self, *, key=KEY, num_tries=NUM_TRIES, masters=default_masters):
self.key = key
self.num_tries = num_tries
self.masters = masters
self._set_id_script = self._register_set_id_script()
self._init_masters()

def _register_set_id_script(self):
master = next(iter(self.masters))
set_id_script = master.register_script('''
local curr = tonumber(redis.call('get', KEYS[1]))
local next = tonumber(ARGV[1])
if curr < next then
redis.call('set', KEYS[1], next)
return next
else
return nil
end
''')
return set_id_script

def _init_masters(self):
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.masters)) as executor:
futures = {executor.submit(master.setnx, self.key, 0) for master in self.masters}
concurrent.futures.wait(futures)

def __iter__(self):
return self

def __next__(self):
for _ in range(self.num_tries):
with contextlib.suppress(RuntimeError):
next_id = self._current_id + 1
self._current_id = next_id
return next_id
else:
raise RuntimeError('quorum not achieved')

@property
def _current_id(self):
num_masters, current_id = 0, 0
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.masters)) as executor:
futures = (executor.submit(master.get, self.key) for master in self.masters)
for future in concurrent.futures.as_completed(futures):
num_masters += 1
current_id = max(current_id, int(future.result()))
if num_masters < len(self.masters) // 2 + 1:
raise RuntimeError('quorum not achieved')
else:
return current_id

@_current_id.setter
def _current_id(self, value):
num_masters = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.masters)) as executor:
futures = (executor.submit(self._set_id_script, keys=(self.key,), args=(value,), client=master,) for master in self.masters)
for future in concurrent.futures.as_completed(futures):
num_masters += future.result() == value
if num_masters < len(self.masters) // 2 + 1:
raise RuntimeError('quorum not achieved')
12 changes: 6 additions & 6 deletions pottery/redlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def key(self, value):
def _register_acquired_script(self):
master = next(iter(self.masters))
acquired_script = master.register_script('''
if redis.call("get", KEYS[1]) == ARGV[1] then
local pttl = redis.call("pttl", KEYS[1])
if redis.call('get', KEYS[1]) == ARGV[1] then
local pttl = redis.call('pttl', KEYS[1])
return (pttl > 0) and pttl or 0
else
return 0
Expand All @@ -77,8 +77,8 @@ def _register_acquired_script(self):
def _register_extend_script(self):
master = next(iter(self.masters))
extend_script = master.register_script('''
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("pexpire", KEYS[1], ARGV[2])
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('pexpire', KEYS[1], ARGV[2])
else
return 0
end
Expand All @@ -88,8 +88,8 @@ def _register_extend_script(self):
def _register_release_script(self):
master = next(iter(self.masters))
release_script = master.register_script('''
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
Expand Down
32 changes: 32 additions & 0 deletions tests/test_nextid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#-----------------------------------------------------------------------------#
# test_nextid.py #
# #
# Copyright © 2015-2016, Rajiv Bakulesh Shah, original author. #
# All rights reserved. #
#-----------------------------------------------------------------------------#
'Distributed Redis-powered monotonically increasing ID generator tests.'



from redis import Redis

from pottery import NextId
from tests.base import TestCase



class NextIdTests(TestCase):
'Distributed Redis-powered monotonically increasing ID generator tests.'

def setUp(self):
super().setUp()
self.redis = Redis()
self.redis.delete(NextId.KEY)

def test_nextid(self):
assert not self.redis.exists(NextId.KEY)
ids = NextId()
assert int(self.redis.get(NextId.KEY)) == 0
for id_ in range(1, 10):
with self.subTest(id_=id_):
assert next(ids) == id_