Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: OptimalBits/bull
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v3.17.0
Choose a base ref
...
head repository: OptimalBits/bull
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v3.18.0
Choose a head ref
  • 3 commits
  • 7 files changed
  • 1 contributor

Commits on Aug 13, 2020

  1. Verified

    This commit was signed with the committer’s verified signature.
    ljharb Jordan Harband
    Copy the full SHA
    a7f986f View commit details
  2. docs: update CHANGELOG

    manast committed Aug 13, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    bcbde16 View commit details
  3. 3.18.0

    manast committed Aug 13, 2020

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    6db649e View commit details
Showing with 57 additions and 45 deletions.
  1. +6 −0 CHANGELOG.md
  2. +5 −8 lib/commands/index.js
  3. +8 −0 lib/commands/{pause-4.lua → pause-5.lua}
  4. +28 −31 lib/queue.js
  5. +1 −1 lib/scripts.js
  6. +1 −1 package.json
  7. +8 −4 test/test_queue.js
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v.3.18.0

- feat: make pause forward compatible with bullmq (#1818) (@manast)

[Changes](https://github.com/OptimalBits/bull/compare/v3.17.0...v3.18.0)

## v.3.17.0

- feat: better rate limiter (#1816) (@manast)
13 changes: 5 additions & 8 deletions lib/commands/index.js
Original file line number Diff line number Diff line change
@@ -27,15 +27,12 @@ const _fs = {
module.exports = (function() {
let scripts;

return function(client) {
return utils.isRedisReady(client).then(() => {
scripts = scripts || loadScripts(__dirname);
return async function(client) {
await utils.isRedisReady(client);
scripts = await (scripts || loadScripts(__dirname));

return scripts.then(_scripts => {
return _scripts.forEach(command => {
return client.defineCommand(command.name, command.options);
});
});
return scripts.map(command => {
return client.defineCommand(command.name, command.options);
});
};
})();
8 changes: 8 additions & 0 deletions lib/commands/pause-4.lua → lib/commands/pause-5.lua
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
KEYS[2] 'paused' or 'wait'
KEYS[3] 'meta-paused'
KEYS[4] 'paused' o 'resumed' event.
KEYS[5] 'meta' this key is only used in BullMQ and above.
ARGV[1] 'paused' or 'resumed'
@@ -20,8 +21,15 @@ end

if ARGV[1] == "paused" then
rcall("SET", KEYS[3], 1)

-- for forwards compatibility
rcall("HSET", KEYS[5], "paused", 1)
else
rcall("DEL", KEYS[3])

-- for forwards compatibility
rcall("HDEL", KEYS[5], "paused")

end

rcall("PUBLISH", KEYS[4], ARGV[1])
59 changes: 28 additions & 31 deletions lib/queue.js
Original file line number Diff line number Diff line change
@@ -90,20 +90,15 @@ const Queue = function Queue(name, url, opts) {
}

if (_.isString(url)) {
opts = _.extend(
{},
{
redis: redisOptsFromUrl(url)
},
opts
);
opts = {
...opts,
redis: { ...redisOptsFromUrl(url), ...(opts || {}).redis }
};
} else {
opts = url;
opts = url || {};
}

opts = _.cloneDeep(opts || {});

if (opts && !_.isObject(opts)) {
if (!_.isObject(opts)) {
throw Error('Options must be a valid object');
}

@@ -142,6 +137,7 @@ const Queue = function Queue(name, url, opts) {
delete opts.redis.keyPrefix;

this.clients = [];

const lazyClient = redisClientGetter(this, opts, (type, client) => {
// bubble up Redis error events
const handler = this.emit.bind(this, 'error');
@@ -490,10 +486,9 @@ Queue.prototype._registerEvent = function(eventName) {

Queue.ErrorMessages = errors.Messages;

Queue.prototype.isReady = function() {
return this._initializing.then(() => {
return this;
});
Queue.prototype.isReady = async function() {
await this._initializing;
return this;
};

async function redisClientDisconnect(client) {
@@ -540,13 +535,11 @@ Queue.prototype.close = function(doNotWaitJobs) {
}

return (this.closing = this.isReady()
.then(
() => this._initializingProcess,
err => {
console.error(err);
isReady = false;
}
)
.then(this._initializingProcess)
.catch(err => {
console.error(err);
isReady = false;
})
.finally(() => this._clearTimers())
.then(() => isReady && this.pause(true, doNotWaitJobs))
.then(() => {
@@ -843,17 +836,21 @@ Queue.prototype.resume = function(isLocal /* Optional */) {
});
};

Queue.prototype.isPaused = function(isLocal) {
Queue.prototype.isPaused = async function(isLocal) {
if (isLocal) {
return Promise.resolve(!!this.paused);
return !!this.paused;
} else {
return this.isReady()
.then(() => {
return this.client.exists(this.keys['meta-paused']);
})
.then(result => {
return result === 1;
});
await this.isReady();
const multi = this.multi();

multi.exists(this.keys['meta-paused']);

// For forward compatibility with BullMQ.
multi.hexists(this.toKey('meta'), 'paused');

const [[, isPaused], [, isPausedNew]] = await multi.exec();

return !!(isPaused || isPausedNew);
}
};

2 changes: 1 addition & 1 deletion lib/scripts.js
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ const scripts = {
}

const keys = _.map(
[src, dst, 'meta-paused', pause ? 'paused' : 'resumed'],
[src, dst, 'meta-paused', pause ? 'paused' : 'resumed', 'meta'],
name => {
return queue.toKey(name);
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bull",
"version": "3.17.0",
"version": "3.18.0",
"description": "Job manager",
"engines": {
"node": ">=10"
12 changes: 8 additions & 4 deletions test/test_queue.js
Original file line number Diff line number Diff line change
@@ -136,7 +136,9 @@ describe('Queue', () => {
});

it('should create a queue with a redis connection string', () => {
const queue = new Queue('connstring', 'redis://123.4.5.67:1234/2');
const queue = new Queue('connstring', 'redis://123.4.5.67:1234/2', {
redis: { connectTimeout: 1000 }
});

expect(queue.client.options.host).to.be.eql('123.4.5.67');
expect(queue.eclient.options.host).to.be.eql('123.4.5.67');
@@ -148,10 +150,12 @@ describe('Queue', () => {
expect(queue.eclient.options.db).to.be.eql(2);

return queue.close();
}).timeout(5000);
});

it('should create a queue with only a hostname', () => {
const queue = new Queue('connstring', 'redis://127.2.3.4');
const queue = new Queue('connstring', 'redis://127.2.3.4', {
redis: { connectTimeout: 1000 }
});

expect(queue.client.options.host).to.be.eql('127.2.3.4');
expect(queue.eclient.options.host).to.be.eql('127.2.3.4');
@@ -162,7 +166,7 @@ describe('Queue', () => {
expect(queue.client.condition.select).to.be.eql(0);
expect(queue.eclient.condition.select).to.be.eql(0);

queue.close().catch((/*err*/) => {
return queue.close().catch((/*err*/) => {
// Swallow error.
});
});