Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Retry #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Handle Retry #26

wants to merge 1 commit into from

Conversation

lsaavedr
Copy link

This close definitively pull request GH-8, GH-9 and issues GH-10, GH-20. I think that with some like this can be chain works also.

@lsaavedr lsaavedr changed the title fix retry Handle Retry Aug 11, 2020
@jayfk
Copy link

jayfk commented Sep 2, 2020

Would be awesome to get this merged in. Do you need any help reviewing it?

@steinitzu
Copy link
Owner

Hey guys, sorry about the delay. And thanks for submitting this @lsaavedr
This looks good at a glance, but I would love some tests to go with it. If either of you want to take care of that I'm happy to merge.

  • To verify that the retry works
  • Verify that the lock is held throughout all retries
  • and released whether the retries eventually succeed or fail (though I'm assuming celery calls on_success/failure here as usual, in which case it doesn't need a test)

I'm also curious about what celery does with the task ID on retry. Does it re-queue with a new ID or re-use the same ID?

@JonnieDoe
Copy link

Any progress here? I need this functionality but I do not want to maintain a separate package just for this fix.

@process0
Copy link

Progress?

@steinitzu
Copy link
Owner

Sorry all, been neglecting this library as I don't use celery much anymore.

I'm not sure about this PR. I would like to see some reasoning for the self._unlock_to_super_retry flag, seems odd to me and looks like it would introduce a race condition.

Tests would be appreciated as well.

@process0
Copy link

Thanks for the response. Can you explain a bit more about how this would introduce a race condition? If you were to implement retry, how would you do it?

@steinitzu
Copy link
Owner

Maybe racing is not such an issue, I haven't looked deeply into how retrying works in celery.
Possibly if the lock times out before the retry call, another task could run and the retry will also go through.
I think a safer way of doing this would be to check whether the existing lock has the same task ID as the current task.

This might be an issue as well:

except Exception:
# Clear the lock if apply_async fails
if lock_aquired:
self.unlock(lock)
raise

With this approach the lock would not be removed when apply_async fails during retry.

@process0
Copy link

process0 commented Sep 27, 2021

Thanks. After trying out this MR and hacking around it, I could not get it to work. This project does the same as celery-once (also abandoned) by attempting to lock before apply_async (which then calls send_task).

I was able to modify this library to provide locking post send_task with retry capabilities. I haven't dug too deep into the retry functionality (eta/countdowns etc) but afaict a message is sent via apply_async, consumed from the broker, and, if eta/countdown, is put into a timer pool in the worker (possibly to be ACK'd when its countdown/etc is reached).

This sample code should be enough to build off of if anyone is interested:

class Singleton(Task):
    def __init__(self, *args, **kwargs):
        self.singleton_backend = RedisBackend(REDIS_URI)
        self._lock_key = None
        self.max_retries = None  # Try forever, change if you must
        self.__run = None

    @property
    def lock_key(self, task_args=None, task_kwargs=None):
        if self._lock_key:
            return self._lock_key

        # Generate your lock key however

        return self._lock_key

    def lock(self):
        lock = self.singleton_backend.lock(self.lock_key, self.request.id, expiry=60*5)
        logger.info(f'Attempted lock for {self.lock_key} = {lock}')

        if not lock:
            """
            Override the task function so it is not called but retried.
            """
            def terminated(*args, **kwargs):
                self.retry(countdown=60)

            # may need to do the same for __call__
            self.__run = self.run
            self.run = terminated
        else:
            if self.__run:
                self.run = self.__run

        return lock

    def unlock(self):
        unlock = self.singleton_backend.unlock(self.lock_key)
        logger.info(f'Attempted unlock for {self.lock_key} = {unlock}')

...

@signals.task_prerun.connect
def connect_task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **e):
    if isinstance(task, Singleton):
        task.lock()

...

@signals.task_postrun.connect
def connect_task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **e):
    if isinstance(task, Singleton) and state is not states.RETRY:
        task.unlock()

@Jonamaita
Copy link

Any progress?, maybe it could help to test this fix.

@elavaud
Copy link

elavaud commented Mar 3, 2023

We actually found a simpler way:

    def retry(self, *args, **kwargs):
        self.release_lock(task_args=self.request.args, task_kwargs=self.request.kwargs)
        return super().retry(*args, **kwargs)

Happy to submit a PR if needed

@jsevillaamd
Copy link

Is this project abandoned?
There is any other tool similar to this one?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants