From 05932815957115cc85a872729a12e90550787667 Mon Sep 17 00:00:00 2001 From: kidig Date: Wed, 23 Mar 2022 03:20:54 +0300 Subject: [PATCH] make code close to redbeat-0.10.0 fix logger name to 'celery.beat' add sentinel_kwargs options use lock.extend replace pexpire --- beatlock/__init__.py | 2 +- beatlock/schedulers.py | 82 ++++++++++++++++++++++++++++++------------ setup.py | 2 +- 3 files changed, 62 insertions(+), 24 deletions(-) diff --git a/beatlock/__init__.py b/beatlock/__init__.py index b577074..35a6b58 100644 --- a/beatlock/__init__.py +++ b/beatlock/__init__.py @@ -1,2 +1,2 @@ __author__ = "Dmitry Gerasimenko" -__version__ = "0.0.2" +__version__ = "0.0.3" diff --git a/beatlock/schedulers.py b/beatlock/schedulers.py index cfd694d..2dede36 100644 --- a/beatlock/schedulers.py +++ b/beatlock/schedulers.py @@ -12,12 +12,33 @@ from redis.client import StrictRedis from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential -logger = get_logger(__name__) +logger = get_logger('celery.beat') CELERY_4_OR_GREATER = CELERY_VERSION[0] >= 4 - -class RetryingConnection(object): +# Copy from: +# https://github.com/andymccurdy/redis-py/blob/master/redis/lock.py#L33 +# KEYS[1] - lock name +# ARGS[1] - token +# ARGS[2] - additional milliseconds +# return 1 if the locks time was extended, otherwise 0 +LUA_EXTEND_TO_SCRIPT = """ + local token = redis.call('get', KEYS[1]) + if not token or token ~= ARGV[1] then + return 0 + end + local expiration = redis.call('pttl', KEYS[1]) + if not expiration then + expiration = 0 + end + if expiration < 0 then + return 0 + end + redis.call('pexpire', KEYS[1], ARGV[2]) + return 1 +""" + +class RetryingConnection: """A proxy for the Redis connection that delegates all the calls to underlying Redis connection while retrying on connection or time-out error. """ @@ -78,18 +99,20 @@ def get_redis(app=None): app = app_or_default(app) conf = ensure_conf(app) if not hasattr(app, 'beatlock_redis') or app.beatlock_redis is None: - redis_options = conf.app.conf.get( - 'BEATLOCK_REDIS_OPTIONS', - conf.app.conf.get('BROKER_TRANSPORT_OPTIONS', {}) - ) + redis_options = conf.beatlock_redis_options retry_period = redis_options.get('retry_period') if conf.redis_url.startswith('redis-sentinel') and 'sentinels' in redis_options: from redis.sentinel import Sentinel - sentinel = Sentinel(redis_options['sentinels'], - socket_timeout=redis_options.get('socket_timeout'), - password=redis_options.get('password'), - decode_responses=True) + + sentinel = Sentinel( + redis_options['sentinels'], + socket_timeout=redis_options.get('socket_timeout'), + password=redis_options.get('password'), + db=redis_options.get('db', 0), + decode_responses=True, + sentinel_kwargs=redis_options.get('sentinel_kwargs'), + ) connection = sentinel.master_for(redis_options.get('service_name', 'master')) else: connection = StrictRedis.from_url(conf.redis_url, decode_responses=True) @@ -102,7 +125,7 @@ def get_redis(app=None): return app.beatlock_redis -class BeatLockConfig(object): +class BeatLockConfig: def __init__(self, app=None): self.app = app_or_default(app) self.key_prefix = self.either_or('beatlock_key_prefix', 'beatlock:') @@ -110,6 +133,10 @@ def __init__(self, app=None): self.lock_key = self.either_or('beatlock_lock_key', self.key_prefix + ':lock') self.lock_timeout = self.either_or('beatlock_lock_timeout', None) self.redis_url = self.either_or('beatlock_redis_url', app.conf['BROKER_URL']) + self.redis_use_ssl = self.either_or('beatlock_redis_use_ssl', app.conf['BROKER_USE_SSL']) + self.beatlock_redis_options = self.either_or( + 'beatlock_redis_options', app.conf['BROKER_TRANSPORT_OPTIONS'] + ) if self.lock_disabled: self.lock_key = None @@ -125,31 +152,33 @@ def either_or(self, name, default=None): class BeatLockScheduler(DatabaseScheduler): - lock = None #: The default lock timeout in seconds. lock_timeout = DEFAULT_MAX_INTERVAL * 5 def __init__(self, app, lock_key=None, lock_timeout=None, **kwargs): - ensure_conf(app) + ensure_conf(app) # set app.beatlock_conf self.lock_key = lock_key or app.beatlock_conf.lock_key - self.lock_timeout = (lock_timeout or - app.beatlock_conf.lock_timeout or - self.lock_timeout) + self.lock_timeout = ( + lock_timeout + or app.beatlock_conf.lock_timeout + or self.max_interval * 5 + or self.lock_timeout + ) super().__init__(app, **kwargs) def tick(self, *args, **kwargs): if self.lock: logger.debug('beat: Extending lock...') - get_redis(self.app).pexpire(self.lock_key, int(self.lock_timeout * 1000)) + self.lock.extend(int(self.lock_timeout)) return super().tick(*args, **kwargs) def close(self): if self.lock: - logger.debug('beat: Releasing Lock') + logger.info('beat: Releasing lock') self.lock.release() self.lock = None super().close() @@ -158,8 +187,11 @@ def close(self): def info(self): info = [' . redis -> {}'.format(maybe_sanitize_url(self.app.beatlock_conf.redis_url))] if self.lock_key: - info.append(' . lock -> `{}` {} ({}s)'.format( - self.lock_key, humanize_seconds(self.lock_timeout), self.lock_timeout)) + info.append( + ' . lock -> `{}` {} ({}s)'.format( + self.lock_key, humanize_seconds(self.lock_timeout), self.lock_timeout + ) + ) return '\n'.join(info) @@ -170,11 +202,17 @@ def acquire_distributed_beat_lock(sender=None, **kwargs): return logger.debug('beat: Acquiring lock...') + redis_client = get_redis(scheduler.app) - lock = get_redis(scheduler.app).lock( + lock = redis_client.lock( scheduler.lock_key, timeout=scheduler.lock_timeout, sleep=scheduler.max_interval, ) + # overwrite redis-py's extend script + # which will add additional timeout instead of extend to a new timeout + lock.lua_extend = redis_client.register_script(LUA_EXTEND_TO_SCRIPT) lock.acquire() + logger.info('beat: Acquired lock') + scheduler.lock = lock diff --git a/setup.py b/setup.py index 0a6723c..927680d 100644 --- a/setup.py +++ b/setup.py @@ -32,6 +32,6 @@ 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', - 'Development Status :: 2 - Pre-Alpha', + 'Development Status :: 4 - Beta', ], ) \ No newline at end of file