Skip to content

Commit

Permalink
feat(job): add changePriority method (#1901) ref #1899
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed May 31, 2023
1 parent 8f4d86a commit 9485ad5
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 7 deletions.
19 changes: 18 additions & 1 deletion docs/gitbook/guide/jobs/delayed.md
Expand Up @@ -20,9 +20,26 @@ await myQueue.add('house', { color: 'white' }, { delay: 5000 });
If you want to process the job after a specific point in time, just add the time remaining to that point in time. For example, let's say you want to process the job on the third of July 2035 at 10:30:

```typescript
const targetTime = new Date("03-07-2035 10:30");
const targetTime = new Date('03-07-2035 10:30');
const delay = Number(targetTime) - Number(new Date());

await myQueue.add('house', { color: 'white' }, { delay });
```

## Change delay

If you want to change the delay after inserting a delayed job, just use **changeDelay** method. For example, let's say you want to change the delay from 2000 to 4000 milliseconds:

```typescript
const job = await Job.create(queue, 'test', { foo: 'bar' }, { delay: 2000 });

await job.changeDelay(4000);
```

{% hint style="warning" %}
Take in count that your job must be into delayed state when you change the delay.
{% endhint %}

## Read more:

- 💡 [Change Delay API Reference](https://api.docs.bullmq.io/classes/Job.html#changeDelay)
24 changes: 23 additions & 1 deletion docs/gitbook/guide/jobs/prioritized.md
Expand Up @@ -6,7 +6,7 @@ Jobs can also include a priority option. Using priorities, job's processing orde
Adding prioritized jobs is a slower operation than the other types of jobs, with a complexity O(n) relative to the number of jobs waiting in the Queue.
{% endhint %}

Note that the priorities go from 1 to MAX\_INT, whereas a lower number is always a higher priority than higher numbers.
Note that the priorities go from 1 to MAX_INT, whereas a lower number is always a higher priority than higher numbers.

Jobs without a priority assigned will get the least priority.

Expand All @@ -24,3 +24,25 @@ await myQueue.add('wall', { color: 'blue' }, { priority: 7 });
```

If several jobs are added with the same priority value, then the jobs within that priority will be processed in FIFO (First in first out) fashion.

## Change priority

If you want to change the priority after inserting a job, just use the **changePriority** method. For example, let's say that you want to change the priority from 16 to 1:

```typescript
const job = await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 });

await job.changePriority({
priority: 1,
});
```

or if you want to use lifo option:

```typescript
const job = await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 });

await job.changePriority({
lifo: true,
});
```
12 changes: 12 additions & 0 deletions src/classes/job.ts
Expand Up @@ -738,6 +738,18 @@ export class Job<
this.delay = delay;
}

/**
* Change job priority.
*
* @returns void
*/
async changePriority(opts: {
priority?: number;
lifo?: boolean;
}): Promise<void> {
await this.scripts.changePriority(this.id, opts.priority, opts.lifo);
}

/**
* Get this jobs children result values if any.
*
Expand Down
34 changes: 34 additions & 0 deletions src/classes/scripts.ts
Expand Up @@ -527,6 +527,40 @@ export class Scripts {
return keys.concat([delay, JSON.stringify(timestamp), jobId]);
}

async changePriority(
jobId: string,
priority = 0,
lifo = false,
): Promise<void> {
const client = await this.queue.client;

const args = this.changePriorityArgs(jobId, priority, lifo);
const result = await (<any>client).changePriority(args);
if (result < 0) {
throw this.finishedErrors(result, jobId, 'changePriority');
}
}

private changePriorityArgs(
jobId: string,
priority = 0,
lifo = false,
): (string | number)[] {
const keys: (string | number)[] = [
this.queue.keys.wait,
this.queue.keys.paused,
this.queue.keys.meta,
this.queue.keys.priority,
];

return keys.concat([
priority,
this.queue.toKey(jobId),
jobId,
lifo ? 1 : 0,
]);
}

// Note: We have an issue here with jobs using custom job ids
moveToDelayedArgs(
jobId: string,
Expand Down
50 changes: 50 additions & 0 deletions src/commands/changePriority-4.lua
@@ -0,0 +1,50 @@
--[[
Change job priority
Input:
KEYS[1] 'wait',
KEYS[2] 'paused'
KEYS[3] 'meta'
KEYS[4] 'priority'
ARGV[1] priority value
ARGV[2] job key
ARGV[3] job id
ARGV[4] lifo
Output:
0 - OK
-1 - Missing job
]]
local jobKey = ARGV[2]
local jobId = ARGV[3]
local priority = tonumber(ARGV[1])
local rcall = redis.call

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"

if rcall("EXISTS", jobKey) == 1 then
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])

local numRemovedElements = rcall("LREM", target, -1, jobId)
if numRemovedElements > 0 then
rcall("ZREM", KEYS[4], jobId)

-- Standard or priority add
if priority == 0 then
-- LIFO or FIFO
local pushCmd = ARGV[4] == '1' and 'RPUSH' or 'LPUSH';
rcall(pushCmd, target, jobId)
else
-- Priority add
addJobWithPriority(KEYS[4], priority, target, jobId)
end
end

rcall("HSET", jobKey, "priority", priority)

return 0
else
return -1
end
4 changes: 2 additions & 2 deletions src/commands/includes/getTargetQueueList.lua
Expand Up @@ -5,8 +5,8 @@

local function getTargetQueueList(queueMetaKey, waitKey, pausedKey)
if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then
return waitKey
return waitKey, false
else
return pausedKey
return pausedKey, true
end
end

0 comments on commit 9485ad5

Please sign in to comment.