Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithread Worker #1803

Draft
wants to merge 25 commits into
base: master
Choose a base branch
from
Draft

Multithread Worker #1803

wants to merge 25 commits into from

Conversation

ccrvlh
Copy link
Collaborator

@ccrvlh ccrvlh commented Feb 7, 2023

This is a (very) rough sketch of what a multithreaded worker may look like.

The way this is currently implemented is: the main worker process is responsible for dequeuing jobs and monitoring the pool, if the pool has a free slot, it dequeues a job, and submits it to the executor, if it doesn't, it waits. It uses the TimerDeathPenalty to ensure the timeout, and receives a pool_size argument (Python's default is CPU * 5, we could adopt something similar).

To monitor the thread pool, it uses a simple counter that is protected by a Lock, and decremented (the argument is idle_pool_slot) as soon as a job occupies a thread, and incremented (the free slots) as soon as the job finishes (through a done_callback) - the logic was built on top of an idle_pool, we could easily do the other way round as well and say running_threads and compare it to the max_workers.

I though this implementation was the simplest (no need to spin threads manually and kill one by one), and the one that required the least amount of customization on the main worker code (we like simple 😀). It would be great if I was wrong and there's an even simpler way!

The only reason the work() method was customized, was because of the waiting it does while checking for a free slot in the pool, other than that, is a copy/paste of the original. Same as the _shutdown() method: it's only there to actually shutdown the executor.

I copied the Worker tests and ran all of them using the ThreadpoolWorker. Currently, three of them fail: (1) timeout moves job to failed registry (2) race condition on dependencies and (3) result lifespan. I didn't spend time looking into why.

Concurrency has been a big topic for quite a while now. At the moment both the SimpleWorker and the Worker are very mature, but to be honest, it's going to take quite a while before a ThreadpoolWorker becomes as robust and mature as the existing ones (or any other new worker).

However, this topic has been hanging for close to 10 years now. I would favor a simplistic beta release approach, in which not all features are necessarily implemented (dependencies, custom handlers, etc) and tested. I think this would greatly help us understand the issues with this worker, room for improvements and ways to make it as solid as the other ones - we could use warnings and be explicit about its experimental nature.

This can be extremely useful for I/O bound workloads.
I would love comments, contributions, critics, and anything else that could help us move forward with adding native concurrency support to RQ - leaving as draft so we can discuss on top of it.

Related to #45 #404 #734 #1804
Fixes #1804

@selwin
Copy link
Collaborator

selwin commented Feb 7, 2023

Concurrency has been a big topic for quite a while now. At the moment both the SimpleWorker and the Worker are very mature, but to be honest, it's going to take quite a while before a ThreadpoolWorker becomes as robust and mature as the existing ones (or any other new worker).

However, this topic has been hanging for close to 10 years now. I would favor a simplistic beta release approach, in which not all features are necessarily implemented (dependencies, custom handlers, etc) and tested.

Agree, we'll definitely label this as beta when first introduced.

Another approach that I think is better, and builds on the robustness of existing worker implementations is the concept of having a WorkerPool. Which is a supervisor process that spawns workers and watches over them, spawning new ones whenever necessary (this is how gunicorn works if I'm not mistaken, so I'm comfortable with this model). I started implementing this a few months ago here: https://github.com/rq/rq/blob/watcher/rq/worker_pool.py

@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Feb 7, 2023

Another approach that I think is better, and builds on the robustness of existing worker implementations is the concept of having a WorkerPool. Which is a supervisor process that spawns workers and watches over them, spawning new ones whenever necessary

So this would be a bit more similar to what Celery does in which you have a single worker, and you just pass the --concurrency num and the --pool type? Would that mean rewriting the worker to be totally agnostic in terms of the concurrency model?

EDIT: Reading again and looking at the code, it seems this would work with multiprocessing, but not sure about threads, async and gevent, since the pool managed the full worker, right? So this looks like a prefork model in which the supervisor runs workers, instead of a worker that spawns processes.

@selwin
Copy link
Collaborator

selwin commented Feb 8, 2023

So this would be a bit more similar to what Celery does in which you have a single worker, and you just pass the --concurrency num and the --pool type? Would that mean rewriting the worker to be totally agnostic in terms of the concurrency model?

It's actually the inverse, so we don't need to rewrite any worker code (or very minimal, hopefully). The WorkerPool simply instantiate workers and only send management signals (e.g shutdown) to the workers.

Let's say we start the worker with this CLI command rq workerpool -n 5. Inside the worker pool we'd do something like this:

for range(n):
    worker = Worker()
    worker.start()  # This would run in a separate process like the scheduler https://github.com/rq/rq/blob/master/rq/worker.py#L668

The good part about this concept is that we can keep the worker simple because it only has to deal with dequeueing and executing it's own jobs and keep track of its own results etc, which it has been doing for the past 10 years.

The WorkerPool is just a higher level of abstractions that makes it easier for us to start more workers.

Edit: this implementation also leaves room for someone to implement their own multi-threaded worker. If the Worker is to be refactored to make this implementation easier, I'm also open to this idea, but the refactoring has to be done bit by bit, I generally stay away from wholesale changes because that may introduce bugs or instability.

@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Feb 9, 2023

It's actually the inverse, so we don't need to rewrite any worker code (or very minimal, hopefully). The WorkerPool simply instantiate workers and only send management signals (e.g shutdown) to the workers.

Got it, makes sense after reading the code more carefully.

this implementation also leaves room for someone to implement their own multi-threaded worker. If the Worker is to be refactored to make this implementation easier

Doesn’t WorkerPool implementation, being a layer of abstraction on top of the worker reduces the flexibility for custom classes, since it hides details that one would want to customize to make other concurrency models work? This is fine, and is sure helpful to run a simple pool of workers, but I don’t see this as being a reason. I guess this implementation would have to have very strong opinions on how to run multiple workers within it, so it seems that the path to a multithreaded worker (or any other concurrency model) doesn’t use a WorkerPool, but rather a new Worker

One thing to keep in mind is the datastore, that’s something to pay attention to, specially in documentation, since this can get confusing:

  • For new users, the "worker" might change slightly - now "worker" and "workerpool" are two concepts that overlap depending on how you look at it, the user can now start a workerpool (a level of abstraction on top of the worker), but without much control over the worker (child) itself, since the top level process that one started through the cli (what we have been calling worker) is just a pool controlling workers.
  • The datastore would be showing multiple workers, but as these workers are hidden (they are not being controlled directly by the user) we might end up in a crossroad, where you are only controlling (spinning up, cancelling, shutting down, following the tasks) the parent. It would definitely be possible to build APIs that would provide granular control of the child processes, but again this would mean more new code with a brand new approach.

That all being said, I think this effort is parallel (although related) to the concurrency implementation of other methods (namely threads, gevent, asyncio , and even multiprocessing)

  • As this is running the full worker, more lightweight concurrency patterns probably wouldn't be well suited: it seems hard to justify running a full blown worker in it's own thread for example, while it could just be throwing jobs to a Threadpool with a lot less issues regarding to safety, race conditions and etc.
  • This is even more relevant for asyncio and gevent - one wouldn't have 1000 workers (one for each greenlet for example).

I guess whether the wrapper over the existing worker (workerpool) is preferable over new workers with different concurrency models is debatable. I would tend to prefer the later, but I understand if the roadmap pends the other way.


the refactoring has to be done bit by bit, I generally stay away from wholesale changes because that may introduce bugs or instability.

This is a fairly different topic, but a very relevant one nonetheless. I understand your point here, although I might have a slightly different view. I guess my main concern is that the bit by bit favors backwards compatibility over anything else, which greatly reduces flexibility. As this is a +10y codebase, it’s very hard to make it mypy friendly for example, and there are choices that I think are worth reviewing - I discussed a few of them on the V2 discussion, but they are mainly related to organization, methods privacy/protection, and architectural decisions. The second relevant aspect is related to speed - the bit by bit approach tends to be a lot slower, changes are a lot more delicate, and slower may mean “not implemented”.

A polished and stable solution necessarily needs time, being exposed to battle and being tested and used in a lot of different scenarios by different people - having “simple” versions out there (beta) is super relevant to get this experience and expertise (think of ThreadPoolWorker that doesn’t support job dependencies just yet, or a GeventWorker that doesn’t have custom exception handling - just examples).

I tend to favor a path where 1.x is maintained while v2 is being developed in beta. Two good examples of this approach are psycopg3 and sqlalchemy2 that just went through (are going through) a significant change, and have been running on alpha/beta mode (gaining experience) for quite a while. This would allow for a lot more flexibility than continuously incrementing 1.x and keeping things backwards compatible, avoiding legacy code for compatibility reasons.

@selwin
Copy link
Collaborator

selwin commented Feb 10, 2023

Doesn’t WorkerPool implementation, being a layer of abstraction on top of the worker reduces the flexibility for custom classes, since it hides details that one would want to customize to make other concurrency models work?

It shouldn't, because the worker pool can pass all configuration options to the underlying worker class.

That all being said, I think this effort is parallel (although related) to the concurrency implementation of other methods (namely threads, gevent, asyncio , and even multiprocessing)

I agree, I'm open to PRs that rearranges or reworks worker methods to make developing other kinds of concurrency implementation easier. This is something that I want to support. A few people maintained gevent powered workers.

I discussed a few of them on the V2 discussion, but they are mainly related to organization, methods privacy/protection, and architectural decisions.

I'm open to re-organizing the codebase to make it more structured, but it has to be done in PR chunks that I can actually review.

I also understand that there's a pattern of using the _function_name() to indicate private functions/methods, but honestly I don't care too deeply about this and my approach is to just give backwards compatibility guarantees to documented methods.

I guess my main concern is that the bit by bit favors backwards compatibility over anything else, which greatly reduces flexibility. As this is a +10y codebase, it’s very hard to make it mypy friendly for example, and there are choices that I think are worth reviewing - I discussed a few of them on the V2 discussion, but they are mainly related to organization, methods privacy/protection, and architectural decisions.

I totally agree and realize that I tend to make decisions that prioritizes stability and slow evolution of things.

The second relevant aspect is related to speed - the bit by bit approach tends to be a lot slower, changes are a lot more delicate, and slower may mean “not implemented”.

Yes, unfortunately this is an unfortunate side effect and I'm aware of this. But I'm in favor of rearranging methods/functions such that improvements can still be done by subclassing RQ classes.

I tend to favor a path where 1.x is maintained while v2 is being developed in beta. Two good examples of this approach are psycopg3 and sqlalchemy2 that just went through (are going through) a significant change, and have been running on alpha/beta mode (gaining experience) for quite a while. This would allow for a lot more flexibility than continuously incrementing 1.x and keeping things backwards compatible, avoiding legacy code for compatibility reasons.

This is a good idea but I currently don't have the time and bandwidth to support two different code bases :(.

@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Feb 10, 2023

@selwin truly appreciate your frankness and openness (and patience :)) on this!
Sure understand your points, I think there's a lot of things that we could do without breaking stuff, so let's stick to that.
I'll keep working on some of the workers, try to fix a few bugs and will take a better look at the pool.

Implementation wise, i was thinking about this over the last couple of days, and I wonder what your thoughts are.
I guess there two main ways to go about concurrency: one is more transparent, and the other is more explicit, I guess there are pros/cons to it.

  • Make 100% explicit, more verbose and less "automatic", which means always using the --worker-class argument and some general parameters, so rq worker -w ' rq.worker.NativeConcurrentWorker' --pool-size 10. The good thing about this is that keeps the API mostly the same, the downside is that is a more "manual" way of providing arguments
  • Make it transparent (like Celery), and then we map the implementation to the class , so rq worker --pool gevent pool-size 10. The WorkerPool implementation is good example of the trade-offs: we could have a new command/concept called WorkerPool, or we could have something like rq worker --pool prefork --pool-size 5

I guess the first option is easier to develop, since not many other argument/parms are needed and the architecture remains mostly the same. The second option is more natural/easier for the enduser.

What are your thoughts?

Also, what are the minimum set of features would you consider for a new worker? By limiting the feature set of a new worker (eg: the threadworker doesn't work with scheduler/callbacks/dependencies) we could greatly reduce the complexity and increase speed of new implementations, which in turn should reduce bugs and unexpected behavior. Another option would be have it implemented, but "unsupported". Apart from the multithreaded worker I have been using a gevent worker for a while, and a couple of months ago I had started an AsyncWorker (I left Flask for FastAPI). What are your thoughts on the bare minimum we want to add theses?

This test is generic and doesn't really test the Threadworker
It doesn't make much sense to this being tested in every worker.
@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Feb 11, 2023

Some more tests with dependencies.
Looking into this a bit further, it seems the challenge is: when running in burst mode, the main thread breaks the loop before the parent job finishes executing, so the dependent job never gets enqueued.

One way I can think of to fix this: before actually breaking the work loop on the main thread, we could have an array of tuples with tuple[job, futures], and then check if any job that is currently in a future object (eg being executed in a thread) has dependents. If they do, wait until they finish to enqueue the dependents. This would block the main thread, avoid it to break the work loop, and allow the parent job to enqueue dependents.

This shouldn't happen when not in burst mode, since there's always going to be next loop cycle, so the parent will always have a chance to enqueue the dependent. TBH It sounds too complex, but it's the only idea I have at the moment to try to fix this. Any other ideas are more then welcome.

@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Feb 12, 2023

before actually breaking the work loop on the main thread, we could have an array of tuples with tuple[job, futures], and then check if any job that is currently in a future object (eg being executed in a thread) has dependents

This seems to work - some more polishing needed. Tests are passing locally, but it seems something is wrong on the CI, will need to take a better look at this. I'll mark this as ready for review just so we can work on top of a feedback loop.

@selwin something to keep in mind: the current architecture for tests was made for a single worker, so to tests other workers basically all test suite has to be replicated, lots of room for trouble here. We might want to think of a more dynamic way of testing different worker classes in the future.

@ccrvlh ccrvlh marked this pull request as ready for review February 12, 2023 00:11
@selwin
Copy link
Collaborator

selwin commented Feb 12, 2023

I haven't had the time to look at this PR closely, for a PR this ambitious, I think we'll need time to iterate.

If you think there's a few chunks of logic that should be broken down into separate functions/methods, let me know. We can make a separate PR to get that function/method into master first so it's easier to maintain this PR. I'll try to take a look at this in a few days.

@codecov
Copy link

codecov bot commented Feb 16, 2023

Codecov Report

Patch coverage: 97.18% and project coverage change: +0.50 🎉

Comparison is base (6546497) 95.02% compared to head (5e25ccb) 95.52%.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1803      +/-   ##
==========================================
+ Coverage   95.02%   95.52%   +0.50%     
==========================================
  Files          51       51              
  Lines        7997     8915     +918     
==========================================
+ Hits         7599     8516     +917     
- Misses        398      399       +1     
Impacted Files Coverage Δ
rq/worker.py 88.90% <83.87%> (+0.39%) ⬆️
tests/test_worker.py 98.70% <99.02%> (+0.39%) ⬆️
rq/defaults.py 100.00% <100.00%> (ø)
tests/test_callbacks.py 100.00% <100.00%> (ø)
tests/test_dependencies.py 100.00% <100.00%> (ø)
tests/test_worker_registration.py 100.00% <100.00%> (ø)
tests/test_connection.py 95.45% <0.00%> (-2.28%) ⬇️
tests/fixtures.py 77.07% <0.00%> (+8.91%) ⬆️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

rq/worker.py Show resolved Hide resolved
@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Mar 7, 2023

@selwin want to take another look at this to see what other changes you would like?

  • Created bootstrap
  • Moved reorder queues
  • created teardown

@selwin
Copy link
Collaborator

selwin commented Mar 7, 2023

Yes, I'd like to take a more in depth look when I have more time. Thanks for getting this PR this far, expect more changes as we get this into shape ;)

This will be a very big step forward for RQ.

@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Mar 7, 2023

Sure no worries, happy to contribute. Let me know when you want to get back to this. Will have more workers coming soon :)

EDIT: BTW, I'll leave this a draft while we work on it.

@ccrvlh ccrvlh marked this pull request as draft March 7, 2023 02:50
with self._lock:
self._idle_threads += operation

def __wait_for_slot(self, wait_interval: float = 0.25):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some kind of push based mechanism instead of polling for free workers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I could find, the ThreadPoolExecutor is a pretty thin wrapper - the only push based mechanics I saw was related to a single future with the done_callback.

super(ThreadPoolWorker, self).__init__(*args, **kwargs)

@property
def is_pool_full(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better if we change this to number_of_available_horses() instead of a simple boolean.

time.sleep(wait_interval)
continue

def __check_pending_dependents(self) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to restructure work() in such a way that this is not needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I went this path here is that in this case, things don't happen sequentially, so we need a global way of handling dependencies (thread Y can't work unless thread X has finished) - this is not a challenge on the Worker and on the SimpleWorker.

@selwin
Copy link
Collaborator

selwin commented Mar 9, 2023

I need to think further to further group some sections inside work() so that you don't need to rewrite so much of it.

On a more meta perspective, since we already have work horse terminology, I think we should stick to that terminology and call the threads horses.


return bool(completed_jobs)

def execute_job(self, job, queue):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the next step would be to make execute_job() on the main Worker class follow this pattern. After executing the job on a separate process, it should return to the main loop and wait for job to finish before dequeueing the next job.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to find some time to work on this sometime in the next few days, I think this will be quite complicated.

self.executor = ThreadPoolExecutor(max_workers=self.threadpool_size, thread_name_prefix="rq_workers_")
self._idle_threads = self.threadpool_size
self._lock = threading.Lock()
self._current_jobs: List[Tuple['Job', 'Future']] = [] # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also bring _current_jobs to the main worker class.

@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Mar 9, 2023

A few comments that are relevant for multiple different review comments:

There are three main reasons why I don’t think going with horse as a generic term for multiple kind of workers might not be the best course of action.

The first, is that this is not obvious: is a thread a horse? How about a worker that runs each job on a separate process (using multiprocess for example)? And a Async worker running an event loop? And a gevent worker? This is historically used for forked processes (specifically using fork()) but as we introduce other concepts we would be trying to fit those concepts into the existing model, whereas the existing model doesn’t actually support new models.

Harder to maintain: not every worker will have the same concepts/structure as the current. The best example is the SimpleWorker which is arguably a lot simpler then the ForkWorker, and with the current implementation, what happens is we have 4-6 methods that are not used by the SimpleWorker, so we have to go around the code to understand whether a change in X is affecting worker Y (this just happened on that Windows thing, where something on the horse broke the SimpleWorker). Ideally we should be able to safely change a worker without affecting the others. (BTW: this is also one of the reasons why I think in the future it would make more sense to have the SimpleWorker the default Worker and the ForkWorker built on top of it: if the Worker class is shared, ideally it should be organized in a way that it only carries the minimum common denominator across all workers’ implementations).

By fitting things from other workers into the main worker, we would most likely cause bloat and less predictability on the current worker implementation - which makes the worker harder to maintain (less freedom to new worker implementations, by “demanding” them to follow the main worker class pattern/signature. Another place where this appears is the _current_jobs - the ThreadWorker will have to overwrite this anyways - it’s fine to have it on the ForkWorker but the signature is different, it means something else depending on the Worker.

Premature optimization: this is the very first implementation of a logically different worker. Fitting everything this new worker uses to the old one, can be seen as optimization at a time where we don’t actually know that much about the requirements for new workers (in my mind a AsyncWorker is crucial for those using FastAPI/Starlite for example, and GeventWorker is very convenient and lightweight for simple I/O bound tasks). Optimizing is difficult at the stage - by the time we find the GeventWorker needs a new concept to wait for dependencies for example, we would be changing the main worker code again - why don’t we just give more freedom for the new workers implementation, and by the time we implement 3-4 workers we can now look back and see that X and Y are attributes that follow a common pattern and we can simplify things by having them on the main Worker class?

Another good example is the dependency logic: for a multithread worker, it needs to check for dependencies outside a specific thread - to avoid a thread working on a job when it actually should be waiting for its dependency to finish (on a different thread). This is not a challenge on the SimpleWorker, Worker, so we would be working hard to change this logic on the main worker, while it doesn’t actually need that, and the logic on the new worker is simple enough and isolated enough in a way that’s simpler to maintain.

At last the example of changing the execute_job logic. This is actually needed for the ThreadWorker because jobs are running within a thread, but the current implementation to the Worker doesn’t actually need a new logic, another case where by trying to fit everything into a super-Worker class we would increase complexity - we would still need to write the execute_job anyways for a new worker.

This worker has ~200 LOC while the original worker has 1.4k LOC - we are actually not rewriting that much at all. Not sure if it's worth to walk a long complicated, tricky and risky path to try to reduce those 200 to say 150 - at least not in the first iteration of a new implementation.

@selwin
Copy link
Collaborator

selwin commented Mar 10, 2023

The first, is that this is not obvious: is a thread a horse? How about a worker that runs each job on a separate process (using multiprocess for example)? And a Async worker running an event loop? And a gevent worker? This is historically used for forked processes (specifically using fork()) but as we introduce other concepts we would be trying to fit those concepts into the existing model, whereas the existing model doesn’t actually support new models.

We need to have a term to describe the capability to have multiple processes/threads/worker inside a Worker, right? I'm open to suggestions on what we want to call this, maybe we can take inspirations from other similar libraries? So far we have been calling this feature multithreaded worker, but I think a thread (it has a very specific meaning in the Python language itself) is even more specific than a horse.

Harder to maintain: not every worker will have the same concepts/structure as the current. The best example is the SimpleWorker which is arguably a lot simpler then the ForkWorker, and with the current implementation, what happens is we have 4-6 methods that are not used by the SimpleWorker, so we have to go around the code to understand whether a change in X is affecting worker Y (this just happened on that Windows thing, where something on the horse broke the SimpleWorker). Ideally we should be able to safely change a worker without affecting the others.

I understand the concern, but in my experience maintaining RQ (and other projects), breaking things into several methods blocks actually helps with the stability of other implementation that subclasses the main worker method. There have been several attempts at making multi-threaded (for lack of a better term) workers which overrides the entire work() method and they never age well. They are never get updated to support the latest features like schedulers, pubsub or any of the new features/bugfixes that get released.

If we were to have multiple worker models in RQ's core library, we'll need to be able to maintain them :).

BTW: this is also one of the reasons why I think in the future it would make more sense to have the SimpleWorker the default Worker and the ForkWorker built on top of it: if the Worker class is shared, ideally it should be organized in a way that it only carries the minimum common denominator across all workers’ implementations

I agree with you that the SimpleWorker model should be the default. If the Simple term bugs you, we can rename this to something else and update the docs to make this the default (this doesn't need to wait for v2.0).

Premature optimization: this is the very first implementation of a logically different worker. Fitting everything this new worker uses to the old one, can be seen as optimization at a time where we don’t actually know that much about the requirements for new workers (in my mind a AsyncWorker is crucial for those using FastAPI/Starlite for example, and GeventWorker is very convenient and lightweight for simple I/O bound tasks). Optimizing is difficult at the stage - by the time we find the GeventWorker needs a new concept to wait for dependencies for example, we would be changing the main worker code again - why don’t we just give more freedom for the new workers implementation, and by the time we implement 3-4 workers we can now look back and see that X and Y are attributes that follow a common pattern and we can simplify things by having them on the main Worker class?

I share your concern, which is why I'm taking this really slowly, one step at a time. It would be good if we can have multiple worker models ready, but I think this will be a big effort so I'm doing what I can based on the implementation that we have.

At the very least, monitoring tools like django-rq (and even RQ's built in CLI) will need several predefined APIs to:

  • See how many free/working horses is in a worker
  • Show jobs currently worked on by a worker

Another good example is the dependency logic: for a multithread worker, it needs to check for dependencies outside a specific thread - to avoid a thread working on a job when it actually should be waiting for its dependency to finish (on a different thread).

This needs to change. A worker in burst model should only listen for jobs and work until there's nothing left on the queue, it shouldn't need other dependency logic.

@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Mar 10, 2023

We need to have a term to describe the capability to have multiple processes/threads/worker inside a Worker, right?

To be honest I don't really see any need for that right now. I do understand this may be relevant in the future (mostly for monitoring purposes), but for now I was just thinking of calling the processing unit of that specific worker as what it actually is, a thread. This is important anyways, so we can have any single job-processing unit as a horse or any other name then.

I understand the concern, but in my experience maintaining RQ (and other projects), breaking things into several methods blocks actually helps with the stability of other implementation that subclasses the main worker method.

Not sure I follow, what I mean was that if horse-related methods are not used by everyone, would make more sense to have the "base" worker (the one everyone inherits from) as thin as possible. In practice that would mean switching: the SimpleWorker would become Worker, and the current Worker would become ForkWorker. That was the minimum-denominator logic I mentioned. This would help Windows folks as well, since the default would then just work on Windows.

There have been several attempts at making multi-threaded (for lack of a better term) workers which overrides the entire work() method and they never age well.

To be honest I never quite understood why, I personally used a slightly updated version of the GeventWorker in production quite a lot, and the logic is actually very similar to this ThreadPoolWorker. Hopefully this will be a PR soon. The main challenge was the import / patch logic. The most obvious way to fix would be to just use an envvar, but that's a pretty inconvenient solution, so I needed to dig deeper into other options. I can push a draft so we can look at it as well.

They are never get updated to support the latest features like schedulers, pubsub or any of the new features/bugfixes that get released.

That's true, however, I also think it's fair that we accept that not necessarily all features will be implemented to all workers every time, specially during the first iterations - Celery is good benchmark here, on their docs they explicitly mention which pool is supported for each specific feature.

At the very least, monitoring tools like django-rq (and even RQ's built in CLI) will need several predefined APIs

  • See how many free/working horses is in a worker
  • Show jobs currently worked on by a worker

That's a perfect approach, but arguably a really high standard for the very first iteration of the first Worker that implements concurrency to have to implement generic monitoring capabilities. But let's pursuit it, it's definitely a great ambition to have.

This needs to change. A worker in burst model should only listen for jobs and work until there's nothing left on the queue, it shouldn't need other dependency logic.

You mean the __check_pending_dependents? Sure let's look at this logic, that was a solution I found since multiple tests were failing - since the work was not sequential anymore, one thread would start before it's dependencies were actually finished.

Let me get back to your specific review comments so we can make some progress and iterate.

@selwin
Copy link
Collaborator

selwin commented Mar 11, 2023

Not sure I follow, what I mean was that if horse-related methods are not used by everyone, would make more sense to have the "base" worker (the one everyone inherits from) as thin as possible. In practice that would mean switching: the SimpleWorker would become Worker, and the current Worker would become ForkWorker. That was the minimum-denominator logic I mentioned. This would help Windows folks as well, since the default would then just work on Windows.

This is a good idea, we can refactor things out to a BaseWorker class and make Worker and SimpleWorker subclass that.

That's a perfect approach, but arguably a really high standard for the very first iteration of the first Worker that implements concurrency to have to implement generic monitoring capabilities. But let's pursuit it, it's definitely a great ambition to have.

Well, nothing prevents other people from writing their own Worker implementation outside of RQ, it has already been done before. But when we actually bring a different type of worker inside the core library itself, it would make sense that it conforms to some sort of common protocol, right?

That's what I'm trying to figure out now, bit by bit :).

@selwin
Copy link
Collaborator

selwin commented Mar 11, 2023

You mean the __check_pending_dependents? Sure let's look at this logic, that was a solution I found since multiple tests were failing - since the work was not sequential anymore, one thread would start before it's dependencies were actually finished.

Yes, we should keep the logic simple:

  1. worker waits on the queue (with X seconds of timeout) when there are processing units (for lack of a better term) available
  2. After successful execution, processing unit enqueues other jobs (which will be picked up and distributed to other processing units)
  3. If there's no other jobs within X seconds and if burst=True, the worker exits.

@selwin
Copy link
Collaborator

selwin commented Mar 11, 2023

I took a more thorough look on the implementation so far. My feedbacks:

  1. worker._current_jobs is implemented nicely, we should bring this into RQ and should be made into public worker APIs.
  2. I also like the job_done() callback approach, this should also be brought into other workers.
  3. I think we can remove _idle_threads though, the number of idle threads can be computed by substracting the number of jobs from pool size

@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Mar 11, 2023

This is a good idea, we can refactor things out to a BaseWorker class and make Worker and SimpleWorker subclass that.

Great, we can start looking at this, in a new PR, leaving all common methods to the Base and having specific implementations. I think the biggest decision is what is the Worker class, since it's the default (I would still favor having the Simple as the default, but I think it would be possible on V2, since its a big behavioural change, even though not breaking.

it would make sense that it conforms to some sort of common protocol

Most definitely yes, my point here is that we could implement the protocol step by step, mainly because we are going to learn other things when implementing new workers. So my idea was to do that incrementally, instead of trying to figure the full protocol right now.

  1. worker waits on the queue (with X seconds of timeout) when there are processing units (for lack of a better term) available
  2. After successful execution, processing unit enqueues other jobs (which will be picked up and distributed to other processing units)
  3. If there's no other jobs within X seconds and if burst=True, the worker exits.

I don't remember exactly why I chose that specific implementation. From what I can remember, with a concurrent worker, there was a scenario where two threads were working in parallel, and they actually shouldn't (because of the dependency). Let me take a look at this again, need to remember the dependency/queueing/deferring logic.

worker._current_jobs is implemented nicely, we should bring this into RQ and should be made into public worker APIs.

Sure, I'll work on that on a separate PR then.

I also like the job_done() callback approach, this should also be brought into other workers.

Ideas on this implementation? The way this works for the thread is native, since the callback is attached to the Future object, so for other workers we would have to do that "manually", maybe on a handle_finished_job or something?

I think we can remove _idle_threads though, the number of idle threads can be computed by substracting the number of jobs from pool size

That's fine also, I don't see the downside of having the info pre calculated (thinking of monitoring), but not really that relevant, so we can remove that as well.

@selwin
Copy link
Collaborator

selwin commented Mar 12, 2023

Most definitely yes, my point here is that we could implement the protocol step by step, mainly because we are going to learn other things when implementing new workers. So my idea was to do that incrementally, instead of trying to figure the full protocol right now.

Yes, I think we're aligned on this one. We're already doing this in a step by step fashion, as this PR continues to evolve :). We have three different worker models:

  1. InProcess
  2. Forking
  3. Threaded

We'll try to sketch out the protocols based on these three models and evolve them as needed. The _current_jobs is a new one that we need because ThreadedWorker can work on multiple jobs in parallel.

Ideas on this implementation? The way this works for the thread is native, since the callback is attached to the Future object, so for other workers we would have to do that "manually", maybe on a handle_finished_job or something?

Yes, I think handle_job_completion() is the next logical thing to implement. It may also absorb some of the common operations performed by handle_job_failure() and handle_job_success().

Maybe it's signature can be handle_job_completion(job, queue, started_job_registry, future)? I'm still unsure about the future bit, but the idea is that it's an object that contains information about the processing unit. Maybe even a Horse class that has these attributes:

  • future: for future based concurrency models
  • process_id: for fork() and spawn() based models - these are also valid concurrency options

@selwin
Copy link
Collaborator

selwin commented Mar 12, 2023

As an aside, if we can land this PR, ProcessPoolExecutor has very similar APIs so we'll actually get Process based concurrency in addition to Thread based one 😀.

@ccrvlh
Copy link
Collaborator Author

ccrvlh commented Mar 12, 2023

As an aside, if we can land this PR, ProcessPoolExecutor has very similar APIs so we'll actually get Process based concurrency in addition to Thread based one 😀.

Exactly, that was one of the reasons I went with the Executor instead of the threading module directly. The interface is the same. I had written a MultiprocessWorker, but it wasn't working because of the if __name__ == '__main__' thing and I didn't spend time figuring that out.

And actually the logic is very similar for a GeventWorker as well, or even if want to do fork-based concurrency. So it is a good framework for thinking of concurrency indeed.

@selwin
Copy link
Collaborator

selwin commented Mar 19, 2023

Just so that we don't work on the same things, I'm working on bringing worker._current_jobs to Worker class.

@selwin
Copy link
Collaborator

selwin commented Mar 19, 2023

Just so that we don't work on the same things, I'm working on bringing worker._current_jobs to Worker class.

Unless you're already working on this, let me know. I'll work on something else.

@danielbprice
Copy link

It would be really nice to have this. We have been trying to use rq with Tensorflow, and Tensorflow's internal thread pools really seem to get screwed up by the forking that RQ wants to do, and subsequent calls into Tensorflow hang/deadlock. There are some workarounds you can do to postpone TF's threadpool initialization, but the net result is that we're having to load our models post-fork, which is really a performance killer. I'm willing to endure "it's Beta" if I could have a multithreaded worker.

There are relevant limitations to the `ThreadPoolWorker`, here's a non extensive list:

- Dependencies might not behave like expected
- Job registries may not be reliable (specially with jobs that were timed out)
Copy link

@gyandeeps gyandeeps Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Biggest thing i like about rq is its job reliability as it maintains different job registries correctly. Is it possible to have the same level of reliability with thread pool worker?
we process kafka queue and hand create rq task to process those events. Reliability is super important as we dont want to loose task without knowing about it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, definitely something to take into account. I've been working on a different branch since there were other relevant changes to allow a more robust design, will work on this as well. I do imagine that there will be limitations though, specially on the first couple of versions. Hopefully nothing that will hurt reliability... we'll see.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate it @ccrvlh . We are in the works to switch from celery to rq for the reliability factor.
btw, i pulled this branch down and tried the threadpool with task timing out but was able to get to consistently get a retry on it. Any thing specific you can point out where you have seen this issue? (just curious)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a while since I worked on this... I'm mostly working on the v2 branch now (which has this worker setup), but still a lot to do. I'll try to make some tests with it this weekend.

Copy link

@gyandeeps gyandeeps Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you can share the link of the branch i would appreciate it. Just want to see and learn. I tried searching on this repo but was not able to find it.

UPDATE: i got it under your fork. thanks

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! It's on my fork only, you might be able to find it from this repo (or from my profile), I'm on mobile, but I'll post the link here when I get to the PC.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccrvlh just curious on the timeline for this feature or in general V2 since we want to rollout rq in prod but multithread is critical to use. thanks

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can we try to release this with 2.0 under experimental flag. Based on my tests things looked good.
cc @selwin

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyandeeps like I mentioned, I'll only merge this in slowly, one bit at a time to make sure we don't mess anything up.

RQ version 2.0 also changed how jobs are enqueued so this branch is probably stale.

@HarryAnkers
Copy link

Is there any update on this feature?
Not sure if an alternative is being worked on elsewhere?
Has it stalled?

This would be a really nice feature

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create a new ThreadPoolWorker
5 participants