Skip to content

Commit

Permalink
fix: fixed issue with publish ignoring 'drain' event
Browse files Browse the repository at this point in the history
fix #129
  • Loading branch information
Richard Klafter committed Aug 21, 2021
1 parent 1193825 commit e195d9b
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions src/ChannelWrapper.js
Expand Up @@ -355,11 +355,12 @@ export default class ChannelWrapper extends EventEmitter {
.then(() => {
const encodedMessage = this._json ? new Buffer.from(JSON.stringify(message.content)) : message.content;

let result = true;
const sendPromise = (() => {
switch (message.type) {
case 'publish':
return new Promise(function(resolve, reject) {
const result = channel.publish(message.exchange, message.routingKey, encodedMessage,
result = channel.publish(message.exchange, message.routingKey, encodedMessage,
message.options,
err => {
if(err) {
Expand All @@ -371,24 +372,25 @@ export default class ChannelWrapper extends EventEmitter {
});
case 'sendToQueue':
return new Promise(function(resolve, reject) {
const result = channel.sendToQueue(message.queue, encodedMessage, message.options, err => {
result = channel.sendToQueue(message.queue, encodedMessage, message.options, err => {
if(err) {
reject(err);
} else {
setImmediate(() => resolve(result));
}
});
});

/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${message.type}`);
}
})();

// Send some more!
this._publishQueuedMessages(workerNumber);

if(result) {
this._publishQueuedMessages(workerNumber);
} else {
this._channel.once('drain', ()=>this._publishQueuedMessages(workerNumber))
}
return sendPromise;
})
.then(
Expand Down Expand Up @@ -482,4 +484,4 @@ function removeUnconfirmedMessage(arr, message) {
}
const removed = arr.splice(toRemove, 1);
return removed[0];
}
}

0 comments on commit e195d9b

Please sign in to comment.