Skip to content

Commit

Permalink
fix(worker): add max concurrency from the beginning (#1597) fixes #1589
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jan 13, 2023
1 parent cc631a6 commit 6f49db3
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 43 deletions.
27 changes: 17 additions & 10 deletions src/classes/worker.ts
Expand Up @@ -369,14 +369,17 @@ export class Worker<
processing.size < this.opts.concurrency &&
(!this.limitUntil || processing.size == 0)
) {
const token = `${this.id}:${tokenPostfix++}`;
processing.set(
this.retryIfFailed<Job<DataType, ResultType, NameType>>(
() => this.getNextJob(token),
this.opts.runRetryDelay,
),
token,
);
const restProcesses = this.opts.concurrency - processing.size;
for (let i = 0; i < restProcesses; i++) {
const token = `${this.id}:${tokenPostfix++}`;
processing.set(
this.retryIfFailed<Job<DataType, ResultType, NameType>>(
() => this.getNextJob(token),
this.opts.runRetryDelay,
),
token,
);
}
}

/*
Expand Down Expand Up @@ -486,7 +489,8 @@ export class Worker<
// If we get the special delayed job ID, we pick the delay as the next
// block timeout.
if (jobId && jobId.startsWith('0:')) {
this.blockTimeout = parseInt(jobId.split(':')[1]);
const expectedBlockTimeout = parseInt(jobId.split(':')[1]) - Date.now();
this.blockTimeout = expectedBlockTimeout > 0 ? expectedBlockTimeout : 1;
}
const [jobData, id, limitUntil, delayUntil] =
await this.scripts.moveToActive(token, jobId);
Expand Down Expand Up @@ -558,7 +562,10 @@ export class Worker<
}

this.limitUntil = Math.max(limitUntil, 0) || 0;
this.blockTimeout = delayUntil;
if (delayUntil) {
const expectedBlockTimeout = Math.max(delayUntil, 0) - Date.now();
this.blockTimeout = expectedBlockTimeout > 0 ? expectedBlockTimeout : 1;
}

if (jobData) {
this.drained = false;
Expand Down
9 changes: 2 additions & 7 deletions src/commands/addJob-8.lua
Expand Up @@ -57,6 +57,7 @@ local parent = args[8]
local parentData

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"
--- @include "includes/trimEvents"
Expand Down Expand Up @@ -142,13 +143,7 @@ elseif (delayedTimestamp ~= 0) then
-- If wait list is empty, and this delayed job is the next one to be processed,
-- then we need to signal the workers by adding a dummy job (jobId 0:delay) to the wait list.
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
if rcall("LLEN", target) == 0 then
local nextTimestamp = getNextDelayedTimestamp(KEYS[5])
if not nextTimestamp or (delayedTimestamp < nextTimestamp) then
local delay = delayedTimestamp - tonumber(timestamp)
rcall("LPUSH", target, "0:" .. delay)
end
end
addDelayMarkerIfNeeded(target, KEYS[5])
else
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])

Expand Down
15 changes: 15 additions & 0 deletions src/commands/includes/addDelayMarkerIfNeeded.lua
@@ -0,0 +1,15 @@
--[[
Add delay marker if needed.
]]

-- Includes
--- @include "getNextDelayedTimestamp"

local function addDelayMarkerIfNeeded(target, delayedKey)
if rcall("LLEN", target) == 0 then
local nextTimestamp = getNextDelayedTimestamp(delayedKey)
if nextTimestamp ~= nil then
rcall("LPUSH", target, "0:" .. nextTimestamp)
end
end
end
8 changes: 2 additions & 6 deletions src/commands/includes/updateParentDepsIfNeeded.lua
Expand Up @@ -3,6 +3,7 @@
]]

-- Includes
--- @include "addDelayMarkerIfNeeded"
--- @include "addJobWithPriority"
--- @include "getNextDelayedTimestamp"
--- @include "getTargetQueueList"
Expand All @@ -25,12 +26,7 @@ local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDepende
local parentDelayedKey = parentQueueKey .. ":delayed"
rcall("ZADD", parentDelayedKey, score, parentId)

if rcall("LLEN", parentTarget) == 0 then
local nextTimestamp = getNextDelayedTimestamp(parentDelayedKey)
if not nextTimestamp or (delayedTimestamp <= nextTimestamp) then
rcall("LPUSH", parentTarget, "0:" .. delayedTimestamp - tonumber(timestamp))
end
end
addDelayMarkerIfNeeded(parentTarget, parentDelayedKey)
-- Standard or priority add
elseif priority == 0 then
rcall("RPUSH", parentTarget, parentId)
Expand Down
2 changes: 1 addition & 1 deletion src/commands/moveToActive-9.lua
Expand Up @@ -78,5 +78,5 @@ end
-- Return the timestamp for the next delayed job if any.
local nextTimestamp = getNextDelayedTimestamp(KEYS[7])
if (nextTimestamp ~= nil) then
return { 0, 0, 0, nextTimestamp - tonumber(ARGV[2])}
return { 0, 0, 0, nextTimestamp}
end
15 changes: 6 additions & 9 deletions src/commands/moveToDelayed-8.lua
Expand Up @@ -28,9 +28,10 @@
local rcall = redis.call

-- Includes
--- @include "includes/promoteDelayedJobs"
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/getTargetQueueList"
--- @include "includes/getNextDelayedTimestamp"
--- @include "includes/promoteDelayedJobs"

local jobKey = KEYS[5]
if rcall("EXISTS", jobKey) == 1 then
Expand All @@ -54,17 +55,13 @@ if rcall("EXISTS", jobKey) == 1 then
return -3
end

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", KEYS[6], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp)

-- Check if we need to push a marker job to wake up sleeping workers.
local target = getTargetQueueList(KEYS[8], KEYS[1], KEYS[7])
if rcall("LLEN", target) == 0 then
local nextTimestamp = getNextDelayedTimestamp(delayedKey)
if not nextTimestamp or (delayedTimestamp < nextTimestamp) then
rcall("LPUSH", target, "0:" .. delayedTimestamp - tonumber(ARGV[2]))
end
end
addDelayMarkerIfNeeded(target, delayedKey)

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", KEYS[6], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp)
return 0
else
return -1
Expand Down
22 changes: 15 additions & 7 deletions tests/test_connection.ts
Expand Up @@ -36,6 +36,7 @@ describe('connection', () => {
[{ host: '10.0.6.161', port: 7379 }],
{
keyPrefix: 'bullmq',
natMap: {},
},
);

Expand Down Expand Up @@ -100,7 +101,9 @@ describe('connection', () => {

describe('when instantiating with a clustered ioredis connection', () => {
it('should not fail when using dsn strings', async () => {
const connection = new IORedis.Cluster(['redis://10.0.6.161:7379']);
const connection = new IORedis.Cluster(['redis://10.0.6.161:7379'], {
natMap: {},
});
const myQueue = new Queue('myqueue', { connection });
connection.disconnect();
});
Expand All @@ -121,11 +124,14 @@ describe('connection', () => {

describe('when using Cluster instance', async () => {
it('throws an error', async () => {
const connection = new IORedis.Cluster([
{
host: 'https://upstash.io',
},
]);
const connection = new IORedis.Cluster(
[
{
host: 'https://upstash.io',
},
],
{ natMap: {} },
);

expect(() => new QueueBase(queueName, { connection })).to.throw(
'BullMQ: Upstash is not compatible with BullMQ.',
Expand All @@ -137,7 +143,9 @@ describe('connection', () => {
it('throws an error', async () => {
const connection = new IORedis.Cluster(
['localhost', 'https://upstash.io'],
{},
{
natMap: {},
},
);

expect(() => new QueueBase(queueName, { connection })).to.throw(
Expand Down
13 changes: 10 additions & 3 deletions tests/test_delay.ts
Expand Up @@ -155,7 +155,7 @@ describe('Delayed jobs', function () {
});

it('should process delayed jobs in correct order respecting delay', async function () {
this.timeout(3500);
this.timeout(7500);
let order = 0;
const numJobs = 12;
const margin = 1.2;
Expand Down Expand Up @@ -184,7 +184,10 @@ describe('Delayed jobs', function () {
};
});

const worker = new Worker(queueName, processor, { connection });
const worker = new Worker(queueName, processor, {
autorun: false,
connection,
});

worker.on('failed', function (job, err) {});

Expand All @@ -197,6 +200,7 @@ describe('Delayed jobs', function () {
}));

await queue.addBulk(jobs);
worker.run();
await processing;
await worker.close();
});
Expand All @@ -205,7 +209,7 @@ describe('Delayed jobs', function () {
this.timeout(35000);
let count = 0;
const numJobs = 50;
const margin = 1.25;
const margin = 1.22;

let processor1, processor2;

Expand Down Expand Up @@ -305,6 +309,7 @@ describe('Delayed jobs', function () {
return;
},
{
autorun: false,
connection,
concurrency,
},
Expand All @@ -314,6 +319,8 @@ describe('Delayed jobs', function () {
});
});

worker.run();

await processing;

expect(processedJobs.length).to.be.equal(countJobs);
Expand Down
96 changes: 96 additions & 0 deletions tests/test_rate_limiter.ts
Expand Up @@ -367,6 +367,102 @@ describe('Rate Limiter', function () {
await worker.close();
});

describe('when there are more added jobs than max limiter', () => {
it('processes jobs as max limiter from the beginning', async function () {
this.timeout(5000);
let parallelJobs = 0;

const processor = async () => {
parallelJobs++;
await delay(700);

parallelJobs--;

expect(parallelJobs).to.be.lessThanOrEqual(100);

return 'success';
};

const worker = new Worker(queueName, processor, {
concurrency: 600,
autorun: false,
limiter: {
max: 100,
duration: 1000,
},
connection,
});

const allCompleted = new Promise(resolve => {
worker.on('completed', after(400, resolve));
});

const jobs = Array(400)
.fill('')
.map((_, index) => {
return {
name: 'test-job',
data: { id: `id-${index}` },
};
});

await queue.addBulk(jobs);

worker.run();
await allCompleted;

await worker.close();
});

describe('when rate limit is max 1', () => {
it('processes jobs as max limiter from the beginning', async function () {
this.timeout(5000);
let parallelJobs = 0;

const processor = async () => {
parallelJobs++;
await delay(700);

parallelJobs--;

expect(parallelJobs).to.be.lessThanOrEqual(1);

return 'success';
};

const worker = new Worker(queueName, processor, {
concurrency: 100,
autorun: false,
limiter: {
max: 1,
duration: 1000,
},
connection,
});

const allCompleted = new Promise(resolve => {
worker.on('completed', after(5, resolve));
});

const jobs = Array(5)
.fill('')
.map((_, index) => {
return {
name: 'test-job',
data: { id: `id-${index}` },
};
});

await queue.addBulk(jobs);

worker.run();
await allCompleted;

await worker.close();
});
});
});

it('should obey priority', async function () {
this.timeout(20000);

Expand Down

0 comments on commit 6f49db3

Please sign in to comment.