Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add iteration support for long-running jobs #6286

Merged
merged 3 commits into from
May 22, 2024
Merged

Conversation

fatkodima
Copy link
Member

@fatkodima fatkodima commented May 12, 2024

This is a native implementation of the https://github.com/fatkodima/sidekiq-iteration pattern.
Closes fatkodima/sidekiq-iteration#6.

At a high level, the API will look something like:

class NotifyUsersJob
  include Sidekiq::Job
  include Sidekiq::Job::Iterable

  def build_enumerator(cursor:)
    active_record_records_enumerator(User.all, cursor: cursor)
  end

  def each_iteration(user)
    user.notify_about_something
  end
end

module Job
module Iterable
# @private
class ActiveRecordEnumerator
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a custom implementation of the ActiveRecord enumerator in the gem - https://github.com/fatkodima/sidekiq-iteration/blob/master/lib/sidekiq_iteration/active_record_enumerator.rb

As you can see, gem's implementation is pretty complex, compared to the implementation in this file, and basically mimics the implementation from rails itself. The only difference between the gem's implementation and rails' implementation is to support multiple columns. But that use case is, I believe, is pretty rare, and most of the time people iterate just by the primary key.

Rails implementation is pretty good - it quite recently got a perf improvement (rails/rails#45414) to be fast and with even further improvement to that coming in (rails/rails#51243). I also plan to add support for multiple columns to it. So, we can just use the default from rails.

Wdyt about this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally you'll have a skeleton interface and people can develop their own Enumerators which work with a specific datastore. ActiveRecord is the primary datastore for 90% of Sidekiq apps though; it would make sense to support it well. I'm ok with a single column if it makes the code far simpler.

Comment on lines 228 to 229
# TODO: determine a better way to detect if the server/capsule is stopping.
Sidekiq.server? && Sidekiq::CLI.instance.launcher.stopping?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the long-running job runs, after each iteration it checks if it is time to stop the execution and reschedule itself. One of the reasons to stop and reschedule is when sidekiq is stopping. In the gem, I use :quiet hook for this - https://github.com/fatkodima/sidekiq-iteration/blob/ffdc784b7400092a25d6528ff883c1fb22384c42/lib/sidekiq_iteration.rb#L55-L58

Is there a better way and which works for standalone sidekiq and capsules too?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, this is the best way to do it. I would use the global you set, rather than calling launcher.stopping?.

Comment on lines 103 to 124
def extract_previous_runs_metadata(arguments)
options =
if arguments.last.is_a?(Hash) && arguments.last.key?("sidekiq_iteration")
arguments.pop["sidekiq_iteration"]
else
{}
end

@executions = options["executions"] || 0
@cursor_position = options["cursor_position"]
@times_interrupted = options["times_interrupted"] || 0
@total_time = options["total_time"] || 0
end
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iterable job keeps its state inside the job hash metadata itself (as the last item of the args, if to be concrete). So, if the interrupted job is run again, for example, we restore its attributes from the previously stored metadata. Same when the job raises and error - we save the current state into the job and push it to redis.

Another approach is to keep the iteration state as a separate structure in redis (as a hash, e.g.). But then we need to decide on expiration time of that hash, decide if we should update it after each iteration (+1 redis call per iteration) or once in a while, write a server middleware to set job's attributes based on that hash.

Which one would you suggest/prefer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really tough question. It's pretty fundamental to the design and I'm not sure I like the idea of mutating the job payload itself, as it's stored in Redis as a String which can't easily be modified without a JSON parse/dump round trip. I'm leaning toward defining a job iteration Hash, with a key like "it-{jid}". Redis has a number of useful H* commands to update/increment Hash values very quickly.

I think the default should be to update the iteration record in Redis only if the iteration is interrupted or 5 seconds have passed. It should not be updated after every record; if we are processing a million records we don't want a million Redis updates.


retry_backoff = self.class.get_sidekiq_options.dig("iteration", "retry_backoff") ||
Sidekiq.default_configuration["retry_backoff"]
self.class.perform_in(retry_backoff, *arguments)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to reschedule itself than keeping arguments in the job and then explicitly calling perform_in?

For example, there is a helper in the ActiveJob for the similar purpose - https://api.rubyonrails.org/classes/ActiveJob/Exceptions.html#method-i-retry_job. Maybe we need something similar?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, it might be time to consider a API for this purpose. Generally I don't like the idea of a job performing its own "meta-logic" around scheduling because it makes testing difficult. It's a massive violation of the Single Responsibility Principle. But this API wouldn't be used by the job itself, but rather your iteration logic.

@fatkodima
Copy link
Member Author

@mperham Can you please answer questions, when you have time? 🙏

@mperham
Copy link
Collaborator

mperham commented May 13, 2024

This is a totally new feature to me and not something I've used before so please give me some time to study the code, understand what it is doing and understand how Sidekiq might need to change to make it work well. I apologize for the delay. Test cases would also be helpful to understand how the APIs will be used.

@fatkodima fatkodima marked this pull request as ready for review May 19, 2024 13:09
Sidekiq.default_configuration.dig("iteration", "retry_backoff")

# Preserve original jid.
self.class.set(jid: jid).perform_in(retry_backoff, *@arguments)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will rerun client middleware again. Need to decide, if this is a problem or not.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about treating it like a retry? Raise a Sidekiq::Job::Interrupted exception or similar and let the retry subsystem reschedule it automatically?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't like the current approach too.
This is exactly how I implemented it initially, but in the added tests for this PR I now need to rescue this exception manually and do all the rescheduling logic for tests to pass. I will look at if I can somehow better solve this in tests.

@fatkodima
Copy link
Member Author

@mperham This is ready now for review.

state = Sidekiq.redis { |conn| conn.get("it-#{jid}") }

if state
state_hash = Sidekiq.load_json(state)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle errors here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, exceptional cases should generate exceptions. I do think you should use native Redis hashes instead of JSON, so you'd use hgetall rather than get.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently store the iteration cursor as part of this metadata. When using json, we get the correct data type when loading from json. But with redis hashes we will always get strings as values and the user will need to remember to manually cast the string value to the correct datatype. When people will use some custom enumerators or we start supporting custom columns for active record enumerator in the future, we will get other types of cursors, other than integers, and this will be a problem.

Wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make the cursor JSON if it is user-supplied but Sidekiq's internal iteration data can be stored element-by-element in the hash and coerced in Ruby. JSON is meant to be used for interoperability but for our internal data, I'd rather the data be directly addressable in Redis, rather than requiring deserialization.

}

Sidekiq.redis do |conn|
conn.set("it-#{jid}", Sidekiq.dump_json(state), ex: STATE_TTL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fatkodima
Copy link
Member Author

Updated with #6286 (comment) and converted to use redis hashes.

module Sidekiq
module Job
module Iterable
class Interrupted < ::RuntimeError; end
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Sidekiq::Job::Interrupted would read better, wdyt?

end

# @api private
def initialize
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module should include a stopping? method which checks @done on Sidekiq::Processor. Here we use a :lifecycle accessor to store the Processor on the Job.

# In Sidekiq::Processor...
job = SomeJob.new
job.lifecycle = self if job.is_a?(Sidekiq::Job::Iteratable) # Processor
job.perform(*args)
job.stopping? # => job.lifecycle.stopping?

It's a little more complex but I'm trying to avoid any more global APIs like Sidekiq.stopping? to be more Ractor-friendly.

filepath = @csv.path
return unless filepath

count = `wc -l < #{filepath}`.strip.to_i
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a security issue under the right circumstances. I think you can use Shellwords.shellescape for this filepath and I think you can use a simpler wc -l filepath without the redirection.

@@ -134,6 +137,15 @@ def local(jobinst, jobstr, queue)

private

def process_requeue(jobinst, jobstr)
retry_backoff = jobinst.class.get_sidekiq_options.dig("iteration", "retry_backoff") ||
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember that sidekiq_options get persisted into the job payload when the job is created; you don't need to dig the value out of get_sidekiq_options, you'd use job.dig("iteration", "retry_backoff").

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see; you didn't add these to the Job default options. That makes sense if overriding these elements is rare or you want to keep the job payload size as small as possible. If the elements are overridden, the job payload will contain the values so you can do:

job.dig(...) || lifecycle.config.dig(...) # get from specific job or capsule's config

@fatkodima
Copy link
Member Author

Addressed feedback.

@@ -79,7 +83,10 @@ def run

def process_one(&block)
@job = fetch
process(@job) if @job
if @job
@job.lifecycle = self if @job.is_a?(Job::Iterable)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, job will always be a UnitOfWork so this will never execute. You want to set this right before middleware is called, right after JobClass.new.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right 🤦 The unit tests are too unit (with mocking), weren't able to detect this.

@mperham
Copy link
Collaborator

mperham commented May 22, 2024

Great work. I'll merge this and spend a little more time polishing it soon.

@mperham mperham merged commit 21953dd into sidekiq:main May 22, 2024
16 checks passed
@mperham
Copy link
Collaborator

mperham commented May 22, 2024

We'll need a new Iteration wiki page which explains this major new feature. If you have any docs to use as a starting point, please free feel to create it.

@fatkodima fatkodima deleted the iteration branch May 22, 2024 16:46
@fatkodima
Copy link
Member Author

I have some existing docs at https://github.com/fatkodima/sidekiq-iteration/tree/master/guides, but I am bad at writing them.
I would appreciate if you can work on that.

@mperham
Copy link
Collaborator

mperham commented May 22, 2024

No problem, I will take care of it.

@mperham
Copy link
Collaborator

mperham commented May 22, 2024

Initial page here: https://github.com/sidekiq/sidekiq/wiki/Iteration

@mperham
Copy link
Collaborator

mperham commented May 22, 2024

I'm refactoring the API a bit and wondering if you considered using to_enum or enum_for instead of build_enumerator. I don't understand Enumerators very well so I'm trying to understand what would be the most idiomatic API.

https://www.rubydoc.info/stdlib/core/Enumerator

@fatkodima
Copy link
Member Author

build_enumerator is a method that user should define inside their job, which specifies what we should iterate over (and which should return an instance of an Enumerator).
The user can use one of the convenient helpers we provide or specify their own (using to_enum/enum_for, for example).
Some examples in https://github.com/fatkodima/sidekiq-iteration/blob/master/guides/custom-enumerator.md.

Let me know if I did not fully get the question.

@mperham
Copy link
Collaborator

mperham commented May 22, 2024

Why not Job#to_enum, instead of Job#build_enumerator? Do the methods have different meanings?

@fatkodima
Copy link
Member Author

If you mean why we use build_enumerator method name and not a to_enum, I don't know, with my english level, Job#to_enum sounds to me like we are converting this job to enum, which makes little sense. But Job#build_enumerator sounds like a method that builds an enumerator for this job. to_enum is a method already defined on Object, so we probably should not define our own.

You may provide some code example about how you imagine it to work, if that helps.

@freemanoid
Copy link

I'm surprised this kind of functionality was integrated into sidekiq. I thought you should always break down long jobs into smaller ones if they require sidekiq-iteration. If it was built for ActiveRecord, that’s even more surprising to me.

@mperham
Copy link
Collaborator

mperham commented May 23, 2024

@freemanoid This provides an API for breaking down a long-running job into discrete chunks without having to fill Redis with thousands of small jobs. Some jobs have to remain serial and cannot be broken down into set of concurrent jobs.

The ActiveRecord portion is totally optional and does not force Rails upon the user.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pull into Sidekiq core?
3 participants