Skip to content

Commit

Permalink
Merge pull request #2 from kidig/redbeat-0.10
Browse files Browse the repository at this point in the history
make code close to redbeat-0.10.0
  • Loading branch information
kidig authored Mar 23, 2022
2 parents b6f5649 + 0593281 commit 8062ec1
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 24 deletions.
2 changes: 1 addition & 1 deletion beatlock/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__author__ = "Dmitry Gerasimenko"
__version__ = "0.0.2"
__version__ = "0.0.3"
82 changes: 60 additions & 22 deletions beatlock/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,33 @@
from redis.client import StrictRedis
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential

logger = get_logger(__name__)
logger = get_logger('celery.beat')

CELERY_4_OR_GREATER = CELERY_VERSION[0] >= 4


class RetryingConnection(object):
# Copy from:
# https://github.com/andymccurdy/redis-py/blob/master/redis/lock.py#L33
# KEYS[1] - lock name
# ARGS[1] - token
# ARGS[2] - additional milliseconds
# return 1 if the locks time was extended, otherwise 0
LUA_EXTEND_TO_SCRIPT = """
local token = redis.call('get', KEYS[1])
if not token or token ~= ARGV[1] then
return 0
end
local expiration = redis.call('pttl', KEYS[1])
if not expiration then
expiration = 0
end
if expiration < 0 then
return 0
end
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
"""

class RetryingConnection:
"""A proxy for the Redis connection that delegates all the calls to
underlying Redis connection while retrying on connection or time-out error.
"""
Expand Down Expand Up @@ -78,18 +99,20 @@ def get_redis(app=None):
app = app_or_default(app)
conf = ensure_conf(app)
if not hasattr(app, 'beatlock_redis') or app.beatlock_redis is None:
redis_options = conf.app.conf.get(
'BEATLOCK_REDIS_OPTIONS',
conf.app.conf.get('BROKER_TRANSPORT_OPTIONS', {})
)
redis_options = conf.beatlock_redis_options
retry_period = redis_options.get('retry_period')

if conf.redis_url.startswith('redis-sentinel') and 'sentinels' in redis_options:
from redis.sentinel import Sentinel
sentinel = Sentinel(redis_options['sentinels'],
socket_timeout=redis_options.get('socket_timeout'),
password=redis_options.get('password'),
decode_responses=True)

sentinel = Sentinel(
redis_options['sentinels'],
socket_timeout=redis_options.get('socket_timeout'),
password=redis_options.get('password'),
db=redis_options.get('db', 0),
decode_responses=True,
sentinel_kwargs=redis_options.get('sentinel_kwargs'),
)
connection = sentinel.master_for(redis_options.get('service_name', 'master'))
else:
connection = StrictRedis.from_url(conf.redis_url, decode_responses=True)
Expand All @@ -102,14 +125,18 @@ def get_redis(app=None):
return app.beatlock_redis


class BeatLockConfig(object):
class BeatLockConfig:
def __init__(self, app=None):
self.app = app_or_default(app)
self.key_prefix = self.either_or('beatlock_key_prefix', 'beatlock:')
self.lock_disabled = self.either_or('beatlock_lock_disabled', False)
self.lock_key = self.either_or('beatlock_lock_key', self.key_prefix + ':lock')
self.lock_timeout = self.either_or('beatlock_lock_timeout', None)
self.redis_url = self.either_or('beatlock_redis_url', app.conf['BROKER_URL'])
self.redis_use_ssl = self.either_or('beatlock_redis_use_ssl', app.conf['BROKER_USE_SSL'])
self.beatlock_redis_options = self.either_or(
'beatlock_redis_options', app.conf['BROKER_TRANSPORT_OPTIONS']
)

if self.lock_disabled:
self.lock_key = None
Expand All @@ -125,31 +152,33 @@ def either_or(self, name, default=None):


class BeatLockScheduler(DatabaseScheduler):

lock = None

#: The default lock timeout in seconds.
lock_timeout = DEFAULT_MAX_INTERVAL * 5

def __init__(self, app, lock_key=None, lock_timeout=None, **kwargs):
ensure_conf(app)
ensure_conf(app) # set app.beatlock_conf
self.lock_key = lock_key or app.beatlock_conf.lock_key
self.lock_timeout = (lock_timeout or
app.beatlock_conf.lock_timeout or
self.lock_timeout)
self.lock_timeout = (
lock_timeout
or app.beatlock_conf.lock_timeout
or self.max_interval * 5
or self.lock_timeout
)

super().__init__(app, **kwargs)

def tick(self, *args, **kwargs):
if self.lock:
logger.debug('beat: Extending lock...')
get_redis(self.app).pexpire(self.lock_key, int(self.lock_timeout * 1000))
self.lock.extend(int(self.lock_timeout))

return super().tick(*args, **kwargs)

def close(self):
if self.lock:
logger.debug('beat: Releasing Lock')
logger.info('beat: Releasing lock')
self.lock.release()
self.lock = None
super().close()
Expand All @@ -158,8 +187,11 @@ def close(self):
def info(self):
info = [' . redis -> {}'.format(maybe_sanitize_url(self.app.beatlock_conf.redis_url))]
if self.lock_key:
info.append(' . lock -> `{}` {} ({}s)'.format(
self.lock_key, humanize_seconds(self.lock_timeout), self.lock_timeout))
info.append(
' . lock -> `{}` {} ({}s)'.format(
self.lock_key, humanize_seconds(self.lock_timeout), self.lock_timeout
)
)
return '\n'.join(info)


Expand All @@ -170,11 +202,17 @@ def acquire_distributed_beat_lock(sender=None, **kwargs):
return

logger.debug('beat: Acquiring lock...')
redis_client = get_redis(scheduler.app)

lock = get_redis(scheduler.app).lock(
lock = redis_client.lock(
scheduler.lock_key,
timeout=scheduler.lock_timeout,
sleep=scheduler.max_interval,
)
# overwrite redis-py's extend script
# which will add additional timeout instead of extend to a new timeout
lock.lua_extend = redis_client.register_script(LUA_EXTEND_TO_SCRIPT)
lock.acquire()
logger.info('beat: Acquired lock')

scheduler.lock = lock
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Development Status :: 2 - Pre-Alpha',
'Development Status :: 4 - Beta',
],
)

0 comments on commit 8062ec1

Please sign in to comment.