Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: OptimalBits/bull
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v3.13.0
Choose a base ref
...
head repository: OptimalBits/bull
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v3.14.0
Choose a head ref
  • 19 commits
  • 9 files changed
  • 13 contributors

Commits on Jan 19, 2020

  1. fix: call Queue.isReady() before job promote is called

    Fotis Tsokos authored and Fotis Tsokos committed Jan 19, 2020
    Copy the full SHA
    4b3abca View commit details

Commits on Feb 14, 2020

  1. Update Job#moveToFailed link

    emilianoLeite authored and Emiliano Leite committed Feb 14, 2020
    Copy the full SHA
    9b6f9b7 View commit details

Commits on Feb 16, 2020

  1. Merge pull request #1653 from emilianoLeite/develop

    [REFERENCE] Update `Job#moveToFailed` link
    manast authored Feb 16, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    95bc627 View commit details

Commits on Mar 1, 2020

  1. Merge pull request #1619 from chocof/uninitialized_queue_promote_fix

    fix: call Queue.isReady() before job promote is called
    manast authored Mar 1, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    ec8ad57 View commit details

Commits on Mar 19, 2020

  1. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    b06f4b2 View commit details

Commits on Mar 21, 2020

  1. Update REFERENCE.md (#1668)

    docs: correct spelling of milliseconds
    Akintunde102 authored Mar 21, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    dbe2549 View commit details
  2. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    4b6031d View commit details
  3. docs: Update CHANGELOG.md (#1652)

    Format changes as a list
    dhritzkiv authored Mar 21, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    4a6268b View commit details
  4. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    5ecc30b View commit details
  5. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    626f21f View commit details
  6. docs: Update REFERENCE.md (#1595)

    Spelling Correction
    Akintunde102 authored Mar 21, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    3a6a519 View commit details
  7. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    031b231 View commit details
  8. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    c61cd84 View commit details
  9. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    c06af98 View commit details

Commits on Apr 23, 2020

  1. Copy the full SHA
    10a9eae View commit details

Commits on May 7, 2020

  1. Copy the full SHA
    635210f View commit details

Commits on May 8, 2020

  1. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    1898b5a View commit details
  2. docs: update changelog

    manast committed May 8, 2020
    Copy the full SHA
    7f9056a View commit details
  3. 3.14.0

    manast committed May 8, 2020
    Copy the full SHA
    a4aa27c View commit details
Showing with 254 additions and 91 deletions.
  1. +13 −3 CHANGELOG.md
  2. +1 −1 README.md
  3. +28 −5 REFERENCE.md
  4. +52 −0 lib/commands/removeJobs-7.lua
  5. +88 −76 lib/job.js
  6. +22 −5 lib/queue.js
  7. +29 −0 lib/scripts.js
  8. +1 −1 package.json
  9. +20 −0 test/test_job.js
16 changes: 13 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
# Changelog

## v.3.14.0

- feat(queue): add removeJobs function
- fix: clamp negative job delay values to 0 to prevent thrashing
- fix: use DEFAULT_JOB_NAME (#1585)
- fix: remove the lazy client error handler on close (#1605)
- fix: prevent exceeding the maximum stack call size when emptying large queues (#1660)

[Changes](https://github.com/OptimalBits/bull/compare/v3.13.0...v3.14.0)

## v.3.13.0

feat: add "preventParsingData" job option to prevent data parsing
fix: queue.clean clean job logs as well
fix: whenCurrentJobsFinished should wait for all jobs
- feat: add "preventParsingData" job option to prevent data parsing
- fix: queue.clean clean job logs as well
- fix: whenCurrentJobsFinished should wait for all jobs

[Changes](https://github.com/OptimalBits/bull/compare/v3.12.1...v3.13.0)

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -420,7 +420,7 @@ and being restarted as a result. Locking is implemented internally by creating a
`lockRenewTime` (which is usually half `lockDuration`). If `lockDuration` elapses before the lock can be renewed,
the job will be considered stalled and is automatically restarted; it will be __double processed__. This can happen when:
1. The Node process running your job processor unexpectedly terminates.
2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the `lockDuration` setting (with the tradeoff being that it will take longer to recognize a real stalled job).
2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see [#488](https://github.com/OptimalBits/bull/issues/488) for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the `lockDuration` setting (with the tradeoff being that it will take longer to recognize a real stalled job).

As such, you should always listen for the `stalled` event and log this to your error monitoring system, as this means your jobs are likely getting double-processed.

33 changes: 28 additions & 5 deletions REFERENCE.md
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
- [Queue#resume](#queueresume)
- [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished)
- [Queue#count](#queuecount)
- [Queue#removeJobs](#queueremovejobs)
- [Queue#empty](#queueempty)
- [Queue#clean](#queueclean)
- [Queue#close](#queueclose)
@@ -42,7 +43,7 @@
- [Job#promote](#jobpromote)
- [Job#finished](#jobfinished)
- [Job#moveToCompleted](#jobMoveToCompleted)
- [Job#moveToFailed](#moveToFailed)
- [Job#moveToFailed](#jobMoveToFailed)

- [Events](#events)
- [Global events](#global-events)
@@ -75,6 +76,7 @@ interface RateLimiter {
max: number, // Max number of jobs processed
duration: number, // per duration in milliseconds
bounceBack: boolean = false; // When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue
}
```

`RedisOpts` are passed directly to ioredis constructor, check [ioredis](https://github.com/luin/ioredis/blob/master/API.md)
@@ -197,7 +199,7 @@ const emailQueue = new Queue('email');
emailQueue.process('sendEmail', 25, sendEmail);
```

Specifying `*` as the process name will make it the default processor for all named jobs.
Specifying `*` as the process name will make it the default processor for all named jobs.
It is frequently used to process all named jobs from one process function:

```js
@@ -249,7 +251,7 @@ interface JobOpts {
priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that
// using priorities has a slight impact on performance, so do not use it if not required.

delay: number; // An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both
delay: number; // An amount of milliseconds to wait until this job can be processed. Note that for accurate delays, both
// server and clients should have their clocks synchronized. [optional].

attempts: number; // The total number of attempts to try the job until it completes.
@@ -307,7 +309,7 @@ pause(isLocal?: boolean, doNotWaitActive?: boolean): Promise

Returns a promise that resolves when the queue is paused. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized. The pause can be either global or local. If global, all workers in all queue instances for a given queue will be paused. If local, just this worker will stop processing new jobs after the current lock expires. This can be useful to stop a worker from taking new jobs prior to shutting down.

If `doNotWaitActive` is `true`, `pause` will *not* wait for any active jobs to finish before resolving. Otherwise, `pause` *will* wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information.
If `doNotWaitActive` is `true`, `pause` will _not_ wait for any active jobs to finish before resolving. Otherwise, `pause` _will_ wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information.

Pausing a queue that is already paused does nothing.

@@ -345,6 +347,25 @@ Returns a promise that returns the number of jobs in the queue, waiting or delay

---

### Queue#removeJobs

```ts
removeJobs(pattern: string): Promise<void>
```

Removes all the jobs which jobId matches the given pattern. The pattern must follow redis glob-style pattern (syntax)[https://redis.io/commands/keys]

Example:
```js
myQueue.removeJobs('?oo*').then(function() {
console.log('done removing jobs');
});
```

Will remove jobs with ids such as: "boo", "foofighter", etc.

---

### Queue#empty

```ts
@@ -425,7 +446,9 @@ parameter. If the specified job cannot be located, the promise will be resolved
getJobs(types: string[], start?: number, end?: number, asc?: boolean): Promise<Job[]>
```

Returns a promise that will return an array of job instances of the given types. Optional parameters for range and ordering are provided.
Returns a promise that will return an array of job instances of the given types. Optional parameters for range and ordering are provided.

Note: The `start` and `end` options are applied **per job type**. For example, if there are 10 jobs in state `completed` and 10 jobs in state `active`, `getJobs(['completed', 'active'], 0, 4)` will yield an array with 10 entries, representing the first 5 completed jobs (0 - 4) and the first 5 active jobs (0 - 4).

---

52 changes: 52 additions & 0 deletions lib/commands/removeJobs-7.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
--[[
Remove all jobs matching a given pattern from all the queues they may be in as well as all its data.
In order to be able to remove any job, they must be unlocked.
Input:
KEYS[1] 'active',
KEYS[2] 'wait',
KEYS[3] 'delayed',
KEYS[4] 'paused',
KEYS[5] 'completed',
KEYS[6] 'failed',
KEYS[7] 'priority',
ARGV[1] prefix
ARGV[2] pattern
ARGV[3] cursor
Events:
'removed'
]]

-- TODO PUBLISH global events 'removed'

local rcall = redis.call
local result = rcall("SCAN", ARGV[3], "MATCH", ARGV[1] .. ARGV[2])
local cursor = result[1];
local jobKeys = result[2];
local removed = {}

local prefixLen = string.len(ARGV[1]) + 1
for i, jobKey in ipairs(jobKeys) do
local keyTypeResp = rcall("TYPE", jobKey)
if keyTypeResp["ok"] == "hash" then
local jobId = string.sub(jobKey, prefixLen)
local lockKey = jobKey .. ':lock'
local lock = redis.call("GET", lockKey)
if not lock then
rcall("LREM", KEYS[1], 0, jobId)
rcall("LREM", KEYS[2], 0, jobId)
rcall("ZREM", KEYS[3], jobId)
rcall("LREM", KEYS[4], 0, jobId)
rcall("ZREM", KEYS[5], jobId)
rcall("ZREM", KEYS[6], jobId)
rcall("ZREM", KEYS[7], jobId)
rcall("DEL", jobKey)
rcall("DEL", jobKey .. ':logs')
table.insert(removed, jobId)
end
end
end
return {cursor, removed}

Loading