-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add iteration support for long-running jobs #6286
Conversation
module Job | ||
module Iterable | ||
# @private | ||
class ActiveRecordEnumerator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a custom implementation of the ActiveRecord enumerator in the gem - https://github.com/fatkodima/sidekiq-iteration/blob/master/lib/sidekiq_iteration/active_record_enumerator.rb
As you can see, gem's implementation is pretty complex, compared to the implementation in this file, and basically mimics the implementation from rails itself. The only difference between the gem's implementation and rails' implementation is to support multiple columns. But that use case is, I believe, is pretty rare, and most of the time people iterate just by the primary key.
Rails implementation is pretty good - it quite recently got a perf improvement (rails/rails#45414) to be fast and with even further improvement to that coming in (rails/rails#51243). I also plan to add support for multiple columns to it. So, we can just use the default from rails.
Wdyt about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally you'll have a skeleton interface and people can develop their own Enumerators which work with a specific datastore. ActiveRecord is the primary datastore for 90% of Sidekiq apps though; it would make sense to support it well. I'm ok with a single column if it makes the code far simpler.
lib/sidekiq/job/iterable.rb
Outdated
# TODO: determine a better way to detect if the server/capsule is stopping. | ||
Sidekiq.server? && Sidekiq::CLI.instance.launcher.stopping? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the long-running job runs, after each iteration it checks if it is time to stop the execution and reschedule itself. One of the reasons to stop and reschedule is when sidekiq is stopping. In the gem, I use :quiet
hook for this - https://github.com/fatkodima/sidekiq-iteration/blob/ffdc784b7400092a25d6528ff883c1fb22384c42/lib/sidekiq_iteration.rb#L55-L58
Is there a better way and which works for standalone sidekiq and capsules too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, this is the best way to do it. I would use the global you set, rather than calling launcher.stopping?
.
lib/sidekiq/job/iterable.rb
Outdated
def extract_previous_runs_metadata(arguments) | ||
options = | ||
if arguments.last.is_a?(Hash) && arguments.last.key?("sidekiq_iteration") | ||
arguments.pop["sidekiq_iteration"] | ||
else | ||
{} | ||
end | ||
|
||
@executions = options["executions"] || 0 | ||
@cursor_position = options["cursor_position"] | ||
@times_interrupted = options["times_interrupted"] || 0 | ||
@total_time = options["total_time"] || 0 | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The iterable job keeps its state inside the job hash metadata itself (as the last item of the args, if to be concrete). So, if the interrupted job is run again, for example, we restore its attributes from the previously stored metadata. Same when the job raises and error - we save the current state into the job and push it to redis.
Another approach is to keep the iteration state as a separate structure in redis (as a hash, e.g.). But then we need to decide on expiration time of that hash, decide if we should update it after each iteration (+1 redis call per iteration) or once in a while, write a server middleware to set job's attributes based on that hash.
Which one would you suggest/prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a really tough question. It's pretty fundamental to the design and I'm not sure I like the idea of mutating the job payload itself, as it's stored in Redis as a String which can't easily be modified without a JSON parse/dump round trip. I'm leaning toward defining a job iteration Hash, with a key like "it-{jid}". Redis has a number of useful H* commands to update/increment Hash values very quickly.
I think the default should be to update the iteration record in Redis only if the iteration is interrupted or 5 seconds have passed. It should not be updated after every record; if we are processing a million records we don't want a million Redis updates.
lib/sidekiq/job/iterable.rb
Outdated
|
||
retry_backoff = self.class.get_sidekiq_options.dig("iteration", "retry_backoff") || | ||
Sidekiq.default_configuration["retry_backoff"] | ||
self.class.perform_in(retry_backoff, *arguments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better way to reschedule itself than keeping arguments in the job and then explicitly calling perform_in
?
For example, there is a helper in the ActiveJob for the similar purpose - https://api.rubyonrails.org/classes/ActiveJob/Exceptions.html#method-i-retry_job. Maybe we need something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, it might be time to consider a API for this purpose. Generally I don't like the idea of a job performing its own "meta-logic" around scheduling because it makes testing difficult. It's a massive violation of the Single Responsibility Principle. But this API wouldn't be used by the job itself, but rather your iteration logic.
@mperham Can you please answer questions, when you have time? 🙏 |
This is a totally new feature to me and not something I've used before so please give me some time to study the code, understand what it is doing and understand how Sidekiq might need to change to make it work well. I apologize for the delay. Test cases would also be helpful to understand how the APIs will be used. |
lib/sidekiq/job/iterable.rb
Outdated
Sidekiq.default_configuration.dig("iteration", "retry_backoff") | ||
|
||
# Preserve original jid. | ||
self.class.set(jid: jid).perform_in(retry_backoff, *@arguments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will rerun client middleware again. Need to decide, if this is a problem or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about treating it like a retry? Raise a Sidekiq::Job::Interrupted
exception or similar and let the retry subsystem reschedule it automatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't like the current approach too.
This is exactly how I implemented it initially, but in the added tests for this PR I now need to rescue this exception manually and do all the rescheduling logic for tests to pass. I will look at if I can somehow better solve this in tests.
@mperham This is ready now for review. |
lib/sidekiq/job/iterable.rb
Outdated
state = Sidekiq.redis { |conn| conn.get("it-#{jid}") } | ||
|
||
if state | ||
state_hash = Sidekiq.load_json(state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we handle errors here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, exceptional cases should generate exceptions. I do think you should use native Redis hashes instead of JSON, so you'd use hgetall
rather than get
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently store the iteration cursor as part of this metadata. When using json, we get the correct data type when loading from json. But with redis hashes we will always get strings as values and the user will need to remember to manually cast the string value to the correct datatype. When people will use some custom enumerators or we start supporting custom columns for active record enumerator in the future, we will get other types of cursors, other than integers, and this will be a problem.
Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make the cursor JSON if it is user-supplied but Sidekiq's internal iteration data can be stored element-by-element in the hash and coerced in Ruby. JSON is meant to be used for interoperability but for our internal data, I'd rather the data be directly addressable in Redis, rather than requiring deserialization.
lib/sidekiq/job/iterable.rb
Outdated
} | ||
|
||
Sidekiq.redis do |conn| | ||
conn.set("it-#{jid}", Sidekiq.dump_json(state), ex: STATE_TTL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See hmset
in Redis. https://redis.io/docs/latest/commands/hmset/
Updated with #6286 (comment) and converted to use redis hashes. |
lib/sidekiq/job/iterable.rb
Outdated
module Sidekiq | ||
module Job | ||
module Iterable | ||
class Interrupted < ::RuntimeError; end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Sidekiq::Job::Interrupted
would read better, wdyt?
end | ||
|
||
# @api private | ||
def initialize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This module should include a stopping?
method which checks @done
on Sidekiq::Processor. Here we use a :lifecycle accessor to store the Processor on the Job.
# In Sidekiq::Processor...
job = SomeJob.new
job.lifecycle = self if job.is_a?(Sidekiq::Job::Iteratable) # Processor
job.perform(*args)
job.stopping? # => job.lifecycle.stopping?
It's a little more complex but I'm trying to avoid any more global APIs like Sidekiq.stopping?
to be more Ractor-friendly.
filepath = @csv.path | ||
return unless filepath | ||
|
||
count = `wc -l < #{filepath}`.strip.to_i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be a security issue under the right circumstances. I think you can use Shellwords.shellescape
for this filepath and I think you can use a simpler wc -l filepath
without the redirection.
lib/sidekiq/job_retry.rb
Outdated
@@ -134,6 +137,15 @@ def local(jobinst, jobstr, queue) | |||
|
|||
private | |||
|
|||
def process_requeue(jobinst, jobstr) | |||
retry_backoff = jobinst.class.get_sidekiq_options.dig("iteration", "retry_backoff") || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remember that sidekiq_options get persisted into the job payload when the job is created; you don't need to dig the value out of get_sidekiq_options
, you'd use job.dig("iteration", "retry_backoff")
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see; you didn't add these to the Job default options. That makes sense if overriding these elements is rare or you want to keep the job payload size as small as possible. If the elements are overridden, the job payload will contain the values so you can do:
job.dig(...) || lifecycle.config.dig(...) # get from specific job or capsule's config
Addressed feedback. |
lib/sidekiq/processor.rb
Outdated
@@ -79,7 +83,10 @@ def run | |||
|
|||
def process_one(&block) | |||
@job = fetch | |||
process(@job) if @job | |||
if @job | |||
@job.lifecycle = self if @job.is_a?(Job::Iterable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, job will always be a UnitOfWork
so this will never execute. You want to set this right before middleware is called, right after JobClass.new
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right 🤦 The unit tests are too unit (with mocking), weren't able to detect this.
Great work. I'll merge this and spend a little more time polishing it soon. |
We'll need a new Iteration wiki page which explains this major new feature. If you have any docs to use as a starting point, please free feel to create it. |
I have some existing docs at https://github.com/fatkodima/sidekiq-iteration/tree/master/guides, but I am bad at writing them. |
No problem, I will take care of it. |
Initial page here: https://github.com/sidekiq/sidekiq/wiki/Iteration |
I'm refactoring the API a bit and wondering if you considered using |
Let me know if I did not fully get the question. |
Why not Job#to_enum, instead of Job#build_enumerator? Do the methods have different meanings? |
If you mean why we use You may provide some code example about how you imagine it to work, if that helps. |
I'm surprised this kind of functionality was integrated into sidekiq. I thought you should always break down long jobs into smaller ones if they require sidekiq-iteration. If it was built for ActiveRecord, that’s even more surprising to me. |
@freemanoid This provides an API for breaking down a long-running job into discrete chunks without having to fill Redis with thousands of small jobs. Some jobs have to remain serial and cannot be broken down into set of concurrent jobs. The ActiveRecord portion is totally optional and does not force Rails upon the user. |
This is a native implementation of the https://github.com/fatkodima/sidekiq-iteration pattern.
Closes fatkodima/sidekiq-iteration#6.
At a high level, the API will look something like: