Skip to content

Commit

Permalink
fix(rate-limit): take in count priority (#1919) fixes #1915
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed May 30, 2023
1 parent ce86ece commit b8157a3
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
3 changes: 2 additions & 1 deletion src/classes/scripts.ts
Expand Up @@ -888,10 +888,11 @@ export class Scripts {
this.queue.keys.paused,
this.queue.keys.meta,
this.queue.keys.limiter,
this.queue.keys.priority,
this.queue.keys.events,
];

const args = [jobId, token];
const args = [jobId, token, this.queue.toKey(jobId)];

const pttl = await (<any>client).moveJobFromActiveToWait(keys.concat(args));

Expand Down
2 changes: 1 addition & 1 deletion src/commands/addJob-8.lua
Expand Up @@ -33,7 +33,7 @@
[6] waitChildrenKey key.
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key
[9] repeat job key
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand Down
Expand Up @@ -9,14 +9,17 @@
KEYS[5] paused key
KEYS[6] meta key
KEYS[7] limiter key
KEYS[8] event key
KEYS[8] priority key
KEYS[9] event key
ARGV[1] job id
ARGV[2] lock token
ARGV[3] job id key
]]
local rcall = redis.call

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"

local jobId = ARGV[1]
Expand All @@ -31,11 +34,19 @@ if lockToken == token and pttl > 0 then
local target = getTargetQueueList(KEYS[6], KEYS[2], KEYS[5])

rcall("SREM", KEYS[3], jobId)
rcall("RPUSH", target, jobId)

local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0

if priority > 0 then
addJobWithPriority(KEYS[8], priority, target, jobId)
else
rcall("RPUSH", target, jobId)
end

rcall("DEL", lockKey)

-- Emit waiting event
rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId", jobId)
rcall("XADD", KEYS[9], "*", "event", "waiting", "jobId", jobId)
end
end

Expand Down
61 changes: 61 additions & 0 deletions tests/test_rate_limiter.ts
Expand Up @@ -323,6 +323,67 @@ describe('Rate Limiter', function () {
await worker.close();
});

describe('when priority is provided', () => {
it('should obey the rate limit respecting priority', async function () {
this.timeout(6000);

let extraCount = 3;
let priority = 9;
const numJobs = 4;
const dynamicLimit = 250;
const duration = 100;

const worker = new Worker(
queueName,
async job => {
if (job.attemptsMade === 1) {
if (extraCount > 0) {
await queue.add('rate test', {}, { priority });
priority -= 1;
extraCount -= 1;
}
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
}
},
{
connection,
limiter: {
max: 1,
duration,
},
},
);

const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async args => {
await worker.close();

try {
expect(args.jobId).to.be.equal('1');
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
await worker.close();
reject(err);
});
});

await queue.add('rate test', {}, { priority: 10 });

await result;
await worker.close();
});
});

describe('when queue is paused', () => {
it('moves job to paused', async function () {
const dynamicLimit = 250;
Expand Down

0 comments on commit b8157a3

Please sign in to comment.