Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support retrieving future jobs in advance, for moving to a short-term queue (e.g. SQS) #315

Open
2 of 6 tasks
gregplaysguitar opened this issue Feb 10, 2023 · 6 comments
Open
2 of 6 tasks

Comments

@gregplaysguitar
Copy link
Contributor

Feature description

When configuring worker to retrieve and process jobs, support a per-task-type time offset which controls how far in the future to look for jobs. For example, if the configured offset was 10 minutes, worker would process jobs up to 10 minutes prior to their run_at value. By default, this offset would be null and the behaviour would remain unchanged.

The point of this offset is to allow future jobs to be moved to a more scalable short-term queue. For example, SQS scales out arbitrarily and supports message delays of up to 15 minutes. So a worker task configured to retrieve jobs up to 15 minutes in future could just move them to SQS with an appropriate delay, and SQS would do the actual job processing.

In combination with the functionality proposed in #99, this change would make worker capable of dealing with orders of magnitude more scale as a job scheduler.

Motivating example

Right now, worker does a great job of scheduling future jobs and processing them efficiently, however there are limits to how far it can scale on a single-node database. This is particularly noticeable when a large number of jobs are scheduled for the same time. To give a real example - 100,000 notifications scheduled for publish at 9am on Monday morning. In practice worker can't process 100k tasks all at once so task processing is spread over a period of time - with worker concurrency at say, 100 and jobs taking 200ms to process, some notifications end up 3-4 minutes late.

With a run_at offset configured to 10 minutes, and a task set up to forward jobs to SQS, worker would have 10 minutes to retrieve and process all 100k jobs from the database rather than having them all suddenly available to process at once. SQS has a limit of 120k in-flight messages so in theory it could actually process these all at once - depending on the limits of the surrounding systems of course!

Supporting development

I [tick all that apply]:

  • am interested in building this feature myself
  • am interested in collaborating on building this feature
  • am willing to help testing this feature before it's released
  • am willing to write a test-driven test suite for this feature (before it exists)
  • am a Graphile sponsor ❤️
  • have an active support or consultancy contract with Graphile
@gregplaysguitar
Copy link
Contributor Author

gregplaysguitar commented Feb 10, 2023

Hi @benjie , long time no see! I wanted to register this idea with you, not in expectation that you'll build it, but just in case you already have plans in this area or thoughts on how it should be done. As you might have gathered we haven't really touched our scheduling infrastructure for a few years - hence the stagnation of #99 - but it's come up again and we'll be working on it in the next few months. The high-level plan is basically as laid out above

  • jobs scheduled in postgres (we already wrote a custom query to do this in bulk via an SQS queue, which is another thing that worker could perhaps support out of the box?)
  • future jobs moved from postgres to SQS with a message delay
  • actual job processing in lambda

Ideally, we'd implement batched processing and the offset as first-class worker features, but time constraints may necessitate some custom wrappers in our own codebase. Either way I'd love to align our efforts with the community, so I'm keen to keep the dialog open!

Let me know if you have any thoughts on the above - otherwise, I'll keep you posted if we end up with anything we could contribute back.

@benjie
Copy link
Member

benjie commented Feb 10, 2023

It’s an interesting use case, but it would have to be done for all tasks the worker supports for efficiency reasons. You could then run that worker supporting only the tasks you want offset.

However… you could achieve this with no worker code changes if you factor that the worker task isn’t “send the notification” but instead “send to SQS”. Schedule the run_at 10 minutes earlier and include the target notification time in the payload and it should Just Work ™️

I have plans to make a highly optimised task exporter mode for worker whose only job is to take tasks from the DB and pipe them into another queue. I didn’t know SQS had a 15 minute delay window, thanks for letting me know 👍

Oh, and have you seen our new batch-jobs? It works very different to your previous proposal (and doesn’t exclude us from doing that too), but may be helpful:

https://github.com/graphile/worker#batch-jobs

@gregplaysguitar
Copy link
Contributor Author

Interesting - it sounds like the optimised task exporter would be the place to have this offset option. Is it something we could help implement? I'm assuming it'd be reading/writing from the jobs table in bulk which would greatly increase efficiency, and for us at least, would make the changes in #99 unneeded. Our use of worker fits pretty neatly into two categories - relatively infrequent ad-hoc jobs where performance is not a concern, and future-scheduled jobs where we need it to scale out pretty arbitrarily.

We had considered setting run_at early as you suggest, but I'd rather keep that as an implementation detail of the consumer code and not something the producer has to determine. We could do it that way as starting point though.

Yeah I saw batch jobs - it doesn't solve the scaling issue for us, but a very nifty feature.

Anyway let me know if you have any more info on the task exporter feature - it sounds that could be a great way for us to solve our specific issue while also contributing a feature with broader utility

@benjie
Copy link
Member

benjie commented Feb 11, 2023

Anyway let me know if you have any more info on the task exporter feature

Nope, just an intent to do it. Plan is probably to do a TX, grab N jobs (skip locked for update, to enable multiple workers to do this in parallel), write them to an external queue via user function, delete jobs on success or set run_at in future on failure, commit TX. N will probably be large (1k? need to experiment). With your additional requirements there'd also be a cap on max run_at value I guess. I suspect we'd also drop any expensive ordering logic and use a single index and as few conditions as possible instead.

@gregplaysguitar
Copy link
Contributor Author

I'm digging into the code for this a bit @benjie, and trying to work out where in the code you'd want the worker/exporter functionality to diverge. I started writing an exporter.ts file alongside worker.ts, but it seems there is a lot of code in worker.ts that you'd probably want to keep for the exporter mode, things like nudge and release behaviour, events etc.

My current inclination is to expose a separate exporter function, but probably share a fair bit of functionality with worker.ts. This is going to make for a bigger change though as a fair bit of code will need to be altered for reuse. Let me know if you have any preferences there.

I'm also curious as to your thoughts on just deleting the jobs directly, in a transaction, and relying on transaction rollback to reinstate them if the export failed. Is that a terrible idea? I think we can assume the export will be fast and highly reliable, so it makes sense to optimise for that happy path, although we'd want to work in much smaller batches than 1000 I think. There might be some downsides that I'm not aware of though. I do plan to try out some different strategies, but haven't got that far yet.

@benjie
Copy link
Member

benjie commented Mar 12, 2023

For now I would implement it as a separate thing which addresses its own needs, later on I might consider DRY-ing up parts. Much clearer if it’s just standalone functionality IMO. I see the exporter as something entirely separate, it wouldn’t use the worker pools, watch mode, cron, etc.

delete from jobs where … order by … returning * in a transaction would be fine, I think 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants