Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rq jobs go into a bad state when redis hits connection limit #1553

Open
alella opened this issue Aug 30, 2021 · 3 comments · May be fixed by #1568
Open

rq jobs go into a bad state when redis hits connection limit #1553

alella opened this issue Aug 30, 2021 · 3 comments · May be fixed by #1568

Comments

@alella
Copy link
Contributor

alella commented Aug 30, 2021

Event: message in popped from a non empty queue
State: redis server's client connection count < max_connections

- worker.py: work()
  - dequeue_job_and_maintain_ttl
    - job, queue = queue_class.dequeue_any(...)  | message is popped from queue. The job
                                                 | corresponding to the message exists in 
                                                 | Redis and its state is set to queued.
  - execute_job(job, queue)
    - fork_work_horse(job, queue)

Event: New work horse is forked to execute the job
State: redis server's client connection count = max_connections

  - worker.py:fork_work_horse(job, queue)
    - main_work_horse(job, queue)
      - perform_job(job, queue)
        - push_connection(self.connection)
        - prepare_job_execution(job)
          - Creates a new connection here     |New connection can't be created here.
          - set job.hearbeat                  |doesnt work
            - Add to StartedJobRegistry       |doesnt work, job is not moved to StartedJobRegistry

For the same reason, executing the failure callback is also not possible.

At this point the job persists in redis with status="queued", but the job is not in queue. Workers
try to pop the next item in the queue and the whole cycle continues until the queue is empty. The
jobs still persist in redis and there is no way to process them. They also miss out from any
monitoring systems that monitor the rq registries as they are absent from all of them.

This problem might happen often when running rq on a redis server with low client connection limit.

Traceback that caused me to investigate the issue:

Traceback (most recent call last):
  File "<base-path>/site-packages/rq/worker.py", line 1026, in perform_job
    self.prepare_job_execution(job)
  File "<base-path>/site-packages/rq/worker.py", line 908, in prepare_job_execution
    pipeline.execute()
  File "<base-path>/site-packages/redis/client.py", line 4013, in execute
    self.shard_hint)
  File "<base-path>/site-packages/redis/connection.py", line 1192, in get_connection
    connection.connect()
  File "<base-path>/site-packages/redis/connection.py", line 567, in connect
    self.on_connect()
  File "<base-path>/site-packages/redis/connection.py", line 643, in on_connect
    auth_response = self.read_response()
  File "<base-path>/site-packages/redis/connection.py", line 739, in read_response
    response = self._parser.read_response()
  File "<base-path>/site-packages/redis/connection.py", line 340, in read_response
    raise error
redis.exceptions.ConnectionError: max number of clients reached
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "<base-path>/site-packages/rq/worker.py", line 1052, in perform_job
    self.execute_failure_callback(job)
  File "<base-path>/site-packages/rq/worker.py", line 1013, in execute_failure_callback
    job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
  File "<base-path>/site-packages/rq/job.py", line 434, in heartbeat
    connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
  File "<base-path>/site-packages/redis/client.py", line 3050, in hset
    return self.execute_command('HSET', name, *items)
  File "<base-path>/site-packages/redis/client.py", line 898, in execute_command
    conn = self.connection or pool.get_connection(command_name, **options)
  File "<base-path>/site-packages/redis/connection.py", line 1192, in get_connection
    connection.connect()
  File "<base-path>/site-packages/redis/connection.py", line 567, in connect
    self.on_connect()
  File "<base-path>/site-packages/redis/connection.py", line 643, in on_connect
    auth_response = self.read_response()
  File "<base-path>/site-packages/redis/connection.py", line 739, in read_response
    response = self._parser.read_response()
  File "<base-path>/site-packages/redis/connection.py", line 340, in read_response
    raise error
redis.exceptions.ConnectionError: max number of clients reached
@selwin
Copy link
Collaborator

selwin commented Sep 7, 2021

Because RQ uses Redis to store worker and job states, it's expected that RQ jobs will go into an inconsistent state if Redis doesn't work properly.

I don't think there's much we can do about this, but I'm open to suggestions. Please reopen the issue if you have any actionable suggestions.

@selwin selwin closed this as completed Sep 7, 2021
@alella
Copy link
Contributor Author

alella commented Sep 20, 2021

Hi @selwin, there are a few ways this could be fixed:

When the job is initialized, it doesn't belong to any registry. The job could be immediately moved into the StartedJobRegistry when poping them from the queue (i.e, with the use of Redis pipelines to pop from the queue and move into StartedJobRegistry atomically). Right now, the job is moved into StartedJobRegistry with a new connection + new process which fails when exceeding the connection limit. This would involve the least amount of code changes.

OR

We dequeue the job when calling job.prepare_for_execution. Instead of doing a pop operation on the queue, we see if the queue is non-empty. When non-empty, initialize a worker that would pop the job from the queue. This may result in multiple workers requesting to pop the same job in Redis.

OR

Introduce a new kind of registry: NewJobRegistry. Right now we have a registry for every Job state except this. Jobs when created are automatically added to NewJobRegistry. When job-keys are popped from the queue they are not removed from NewJobRegistry. job.prepare_for_execution() would change the job's registry from NewJobRegistry to StartedJobRegistry. Using NewJobRegistry gives us a staging area that allows us to recover from Redis failures. A Job is always guaranteed to be found in one of the registries. Stranded jobs could be re-queued through the registry cleanup functions. Cleanup jobs are only executed once every 10 min, this shouldn't block any real-time processing.

If you think any of the solutions could be applied, could you please re-open this issue? I don't have permission to re-open this issue.

@selwin
Copy link
Collaborator

selwin commented Sep 21, 2021

If the issue is about addressing the part when a job gets popped, it could get orphaned because it no longer belongs in any queue/registry (and not Redis errors causes RQ to fail because this is too broad), I'm open to addressing this issue.

So the TLDR is option 1 is best if it can be done. Otherwise option 3.

When the job is initialized, it doesn't belong to any registry. The job could be immediately moved into the StartedJobRegistry when poping them from the queue (i.e, with the use of Redis pipelines to pop from the queue and move into StartedJobRegistry atomically). Right now, the job is moved into StartedJobRegistry with a new connection + new process which fails when exceeding the connection limit. This would involve the least amount of code changes.

This would be the most elegant solution, but I'm not sure this is easily achievable given that RQ uses BLPOP when listening for incoming jobs. We also can't use BLMOVE or BRPOPLPUSH because RQ allows workers to listen to multiple queues at the same time.

We dequeue the job when calling job.prepare_for_execution. Instead of doing a pop operation on the queue, we see if the queue is non-empty. When non-empty, initialize a worker that would pop the job from the queue. This may result in multiple workers requesting to pop the same job in Redis.

Not sure how viable this approach is, this would also change RQ in a lot of places so this would be a huge change.

Introduce a new kind of registry: NewJobRegistry. Right now we have a registry for every Job state except this. Jobs when created are automatically added to NewJobRegistry. When job-keys are popped from the queue they are not removed from NewJobRegistry. job.prepare_for_execution() would change the job's registry from NewJobRegistry to StartedJobRegistry. Using NewJobRegistry gives us a staging area that allows us to recover from Redis failures. A Job is always guaranteed to be found in one of the registries. Stranded jobs could be re-queued through the registry cleanup functions. Cleanup jobs are only executed once every 10 min, this shouldn't block any real-time processing.

Maybe this can be done, I'm willing to accept a PR for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants