-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle Retry #26
base: master
Are you sure you want to change the base?
Handle Retry #26
Conversation
Would be awesome to get this merged in. Do you need any help reviewing it? |
Hey guys, sorry about the delay. And thanks for submitting this @lsaavedr
I'm also curious about what celery does with the task ID on retry. Does it re-queue with a new ID or re-use the same ID? |
Any progress here? I need this functionality but I do not want to maintain a separate package just for this fix. |
Progress? |
Sorry all, been neglecting this library as I don't use celery much anymore. I'm not sure about this PR. I would like to see some reasoning for the Tests would be appreciated as well. |
Thanks for the response. Can you explain a bit more about how this would introduce a race condition? If you were to implement retry, how would you do it? |
Maybe racing is not such an issue, I haven't looked deeply into how retrying works in celery. This might be an issue as well: celery-singleton/celery_singleton/singleton.py Lines 120 to 124 in 527d227
With this approach the lock would not be removed when |
Thanks. After trying out this MR and hacking around it, I could not get it to work. This project does the same as I was able to modify this library to provide locking post This sample code should be enough to build off of if anyone is interested: class Singleton(Task):
def __init__(self, *args, **kwargs):
self.singleton_backend = RedisBackend(REDIS_URI)
self._lock_key = None
self.max_retries = None # Try forever, change if you must
self.__run = None
@property
def lock_key(self, task_args=None, task_kwargs=None):
if self._lock_key:
return self._lock_key
# Generate your lock key however
return self._lock_key
def lock(self):
lock = self.singleton_backend.lock(self.lock_key, self.request.id, expiry=60*5)
logger.info(f'Attempted lock for {self.lock_key} = {lock}')
if not lock:
"""
Override the task function so it is not called but retried.
"""
def terminated(*args, **kwargs):
self.retry(countdown=60)
# may need to do the same for __call__
self.__run = self.run
self.run = terminated
else:
if self.__run:
self.run = self.__run
return lock
def unlock(self):
unlock = self.singleton_backend.unlock(self.lock_key)
logger.info(f'Attempted unlock for {self.lock_key} = {unlock}')
...
@signals.task_prerun.connect
def connect_task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **e):
if isinstance(task, Singleton):
task.lock()
...
@signals.task_postrun.connect
def connect_task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **e):
if isinstance(task, Singleton) and state is not states.RETRY:
task.unlock() |
Any progress?, maybe it could help to test this fix. |
We actually found a simpler way:
Happy to submit a PR if needed |
Is this project abandoned? |
This close definitively pull request GH-8, GH-9 and issues GH-10, GH-20. I think that with some like this can be chain works also.