Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(get-jobs): filter marker #1551

Merged
merged 6 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 4 additions & 15 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,36 +296,25 @@ export class QueueGetters<
end = 1,
asc = false,
): Promise<string[]> {
const client = await this.client;
const multi = client.multi();
const multiCommands: string[] = [];

this.commandByType(types, false, (key, command) => {
switch (command) {
case 'lrange':
multiCommands.push('lrange');
if (asc) {
multi.lrange(key, -(end + 1), -(start + 1));
} else {
multi.lrange(key, start, end);
}
break;
case 'zrange':
multiCommands.push('zrange');
if (asc) {
multi.zrange(key, start, end);
} else {
multi.zrevrange(key, start, end);
}
break;
}
});

const responses = await multi.exec();
const responses = await this.scripts.getRanges(types, start, end, asc);

let results: string[] = [];

responses.forEach((response: any[], index: number) => {
const result = response[1] || [];
responses.forEach((response: string[], index: number) => {
const result = response || [];

if (asc && multiCommands[index] === 'lrange') {
results = results.concat(result.reverse());
Expand Down
31 changes: 31 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
} from '../interfaces';
import {
JobState,
JobType,
FinishedStatus,
FinishedPropValAttribute,
RedisJobOptions,
Expand Down Expand Up @@ -394,6 +395,36 @@ export class Scripts {
return (<any>client).drain(args);
}

private getRangesArgs(
types: JobType[],
start: number,
end: number,
asc: boolean,
): (string | number)[] {
const queueKeys = this.queue.keys;
const transformedTypes = types.map(type => {
return type === 'waiting' ? 'wait' : type;
});

const keys: (string | number)[] = [queueKeys['']];

const args = [start, end, asc ? '1' : '0', ...transformedTypes];

return keys.concat(args);
}

async getRanges(
types: JobType[],
start = 0,
end = 1,
asc = false,
): Promise<[string][]> {
const client = await this.queue.client;
const args = this.getRangesArgs(types, start, end, asc);

return (<any>client).getRanges(args);
}

moveToCompleted<T = any, R = any, N extends string = string>(
job: Job<T, R, N>,
returnvalue: R,
Expand Down
69 changes: 69 additions & 0 deletions src/commands/getRanges-1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
--[[
Promotes a job that is currently "delayed" to the "waiting" state
Input:
KEYS[1] 'prefix'
ARGV[1] start
ARGV[2] end
ARGV[3] asc
ARGV[4...] types
]]
local rcall = redis.call;
local prefix = KEYS[1]
local rangeStart = tonumber(ARGV[1])
local rangeEnd = tonumber(ARGV[2])
local asc = ARGV[3]
local results = {}

local function getRangeInList(listKey, asc, rangeStart, rangeEnd, results)
if asc == "1" then
local modifiedRangeStart
local modifiedRangeEnd
if rangeStart == -1 then
modifiedRangeStart = 0
else
modifiedRangeStart = -(rangeStart + 1)
end

if rangeEnd == -1 then
modifiedRangeEnd = 0
else
modifiedRangeEnd = -(rangeEnd + 1)
end

results[#results+1] = rcall("LRANGE", listKey,
modifiedRangeEnd,
modifiedRangeStart)
else
results[#results+1] = rcall("LRANGE", listKey, rangeStart, rangeEnd)
end
end

for i = 4, #ARGV do
local stateKey = prefix .. ARGV[i]
if ARGV[i] == "wait" or ARGV[i] == "paused" then
local marker = rcall("LINDEX", stateKey, -1)
if marker and string.sub(marker, 1, 2) == "0:" then
local count = rcall("LLEN", stateKey)
if count > 1 then
rcall("RPOP", stateKey)
getRangeInList(stateKey, asc, rangeStart, rangeEnd, results)
else
results[#results+1] = {}
end
else
getRangeInList(stateKey, asc, rangeStart, rangeEnd, results)
end
elseif ARGV[i] == "active" then
getRangeInList(stateKey, asc, rangeStart, rangeEnd, results)
else
if asc == "1" then
results[#results+1] = rcall("ZRANGE", stateKey, rangeStart, rangeEnd)
else
results[#results+1] = rcall("ZREVRANGE", stateKey, rangeStart, rangeEnd)
end
end
end

return results
26 changes: 26 additions & 0 deletions tests/test_getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,32 @@ describe('Jobs getters', function () {
queue.add('test', { foo: 2 });
});

describe('when marker is present', () => {
describe('when there are delayed jobs and waiting jobs', () => {
it('filters jobIds different than marker', async () => {
await queue.add('test1', { foo: 3 }, { delay: 2000 });
await queue.add('test2', { foo: 2 });

const jobs = await queue.getJobs(['waiting']);

expect(jobs).to.be.an('array');
expect(jobs).to.have.length(1);
expect(jobs[0].name).to.be.equal('test2');
});
});

describe('when there is only one delayed job and get waiting jobs', () => {
it('filters marker and returns an empty array', async () => {
await queue.add('test1', { foo: 3 }, { delay: 2000 });

const jobs = await queue.getJobs(['waiting']);

expect(jobs).to.be.an('array');
expect(jobs).to.have.length(0);
});
});
});

it('should return deduplicated jobs for duplicates types', async function () {
queue.add('test', { foo: 1 });
const jobs = await queue.getJobs(['wait', 'waiting', 'waiting']);
Expand Down